LastMsgSeqNumProcessed(369) processing with FAJ
Overview
LastMsgSeqNumProcessed(369) tag represents last MsgSeqNum(34) value received by the FIX engine and processed by the downstream application, such as trading engine or order routing system.
It can be specified on every message sent. It is useful for detecting a backlog with a counterparty.
The changes were introduced into FIX Antenna Java to support LastMsgSeqNumProcessed(369) tag processing. Improved mechanism of User Message handlers gives customers a flexible way to write and register own handlers for specific cases.
Sending LastMsgSeqNumProcessed(369) tag with FIX Antenna Java
FIX Antenna Java provides a built-in way to fill LastMsgSeqNumProcessed(369) tag for all outgoing messages. It needs to switch on the option ‘includeLastProcessed’ in FIX session configuration to enable this feature:
# Include last processed sequence 369 tag in every message for FIX versions > 4.2 # default value is "false" includeLastProcessed=true
Processing LastMsgSeqNumProcessed(369) tag
FIX Antenna Java provides a mechanism for extending engine’ message processing logic – user message handlers. It allows customizing engine’ behavior to different environments. This way also allows implementing processing of LastMsgSeqNumProcessed(369) tag for all incoming messages.
User Message handlers
User Message handlers register in FIX engine’ handlers chain to process all incoming messages. User Message handlers are placed in the chain before typed System Session message handlers but after Global System message handlers.
There 2 ways to register User Message handler:
- via engine/session configuration file
- with new API
Define User Message handler in configuration
To register User Message handler it needs to specify following lines in "fixengine.properties" file:
user.messagehandler.global.0=<user message handler 1 full class name> user.messagehandler.global.1=<user message handler 2 full class name>
This approach is simple but does not give enough flexibility and doesn’t allow to link handler with the rest application.
Register User Message handler with new API
New API (started from 2.21.0) can be used to register user message handler via code:
session.addUserGlobalMessageHandler(userMessageHandler1); session.addUserGlobalMessageHandler(userMessageHandler2);
User Message handler classes need to be extended from new AbstractUserGlobalMessageHandler class and implement processMessage(FIXFieldList message) method for user logic:
public class SampleUserMessageHandler extends AbstractUserGlobalMessageHandler { /** * Process input message in user message handler * * @param message * @return true - to process next handler in chain * false - stop further chain handlers processing */ public boolean processMessage(FIXFieldList message) { // user logic here to process incoming messages return true; // to process next handlers in chain // return false; // stop processing message in further chain handlers } }
This approach is more flexible and allows creating handlers with initial parameters or dynamically links them with other application parts.
Tag 369 processing with User Message handler mechanism
FIX Antenna Java provides implementations for pair of LastMsgSeqNumProcessed(369) tag processing cases:
- automatic sequence synchronization on Logout message by LastMsgSeqNumProcessed(369) tag
- slow consumer detection based on LastMsgSeqNumProcessed(369) tag values
Automatic sequence synchronization on Logout
Following sample can be used to synchronize sequences based on LastMsgSeqNumProcessed(369) tag value in Logout with User Message handler. Implementation of LastProcessedSequenceSyncMessageHandler is available started from FIX Antenna Java 2.21.0:
package com.epam.fixengine.session.messagehandler.user; import com.epam.fix.message.FIXFieldList; import com.epam.fix.message.constants.FIXT11; import com.epam.fixengine.session.messagehandler.AbstractUserGlobalMessageHandler; public class LastProcessedSequenceSyncMessageHandler extends AbstractUserGlobalMessageHandler { private final byte[] LOGOUT_MSG_TYPE = "5".getBytes(); @Override public boolean processMessage(FIXFieldList message) { if (message.hasTagValue(FIXT11.Header.LastMsgSeqNumProcessed)) { if (message.isTagValueEqual(FIXT11.Header.MsgType, LOGOUT_MSG_TYPE)) { try { getFIXSession().setOutSeqNum(message.getTagValueAsLong(FIXT11.Header.LastMsgSeqNumProcessed)); } catch (Exception ex) { logErrorToSession("Error to reset seq number for tag 369 in Logout", ex); } } } return true; } }
This handler can be registered through:
- fixengine.properties:
user.messagehandler.global.0=com.epam.fixengine.session.messagehandler.user.LastProcessedSequenceSyncMessageHandler
- API:
session.addUserGlobalMessageHandler(new LastProcessedSequenceSyncMessageHandler());
Slow consumer detection based on LastMsgSeqNumProcessed(369) tag values
Following sample can be used to check and detect slow consumer case based on LastMsgSeqNumProcessed(369) tag processing. Provided LastProcessedSlowConsumerCheckerMessageHandler compare session’ outgoing sequence number with received LastMsgSeqNumProcessed(369) tag value and report about a slow consumer in case of exceeding the provided threshold:
package com.epam.fixengine.session.messagehandler.user; import com.epam.fix.message.FIXFieldList; import com.epam.fix.message.constants.FIXT11; import com.epam.fixengine.session.AbstractFIXSession; import com.epam.fixengine.session.FIXSessionSlowConsumerListener; import com.epam.fixengine.session.SlowConsumerReason; import com.epam.fixengine.session.messagehandler.AbstractUserGlobalMessageHandler; public class LastProcessedSlowConsumerCheckerMessageHandler extends AbstractUserGlobalMessageHandler { private int gapSize; public LastProcessedSlowConsumerCheckerMessageHandler(final int gapSize) { this.gapSize = gapSize; } @Override public boolean processMessage(FIXFieldList message) { if (message.hasTagValue(FIXT11.Header.LastMsgSeqNumProcessed)) { FIXSessionSlowConsumerListener slowConsumerListener = getSlowConsumerListener(); if (slowConsumerListener != null) { long current = getFIXSession().getOutSeqNum(); long processed = message.getTagValueAsLong(FIXT11.Header.LastMsgSeqNumProcessed); if (current - processed > gapSize) { slowConsumerListener.onSlowConsumerDetected(SlowConsumerReason.LAST_PROCESSED_SEQ_RECEIVED, current, processed); } } } return true; } private FIXSessionSlowConsumerListener getSlowConsumerListener() { return ((AbstractFIXSession) getFIXSession()).getSlowConsumerListener(); } }
LastProcessedSlowConsumerCheckerMessageHandlern is also available started from FIX Antenna Java 2.21.0.
To enable this logic you need to create handler and register it with API:
// prepare user message handler instance with gap size in sequences to controll LastProcessedSlowConsumerCheckerMessageHandler userHandler = new LastProcessedSlowConsumerCheckerMessageHandler(100); // set slow consumer listener session.setSlowConsumerListener((reason, expected, current) -> { // user logic to react on slow consumer events System.out.println("Slow consumer detected. Session: " + session.getSessionParameters().getSessionID()); }); // register user message handler in session session.addUserGlobalMessageHandler(userHandler);