RMQ Transport Adaptor Installation Guide

Overview

The RabbitMQ Transport Adaptor (hereinafter the RMQ TA) is intended to communicate FIX messages to other applications using RabbitMQ as middleware. The document describes common steps required to install Transport for the FIX Edge.

RMQ TA Functionality

RMQTA is able to have one or more producers and consumers.

Producer is used to transfer messages from FIXEdge to the configured RabbitMQ server’s queue. Producer connects to the queue immediately or at configured time (see parameter "StartTime") if connection is available and the queue exists. 

Consumer listens for the incoming messages from MQ and publishes these messages to the FIXEdge. If heartbeat processing is disabled (see parameter "HeartBeat.Enable"), the consumer connects to the queue immediately or at configured time (refer to parameter "StartTime" for details) when connection is available and the queue exists. Otherwise, consumer connects to the queue when first message (no matter heartbeat or any other message) is received.

Heartbeat Processing

RMQ TA supports internal heartbeat processing that allows to determine if counterparties are in the state of inactivity. 

The format of Heartbeat message is configurable via "HeartBeat.MessageFile" property. The content of the file is read as byte sequence that is compared with incoming message as is.

Reconnection Procedure

Producer

Reconnection procedure starts (see parameter "Reconnect") if connection is unavailable or the queue does not exist. Producer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).

Consumer

Reconnection procedure starts (see parameter "Reconnect") if connection becomes unavailable or no message is received in the configured time interval. Consumer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).

Client Groups

Clients can be grouped via "ClientGroups" property. Grouped clients are available for work if all clients in this group are connected to their queues. In other words, if one client is disconnected – all clients-members of the group will be also in Disconnected state.

Typically it is used for pairs of Producer-Consumer when there is no sense to leave one of the clients in Connected state when another one is disconnected.

RMQ TA Configuration

TA distribution package contains following folders:

  • etc – contains the configuration files;
  • lib – contains the libraries for working RMQ TA.

In order to setup RMQ TA for FIX Edge you need:

  1. Copy TA distribution package to the FIX Edge folder
    1. Copy lib folder from distribution package to the conf directory, e.g. to the FIXGW/FIXEdge1/conf/amqp-ta-distribution.
    2. Copy log4j.properties and JVM_Options.jvmopts configuration files from the etc to the configuration directory of the FIX Edge, e.g. FIXGW/FIXEdge1/conf.
    3. Check and correct the paths in the JVM_Options.jvmopts configuration file to the libs and log4j.properties according your workspace.
    4. Check path to the jvm folder in path environment variable If your operating system is the windows. If your system is unix check the path in FixEdge1.run.sh which located in bin directory of the FIXEdge.
  2. Configure TA
    1. In the ‘Transport Layer Section’ of the FIXEdge.properties add RMQ TA to the list of supported adapters:

      TransportLayer.TransportAdapters = TransportLayer.RMQTA
    2. Add RMQ TA section to the FIXEdge.properties which located in conf directory of the FIXEdge.
      Sample set of properties is given below:

      TransportLayer.JVMOptionsFile = ../FixEdge1/conf/JVM_Options.jvmopts
      
      TransportLayer.RMQTA.DllName = libUniversalTADll-MD-x64.so
      TransportLayer.RMQTA.Description = RMQ Transport Adaptor
      TransportLayer.RMQTA.JavaClass = com.epam.fe.jms.jni.JMSAdaptor
      TransportLayer.RMQTA.AllowRejectMessages = true
      TransportLayer.RMQTA.ConnectionNames = Connection1
      TransportLayer.RMQTA.ClientNames = TradeFlowMQProducer, TradeFlowMQConsumer
       
      TransportLayer.RMQTA.Connection.Connection1.ProviderURI = amqp://127.0.0.1:5672
      TransportLayer.RMQTA.Connection.Connection1.User = sam1203
      TransportLayer.RMQTA.Connection.Connection1.Password = test
      TransportLayer.RMQTA.Connection.Connection1.Reconnect = true
      TransportLayer.RMQTA.Connection.Connection1.ReconnectTries = 300
      TransportLayer.RMQTA.Connection.Connection1.ReconnectInterval = 30000
       
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.ConnectionName = Connection1
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.StorageDir = ../FixEdge1/log
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.SessionType = Producer
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.DestinationURI = TradeFlow
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.ExchangeName = exchange.OUT
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Enable = true
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Interval = 20
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.MessageProcessingOnMQDisconnect = reject
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.StartTime= 09:00
      TransportLayer.RMQTA.Client.TradeFlowMQProducer.EndTime= 18:00
       
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.ConnectionName = Connection1
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.StorageDir = ../FixEdge1/log
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.SessionType = Consumer
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.DestinationURI = TradeFlow
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Enable = true
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Interval = 20
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg
      TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MissedCountBeforeDisconnect = 3
    3. Make sure the RabbitMQ’s user, password and queue you use are configured on the server side.
    4. Change TA logging settings in the log4j.properties if necessary. Paths to logs dir must be absolute.
  3. Restart FIX Edge to apply the changes.

Routing Messages to/from RMQ TA Clients

RMQ Clients can be referred on the business layer (BL) by their names specified in the FIXEdge.properties file. Below is an example of the BL_Config.xml for the two RMQ clients specified above.

BL_Config.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!--
		FIXEdge - the XML Configuration file
		$Revision: 1.17.2.7 $
		<!DOCTYPE FIXEdge SYSTEM "BusinessLayer.dtd">
-->
<FIXEdge>
		<BusinessLayer>
			<Rule>
				<Source>
						<FixSession SenderCompID="FIXCLIENT" TargetCompID="FIXEDGE"/>
				</Source>
				<Condition>
						<MatchField Field="35" Value="D|G|F|n" />
				</Condition>
				<Action>
						<Send Name="TradeFlowMQProducer" />
				</Action>
			</Rule>
			<Rule>
				<Source Name="TradeFlowMQConsumer"/>
				<Condition>
						<MatchField Field="35" Value="8|9|j|n" />
				</Condition>
				<Action>
						<Send>
							<FixSession SenderCompID="FIXEDGE" TargetCompID="FIXCLIENT"/>
						</Send>
				</Action>
			</Rule>
			<DefaultRule>
				<Action>
						<DoNothing/>
				</Action>
			</DefaultRule>
		</BusinessLayer>
</FIXEdge>

RMQ TA Configuration Parameters

Configuration of RMQ Adaptor contains list of registered RMQ client names and other parameters which takes part in message routing.

In configuration of RMQ TA can be present default parameters:

Property NameDescriptionRequiredDefault Value
TransportLayer.RMQTA.DescriptionTA DescriptionYRMQ Transport Adaptor
TransportLayer.RMQTA.DllNameTA library file name. libUniversalTADll-MD-x64.so should be used.Y
TransportLayer.RMQTA.JVMOptionsFilePath to the TA configuration file nameY
TransportLayer.RMQTA.JavaClassJava class to be used. com.epam.fe.jms.jni.JMSAdaptor should be used.Y

TransportLayer.RMQTA.AllowRejectMessages

When option is true, FE rejects messages if unable to send to the MQ or error was fired. False by default.Nfalse
TransportLayer.RMQTA.AllowRepeatedStatusNotificationWhen option is true, FE allows to call onLogout() callback if no onLogon() callback was called. False by default.Nfalse
TransportLayer.RMQTA.ConnectionNames

Comma delimited list of TA connections.
Separate configuration section for each listed connection should be specified

Y
TransportLayer.RMQTA.FIXVersionIn every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.

 

Acceptable values:

FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2.
NFIX44
TransportLayer.RMQTA.ClientNames

Comma delimited list of TA clients.
Separate configuration section for each listed client should be specified

Y
TransportLayer.RMQTA.ClientGroupsComma delimited list of clients group.N
TransportLayer.RMQTA.[GroupName].ClientsComma delimited list of TA clients that are grouped.N
Connection Parameters
TransportLayer.RMQTA.Connection.[ConnectionName].ProviderURI

URI has the following format: amqp://<host>:<port>/<vhost>
Default values: port => 5672; vhost => "/"
For example: «amqp://localhost», «amqp://localhost:8734/vhost»

N
TransportLayer.RMQTA.Connection.[ConnectionName].Addresses

Addresses list. Necessary for clustered solutions to switch between addresses in case of disconnects.

Note, reconnect attempts or intervals are not configurable - when first connection is down, adapter switches to the next connection immediately.

Example:

RMQ.properties
#Addresses list without amqp://
TransportLayer.RMQTA.Connection.rabbit.Addresses = mqprod1:5672, mqprod2:5672

Should be specified one of the parameters - ProviderURI or Addresses.

N
TransportLayer.RMQTA.Connection.[ConnectionName].UserUser nameY
TransportLayer.RMQTA.Connection.[ConnectionName].PasswordUser PasswordY
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectEnables or disables reconnect procedure for connection restoreNfalse
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectInterval

Fixed interval in milliseconds between reconnection attempts.

N2000
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectTriesNumber of reconnect tries or -1 for an infinite number of attemptsN3
Client Parameters
TransportLayer.RMQTA.Client.[ClientName].ConnectionNameName of primary connection used by client. Connection should be registered in ConnectionNames enumeration and has all required parametersY
TransportLayer.RMQTA.Client.[ClientName].SessionType

Session type:

  • Producer - session is a message producer
  • Consumer - session is a message consumer
Y
TransportLayer.RMQTA.Client.[ClientName].DestinationURI

URI of session destination (Queue name). Required for Consumer session type.

For Producer session type should be specified one of the parameters - DestinationURI or ExchangeName - or both of them.

N
TransportLayer.RMQTA.Client.[ClientName].ExchangeName

Name of the Exchange. See RabbitMQ tutorial for details: https://www.rabbitmq.com/tutorials/amqp-concepts.html.

Only for Producer session type. Works with "topic" and "direct" exchange types - on RabbitMQ queues can be bound to an exchange, in this case DestinationURI parameter will be used as routing key.

N
TransportLayer.RMQTA.Client.[ClientName].StorageDirDirectory where persistence file is stored in case the communication problemY
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.EnableIf parameter is true then TA sends and expects HeartBeat messages in configured time intervalNfalse
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.IntervalTime interval in seconds before sending or expecting incoming HeartBeat messageN30
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MessageFilePath to binary file that contains HeartBeat messageY
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MissedCountBeforeDisconnectNumber of missed messages before disconnectN3
TransportLayer.RMQTA.Client.[ClientName].MessageProcessingOnMQDisconnect

Type of action that TA executes in case it's unable to deliver outgoing messages to the queue:

  • Save – save in storage file and resend after reconnect
  • Reject – send reject event to FIXEdge
Nreject
TransportLayer.RMQTA.Client.[ClientName].ChannelTransacted

The property turns on transactions to ensure that the messages are stored on the disk.

Values: true | false

NOTE: This property affects performance.

Nfalse
TransportLayer.RMQTA.Client.[ClientName].StartTime

Client start time.
It supports following masks:

  • HH:mm:ss
  • HH:mm
N
TransportLayer.RMQTA.Client.[ClientName].EndTime

Client stop time.
It supports following masks:

  • HH:mm:ss
  • HH:mm
N