Transport adapter API
Implementation
The FEJ routing server provides the ability to integrate custom 3rd-party transports and use them for routing messages. To this purpose, the server provides several interfaces for implementation:
Endpoint
is a basic interface that represents an instance of some entry that can be uniquely identified by the routing engine for receiving or delivering messages. EndpointParams
provides the unique ID of that endpoint and other additional attributes that may be useful for the routing logic.
public interface Endpoint { EndpointParams getParams(); }
The SourceEndpoint
interface represents a provider of messages for the routing engine.
public interface SourceEndpoint extends Endpoint { void setListener(SourceMessageListener listener); }
It offers the ability to register listeners of the SourceMessageListener
 type for processing incoming messages from external systems.
public interface SourceMessageListener { void onNewMessage(FIXFieldList message); }
The DestinationEndpoint
interface represents a target of the routing rules. It allows you to pass the routed message to a certain system.
public interface DestinationEndpoint extends Endpoint { /** * Send a {@link FIXFieldList} to this adapter. If the message is sent successfully, * the method returns {@code true}. If the message cannot be sent due to a * non-fatal reason, the method returns {@code false}. The method may also * throw a RuntimeException in case of non-recoverable errors. * <p>This method may block indefinitely, depending on the implementation. * * @param message the message to send * @return whether or not the message was sent */ boolean send(FIXFieldList message); }
There is also a mechanism for registering sources like these and destinations into the server. After registration, these endpoints will be accessible to the routing engine. The registration of sources and destinations are independent. This means that you can register the source and the destination endpoint with the same ID. This is especially important for bidirectional transports like FIX, where in and out connections are identified by the same parameters. The BidirectionalEndpoint
interface exists for transports like this.
public interface BidirectionalEndpoint extends ConsumerEndpoint, ProducerEndpoint { }
Anyway, there are two separate interfaces for registering sources and destinations:
public interface SourceEndpointRegistry { void registerConsumer(SourceEndpoint consumer); void removeConsumer(String id); } public interface DestinationEndpointRegistry { void registerProducer(DestinationEndpoint producer); void removeProducer(String id); }
Both of them are implemented by the EndpointRegistryAdapter
class. It is accessible via the Spring config:
<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter" c:destinationsRegister-ref="destinationsRegister"/>
The Transport adapter API supports the extension of the following transport implementations:
JMS endpoint
The JMS endpoint is the implementation of the transport adapter.
There are several ways to add JMS connectivity into the FEJ container. The sysconf/fej-jms.xml
configuration file already contains the basic configuration for the JMS adapter:
<bean id="jmsProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean" p:location="classpath:jms-adaptor.properties"/> <bean id="jmsConfig" class="com.epam.fixengine.jms.config.Config" c:prefix="jms.adaptor" c:properties-ref="jmsProperties"> </bean> <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister" p:jmsManager-ref="jmsAdaptorManager" c:config-ref="jmsConfig" init-method="init"/> <bean id="jmsClientFactory" class="com.epam.fixengine.jms.client.JMSClientFactory" factory-method="getInstance"/> <bean id="jmsAdaptorManager" class="com.epam.fej.jms.JmsAdapterManager" c:endpointRegistry-ref="endpointRegistry" c:clientFactory-ref="jmsClientFactory" depends-on="rulesConfigManager"/>
The jms-adaptor.properties file contains parameters for JMS producers and consumers (FIXAJ JMS Adapter properties). The jmsConfigRegister
bean (com.epam.fej.jms.DefaultJmsConfigsRegister
) is responsible for loading JMS session contexts (SessionContext) from the configuration file and registering them with jmsAdaptorManager
(com.epam.fej.jms.JmsAdaptorManager) for the routing engine. JmsAdaptorManager
builds source and destination endpoint adapters from the given SessionContext objects and registers them on the server.
If you want use your own Configuration Factory, you can use JmsManager
implementation for building and registering SessionContext instances as well.
DefaultJmsConfigsRegister
produces SessionContext
via the JmsContextFactory
implementation. By default, it uses the com.epam.fej.jms.JndiJmsSessionContextFactory
implementation but you can set you own implementation via DefaultJmsConfigsRegister.setJmsContextFactory(JmsContextFactory jmsContextFactory)
. You can also use com.epam.fej.jms.SimpleJmsContextFactory
with your definition of javax.jms.ConnectionFactory.
public SessionContext createSessionContext(JmsConfig config) { final ConnectionInfo connectionInfo = config.getConnectionInfo(); final SessionInfo sessionInfo = config.getSessionInfo(); return new SessionContext(connectionInfo, sessionInfo, null, jmsContextFactory.createContext(config)); }
Please note that the ConnectionInfo
and SessionInfo
classes support the loading of custom properties from configuration files:
final Properties additionalProperties = connectionInfo.getAdditionalProperties(); final Map<String, Object> additionalParams = sessionInfo.getAdditionalParams();
Custom connection factory instead of JNDI
The custom JMS connection factory can be used in several ways:
DeclareÂ
ConnectionFactory
in Spring and inject it inSimpleJmsContextFactory
:<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister" p:jmsManager-ref="jmsAdaptorManager" p:jmsContextFactory-ref="jmsContextFactory" c:config-ref="jmsConfig" init-method="init"/> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" c:brokerURL-ref="${jms.broker.url}"/> <bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory" c:connectionFactory-ref="jmsConnectionFactory"/>
Implement your own
JmsContextFactory
and pass it as a parameter forDefaultJmsConfigRegister
:public class ActiveMqJmsContextFactory implements JmsContextFactory { @Override public JMSContext createContext(JmsConfig config) { return new SimpleJmsContext(new ActiveMQConnectionFactory( config.getConnectionInfo().getProviderURL()), config.getConnectionInfo()); } }
<bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/> <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister" p:jmsManager-ref="jmsAdaptorManager" p:jmsContextFactory-ref="activeMqContextFactory" c:config-ref="jmsConfig" init-method="init"/>
Kafka endpoint
The Kafka endpoint is a FEJ transport adapter that provides connectivity to the Kafka streaming platform. Its implementation is based on the FIX Antenna Java Kafka adapter and it uses the Kafka Producer and Consumer API internally.
Inside the FIXEdge Java server, each instance of the Kafka producer is represented as a destination endpoint and each instance of the consumer is represented as a source endpoint. After the server's start, they are available for use in the routing rules, to accept messages from topics, and to send them to Kafka.
Configuration
Basic Kafka parameters are described here.
There are several ways to add Kafka connectivity into the FIXEdge/J container. The sysconf/fej-kafka.xml configuration file already contains the basic configuration for the Kafka adapter:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:c="http://www.springframework.org/schema/c" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="kafkaProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean" p:location="classpath:kafka-adaptor.properties"/> <bean id="kafkaConfig" class="com.epam.fixengine.kafka.config.Config" c:prefix="kafka" c:properties-ref="kafkaProperties"> </bean> <bean id="kafkaConfigRegister" class="com.epam.fej.kafka.DefaultKafkaConfigsRegister" p:clientManager-ref="kafkaClientManager" c:config-ref="kafkaConfig"/> <bean id="kafkaClientFactory" class="com.epam.fixengine.kafka.client.ClientFactory" factory-method="getInstance"/> <bean id="kafkaClientManager" class="com.epam.fej.kafka.DefaultKafkaClientManager" c:endpointRegistry-ref="endpointRegistry" c:clientFactory-ref="kafkaClientFactory" c:messageEventPool-ref="messageEventPool" depends-on="rulesConfigManager"/> </beans>
The kafkaConfigRegister
bean is responsible for loading Kafka session contexts (SessionContext) from the configuration file and registering them with kafkaClientManager
 (com.epam.fej.kafka.DefaultKafkaClientManager
) for the routing engine. The kafkaClientManager
bean builds source and destination endpoint adapters from the given SessionContext objects and registers them on the server.
Refer to the Kafka transport configuration.
Business Rules
When FIXEdge/JÂ acts as a Producer and forwards messages to Kafka topics, please use the following business rule:
[ rule("Route NewOrder to Kafka") .sourceCondition({ //static filter for sesion source -> source.id == "fix_client" }) .condition({ // context filter - apply this rule for New Order - Single (D) messages ctx -> ctx.getMessage().getTagValueAsString(35) == "D" }) .action({ // action for rule - send message to Kafka client with id 'KProducer' ctx -> routingContext.getDestinationById("KProducer").send(ctx.messageEvent) ctx.exit() }) .build() ]
When FIXEdge/J acts as a Consumer and subscribes to messages in Kafka topics, please use the following business rule:
[ rule("Route all messages from Kafka") .sourceCondition({ params -> return params.getGroups().contains("kafka_consumer") }) .action({ ctx-> def msg = ctx.getMessage() logger.info("Processing message '{}' has been started", msg) def sessionDestination = stringValue(msg, Header.DeliverToCompID) send(routingContext, ctx, sessionDestination) ctx.exit() logger.info("Processing message '{}' has been finished: {}", msg, new java.util.Date().toInstant()) }) .build() ]
Camel destination endpoint
The Camel framework is integrated into FIXEdge Java as a destination endpoint that allows publishing messages by using the integrated components.
FIXEdge Java provides a definition of the Camel source endpoint ("direct:routingRule"
), which can receive messages from the FIXEdge business layer.Â
Please refer to the official Camel documentation for further Camel extension.
SMTP destination endpoint
The JavaMail framework is integrated into FIXEdge Java as SMTP destination endpoints and enables sending emails to external SMTP servers.
Configuration
The Spring configuration is used to configure SMTP destination endpoints with their own properties for connection to external SMTP servers and converters to prepare automatic emails from FIX messages.
Example of Spring configuration
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:c="http://www.springframework.org/schema/c" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="smtpProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean" p:location="classpath:smtp-adaptor.properties"/> <bean id="smtpConfigProvider" class="com.epam.fej.smtp.SMTPConfigProvider" c:properties-ref="smtpProperties"> </bean> <util:properties id="velocityProperties"> <prop key="resource.loader">class</prop> <prop key="class.resource.loader.class">org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader</prop> </util:properties> <bean id="velocityEngine" class="org.apache.velocity.app.VelocityEngine"> <constructor-arg ref="velocityProperties"/> </bean> <bean id="simpleTemplateConverter" class="com.epam.fej.smtp.TemplateEmailConverter" scope="prototype" p:velocityEngine-ref="velocityEngine"> </bean> <bean id="mimeTemplateConverter" class="com.epam.fej.smtp.TemplateMimeEmailConverter" scope="prototype" p:velocityEngine-ref="velocityEngine"> </bean> <context:component-scan base-package="com.epam.fej.smtp"/> </beans>
SMTPConfigProvider
prepares SMTP configuration objects from the smtp-adaptor.properties file to use them in the SMTPAdapterManager
for SMTP endpoint creation.
SMTPAdapterManager and SMTPDestinationEnpoint / SMTPMimeDestinationEndpoint configuration
SMTPAdapterManager
 is created as Spring Configuration Bean and, upon its initialization, creates and registers SMTPDestinationEndpointÂ
or SMTPMimeDestinationEndpoint
objects, which are FIXEdge Java destination endpoints (for simple or MIME email messages) that can be used in routing rules to send emails to external SMTP servers.
Each simple or MIME SMTP destination endpoint internally uses:
JavaMailSender
- prepared SMTP client to connect to the configured SMTP server;SimpleMailMessage
- prepared template with configured 'From/To/CC/BCC/Subject' values that will be common for all emails from this endpoint;SimpleEmailConverter
orÂMimeEmailConverter
 - converter class with client logic to transform aÂMessageEvent
(FIX message) into an email message (set email's text for this endpoint emails).
For a list of SMTP endpoint properties, refer to the Configuring SMTP endpoints section.
Simple and MIME Converters
A converter class must implement either the SimpleEmailConverter
or the MimeEmailConverter
interface. Each interface works with one of two email messages: the SimpleEmailMessage (simple text without formating) or the MimeEmailMessage (message can contain HTML formatted content).
public interface SimpleEmailConverter { SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to); default void init(SMTPConfig config) {} }
public interface MimeEmailConverter { MimeMailMessage convert(MessageEvent from, MimeMailMessage to) throws MessagingException; default void init(SMTPConfig config) {} }
There are 3 possible ways to set the SMTP endpoint converter (in order to control how FEJ checks properties for it):
- use the 'converterRef' property in SMTP configuration - define the Spring bean in FEJ config (SimpleEmailConverter or MimeEmailConverter implemented) and set a reference to it (possible to define scope=prototype to have diff. instances for diff. references)
- use the 'converterClass' property in SMTP configuration - an instance of the converter class is created when the SMTP endpoint is initialized (class SimpleEmailConverter or MimeEmailConverter implemented)
- default converter from Spring's defaultConverterFactory (MimeEmailConverter implemented)
A custom converter can implement the client's specific logic to set/change email fields (Subject/To/CC/BCC/Text).
package com.epam.fej.smtp; import com.epam.fej.routing.MessageEvent; import org.springframework.mail.SimpleMailMessage; public class FIXToEmailConverter implements SimpleEmailConverter { @Override public SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to) { if (from.getMessage() != null) { to.setText(from.getMessage().toPrintableString()); } else { to.setText(from.toString()); } return to; } }
package com.epam.fej.smtp; import com.epam.fej.routing.MessageEvent; import javax.mail.MessagingException; import org.springframework.mail.javamail.MimeMailMessage; public class FIXToMimeEmailConverter implements MimeEmailConverter { @Override public MimeMailMessage convert(MessageEvent from, MimeMailMessage to) throws MessagingException { if (from.getMessage() != null) { to.getMimeMessageHelper().setText("<b>" + from.getMessage().toPrintableString() + "</b>", true); } else { to.getMimeMessageHelper().setText("<b>" + from.toString() + "</b>", true); } return to; } }
Apache Velocity Template Converters
FEJ gives support for the usage of Apache Velocity templates for Simple and MIME converters.
The Velocity engine uses objects from MessageEvent
headers in its templates:
- '$event' - in the template, this property will point to aÂ
MessageEvent
object - '$source' - in the template, this property will point to an object implemented
AppEvent
interface (ServerStateEvent
,FIXSessionStateEvent
,SnFEvent
, etc)
@Override public SimpleMailMessage convert(MessageEvent messageEvent, SimpleMailMessage mailMessage) { Serializable subject = messageEvent.getHeader(TemplateOptions.EVENT_HEADER_SUBJECT_PROPERTY); if (subject != null) { LOGGER.trace("Set subject from event header '{}': {}", EVENT_HEADER_SUBJECT_PROPERTY, subject); mailMessage.setSubject(subject.toString()); } String template = (String) messageEvent.getHeader(EVENT_HEADER_TEMPLATE_PROPERTY); if (template != null) { LOGGER.trace("Use template from event header : {}", template); } else { LOGGER.trace("Use default template: {}", defaultTemplate); template = defaultTemplate; } if (template == null) { throw new IllegalStateException("Template isn't defined, configuration property '" + TEMPLATE_PROP + "' is empty"); } mailMessage.setText(convertFromTemplate(template, messageEvent)); return mailMessage; }
@Override public MimeMailMessage convert(MessageEvent messageEvent, MimeMailMessage mailMessage) throws MessagingException { Serializable subject = messageEvent.getHeader(EVENT_HEADER_SUBJECT_PROPERTY); if (subject != null) { LOGGER.trace("Set subject from event header '{}': {}", EVENT_HEADER_SUBJECT_PROPERTY, subject); mailMessage.getMimeMessageHelper().setSubject(subject.toString()); } String template = (String) messageEvent.getHeader(EVENT_HEADER_TEMPLATE_PROPERTY); if (template != null) { LOGGER.trace("Use template from event header : {}", template); } else { LOGGER.trace("Use default template: {}", defaultTemplate); template = defaultTemplate; } if (template == null) { throw new IllegalStateException("Template isn't defined, configuration property '" + TEMPLATE_PROP + "' is empty"); } mailMessage.getMimeMessageHelper().setText(convertFromTemplate(template, messageEvent), true); return mailMessage; }
Examples of templates:
Simple Template Test FIX: $event.getMessage()
<b>MIME Template Test</b><br> <i>FIX:</i> $event.getMessage()<br
Example of a rule that sends all FIXSessionStateEvent
to SMTP destination endpoint with templates:
// send AppEvent with default template from config eventRule("Sample event rule"). eventType(FIXSessionStateEvent.class). condition({ event -> return true }). action({ event -> rc.getDestinationById("smtp-adapter").send(event) }). build(), // send AppEvent with specific template eventRule("Sample event rule"). eventType(FIXSessionStateEvent.class). condition({ event -> return true }). action({ event -> rc.getDestinationById("smtp-adapter").send(event, "newTemplateForThisEvent.vm") }). build()