The RabbitMQ Transport Adaptor (hereinafter the RMQ TA) is intended to communicate FIX messages to other applications using RabbitMQ as middleware. The document describes common steps required to install Transport for the FIX Edge.
RMQTA is able to have one or more producers and consumers.
Producer is used to transfer messages from FIXEdge to the configured RabbitMQ server’s queue. Producer connects to the queue immediately or at configured time (see parameter "StartTime") if connection is available and the queue exists.
Consumer listens for the incoming messages from MQ and publishes these messages to the FIXEdge. If heartbeat processing is disabled (see parameter "HeartBeat.Enable"), the consumer connects to the queue immediately or at configured time (refer to parameter "StartTime" for details) when connection is available and the queue exists. Otherwise, consumer connects to the queue when first message (no matter heartbeat or any other message) is received.
RMQ TA supports internal heartbeat processing that allows to determine if counterparties are in the state of inactivity.
The format of Heartbeat message is configurable via "HeartBeat.MessageFile" property. The content of the file is read as byte sequence that is compared with incoming message as is.
Reconnection procedure starts (see parameter "Reconnect") if connection is unavailable or the queue does not exist. Producer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).
Reconnection procedure starts (see parameter "Reconnect") if connection becomes unavailable or no message is received in the configured time interval. Consumer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).
Clients can be grouped via "ClientGroups" property. Grouped clients are available for work if all clients in this group are connected to their queues. In other words, if one client is disconnected – all clients-members of the group will be also in Disconnected state.
Typically it is used for pairs of Producer-Consumer when there is no sense to leave one of the clients in Connected state when another one is disconnected.
TA distribution package contains following folders:
In order to setup RMQ TA for FIX Edge you need:
In the ‘Transport Layer Section’ of the FIXEdge.properties add RMQ TA to the list of supported adapters:
TransportLayer.TransportAdapters = TransportLayer.RMQTA |
Add RMQ TA section to the FIXEdge.properties which located in conf directory of the FIXEdge.
Sample set of properties is given below:
TransportLayer.JVMOptionsFile = ../FixEdge1/conf/JVM_Options.jvmopts TransportLayer.RMQTA.DllName = libUniversalTADll-MD-x64.so TransportLayer.RMQTA.Description = RMQ Transport Adaptor TransportLayer.RMQTA.JavaClass = com.epam.fe.jms.jni.JMSAdaptor TransportLayer.RMQTA.AllowRejectMessages = true TransportLayer.RMQTA.ConnectionNames = Connection1 TransportLayer.RMQTA.ClientNames = TradeFlowMQProducer, TradeFlowMQConsumer TransportLayer.RMQTA.Connection.Connection1.ProviderURI = amqp://127.0.0.1:5672 TransportLayer.RMQTA.Connection.Connection1.User = sam1203 TransportLayer.RMQTA.Connection.Connection1.Password = test TransportLayer.RMQTA.Connection.Connection1.Reconnect = true TransportLayer.RMQTA.Connection.Connection1.ReconnectTries = 300 TransportLayer.RMQTA.Connection.Connection1.ReconnectInterval = 30000 TransportLayer.RMQTA.Client.TradeFlowMQProducer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQProducer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQProducer.SessionType = Producer TransportLayer.RMQTA.Client.TradeFlowMQProducer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQProducer.ExchangeName = exchange.OUT TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQProducer.MessageProcessingOnMQDisconnect = reject TransportLayer.RMQTA.Client.TradeFlowMQProducer.StartTime= 09:00 TransportLayer.RMQTA.Client.TradeFlowMQProducer.EndTime= 18:00 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQConsumer.SessionType = Consumer TransportLayer.RMQTA.Client.TradeFlowMQConsumer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MissedCountBeforeDisconnect = 3 |
RMQ Clients can be referred on the business layer (BL) by their names specified in the FIXEdge.properties file. Below is an example of the BL_Config.xml for the two RMQ clients specified above.
<?xml version="1.0" encoding="UTF-8" ?> <!-- FIXEdge - the XML Configuration file $Revision: 1.17.2.7 $ <!DOCTYPE FIXEdge SYSTEM "BusinessLayer.dtd"> --> <FIXEdge> <BusinessLayer> <Rule> <Source> <FixSession SenderCompID="FIXCLIENT" TargetCompID="FIXEDGE"/> </Source> <Condition> <MatchField Field="35" Value="D|G|F|n" /> </Condition> <Action> <Send Name="TradeFlowMQProducer" /> </Action> </Rule> <Rule> <Source Name="TradeFlowMQConsumer"/> <Condition> <MatchField Field="35" Value="8|9|j|n" /> </Condition> <Action> <Send> <FixSession SenderCompID="FIXEDGE" TargetCompID="FIXCLIENT"/> </Send> </Action> </Rule> <DefaultRule> <Action> <DoNothing/> </Action> </DefaultRule> </BusinessLayer> </FIXEdge> |
Configuration of RMQ Adaptor contains list of registered RMQ client names and other parameters which takes part in message routing.
In configuration of RMQ TA can be present default parameters:
Property Name | Description | Required | Default Value | |
---|---|---|---|---|
TransportLayer.RMQTA.Description | TA Description | Y | RMQ Transport Adaptor | |
TransportLayer.RMQTA.DllName | TA library file name. libUniversalTADll-MD-x64.so should be used. | Y | ||
TransportLayer.RMQTA.JVMOptionsFile | Path to the TA configuration file name | Y | ||
TransportLayer.RMQTA.JavaClass | Java class to be used. com.epam.fe.jms.jni.JMSAdaptor should be used. | Y | ||
TransportLayer.RMQTA.AllowRejectMessages | When option is true, FE rejects messages if unable to send to the MQ or error was fired. False by default. | N | false | |
TransportLayer.RMQTA.AllowRepeatedStatusNotification | When option is true, FE allows to call onLogout() callback if no onLogon() callback was called. False by default. | N | false | |
TransportLayer.RMQTA.ConnectionNames | Comma delimited list of TA connections. | Y | ||
TransportLayer.RMQTA.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session. Acceptable values: FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2, FIXLatest, custom dictionaries. | N | FIX44 | |
TransportLayer.RMQTA.ClientNames | Comma delimited list of TA clients. | Y | ||
TransportLayer.RMQTA.ClientGroups | Comma delimited list of clients group. | N | ||
TransportLayer.RMQTA.[GroupName].Clients | Comma delimited list of TA clients that are grouped. | N | ||
Connection Parameters | ||||
TransportLayer.RMQTA.Connection.[ConnectionName].ProviderURI | URI has the following format: amqp://<host>:<port>/<vhost> | N | ||
TransportLayer.RMQTA.Connection.[ConnectionName].Addresses | Addresses list. Necessary for clustered solutions to switch between addresses in case of disconnects. Note, reconnect attempts or intervals are not configurable - when first connection is down, adapter switches to the next connection immediately. Example:
Should be specified one of the parameters - ProviderURI or Addresses. | N | ||
TransportLayer.RMQTA.Connection.[ConnectionName].User | User name | Y | ||
TransportLayer.RMQTA.Connection.[ConnectionName].Password | User Password | Y | ||
TransportLayer.RMQTA.Connection.[ConnectionName].Reconnect | Enables or disables reconnect procedure for connection restore | N | false | |
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectInterval | Fixed interval in milliseconds between reconnection attempts. | N | 2000 | |
TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectTries | Number of reconnect tries or -1 for an infinite number of attempts | N | 3 | |
Client Parameters | ||||
TransportLayer.RMQTA.Client.[ClientName].ConnectionName | Name of primary connection used by client. Connection should be registered in ConnectionNames enumeration and has all required parameters | Y | ||
TransportLayer.RMQTA.Client.[ClientName].SessionType | Session type:
| Y | ||
TransportLayer.RMQTA.Client.[ClientName].DestinationURI | URI of session destination (Queue name). Required for Consumer session type. For Producer session type should be specified one of the parameters - DestinationURI or ExchangeName - or both of them. | N | ||
TransportLayer.RMQTA.Client.[ClientName].ExchangeName | Name of the Exchange. See RabbitMQ tutorial for details: https://www.rabbitmq.com/tutorials/amqp-concepts.html. Only for Producer session type. Works with "topic" and "direct" exchange types - on RabbitMQ queues can be bound to an exchange, in this case DestinationURI parameter will be used as routing key. | N | ||
TransportLayer.RMQTA.Client.[ClientName].StorageDir | Directory where persistence file is stored in case the communication problem | Y | ||
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Enable | If parameter is true then TA sends and expects HeartBeat messages in configured time interval | N | false | |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Interval | Time interval in seconds before sending or expecting incoming HeartBeat message | N | 30 | |
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MessageFile | Path to binary file that contains HeartBeat message | Y | ||
TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MissedCountBeforeDisconnect | Number of missed messages before disconnect | N | 3 | |
TransportLayer.RMQTA.Client.[ClientName].MessageProcessingOnMQDisconnect | Type of action that TA executes in case it's unable to deliver outgoing messages to the queue:
| N | reject | |
TransportLayer.RMQTA.Client.[ClientName].ChannelTransacted | The property turns on transactions to ensure that the messages are stored on the disk. Values: true | false NOTE: This property affects performance. | N | false | |
TransportLayer.RMQTA.Client.[ClientName].StartTime | Client start time.
| N | ||
TransportLayer.RMQTA.Client.[ClientName].EndTime | Client stop time.
| N |