RMQ Transport Adaptor Installation Guide
Overview
The RabbitMQ Transport Adaptor (hereinafter the RMQ TA) is intended to communicate FIX messages to other applications using RabbitMQ as middleware. The document describes common steps required to install Transport for the FIX Edge.
RMQ TA Functionality
RMQTA is able to have one or more producers and consumers.
Producer is used to transfer messages from FIXEdge to the configured RabbitMQ server’s queue. Producer connects to the queue immediately or at configured time (see parameter "StartTime") if connection is available and the queue exists.
Consumer listens for the incoming messages from MQ and publishes these messages to the FIXEdge. If heartbeat processing is disabled (see parameter "HeartBeat.Enable"), the consumer connects to the queue immediately or at configured time (refer to parameter "StartTime" for details) when connection is available and the queue exists. Otherwise, consumer connects to the queue when first message (no matter heartbeat or any other message) is received.
Heartbeat Processing
RMQ TA supports internal heartbeat processing that allows to determine if counterparties are in the state of inactivity.
The format of Heartbeat message is configurable via "HeartBeat.MessageFile" property. The content of the file is read as byte sequence that is compared with incoming message as is.
Reconnection Procedure
Producer
Reconnection procedure starts (see parameter "Reconnect") if connection is unavailable or the queue does not exist. Producer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).
Consumer
Reconnection procedure starts (see parameter "Reconnect") if connection becomes unavailable or no message is received in the configured time interval. Consumer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).
Client Groups
Clients can be grouped via "ClientGroups" property. Grouped clients are available for work if all clients in this group are connected to their queues. In other words, if one client is disconnected – all clients-members of the group will be also in Disconnected state.
Typically it is used for pairs of Producer-Consumer when there is no sense to leave one of the clients in Connected state when another one is disconnected.
RMQ TA Configuration
TA distribution package contains following folders:
- etc – contains the configuration files;
- lib – contains the libraries for working RMQ TA.
In order to setup RMQ TA for FIX Edge you need:
- Copy TA distribution package to the FIX Edge folder
- Copy lib folder from distribution package to the conf directory, e.g. to the FIXGW/FIXEdge1/conf/amqp-ta-distribution.
- Copy log4j.properties and JVM_Options.jvmopts configuration files from the etc to the configuration directory of the FIX Edge, e.g. FIXGW/FIXEdge1/conf.
- Check and correct the paths in the JVM_Options.jvmopts configuration file to the libs and log4j.properties according your workspace.
- Check path to the jvm folder in path environment variable If your operating system is the windows. If your system is unix check the path in FixEdge1.run.sh which located in bin directory of the FIXEdge.
- Configure TA
In the ‘Transport Layer Section’ of the FIXEdge.properties add RMQ TA to the list of supported adapters:
TransportLayer.TransportAdapters = TransportLayer.RMQTA
Add RMQ TA section to the FIXEdge.properties which located in conf directory of the FIXEdge.
Sample set of properties is given below:TransportLayer.JVMOptionsFile = ../FixEdge1/conf/JVM_Options.jvmopts TransportLayer.RMQTA.DllName = libUniversalTADll-MD-x64.so TransportLayer.RMQTA.Description = RMQ Transport Adaptor TransportLayer.RMQTA.JavaClass = com.epam.fe.jms.jni.JMSAdaptor TransportLayer.RMQTA.AllowRejectMessages = true TransportLayer.RMQTA.ConnectionNames = Connection1 TransportLayer.RMQTA.ClientNames = TradeFlowMQProducer, TradeFlowMQConsumer TransportLayer.RMQTA.Connection.Connection1.ProviderURI = amqp://127.0.0.1:5672 TransportLayer.RMQTA.Connection.Connection1.User = sam1203 TransportLayer.RMQTA.Connection.Connection1.Password = test TransportLayer.RMQTA.Connection.Connection1.Reconnect = true TransportLayer.RMQTA.Connection.Connection1.ReconnectTries = 300 TransportLayer.RMQTA.Connection.Connection1.ReconnectInterval = 30000 TransportLayer.RMQTA.Client.TradeFlowMQProducer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQProducer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQProducer.SessionType = Producer TransportLayer.RMQTA.Client.TradeFlowMQProducer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQProducer.ExchangeName = exchange.OUT TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQProducer.MessageProcessingOnMQDisconnect = reject TransportLayer.RMQTA.Client.TradeFlowMQProducer.StartTime= 09:00 TransportLayer.RMQTA.Client.TradeFlowMQProducer.EndTime= 18:00 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQConsumer.SessionType = Consumer TransportLayer.RMQTA.Client.TradeFlowMQConsumer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MissedCountBeforeDisconnect = 3
- Make sure the RabbitMQ’s user, password and queue you use are configured on the server side.
- Change TA logging settings in the log4j.properties if necessary. Paths to logs dir must be absolute.
- Restart FIX Edge to apply the changes.
Routing Messages to/from RMQ TA Clients
RMQ Clients can be referred on the business layer (BL) by their names specified in the FIXEdge.properties file. Below is an example of the BL_Config.xml for the two RMQ clients specified above.
<?xml version="1.0" encoding="UTF-8" ?> <!-- FIXEdge - the XML Configuration file $Revision: 1.17.2.7 $ <!DOCTYPE FIXEdge SYSTEM "BusinessLayer.dtd"> --> <FIXEdge> <BusinessLayer> <Rule> <Source> <FixSession SenderCompID="FIXCLIENT" TargetCompID="FIXEDGE"/> </Source> <Condition> <MatchField Field="35" Value="D|G|F|n" /> </Condition> <Action> <Send Name="TradeFlowMQProducer" /> </Action> </Rule> <Rule> <Source Name="TradeFlowMQConsumer"/> <Condition> <MatchField Field="35" Value="8|9|j|n" /> </Condition> <Action> <Send> <FixSession SenderCompID="FIXEDGE" TargetCompID="FIXCLIENT"/> </Send> </Action> </Rule> <DefaultRule> <Action> <DoNothing/> </Action> </DefaultRule> </BusinessLayer> </FIXEdge>
RMQ TA Configuration Parameters
Configuration of RMQ Adaptor contains list of registered RMQ client names and other parameters which takes part in message routing.
In configuration of RMQ TA can be present default parameters:
Property Name | Description | Required | Default Value |
---|---|---|---|
TransportLayer.RMQTA.Description | TA Description | Y | RMQ Transport Adaptor |
TransportLayer.RMQTA.DllName | TA library file name. libUniversalTADll-MD-x64.so should be used. | Y | |
TransportLayer.RMQTA.JVMOptionsFile | Path to the TA configuration file name | Y | |
TransportLayer.RMQTA.JavaClass | Java class to be used. com.epam.fe.jms.jni.JMSAdaptor should be used. | Y | |
TransportLayer.RMQTA.AllowRejectMessages | When option is true, FE rejects messages if unable to send to the MQ or error was fired. False by default. | N | false |
TransportLayer.RMQTA.AllowRepeatedStatusNotification | When option is true, FE allows to call onLogout() callback if no onLogon() callback was called. False by default. | N | false |
TransportLayer.RMQTA.ConnectionNames | Comma delimited list of TA connections. | Y | |
TransportLayer.RMQTA.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session. Acceptable values: FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2. | N | FIX44 |
TransportLayer.RMQTA.ClientNames | Comma delimited list of TA clients. | Y | |
TransportLayer.RMQTA.ClientGroups | Comma delimited list of clients group. | N | |
TransportLayer.RMQTA.[GroupName].Clients | Comma delimited list of TA clients that are grouped. | N | |
Connection Parameters | |||
TransportLayer.RMQTA.Connection.[ConnectionName].ProviderURI | URI has the following format: amqp://<host>:<port>/<vhost> | N | |
TransportLayer.RMQTA.Connection.[ConnectionName].Addresses | Addresses list. Necessary for clustered solutions to switch between addresses in case of disconnects. Note, reconnect attempts or intervals are not configurable - when first connection is down, adapter switches to the next connection immediately. Example: RMQ.properties #Addresses list without amqp:// TransportLayer.RMQTA.Connection.rabbit.Addresses = mqprod1:5672, mqprod2:5672 Should be specified one of the parameters - ProviderURI or Addresses. | N | |
TransportLayer.RMQTA.Connection.[ConnectionName].User | User name | Y | |
TransportLayer.RMQTA.Connection.[ConnectionName].Password | User Password | Y | |
TransportLayer.RMQTA.Connection.[ConnectionName].Reconnect | Enables or disables reconnect procedure for connection restore | N | false |
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectInterval | Fixed interval in milliseconds between reconnection attempts. | N | 2000 |
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectTries | Number of reconnect tries or -1 for an infinite number of attempts | N | 3 |
Client Parameters | |||
TransportLayer.RMQTA.Client.[ClientName].ConnectionName | Name of primary connection used by client. Connection should be registered in ConnectionNames enumeration and has all required parameters | Y | |
TransportLayer.RMQTA.Client.[ClientName].SessionType | Session type:
| Y | |
TransportLayer.RMQTA.Client.[ClientName].DestinationURI | URI of session destination (Queue name). Required for Consumer session type. For Producer session type should be specified one of the parameters - DestinationURI or ExchangeName - or both of them. | N | |
TransportLayer.RMQTA.Client.[ClientName].ExchangeName | Name of the Exchange. See RabbitMQ tutorial for details: https://www.rabbitmq.com/tutorials/amqp-concepts.html. Only for Producer session type. Works with "topic" and "direct" exchange types - on RabbitMQ queues can be bound to an exchange, in this case DestinationURI parameter will be used as routing key. | N | |
TransportLayer.RMQTA.Client.[ClientName].StorageDir | Directory where persistence file is stored in case the communication problem | Y | |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Enable | If parameter is true then TA sends and expects HeartBeat messages in configured time interval | N | false |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Interval | Time interval in seconds before sending or expecting incoming HeartBeat message | N | 30 |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MessageFile | Path to binary file that contains HeartBeat message | Y | |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MissedCountBeforeDisconnect | Number of missed messages before disconnect | N | 3 |
TransportLayer.RMQTA.Client.[ClientName].MessageProcessingOnMQDisconnect | Type of action that TA executes in case it's unable to deliver outgoing messages to the queue:
| N | reject |
TransportLayer.RMQTA.Client.[ClientName].ChannelTransacted | The property turns on transactions to ensure that the messages are stored on the disk. Values: true | false NOTE: This property affects performance. | N | false |
TransportLayer.RMQTA.Client.[ClientName].StartTime | Client start time.
| N | |
TransportLayer.RMQTA.Client.[ClientName].EndTime | Client stop time.
| N |