Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Transport adapter

Implementation

FEJ routing server provides the ability to integrate custom 3rdparty transports and use them for routing messages. For such goal server provides few interfaces for implementations:

Endpoint is a basic interface that represents instance of some entry, which could be unique identified by routing engine for receiving or delivering messages. EndpointParams provides unique id of such endpoint and other additional attributes, which may be useful for routing logic.

Code Block
languagejava
public interface Endpoint {
    EndpointParams getParams();
}

SourceEndpoint interface represent a provider of messages for routing engine.

Code Block
languagejava
public interface SourceEndpoint extends Endpoint {
    void setListener(SourceMessageListener listener);
}

It offers the ability to register listeners SourceMessageListener of incoming messages for external systems.

Code Block
languagejava
public interface SourceMessageListener {
    void onNewMessage(FIXFieldList message);
}

DestinationEndpoint represent a target of routing rules. It allows you to pass routed massage to the certain system.

Code Block
languagejava
public interface DestinationEndpoint extends Endpoint {
    /**
      * Send a {@link FIXFieldList} to this adapter. If the message is sent successfully,
      * the method returns {@code true}. If the message cannot be sent due to a
      * non-fatal reason, the method returns {@code false}. The method may also
      * throw a RuntimeException in case of non-recoverable errors.
      * <p>This method may block indefinitely, depending on the implementation.
      *
      * @param message the message to send
      * @return whether or not the message was sent
      */
     boolean send(FIXFieldList message);
}

There is also mechanism for registering such sources and destinations into server. After registration such endpoints will be accessible for routing engine. Registration of sources and destinations are independent. It means that you can register source and destination endpoint with the same id. This is especially important for bidirectional transports like FIX, where in and out connections are identified by the same parameters. For such transports exists 'BidirectionalEndpoint' interface.

Code Block
languagejava
public interface BidirectionalEndpoint extends ConsumerEndpoint, ProducerEndpoint {
}

Anyway, there are two separate interface for registering sources and destinations:

Code Block
languagejava
public interface SourceEndpointRegistry {
    void registerConsumer(SourceEndpoint consumer);
    void removeConsumer(String id);
}

public interface DestinationEndpointRegistry {
    void registerProducer(DestinationEndpoint producer);
    void removeProducer(String id);
}

Both they are implemented by EndpointRegistryAdapter class. it is available for accessing via Spring config:

Code Block
languagexml
<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter"
         c:destinationsRegister-ref="destinationsRegister"/>

JMS Transport Adapter

There are several ways to add JMS connectivity into FEJ container. fej-jms.xml configuration file already contains basic configuration for JMS adapter:

Code Block
languagexml
<bean id="jmsProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
      p:location="classpath:jms-adaptor.properties"/>

<bean id="jmsConfig" class="com.epam.fixengine.jms.config.Config"
      c:prefix="jms.adaptor"
      c:properties-ref="jmsProperties">
</bean>

<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
      p:jmsManager-ref="jmsAdaptorManager"
      c:config-ref="jmsConfig" init-method="init"/>

<bean id="jmsClientFactory" class="com.epam.fixengine.jms.client.JMSClientFactory" factory-method="getInstance"/>

<bean id="jmsAdaptorManager" class="com.epam.fej.jms.JmsAdapterManager"
      c:endpointRegistry-ref="endpointRegistry"
      c:clientFactory-ref="jmsClientFactory"
      depends-on="rulesConfigManager"/>

jms-adaptor.properties file contains parameters for JMS producers and consumers (FIXAJ JMS Adaptor properties). jmsConfigRegister bean (com.epam.fej.jms.DefaultJmsConfigsRegister) is responsible for loading JMS session contexts (SessionContext) from configuration file and registering them with jmsAdaptorManager (com.epam.fej.jms.JmsAdaptorManager) for routing engine. JmsAdaptorManager builds source and destination endpoints adapters from given SessionContext objects and register them in server.

Info

If you want use your own Configuration Factory you can use JmsManager implementation for building and registering SessionContext instances also.

DefaultJmsConfigsRegister produces SessionContext via JmsContextFactory implementation. By default it uses com.epam.fej.jms.JndiJmsSessionContextFactory implementation but you can set you own implementation via DefaultJmsConfigsRegister.setJmsContextFactory(JmsContextFactory jmsContextFactory). Also you can use com.epam.fej.jms.SimpleJmsContextFactory with your definition javax.jms.ConnectionFactory

Code Block
languagejava
public SessionContext createSessionContext(JmsConfig config) {
    final ConnectionInfo connectionInfo = config.getConnectionInfo();
    final SessionInfo sessionInfo = config.getSessionInfo();
    return new SessionContext(connectionInfo, sessionInfo, null, jmsContextFactory.createContext(config));
}
Info

Please note that ConnectionInfo and SessionInfo classes support loading of custom properties from configuration files:

Code Block
languagejava
final Properties additionalProperties = connectionInfo.getAdditionalProperties();

final Map<String, Object> additionalParams = sessionInfo.getAdditionalParams();

Custom connection factory instead of JNDI

Custom jms connection factory could be used in few ways:

  1. Declaring ConnectionFactory in Spring and inject it in SimpleJmsContextFactory:

    Code Block
    languagexml
    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="jmsContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>
    
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
          c:brokerURL-ref="${jms.broker.url}"/>
    
    <bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory"
          c:connectionFactory-ref="jmsConnectionFactory"/>
  2. Implement your own JmsContextFactory and pass it as a parameter for DefaultJmsConfigRegister:

    Code Block
    languagejava
    public class ActiveMqJmsContextFactory implements JmsContextFactory {
        @Override
        public JMSContext createContext(JmsConfig config) {
            return new SimpleJmsContext(new ActiveMQConnectionFactory(
                    config.getConnectionInfo().getProviderURL()), config.getConnectionInfo());
        }
    }
    Code Block
    languagexml
    <bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/>
    
    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="activeMqContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>

Persistence API

Persistent API provides easy and fast way to storing data. It support two main storages: PersistentSequence and PersistentQueue.

PersistentSequence

PersistentSequence provides functionality for storing sequential data (for example, logs). Each record should have unique index. Indexer implementation is used to provide index for certain storing record. Also PersistentSequence requires custom Serializer implementation to serialize objects to byte buffer and restore it back.

Code Block
languagejava
PersistenceFactory factory = ...

Indexer<MyRecord> indexer = new Indexer<MyRecord>() {
    @Override
    public long getIndex(MyRecord item) {
        return item.getId();
    }
};

Serializer<MyRecord> serializer = new Serializer<MyRecord>() {
    @Override
    public void serialize(MyRecord item, ElasticByteBuffer buffer) {
        // serialize instance ...
    }

    @Override
    public MyRecord deserialize(ElasticByteBuffer buffer, int length) {
        MyRecord record = new MyRecord;
        // load data to instance...
        return record;
    }
};

final PersistentSequence<MyRecord> sequence = factory.buildSequence("seq_sample", indexer, serializer);

//store record to sequence
sequence.append(record);

In additional to storing data PersistentSequence provides methods for retrieving records from storage. It supports reading single item by index or iterating through items:

Code Block
languagejava
//get single item with index 100
final MyRecord myRecord = sequence.get(100);

//iterate items from index 0 till 100
sequence.retrieveItems(0, 100, new RetrieveSequenceItemsListener<MyRecord>() {
    @Override
    public void onItem(long id, MyRecord record) {
        //...
    }
}, true);

There is a possibility to review stored records and remove some of them from storage:

Code Block
languagejava
sequence.cleanUp(new CleanupItemListener<MyRecord>() {
    @Override
    public boolean checkForClean(long id, MyRecord item) {
        //return true to removed this record from storage
        return false;
    }
});

Or remove all and reset sequence:

Code Block
languagejava
//remove all items and reset index
sequence.reset();

Default PersistentSequence implementation is optimized for writing data and reading operations may take a bit more time.

PersistentQueue

PersistentQueue works like a queue but persist all items to disk. Thus it can restore its state after application restart. PersistentQueue has similar to java.util.Queue methods:

Code Block
languagejava
final PersistentQueue<MyRecord> queue = factory.buildQueue("queue_sample", serializer);

//push item to the tail of queue
queue.add(record);

// read iem from head but doesn't remove
record = queue.peek();

// extract item from head and remove it from queue
record = queue.poll();

Also PersistentQueue allow to iterate all its items:

Code Block
languagejava
queue.iterate(new RetrieveQueueItemsListener<MyRecord>() {
    @Override
    public void onItem(MyRecord record) {
        //....
    }
});

To remove all items from queue you can use clean method:

Code Block
languagejava
//remove all items and clean file
queue.clear();

Routing Rules

FIX Edge Java provides an RoutingRule unit as an abstraction for internal message routing element. FEJ supports pure Java and Groovy implementations for routing rules.

RoutingRule requires few components for its instantiation:

Code Block
languagejava
public RoutingRule(java.lang.String description,
                   SourceCondition sourceFilter,
                   RuleCondition contectFilter,
                   RuleAction action)
  • description - String with free test description of rule

  • source filter - check if this rule should be applied to messages from certain source. This filter was added as a separate with propose of optimization process. Such filter can by applied on static basis without addition affect in runtime. Source filter is SourceCondition implementation and can be null if you’d like to ignore it.

  • context filter - dynamic filter, which can check in the same time appliance of this rule depends on message content and source attributes. Context filter is RuleCondition implementation and can be null if you’d like to ignore it.

  • action - implementation of RuleAction which describes the main goal of this rule. It can be transformation, modification or just resending to required destination.

Sample of routing rule:

Code Block
languagegroovy
import com.epam.fej.routing.RoutingContext
import com.epam.fej.routing.rules.RoutingRule
import com.epam.fej.routing.rules.RuleAction
import com.epam.fej.routing.rules.RuleCondition

import static com.epam.fej.routing.CustomRoutingRules.getDefaultRules
import static com.epam.fej.routing.CustomRoutingRules.getRejectionRule

RoutingContext rc = routingContext as RoutingContext;
[
        new RoutingRule(
                // rule description
                "some Rule",
                //source filter - ignore for this rule
                null,
                // context filter - apply this rule for New Order - Single (D) messages
                { ctx -> ctx.getMessage().getTagValueAsString(35) == "D" } as RuleCondition,
                // action for rule - resend message to all session within same group
                // and stop message processing
                { ctx ->
                    rc.getDestinationsByGroup(ctx.sourceParams.groups).each { adapter ->
                        adapter.send(ctx.message)
                        ctx.exit()
                    }
                } as RuleAction),

        // append system rejection rules for not processed messages
        getRejectionRule(rc)
]