Transport adapter API

Implementation

The FEJ routing server provides the ability to integrate custom 3rd-party transports and use them for routing messages. To this purpose, the server provides several interfaces for implementation:

Endpoint is a basic interface that represents an instance of some entry that can be uniquely identified by the routing engine for receiving or delivering messages. EndpointParams provides the unique ID of that endpoint and other additional attributes that may be useful for the routing logic.

public interface Endpoint {
    EndpointParams getParams();
}

The SourceEndpoint interface represents a provider of messages for the routing engine.

public interface SourceEndpoint extends Endpoint {
    void setListener(SourceMessageListener listener);
}

It offers the ability to register listeners of the SourceMessageListener type for processing incoming messages from external systems.

public interface SourceMessageListener {
    void onNewMessage(FIXFieldList message);
}

The DestinationEndpoint interface represents a target of the routing rules. It allows you to pass the routed message to a certain system.

public interface DestinationEndpoint extends Endpoint {
    /**
      * Send a {@link FIXFieldList} to this adapter. If the message is sent successfully,
      * the method returns {@code true}. If the message cannot be sent due to a
      * non-fatal reason, the method returns {@code false}. The method may also
      * throw a RuntimeException in case of non-recoverable errors.
      * <p>This method may block indefinitely, depending on the implementation.
      *
      * @param message the message to send
      * @return whether or not the message was sent
      */
     boolean send(FIXFieldList message);
}

There is also a mechanism for registering sources like these and destinations into the server. After registration, these endpoints will be accessible to the routing engine. The registration of sources and destinations are independent. This means that you can register the source and the destination endpoint with the same ID. This is especially important for bidirectional transports like FIX, where in and out connections are identified by the same parameters. The BidirectionalEndpoint interface exists for transports like this.

public interface BidirectionalEndpoint extends ConsumerEndpoint, ProducerEndpoint {
}

Anyway, there are two separate interfaces for registering sources and destinations:

public interface SourceEndpointRegistry {
    void registerConsumer(SourceEndpoint consumer);
    void removeConsumer(String id);
}

public interface DestinationEndpointRegistry {
    void registerProducer(DestinationEndpoint producer);
    void removeProducer(String id);
}

Both of them are implemented by the EndpointRegistryAdapter class. It is accessible via the Spring config:

<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter"
         c:destinationsRegister-ref="destinationsRegister"/>

The Transport adapter API supports the extension of the following transport implementations:

JMS endpoint

The JMS endpoint is the implementation of the transport adapter.

There are several ways to add JMS connectivity into the FEJ container. The sysconf/fej-jms.xml configuration file already contains the basic configuration for the JMS adapter:

<bean id="jmsProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
      p:location="classpath:jms-adaptor.properties"/>

<bean id="jmsConfig" class="com.epam.fixengine.jms.config.Config"
      c:prefix="jms.adaptor"
      c:properties-ref="jmsProperties">
</bean>

<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
      p:jmsManager-ref="jmsAdaptorManager"
      c:config-ref="jmsConfig" init-method="init"/>

<bean id="jmsClientFactory" class="com.epam.fixengine.jms.client.JMSClientFactory" factory-method="getInstance"/>

<bean id="jmsAdaptorManager" class="com.epam.fej.jms.JmsAdapterManager"
      c:endpointRegistry-ref="endpointRegistry"
      c:clientFactory-ref="jmsClientFactory"
      depends-on="rulesConfigManager"/>

The jms-adaptor.properties file contains parameters for JMS producers and consumers (FIXAJ JMS Adapter properties). The jmsConfigRegister bean (com.epam.fej.jms.DefaultJmsConfigsRegister) is responsible for loading JMS session contexts (SessionContext) from the configuration file and registering them with jmsAdaptorManager (com.epam.fej.jms.JmsAdaptorManager) for the routing engine. JmsAdaptorManager builds source and destination endpoint adapters from the given SessionContext objects and registers them on the server.

If you want use your own Configuration Factory, you can use JmsManager implementation for building and registering SessionContext instances as well.

DefaultJmsConfigsRegister produces SessionContext via the JmsContextFactory implementation. By default, it uses the com.epam.fej.jms.JndiJmsSessionContextFactory implementation but you can set you own implementation via DefaultJmsConfigsRegister.setJmsContextFactory(JmsContextFactory jmsContextFactory). You can also use com.epam.fej.jms.SimpleJmsContextFactory with your definition of javax.jms.ConnectionFactory.

public SessionContext createSessionContext(JmsConfig config) {
    final ConnectionInfo connectionInfo = config.getConnectionInfo();
    final SessionInfo sessionInfo = config.getSessionInfo();
    return new SessionContext(connectionInfo, sessionInfo, null, jmsContextFactory.createContext(config));
}

Please note that the ConnectionInfo and SessionInfo classes support the loading of custom properties from configuration files:

final Properties additionalProperties = connectionInfo.getAdditionalProperties();

final Map<String, Object> additionalParams = sessionInfo.getAdditionalParams();

Custom connection factory instead of JNDI

The custom JMS connection factory can be used in several ways:

  1. Declare ConnectionFactory in Spring and inject it in SimpleJmsContextFactory:

    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="jmsContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>
    
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
          c:brokerURL-ref="${jms.broker.url}"/>
    
    <bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory"
          c:connectionFactory-ref="jmsConnectionFactory"/>
  2. Implement your own JmsContextFactory and pass it as a parameter for DefaultJmsConfigRegister:

    public class ActiveMqJmsContextFactory implements JmsContextFactory {
        @Override
        public JMSContext createContext(JmsConfig config) {
            return new SimpleJmsContext(new ActiveMQConnectionFactory(
                    config.getConnectionInfo().getProviderURL()), config.getConnectionInfo());
        }
    }
    <bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/>
    
    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="activeMqContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>

Kafka endpoint

The Kafka endpoint is a FEJ transport adapter that provides connectivity to the Kafka streaming platform. Its implementation is based on the FIX Antenna Java Kafka adapter and it uses the Kafka Producer and Consumer API internally.
Inside the FIXEdge Java server, each instance of the Kafka producer is represented as a destination endpoint and each instance of the consumer is represented as a source endpoint. After the server's start, they are available for use in the routing rules, to accept messages from topics, and to send them to Kafka.

Configuration

Basic Kafka parameters are described here.

There are several ways to add Kafka connectivity into the FIXEdge/J container. The sysconf/fej-kafka.xml configuration file already contains the basic configuration for the Kafka adapter:

sysconf/fej-kafka.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:c="http://www.springframework.org/schema/c"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

     <bean id="kafkaProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
          p:location="classpath:kafka-adaptor.properties"/>

    <bean id="kafkaConfig" class="com.epam.fixengine.kafka.config.Config"
          c:prefix="kafka"
          c:properties-ref="kafkaProperties">
    </bean>

    <bean id="kafkaConfigRegister" class="com.epam.fej.kafka.DefaultKafkaConfigsRegister"
          p:clientManager-ref="kafkaClientManager"
          c:config-ref="kafkaConfig"/>

    <bean id="kafkaClientFactory" class="com.epam.fixengine.kafka.client.ClientFactory" factory-method="getInstance"/>

    <bean id="kafkaClientManager" class="com.epam.fej.kafka.DefaultKafkaClientManager"
          c:endpointRegistry-ref="endpointRegistry"
          c:clientFactory-ref="kafkaClientFactory"
          c:messageEventPool-ref="messageEventPool"
          depends-on="rulesConfigManager"/>

</beans>

The kafkaConfigRegister bean is responsible for loading Kafka session contexts (SessionContext) from the configuration file and registering them with kafkaClientManager (com.epam.fej.kafka.DefaultKafkaClientManager) for the routing engine. The kafkaClientManager bean builds source and destination endpoint adapters from the given SessionContext objects and registers them on the server.

Refer to the Kafka transport configuration.

Business Rules

When FIXEdge/J acts as a Producer and forwards messages to Kafka topics, please use the following business rule:

rules.groovy
[
	rule("Route NewOrder to Kafka")
		.sourceCondition({
		    //static filter for sesion
		    source -> source.id == "fix_client"
		})
		.condition({
			// context filter - apply this rule for New Order - Single (D) messages
			ctx -> ctx.getMessage().getTagValueAsString(35) == "D"
		})
		.action({
			// action for rule - send message to Kafka client with id 'KProducer'
			ctx ->
				routingContext.getDestinationById("KProducer").send(ctx.messageEvent)
				ctx.exit()
		})
		.build()

]

When FIXEdge/J acts as a Consumer and subscribes to messages in Kafka topics, please use the following business rule:

rules.groovy
[
    rule("Route all messages from Kafka")
            .sourceCondition({
                params ->
                    return params.getGroups().contains("kafka_consumer")
            })
            .action({
                ctx->
                    def msg = ctx.getMessage()
                    logger.info("Processing message '{}' has been started", msg)
                    def sessionDestination = stringValue(msg, Header.DeliverToCompID)
                    send(routingContext, ctx, sessionDestination)
                    ctx.exit()
                    logger.info("Processing message '{}' has been finished: {}", msg, new java.util.Date().toInstant())
            })
            .build()
]

Camel destination endpoint

The Camel framework is integrated into FIXEdge Java as a destination endpoint that allows publishing messages by using the integrated components.

FIXEdge Java provides a definition of the Camel source endpoint ("direct:routingRule"), which can receive messages from the FIXEdge business layer. 

Please refer to the official Camel documentation for further Camel extension.


SMTP destination endpoint

The JavaMail framework is integrated into FIXEdge Java as SMTP destination endpoints and enables sending emails to external SMTP servers.

Configuration

The Spring configuration is used to configure SMTP destination endpoints with their own properties for connection to external SMTP servers and converters to prepare automatic emails from FIX messages.

Example of Spring configuration

Example of 'sysconf/fej-smtp.xml'
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:c="http://www.springframework.org/schema/c"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="smtpProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
          p:location="classpath:smtp-adaptor.properties"/>

    <bean id="smtpConfigProvider" class="com.epam.fej.smtp.SMTPConfigProvider"
          c:properties-ref="smtpProperties">
    </bean>

    <util:properties id="velocityProperties">
        <prop key="resource.loader">class</prop>
        <prop key="class.resource.loader.class">org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader</prop>
    </util:properties>

    <bean id="velocityEngine" class="org.apache.velocity.app.VelocityEngine">
        <constructor-arg ref="velocityProperties"/>
    </bean>
    
    <bean id="simpleTemplateConverter" class="com.epam.fej.smtp.TemplateEmailConverter" scope="prototype"
          p:velocityEngine-ref="velocityEngine">
    </bean>

    <bean id="mimeTemplateConverter" class="com.epam.fej.smtp.TemplateMimeEmailConverter" scope="prototype"
          p:velocityEngine-ref="velocityEngine">
    </bean>

    <context:component-scan base-package="com.epam.fej.smtp"/>
</beans>

SMTPConfigProvider prepares SMTP configuration objects from the smtp-adaptor.properties file to use them in the SMTPAdapterManager for SMTP endpoint creation.

SMTPAdapterManager and SMTPDestinationEnpoint / SMTPMimeDestinationEndpoint configuration

SMTPAdapterManager is created as Spring Configuration Bean and, upon its initialization, creates and registers SMTPDestinationEndpoint or SMTPMimeDestinationEndpoint objects, which are FIXEdge Java destination endpoints (for simple or MIME email messages) that can be used in routing rules to send emails to external SMTP servers.

Each simple or MIME SMTP destination endpoint internally uses:

  • JavaMailSender - prepared SMTP client to connect to the configured SMTP server;
  • SimpleMailMessage - prepared template with configured 'From/To/CC/BCC/Subject' values that will be common for all emails from this endpoint;
  • SimpleEmailConverter or MimeEmailConverter  - converter class with client logic to transform a MessageEvent (FIX message) into an email message (set email's text for this endpoint emails).

For a list of SMTP endpoint properties, refer to the Configuring SMTP endpoints section.

Simple and MIME Converters

A converter class must implement either the SimpleEmailConverter or the MimeEmailConverter interface. Each interface works with one of two email messages: the SimpleEmailMessage (simple text without formating) or the MimeEmailMessage (message can contain HTML formatted content).

SimpeEmailConverter interface
public interface SimpleEmailConverter {
    SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to);
    
    default void init(SMTPConfig config) {}
}
MimeEmailConverter interface
public interface MimeEmailConverter {
    MimeMailMessage convert(MessageEvent from, MimeMailMessage to) throws MessagingException;

    default void init(SMTPConfig config) {}
}

There are 3 possible ways to set the SMTP endpoint converter (in order to control how FEJ checks properties for it):

  • use the 'converterRef' property in SMTP configuration - define the Spring bean in FEJ config (SimpleEmailConverter or MimeEmailConverter implemented) and set a reference to it  (possible to define scope=prototype to have diff. instances for diff. references)
  • use the 'converterClass' property in SMTP configuration - an instance of the converter class is created when the SMTP endpoint is initialized (class SimpleEmailConverter or MimeEmailConverter implemented)
  • default converter from Spring's defaultConverterFactory (MimeEmailConverter implemented)

A custom converter can implement the client's specific logic to set/change email fields (Subject/To/CC/BCC/Text).

FIXToEmailConverter - default SampleEmailConverter
package com.epam.fej.smtp;

import com.epam.fej.routing.MessageEvent;
import org.springframework.mail.SimpleMailMessage;

public class FIXToEmailConverter implements SimpleEmailConverter {
    @Override
    public SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to) {
        if (from.getMessage() != null) {
            to.setText(from.getMessage().toPrintableString());
        } else {
            to.setText(from.toString());
        }
        return to;
    }
}
FIXToMimeEmailConverter - default MimeEmailConverter
package com.epam.fej.smtp;

import com.epam.fej.routing.MessageEvent;
import javax.mail.MessagingException;
import org.springframework.mail.javamail.MimeMailMessage;

public class FIXToMimeEmailConverter implements MimeEmailConverter {

    @Override
    public MimeMailMessage convert(MessageEvent from, MimeMailMessage to) throws MessagingException {
        if (from.getMessage() != null) {
            to.getMimeMessageHelper().setText("<b>" + from.getMessage().toPrintableString() + "</b>", true);
        } else {
            to.getMimeMessageHelper().setText("<b>" + from.toString() + "</b>", true);
        }
        return to;
    }
    
}

Apache Velocity Template Converters

FEJ gives support for the usage of Apache Velocity templates for Simple and MIME converters.

The Velocity engine uses objects from MessageEvent headers in its templates:

  • '$event' - in the template, this property will point to a MessageEvent object
  • '$source' - in the template, this property will point to an object implemented AppEvent interface (ServerStateEvent, FIXSessionStateEvent, SnFEvent, etc)
TemplateEmailConverter - SimpleEmailConverter for templates
    @Override
    public SimpleMailMessage convert(MessageEvent messageEvent, SimpleMailMessage mailMessage) {
        Serializable subject = messageEvent.getHeader(TemplateOptions.EVENT_HEADER_SUBJECT_PROPERTY);
        if (subject != null) {
            LOGGER.trace("Set subject from event header '{}': {}", EVENT_HEADER_SUBJECT_PROPERTY, subject);
            mailMessage.setSubject(subject.toString());
        }

        String template = (String) messageEvent.getHeader(EVENT_HEADER_TEMPLATE_PROPERTY);
        if (template != null) {
            LOGGER.trace("Use template from event header : {}", template);
        } else {
            LOGGER.trace("Use default template: {}", defaultTemplate);
            template = defaultTemplate;
        }
        if (template == null) {
            throw new IllegalStateException("Template isn't defined, configuration property '" + TEMPLATE_PROP + "' is empty");
        }

        mailMessage.setText(convertFromTemplate(template, messageEvent));

        return mailMessage;
    }
TemplateMimeEmailConverter - MimeEmailConverter for templates
    @Override
    public MimeMailMessage convert(MessageEvent messageEvent, MimeMailMessage mailMessage) throws MessagingException {
        Serializable subject = messageEvent.getHeader(EVENT_HEADER_SUBJECT_PROPERTY);
        if (subject != null) {
            LOGGER.trace("Set subject from event header '{}': {}", EVENT_HEADER_SUBJECT_PROPERTY, subject);
            mailMessage.getMimeMessageHelper().setSubject(subject.toString());
        }

        String template = (String) messageEvent.getHeader(EVENT_HEADER_TEMPLATE_PROPERTY);
        if (template != null) {
            LOGGER.trace("Use template from event header : {}", template);
        } else {
            LOGGER.trace("Use default template: {}", defaultTemplate);
            template = defaultTemplate;
        }
        if (template == null) {
            throw new IllegalStateException("Template isn't defined, configuration property '" + TEMPLATE_PROP + "' is empty");
        }

        mailMessage.getMimeMessageHelper().setText(convertFromTemplate(template, messageEvent), true);

        return mailMessage;
    }

Examples of templates:

simpleTemplate.vm
Simple Template Test
FIX: $event.getMessage()
mimeTemplate.vm
<b>MIME Template Test</b><br>
<i>FIX:</i> $event.getMessage()<br

Example of a rule that sends all FIXSessionStateEvent to SMTP destination endpoint with templates:

event_rules.groovy
    // send AppEvent with default template from config
    eventRule("Sample event rule").
        eventType(FIXSessionStateEvent.class).
        condition({ event -> return true }).
        action({
            event -> rc.getDestinationById("smtp-adapter").send(event)
        }).
        build(),

    // send AppEvent with specific template
    eventRule("Sample event rule").
        eventType(FIXSessionStateEvent.class).
        condition({ event -> return true }).
        action({
            event -> rc.getDestinationById("smtp-adapter").send(event, "newTemplateForThisEvent.vm")
        }).
        build()