Overview
The Framework is a sample application that illustrates fulfillment of the following operations by the FIX Antenna HFT Engine:
- FIX sessions management (create, connect, disconnect, destroy);
- asynchronous message processing;
- recovery achievement (replay received but not processed messages);
distribution of CPU resources (setup thread affinity, priority);
The Framework creates FIX acceptor sessions and processes incoming FIX messages of User Request (MsgType = BE) and New Order - Single (MsgType = D) message types:
- User Request (MsgType = BE) message is used to perform user logon/logout;
- New Order - Single (MsgType = D) message is used to check whether user is logged on and send the message back indicating that incoming message was processed.
The following picture demonstrates the architecture and environment of the sample application:
The Framework can be used as a benchmark.
FIX sessions management
The Framework creates and configures FIX acceptor sessions according to its configuration file <HFT FA package>/samples/framework/bin/daily_conf_prod/trader_session.properties:
Trader.Sessions = s1,s2 Trader.Session.s1.Description = Test session1 Trader.Session.s1.Version = FIX42 Trader.Session.s1.HBI = 30 Trader.Session.s1.SenderCompID = framework Trader.Session.s1.SenderSubID = Trader.Session.s1.StartTime = 05:30 Trader.Session.s1.TargetCompID = target1 Trader.Session.s1.TargetSubID = Trader.Session.s1.TerminateTime = 21:30 Trader.Session.s1.Password = foobar Trader.Session.s1.Users = user1,user2,user3 Trader.Session.s1.User.user1.Password = pass1 Trader.Session.s1.User.user2.Password = pass2 Trader.Session.s1.User.user3.Password = pass3 Trader.Session.s2.Description = Test session2 ...
FIX Antenna HFT has the same session management API as FIX Antenna C++.
Description of the methods FixEngine::createSession, Session::setInSeqNum, Session::setOutSeqNum, Session::connect, Session::disconnect, Session::release,
etc is available in the FIX Antenna C++ documentation: http://corp-web.b2bits.com/fixacpp/doc/html/
The Framework uses this API to manage sessions according to the configured schedule.
Message processing
The Framework registers a separate Application object for each session in order to listen to session's events. So, when a message comes in, it is not needed to perform lookup for an actual session object or a bound data by the session name, because each Application stores direct link to required data.
When remote Initiator sends the first logon message, the onLogonEvent is fired for the Application:
OrderEntryApplication::onLogonEvent(const Engine::LogonEvent* event, const Engine::Session& sn)
The Framework uses this event for password verification and decision on connection acceptance or denial.
When connection is established, the FIX Antenna HFT passes incoming messages to the application using process callback:
OrderEntryApplication::process( Engine::FIXMessageProcessElem* work, const Engine::Session& sn )
The Application interface in the FIX Antenna HFT also has old style process callback like in FIX Antenna C++:
process(const Engine::FIXMessage& msg, const Engine::Session& sn)
The first callback option is used when session parameter UseLiteMessage is set to "true" and the second one - if "false". The default behavior can be configured in the configuration file <HFT FA package>/samples/framework/bin/conf/engine.properties.test:
Session.Default.UseLiteMessage = true
In addition, the same behavior can be configured per session during session creation time by SessionExtraParameters::useLiteMessage_ property:
SessionExtraParameters extraParams; extraParams.useLiteMessage_ = true; engine->createSession (application, senderCompID, targetCompID, protocolID, &extraParams, storageType);
The work parameter of the first callback contains fast LiteFixMessage object created in the message pool. The application can extract message from the work parameter and defer it for asynchronous processing.
The heavy synchronous processing in this callback can slow down other sessions (see the FIX Antenna HFT threading model of message processing)
The LiteFixMessage doesn't support validation, the application should perform validation programmatically or by parsing incoming message into regular FIXMessage:
int sz=0; const char* buf = liteMsg->toRaw(&sz); Engine::FIXMessage* regMsg = Engine::FIXMsgProcessor::singleton()->parse(buf, sz); regMsg->checkRequired( Parser::Options() );
The message object should be returned to the pool once processing is finished:
// Engine::FIXMessageProcessElem* work Engine::MsgPipeElem* elem = work->elem; elem->returnObj();
The queue is chosen by the session's user name. Thus, messages from different users can be processed in parallel, and messages from the same user will be processed strictly one by one. The hash function of user name is used to define which queue should be used for this user.
The next processing of the incoming message is performed in the processMsg method:
OrderEntryApplication::processMsg(Parser::LiteFixMessage& msg, ProcessMsgMode processMode, Engine::MsgPipeElem* elem)
The Framework expects "BE" and "D" messages here and validates them programmatically. Messages of all other types will be rejected.
The "BE" message is used to logon/logout the session with specific user.
8=FIX.4.2|9=117|35=BE|49=target1|56=framework|34=2|50=user1|97=Y|52=20160210-15:13:48.081|923=UserReqID001|924=1|553=dummy|554=pass1|10=016|
The user name is taken from tag 50 and the password is taken from the tag 554. The Framework compares user's password from the message with the configured one and sends result in the "BF" message.
8=FIX.4.2|9=127|35=BF|49=framework|56=target1|34=000000002|52=20160210-15:13:48.089|923=UserReqID001|553=user1|926=1|927=[FRAMEWORK] Logged In|10=145|
For each incoming "D" message there is a validation for user to be logged on. The framework echoes "D" message back to user if the test is passed, otherwise issues "j" message rejecting it.
8=FIX.4.2|9=164|35=D|49=target1|56=framework|34=4|50=user|97=Y|52=20160210-15:19:56.915|11=90001008|1=10030003|21=2|55=TESTA|54=1|60=20000809-18:20:32|38=4000|40=2|44=30|59=0|47=I|10=045|
8=FIX.4.2|9=114|35=j|49=framework|56=target1|34=000000004|52=20160210-15:19:56.944|45=4|372=D|380=6|58=[FRAMEWORK] Not authorized|10=020|
Recovery
The Framework processes incoming messages asynchronously and out of sequence. In case of unexpected interruption/crush it makes harder to define which message was processed and which was not. Because of this, the framework performs message Replay at start time.
Each message in the logs has unique GlobalMsgID - the counter assigned at receiving or sending time that allows to put all messages in correct order. Each message in the logs can also store some data attached by the user. If there is an outgoing message created in response to incoming one, the Framework puts GlobalMsgID of the incoming message in this data field of the outgoing message. The framework generates exactly one outgoing message for each incoming message, so incoming message is marked as processed if some outgoing message contains it's GlobalMsgID.
Messages that weren't marked as processed after messages had been replayed will be processed again during recovery process.
The recovery process is performed in the FixConnectionMgr::loadPersistedState
method.
Distribution of CPU resources
The Framework uses API provided by FIX Antenna HFT to setup priority and affinity of both FIX Antenna HFT and Framework threads.
The Framework threads are configured by means of the following functions:
void setThreadAffinityCPUMask_Verbose( uint64_t mask, pthread_t thread, const char* thread_name ); void setThreadRealtimePrio_Verbose( int realtimePrio, const char* thread_name );
To get access to the FIX Antenna HFT threads the Framework registers ThreadsPoolListener
during the engine initialization:
Engine::FixEngine::InitParameters params; //... params.threadPoolListener_ = listener; engine = Engine::FixEngine::init( params );
and setups properties in the callback:
void onNewThreadCreated(Utils::ThreadAttrs& threadAttrs) { threadAttrs.realtimePrio_ = prio; threadAttrs.affinityMask_ = mask; //...
The engine initialization is performed in FixConnectionMgr::init()
and ThreadsPoolListener
interface is implemented in the FixConnectionMgr.
The Framework also locks pages in RAM using mlockall( MCL_FUTURE ) at its launch and disables OpenOnload for old style dispatcher in onNewThreadCreated
callback.
Remote console
The FIX Antenna HFT provides interface for implementation of remote console for telnet client.
The application should register a command handler function and enable flag in order to activate this functionality.
void handlerCmd( void* sock, char* cmd_buf ) { if( strcmp(cmd_buf, "hello") == 0 ) { writeToSocket( (Socket*)sock, "echo" ); } } //... g_platformProps.TCPCmdShell_Enable = true; platform_initialize(pLogCategory, logLevel, exitHandler, handlerCmd);
Threading models of message processing in FIX Antenna C++ and FIX Antenna HFT
FIX Antenna C++ default mode
The default mode is used for session if session's parameter SocketOpPriority is not configured or is set to EVEN
Engine::SessionExtraParameters extraParams; extraParams.socketPriority_ = EVEN_SOCKET_OP_PRIORITY; session = engine->createSession(application, senderCompID, targetCompID, fixVersion, &extraParams);
In this mode session's socket polling is performed by single dispatcher thread using select (Windows) or poll (Linux). Session's recvs/sends are handled in the worker pool.
FIX Antenna C++ aggressive mode
The aggressive mode can be independently enabled for sending and receiving parts of a session. The session's parameter SocketOnPriority should be set to AGGRESSIVE_SEND or AGGRESSIVE_RECEIVE or AGGRESSIVE_SEND_AND_RECEIVE accordingly.
Engine::SessionExtraParameters extraParams; extraParams.socketPriority_ = AGGRESSIVE_SEND_AND_RECEIVE_SOCKET_OP_PRIORITY; session = engine->createSession(application, senderCompID, targetCompID, fixVersion, &extraParams);
Each aggressive side of each session creates dedicated thread to poll sockets and process messages. The single dispatcher is not used.
The AGGRESSIVE_SEND and AGGRESSIVE_SEND_AND_RECEIVE modes also enable direct message emission from user thread to the socket if socket is ready to send data.
FIX Antenna HFT
The FIX Antenna HFT uses configurable number of the dispatchers (TCP reader threads) to poll sockets that are ready to read. The WSAPoll
method is used on Windows and epoll
method - on Linux. These dispatchers also perform reading from the sockets and all processing of the incoming messages.
FIX Antenna HFT always uses direct sending from the user threads to the sockets if sockets are ready. Otherwise (if sockets are busy), it uses the old style single dispatcher to poll sockets until a socket gets ready (vacant).
Also, in order to reduce number of system calls if the sending activity is very intensive, FIX Antenna HFT can merge outgoing messages into one sending buffer.
The FIX Antenna HFT dispatchers are designed to be OpenOnload friendly:
- all sockets are created in the same thread that will poll them for reading.
- If there is a new socket for incoming connection created as a result of listening socket processing, it is possible to put this new socket to the same dispatcher thread that processes the "parent" listening socket (solarflare stack per thread mode).
Another possibility is to chose dispatcher for the new incoming connection using RoundRobin. The behavior is managed by the boolean property TCPDispatcher.IncomingConnectionOnloadStackAffinity in the <HFT FA package>/samples/framework/bin/conf/engine.properties.test configuration file.
Listening sockets at the engine start and outgoing connections always use RoundRobin.
Accessing FIX Antenna HFT LiteFixMessage fields
The root fields in the LiteFixMessage can be accessed like in FIX Antenna C++:
Parser::LiteFixMessage* msg = Engine::FIXMsgProcessor::singleton()->newLiteMessage(); msg->set( 8, "FIX.4.2" ); msg->set( 9, "000" ); msg->set( 35, "BF" ); msg->set( 49, senderCompID ); msg->set( 56, targetCompID ); msg->set( 34, "000000000" ); msg->set( 52, "20140101-01:01:01.001" ); Engine::FIXFieldValue val; bool exist = msg->get(FIXFields::SenderCompID, &val);
Unlike common FIXMessage, the LiteFixMessage doesn't have hierarchical access to the fields in groups. LiteFixMessage has flat structure. Hence, the range of fields, within which required field should be looked for, has to be bound by start-end indexes explicitly and manually:
Parser::LiteFixMessage& orderMsg = msg; const int leadfield = 552; int leadindex = orderMsg.getTagIndex(leadfield); if( leadindex != orderMsg.NOTFOUND ) { const int startfield = 11; int startindex = orderMsg.getTagIndex(startfield, leadindex + 1); int endindex; for( int num = orderMsg.getTagAsIntAtIndex(leadindex); 0 < num && startindex != orderMsg.NOTFOUND; startindex = endindex, num-- ) { endindex = orderMsg.getTagIndex(startfield, startindex + 1); const int leadfield = 555; int leadindex = orderMsg.getTagIndex(leadfield); if( leadindex != orderMsg.NOTFOUND ) { int startfield = orderMsg.whichTagAtIndex( leadindex + 1 ); if( startfield == orderMsg.NOTFOUND ) throw std::logic_error("Bad MLEG group"); // MLEG Cross int startindexMleg = orderMsg.getTagIndex(startfield, leadindex + 1); int endindexMleg; for( int num = orderMsg.getTagAsIntAtIndex(leadindex); 0 < num && startindexMleg != orderMsg.NOTFOUND; startindexMleg = endindexMleg, num-- ) { endindexMleg = orderMsg.getTagIndex(startfield, startindexMleg + 1); int i = orderMsg.getTagIndex( FIXFields::Account, startindex, endindex ); if(i != orderMsg.NOTFOUND) { Engine::AsciiString account = orderMsg.getTagAsStringAtIndex(i); } } } } }