RMQ Transport Adaptor Installation Guide

RMQ Transport Adaptor Installation Guide

Overview

The RabbitMQ Transport Adaptor (hereinafter the RMQ TA) is intended to communicate FIX messages to other applications using RabbitMQ as middleware. The document describes common steps required to install Transport for the FIX Edge.

RMQ TA Functionality

RMQTA is able to have one or more producers and consumers.

Producer is used to transfer messages from FIXEdge to the configured RabbitMQ server’s queue. Producer connects to the queue immediately or at configured time (see parameter "StartTime") if connection is available and the queue exists. 

Consumer listens for the incoming messages from MQ and publishes these messages to the FIXEdge. If heartbeat processing is disabled (see parameter "HeartBeat.Enable"), the consumer connects to the queue immediately or at configured time (refer to parameter "StartTime" for details) when connection is available and the queue exists. Otherwise, consumer connects to the queue when first message (no matter heartbeat or any other message) is received.

Heartbeat Processing

RMQ TA supports internal heartbeat processing that allows to determine if counterparties are in the state of inactivity. 

The format of Heartbeat message is configurable via "HeartBeat.MessageFile" property. The content of the file is read as byte sequence that is compared with incoming message as is.

Reconnection Procedure

Producer

Reconnection procedure starts (see parameter "Reconnect") if connection is unavailable or the queue does not exist. Producer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).

Consumer

Reconnection procedure starts (see parameter "Reconnect") if connection becomes unavailable or no message is received in the configured time interval. Consumer attempts to reconnect configured number of tries in the configured time intervals (see parameters "ReconnectTries" and "ReconnectInterval" for details).

Client Groups

Clients can be grouped via "ClientGroups" property. Grouped clients are available for work if all clients in this group are connected to their queues. In other words, if one client is disconnected – all clients-members of the group will be also in Disconnected state.

Typically it is used for pairs of Producer-Consumer when there is no sense to leave one of the clients in Connected state when another one is disconnected.

RMQ TA Configuration

TA distribution package contains following folders:

  • etc – contains the configuration files;

  • lib – contains the libraries for working RMQ TA.

In order to setup RMQ TA for FIX Edge you need:

  1. Copy TA distribution package to the FIX Edge folder

    1. Copy lib folder from distribution package to the conf directory, e.g. to the FIXGW/FIXEdge1/conf/amqp-ta-distribution.

    2. Copy log4j.properties and JVM_Options.jvmopts configuration files from the etc to the configuration directory of the FIX Edge, e.g. FIXGW/FIXEdge1/conf.

    3. Check and correct the paths in the JVM_Options.jvmopts configuration file to the libs and log4j.properties according your workspace.

    4. Check path to the jvm folder in path environment variable If your operating system is the windows. If your system is unix check the path in FixEdge1.run.sh which located in bin directory of the FIXEdge.

  2. Configure TA

    1. In the ‘Transport Layer Section’ of the FIXEdge.properties add RMQ TA to the list of supported adapters:

      TransportLayer.TransportAdapters = TransportLayer.RMQTA
    2. Add RMQ TA section to the FIXEdge.properties which located in conf directory of the FIXEdge.
      Sample set of properties is given below:

      TransportLayer.JVMOptionsFile = ../FixEdge1/conf/JVM_Options.jvmopts TransportLayer.RMQTA.DllName = libUniversalTADll-MD-x64.so TransportLayer.RMQTA.Description = RMQ Transport Adaptor TransportLayer.RMQTA.JavaClass = com.epam.fe.jms.jni.JMSAdaptor TransportLayer.RMQTA.AllowRejectMessages = true TransportLayer.RMQTA.ConnectionNames = Connection1 TransportLayer.RMQTA.ClientNames = TradeFlowMQProducer, TradeFlowMQConsumer   TransportLayer.RMQTA.Connection.Connection1.ProviderURI = amqp://127.0.0.1:5672 TransportLayer.RMQTA.Connection.Connection1.User = sam1203 TransportLayer.RMQTA.Connection.Connection1.Password = test TransportLayer.RMQTA.Connection.Connection1.Reconnect = true TransportLayer.RMQTA.Connection.Connection1.ReconnectTries = 300 TransportLayer.RMQTA.Connection.Connection1.ReconnectInterval = 30000   TransportLayer.RMQTA.Client.TradeFlowMQProducer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQProducer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQProducer.SessionType = Producer TransportLayer.RMQTA.Client.TradeFlowMQProducer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQProducer.ExchangeName = exchange.OUT TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQProducer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQProducer.MessageProcessingOnMQDisconnect = reject TransportLayer.RMQTA.Client.TradeFlowMQProducer.StartTime= 09:00 TransportLayer.RMQTA.Client.TradeFlowMQProducer.EndTime= 18:00   TransportLayer.RMQTA.Client.TradeFlowMQConsumer.ConnectionName = Connection1 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.StorageDir = ../FixEdge1/log TransportLayer.RMQTA.Client.TradeFlowMQConsumer.SessionType = Consumer TransportLayer.RMQTA.Client.TradeFlowMQConsumer.DestinationURI = TradeFlow TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Enable = true TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.Interval = 20 TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MessageFile = ../FixEdge1/conf/rabbitMQHB.msg TransportLayer.RMQTA.Client.TradeFlowMQConsumer.HeartBeat.MissedCountBeforeDisconnect = 3
    3. Make sure the RabbitMQ’s user, password and queue you use are configured on the server side.

    4. Change TA logging settings in the log4j.properties if necessary. Paths to logs dir must be absolute.

  3. Restart FIX Edge to apply the changes.

Routing Messages to/from RMQ TA Clients

RMQ Clients can be referred on the business layer (BL) by their names specified in the FIXEdge.properties file. Below is an example of the BL_Config.xml for the two RMQ clients specified above.

BL_Config.xml
<?xml version="1.0" encoding="UTF-8" ?> <!-- FIXEdge - the XML Configuration file $Revision: 1.17.2.7 $ <!DOCTYPE FIXEdge SYSTEM "BusinessLayer.dtd"> --> <FIXEdge> <BusinessLayer> <Rule> <Source> <FixSession SenderCompID="FIXCLIENT" TargetCompID="FIXEDGE"/> </Source> <Condition> <MatchField Field="35" Value="D|G|F|n" /> </Condition> <Action> <Send Name="TradeFlowMQProducer" /> </Action> </Rule> <Rule> <Source Name="TradeFlowMQConsumer"/> <Condition> <MatchField Field="35" Value="8|9|j|n" /> </Condition> <Action> <Send> <FixSession SenderCompID="FIXEDGE" TargetCompID="FIXCLIENT"/> </Send> </Action> </Rule> <DefaultRule> <Action> <DoNothing/> </Action> </DefaultRule> </BusinessLayer> </FIXEdge>

RMQ TA Configuration Parameters

Configuration of RMQ Adaptor contains list of registered RMQ client names and other parameters which takes part in message routing.

In configuration of RMQ TA can be present default parameters:

Property Name

Description

Required

Default Value

Property Name

Description

Required

Default Value

TransportLayer.RMQTA.Description

TA Description

Y

RMQ Transport Adaptor

TransportLayer.RMQTA.DllName

TA library file name. libUniversalTADll-MD-x64.so should be used.

Y



TransportLayer.RMQTA.JVMOptionsFile

Path to the TA configuration file name

Y



TransportLayer.RMQTA.JavaClass

Java class to be used. com.epam.fe.jms.jni.JMSAdaptor should be used.

Y



TransportLayer.RMQTA.AllowRejectMessages

When option is true, FE rejects messages if unable to send to the MQ or error was fired. False by default.

N

false

TransportLayer.RMQTA.AllowRepeatedStatusNotification

When option is true, FE allows to call onLogout() callback if no onLogon() callback was called. False by default.

N

false

TransportLayer.RMQTA.ConnectionNames

Comma delimited list of TA connections.
Separate configuration section for each listed connection should be specified

Y



TransportLayer.RMQTA.FIXVersion

In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.

Acceptable values: FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2, FIXLatest, custom dictionaries.

N

FIX44

TransportLayer.RMQTA.ClientNames

Comma delimited list of TA clients.
Separate configuration section for each listed client should be specified

Y



TransportLayer.RMQTA.ClientGroups

Comma delimited list of clients group.

N



TransportLayer.RMQTA.[GroupName].Clients

Comma delimited list of TA clients that are grouped.

N



Connection Parameters

TransportLayer.RMQTA.Connection.[ConnectionName].ProviderURI

URI has the following format: amqp://<host>:<port>/<vhost>
Default values: port => 5672; vhost => "/"
For example: «amqp://localhost», «amqp://localhost:8734/vhost»

N



TransportLayer.RMQTA.Connection.[ConnectionName].Addresses

Addresses list. Necessary for clustered solutions to switch between addresses in case of disconnects.

Note, reconnect attempts or intervals are not configurable - when first connection is down, adapter switches to the next connection immediately.

Example:

RMQ.properties
#Addresses list without amqp:// TransportLayer.RMQTA.Connection.rabbit.Addresses = mqprod1:5672, mqprod2:5672

Should be specified one of the parameters - ProviderURI or Addresses.

N



TransportLayer.RMQTA.Connection.[ConnectionName].User

User name

Y



TransportLayer.RMQTA.Connection.[ConnectionName].Password

User Password

Y



TransportLayer.RMQTA.Connection.[ConnectionName].Reconnect

Enables or disables reconnect procedure for connection restore

N

false

TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectInterval

Fixed interval in milliseconds between reconnection attempts.

N

2000

TransportLayer.RMQTA.Connection.[ConnectionName].ReconnectTries

Number of reconnect tries or -1 for an infinite number of attempts

N

3

Client Parameters

TransportLayer.RMQTA.Client.[ClientName].ConnectionName

Name of primary connection used by client. Connection should be registered in ConnectionNames enumeration and has all required parameters

Y



TransportLayer.RMQTA.Client.[ClientName].SessionType

Session type:

  • Producer - session is a message producer

  • Consumer - session is a message consumer

Y



TransportLayer.RMQTA.Client.[ClientName].DestinationURI

URI of session destination (Queue name). Required for Consumer session type.

For Producer session type should be specified one of the parameters - DestinationURI or ExchangeName - or both of them.

N



TransportLayer.RMQTA.Client.[ClientName].ExchangeName

Name of the Exchange. See RabbitMQ tutorial for details: https://www.rabbitmq.com/tutorials/amqp-concepts.html.

Only for Producer session type. Works with "topic" and "direct" exchange types - on RabbitMQ queues can be bound to an exchange, in this case DestinationURI parameter will be used as routing key.

N



TransportLayer.RMQTA.Client.[ClientName].StorageDir

Directory where persistence file is stored in case the communication problem

Y



TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Enable

If parameter is true then TA sends and expects HeartBeat messages in configured time interval

N

false

TransportLayer.RMQTA.Client.[ClientName].HeartBeat.Interval

Time interval in seconds before sending or expecting incoming HeartBeat message

N

30

TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MessageFile

Path to binary file that contains HeartBeat message

Y



TransportLayer.RMQTA.Client.[ClientName].HeartBeat.MissedCountBeforeDisconnect

Number of missed messages before disconnect

N

3

TransportLayer.RMQTA.Client.[ClientName].MessageProcessingOnMQDisconnect

Type of action that TA executes in case it's unable to deliver outgoing messages to the queue:

  • Save – save in storage file and resend after reconnect

  • Reject – send reject event to FIXEdge

N

reject

TransportLayer.RMQTA.Client.[ClientName].ChannelTransacted

The property turns on transactions to ensure that the messages are stored on the disk.

Values: true | false

NOTE: This property affects performance.

N

false

TransportLayer.RMQTA.Client.[ClientName].StartTime

Client start time.
It supports following masks:

  • HH:mm:ss

  • HH:mm

N



TransportLayer.RMQTA.Client.[ClientName].EndTime

Client stop time.
It supports following masks:

  • HH:mm:ss

  • HH:mm

N