Framework Sample

Overview

 The Framework is a sample application that illustrates the fulfillment of the following operations by the FIX Antenna HFT Engine:

The Framework creates FIX acceptor sessions and processes incoming FIX messages of User Request (MsgType = BE) and New Order - Single (MsgType = D) message types:

  • User Request (MsgType = BE) message is used to perform user logon/logout;
  • New Order - Single (MsgType = D) message is used to check whether the user is logged on and send the message back indicating that the incoming message was processed.

The following picture demonstrates the architecture and environment of the sample application:


The Framework can be used as a benchmark.

FIX sessions management

The Framework creates and configures FIX acceptor sessions according to its <HFT FA package>/samples/framework/bin/daily_conf_prod/trader_session.properties configuration file:

trader_session.properties
Trader.Sessions = s1,s2
Trader.Session.s1.Description = Test session1
Trader.Session.s1.Version = FIX42
Trader.Session.s1.HBI = 30
Trader.Session.s1.SenderCompID = framework
Trader.Session.s1.SenderSubID =
Trader.Session.s1.StartTime = 05:30
Trader.Session.s1.TargetCompID = target1
Trader.Session.s1.TargetSubID =
Trader.Session.s1.TerminateTime = 21:30
Trader.Session.s1.Password = foobar
Trader.Session.s1.Users = user1,user2,user3
Trader.Session.s1.User.user1.Password = pass1
Trader.Session.s1.User.user2.Password = pass2
Trader.Session.s1.User.user3.Password = pass3

Trader.Session.s2.Description = Test session2
...

FIX Antenna HFT has the same session management API as FIX Antenna C++.

Description of the FixEngine::createSession, Session::setInSeqNum, Session::setOutSeqNum, Session::connect, Session::disconnect, Session::release, etc. methods is available in the FIX Antenna C++ documentation.

The Framework uses this API to manage sessions according to the configured schedule.

Message processing

The framework registers a separate application object for each session to listen to the session's events. So, when a message comes in, it is not needed to perform the lookup for an actual session object or bound data by the session name, because each application stores a direct link to the required data.

When the remote Initiator sends the first Logon message, the onLogonEvent is fired for the application:

onLogonEvent
OrderEntryApplication::onLogonEvent(const Engine::LogonEvent* event, const Engine::Session& sn)

The framework uses this event for password verification and decision on connection acceptance or denial. When the connection is established, the FIX Antenna HFT passes incoming messages to the application using process callback:

process callback
OrderEntryApplication::process( Engine::FIXMessageProcessElem* work, const Engine::Session& sn )

The application interface in the FIX Antenna HFT also has old-style process callback like in FIX Antenna C++:

process(const Engine::FIXMessage& msg, const Engine::Session& sn)

The first callback option is used when the UseLiteMessage session parameter is set to "true" and the second one - when it is set to "false". The default behavior can be configured in the <HFT FA package>/samples/framework/bin/conf/engine.properties.test configuration file:

Session.Default.UseLiteMessage = true

In addition, the same behavior can be configured per session during session creation time by SessionExtraParameters::useLiteMessage_ property:

SessionExtraParameters extraParams;
extraParams.useLiteMessage_ = true;
engine->createSession (application, senderCompID, targetCompID, protocolID, &extraParams, storageType);

The work parameter of the first callback contains a fast LiteFixMessage object created in the message pool. The application can extract messages from the work parameter and defer it for asynchronous processing. The heavy synchronous processing in this callback can slow down other sessions.

The LiteFixMessage doesn't support validation, the application should perform validation programmatically or by parsing incoming messages into regular FIXMessage:

int sz=0;
const char* buf = liteMsg->toRaw(&sz);
Engine::FIXMessage* regMsg = Engine::FIXMsgProcessor::singleton()->parse(buf, sz);
regMsg->checkRequired( Parser::Options() );

The message object should be returned to the pool once processing is finished:

// Engine::FIXMessageProcessElem* work
Engine::MsgPipeElem* elem = work->elem;
elem->returnObj();

The queue is chosen by the session's user name. Thus, messages from different users can be processed in parallel, and messages from the same user will be processed strictly one by one. The hash function of the user name is used to define which queue should be used for this user.

The next processing of the incoming message is performed in the processMsg method:

processMsg
OrderEntryApplication::processMsg(Parser::LiteFixMessage& msg, ProcessMsgMode processMode, Engine::MsgPipeElem* elem)

The Framework expects BE and D types of messages here and validates them programmatically. Messages of all other types will be rejected.

The BE message is used to send the Logon/Logout message to the session with the specific user.

"BE" message example
8=FIX.4.2|9=117|35=BE|49=target1|56=framework|34=2|50=user1|97=Y|52=20160210-15:13:48.081|923=UserReqID001|924=1|553=dummy|554=pass1|10=016|

The user name is taken from the 50 tag and the password is taken from the 554 tag. The Framework compares the user's password from the message with the configured one and sends the result in the BF message.

"BF" message example
8=FIX.4.2|9=127|35=BF|49=framework|56=target1|34=000000002|52=20160210-15:13:48.089|923=UserReqID001|553=user1|926=1|927=[FRAMEWORK] Logged In|10=145|

For each incoming D message there is a validation for the user to be logged on. The framework echoes D message back to the user if the test is passed, otherwise issues the j message rejecting it.

"D" message example
8=FIX.4.2|9=164|35=D|49=target1|56=framework|34=4|50=user|97=Y|52=20160210-15:19:56.915|11=90001008|1=10030003|21=2|55=TESTA|54=1|60=20000809-18:20:32|38=4000|40=2|44=30|59=0|47=I|10=045|
"j" message example
8=FIX.4.2|9=114|35=j|49=framework|56=target1|34=000000004|52=20160210-15:19:56.944|45=4|372=D|380=6|58=[FRAMEWORK] Not authorized|10=020|

Recovery

The framework processes incoming messages asynchronously and out of sequence. In case of unexpected interruption or crush, it makes it harder to define which message was processed and which was not. Because of this, the framework performs message replay at start time.

Each message in the logs has a unique GlobalMsgID - the counter assigned at the receiving or sending time that allows putting all messages in correct order. Each message in the logs can also store some data attached by the user. If there is an outgoing message created in response to incoming one, the framework puts the GlobalMsgID of the incoming message in this data field of the outgoing message. The framework generates exactly one outgoing message for each incoming message, so incoming message is marked as processed if some outgoing message contains it's GlobalMsgID. Messages that weren't marked as processed after messages had been replayed will be processed again during recovery process.

The recovery process is performed in the FixConnectionMgr::loadPersistedState method.

Distribution of CPU resources

The framework uses API provided by FIX Antenna HFT to set up the priority and affinity of both FIX Antenna HFT and framework threads.

The framework threads are configured using the following functions:

void setThreadAffinityCPUMask_Verbose( uint64_t mask, pthread_t thread, const char* thread_name );
void setThreadRealtimePrio_Verbose( int realtimePrio, const char* thread_name );

To get access to the FIX Antenna HFT threads the framework registers ThreadsPoolListener during the engine initialization:

Engine::FixEngine::InitParameters params;
//...
params.threadPoolListener_ = listener;
engine = Engine::FixEngine::init( params );

and sets properties in the callback:

void onNewThreadCreated(Utils::ThreadAttrs& threadAttrs)
{
threadAttrs.realtimePrio_ = prio;
threadAttrs.affinityMask_ = mask;
//...

The engine initialization performed in the FixConnectionMgr::init() and ThreadsPoolListener interface is implemented in the FixConnectionMgr.

The framework also locks pages in RAM using mlockall( MCL_FUTURE ) at its launch and disables OpenOnload for old style dispatcher in the onNewThreadCreated callback.

Remote console

The FIX Antenna HFT provides an interface for the implementation of a remote console for telnet clients.

The application should register a command handler function and enable a flag to activate this functionality.

void handlerCmd( void* sock, char* cmd_buf )
{
	if( strcmp(cmd_buf, "hello") == 0 )
		{
		writeToSocket( (Socket*)sock, "echo" );
		}
}
//...
g_platformProps.TCPCmdShell_Enable = true;
platform_initialize(pLogCategory, logLevel, exitHandler, handlerCmd);

Threading models of message processing in FIX Antenna C++ and FIX Antenna HFT

FIX Antenna C++ default mode

The default mode is used for the session if the SocketOpPriority session's parameter is not configured or is set to 'EVEN':

Engine::SessionExtraParameters extraParams;
extraParams.socketPriority_ = EVEN_SOCKET_OP_PRIORITY;
session = engine->createSession(application, senderCompID, targetCompID, fixVersion, &extraParams);

In this mode session's socket polling is performed by a single dispatcher thread using select (Windows) or poll (Linux). Session's recvs/sends are handled in the worker pool.

FIX Antenna C++ aggressive mode

The aggressive mode can be independently enabled for sending and receiving parts of a session. The SocketOnPriority session parameter should be set to 'AGGRESSIVE_SEND' or 'AGGRESSIVE_RECEIVE' or 'AGGRESSIVE_SEND_AND_RECEIVE; accordingly:

Engine::SessionExtraParameters extraParams;
extraParams.socketPriority_ = AGGRESSIVE_SEND_AND_RECEIVE_SOCKET_OP_PRIORITY;
session = engine->createSession(application, senderCompID, targetCompID, fixVersion, &extraParams);

Each aggressive side of each session creates a dedicated thread to poll sockets and process messages. The single dispatcher is not used.

The AGGRESSIVE_SEND and AGGRESSIVE_SEND_AND_RECEIVE modes also enable direct message emission from the user thread to the socket if the socket is ready to send data.

FIX Antenna HFT

The FIX Antenna HFT uses a configurable number of dispatchers (TCP reader threads) to poll sockets that are ready to read. The WSAPoll method is used on Windows and the epoll method - on Linux. These dispatchers also perform reading from the sockets and all processing of the incoming messages. 

FIX Antenna HFT always uses direct sending from the user threads to the sockets if the sockets are ready. Otherwise (if sockets are busy), it uses the old-style single dispatcher to poll sockets until a socket gets ready (vacant).

Also, to reduce the number of system calls if the sending activity is very intensive, FIX Antenna HFT can merge outgoing messages into one sending buffer. 

The FIX Antenna HFT dispatchers are designed to be OpenOnload friendly:

  • all sockets are created in the same thread that will poll them for reading.
  • If there is a new socket for incoming connection created as a result of listening socket processing, it is possible to put this new socket to the same dispatcher thread that processes the 'parent' listening socket (Solarflare stack per thread mode).

Another possibility is to choose a dispatcher for the new incoming connection using RoundRobin. The behavior is managed by the TCPDispatcher.IncomingConnectionOnloadStackAffinity boolean property in the <HFT FA package>/samples/framework/bin/conf/engine.properties.test configuration file.

Listening sockets at the engine start and outgoing connections always use RoundRobin.

Accessing FIX Antenna HFT LiteFixMessage fields

The root fields in the LiteFixMessage can be accessed like in FIX Antenna C++:

Parser::LiteFixMessage* msg = Engine::FIXMsgProcessor::singleton()->newLiteMessage();
msg->set( 8, "FIX.4.2" );
msg->set( 9, "000" );
msg->set( 35, "BF" );
msg->set( 49, senderCompID );
msg->set( 56, targetCompID );
msg->set( 34, "000000000" );
msg->set( 52, "20140101-01:01:01.001" );
Engine::FIXFieldValue val;
bool exist = msg->get(FIXFields::SenderCompID, &val);

Unlike common FIXMessage, the LiteFixMessage doesn't have hierarchical access to the fields in groups. LiteFixMessage has a flat structure. Hence, the range of fields, within which the required field should be looked for, has to be bound by start-end indexes explicitly and manually:

Parser::LiteFixMessage& orderMsg = msg;
const int leadfield = 552;
int leadindex = orderMsg.getTagIndex(leadfield);
if( leadindex != orderMsg.NOTFOUND )
{
	const int startfield = 11;
	int startindex = orderMsg.getTagIndex(startfield, leadindex + 1);
	int endindex;
	for( int num = orderMsg.getTagAsIntAtIndex(leadindex); 0 < num && startindex != orderMsg.NOTFOUND; startindex = endindex, num-- )
	{
		endindex = orderMsg.getTagIndex(startfield, startindex + 1);
		const int leadfield = 555;
		int leadindex = orderMsg.getTagIndex(leadfield);
		if( leadindex != orderMsg.NOTFOUND )
		{
			int startfield = orderMsg.whichTagAtIndex( leadindex + 1 );
			if( startfield == orderMsg.NOTFOUND )
			throw std::logic_error("Bad MLEG group");
			// MLEG Cross
			int startindexMleg = orderMsg.getTagIndex(startfield, leadindex + 1);
			int endindexMleg;
			for( int num = orderMsg.getTagAsIntAtIndex(leadindex); 0 < num && startindexMleg != orderMsg.NOTFOUND; startindexMleg = endindexMleg, num-- )
			{
				endindexMleg = orderMsg.getTagIndex(startfield, startindexMleg + 1);
				int i = orderMsg.getTagIndex( FIXFields::Account, startindex, endindex );
				if(i != orderMsg.NOTFOUND)
				{
					Engine::AsciiString account = orderMsg.getTagAsStringAtIndex(i);
				}
			}
		}
	}
}