Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Transport adapter

Implementation

...

Code Block
languagexml
<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter"
         c:destinationsRegister-ref="destinationsRegister"/>

JMS

...

Endpoint

There are several ways to add JMS connectivity into FEJ container. fej-jms.xml configuration file already contains basic configuration for JMS adapter:

...

  1. Declaring ConnectionFactory in Spring and inject it in SimpleJmsContextFactory:

    Code Block
    languagexml
    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="jmsContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>
    
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
          c:brokerURL-ref="${jms.broker.url}"/>
    
    <bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory"
          c:connectionFactory-ref="jmsConnectionFactory"/>
  2. Implement your own JmsContextFactory and pass it as a parameter for DefaultJmsConfigRegister:

    Code Block
    languagejava
    public class ActiveMqJmsContextFactory implements JmsContextFactory {
        @Override
        public JMSContext createContext(JmsConfig config) {
            return new SimpleJmsContext(new ActiveMQConnectionFactory(
                    config.getConnectionInfo().getProviderURL()), config.getConnectionInfo());
        }
    }
    Code Block
    languagexml
    <bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/>
    
    <bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
          p:jmsManager-ref="jmsAdaptorManager"
          p:jmsContextFactory-ref="activeMqContextFactory"
          c:config-ref="jmsConfig" init-method="init"/>

Persistence API

Persistent API provides easy and fast way to storing data. It support two main storages: PersistentSequence and PersistentQueue.

PersistentSequence

PersistentSequence provides functionality for storing sequential data (for example, logs). Each record should have unique index. Indexer implementation is used to provide index for certain storing record. Also PersistentSequence requires custom Serializer implementation to serialize objects to byte buffer and restore it back.

...

languagejava

...

Kafka Endpoint

Kafka Endpoint is a FEJ transport adaptor, which provides connectivity to the Kafka streaming platform. Its implementation is based on FIX Antenna Java Kafka Adapter and it uses internally Kafka Producer and Consumer API.
Inside FIX Edge Java server, each instance of Kafka producer is represented as destination endpoint and each instance of consumer - as source endpoint. After the server start, they are available for using them in routing rules, to accept messages from topics and to send them to Kafka.

Configuration

Basic Kafka parameters are described here.

There are several ways to add Kafka connectivity into FIXEdge/J container. fej-kafka.xml configuration file already contains basic configuration for Kafka adaptor:

Code Block
languagexml
titlefej-kafka.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
        return item.getId();xmlns:c="http://www.springframework.org/schema/c"
    } };  Serializer<MyRecord> serializer = new Serializer<MyRecord>() {xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

     @Override<bean   id="kafkaProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
 public void serialize(MyRecord item, ElasticByteBuffer buffer) {   p:location="classpath:kafka-adaptor.properties"/>

    // serialize instance ...<bean id="kafkaConfig" class="com.epam.fixengine.kafka.config.Config"
    }      @Overridec:prefix="kafka"
    public MyRecord deserialize(ElasticByteBuffer buffer,  int length) { c:properties-ref="kafkaProperties">
    </bean>

   MyRecord record<bean id= new MyRecord;"kafkaConfigRegister" class="com.epam.fej.kafka.DefaultKafkaConfigsRegister"
         // load data to instance... p:clientManager-ref="kafkaClientManager"
           return record;c:config-ref="kafkaConfig"/>

     }
};

final PersistentSequence<MyRecord> sequence = factory.buildSequence("seq_sample", indexer, serializer);

//store record to sequence
sequence.append(record);

In additional to storing data PersistentSequence provides methods for retrieving records from storage. It supports reading single item by index or iterating through items:

Code Block
languagejava
//get single item with index 100
final MyRecord myRecord = sequence.get(100);

//iterate items from index 0 till 100
sequence.retrieveItems(0, 100, new RetrieveSequenceItemsListener<MyRecord>() {
    @Override
    public void onItem(long id, MyRecord record) {
        //...
    }
}, true);

There is a possibility to review stored records and remove some of them from storage:

Code Block
languagejava
sequence.cleanUp(new CleanupItemListener<MyRecord>() {
    @Override
    public boolean checkForClean(long id, MyRecord item) {
        //return true to removed this record from storage
        return false;
    }
});

Or remove all and reset sequence:

Code Block
languagejava
//remove all items and reset index
sequence.reset();

Default PersistentSequence implementation is optimized for writing data and reading operations may take a bit more time.

PersistentQueue

PersistentQueue works like a queue but persist all items to disk. Thus it can restore its state after application restart. PersistentQueue has similar to java.util.Queue methods:

Code Block
languagejava
final PersistentQueue<MyRecord> queue = factory.buildQueue("queue_sample", serializer);

//push item to the tail of queue
queue.add(record);

// read iem from head but doesn't remove
record = queue.peek();

// extract item from head and remove it from queue
record = queue.poll();

Also PersistentQueue allow to iterate all its items:

Code Block
languagejava
queue.iterate(new RetrieveQueueItemsListener<MyRecord>() {
    @Override
    public void onItem(MyRecord record) {
        //....
    }
});

To remove all items from queue you can use clean method:

Code Block
languagejava
//remove all items and clean file
queue.clear(<bean id="kafkaClientFactory" class="com.epam.fixengine.kafka.client.ClientFactory" factory-method="getInstance"/>

    <bean id="kafkaClientManager" class="com.epam.fej.kafka.DefaultKafkaClientManager"
          c:endpointRegistry-ref="endpointRegistry"
          c:clientFactory-ref="kafkaClientFactory"
          c:messageEventPool-ref="messageEventPool"
          depends-on="rulesConfigManager"/>

</beans>

kafkaConfigRegister bean is responsible for loading Kafka session contexts (SessionContext) from configuration file and registering them with kafkaClientManager (com.epam.fej.kafka.DefaultKafkaClientManager) for routing engine. kafkaClientManager builds source and destination endpoints adapters from given SessionContext objects and registers them in the server.

Properties

Kafka Endpoints parameters are described in conf/kafka-adaptor.properties file. The configuration should include a list of Kafka endpoints and link them to corresponding Kafka topics. Also, serializers/deserializers should be defined :

Code Block
languagebash
# list of kafka endpoints (consumers and producer)
kafka.clients = KProducer, KConsumer

# main properties to specify producer's id and topic
kafka.producer.KProducer.client.id = KProducer
kafka.producer.KProducer.topic = PTopic1
# Type is class. Serializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeySerializer interface.
kafka.producer.key.serializer = com.epam.fej.kafka.FIXMessageEventSerializer
# Type is class. Serializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueSerializer interface.
kafka.producer.value.serializer = com.epam.fej.kafka.FIXMessageEventSerializer


# main properties to specify consumer's id and topic
kafka.consumer.KConsumer.client.id = KConsumer
kafka.consumer.KConsumer.topics = KTopic2
# Type is class. Deserializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeyDeserializer interface.
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# Type is class. Deserializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueDeserializer interface.
kafka.consumer.value.deserializer = com.epam.fej.kafka.FIXMessageEventDeserializer


Each Kafka Endpoint (producer or consumer) can be additionally configured with original Kafka Producer or Consumer configuration options if they are added after the client' prefix:

Code Block
languagebash
# Apply specific batch size for KProducer producer client only
kafka.producer.KProducer.batch.size = 32768

# wait for the full set of in-sync replicas to acknowledge the record
kafka.producer.KProducer.acks = all

# Apply batch size for every producer (default value for all producers)
kafka.producer.batch.size = 16384


FEJ allows adding a set of options, which affect endpoint behavior. They can be added in the way, described above, separately for each client:

Code Block
languagebash
# set 'groups' property for endpoint. KProducer is assigned to groups 'md_consumer' and 'usa1'
kafka.producer.KProducer.groups = md_consumer, usa1

Also, this file can be extended with additional generic Kafka configuration options followed by the prefix 'kafka.'
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
kafka.bootstrap.servers = localhost:9092


Kafka Producer and Consumer API, which are used in FEJ, allows defining serializer/deserializer for clients. So it’s possible to transform messages to Avro format with custom code. Just pay attention that FEJ sends and expects as an input serialized to byte array FIX messages (key-value pairs).

Business Rules

In the case when the FIXEdge/J acts as a producer and forwards messages to Kafka topics, please use the following business rule:

Code Block
languagegroovy
titlerules.groovy
[
	rule("Route NewOrder to Kafka")
		.sourceCondition({
		    //static filter for sesion
		    source -> source.id == "fix_client"
		})
		.condition({
			// context filter - apply this rule for New Order - Single (D) messages
			ctx -> ctx.getMessage().getTagValueAsString(35) == "D"
		})
		.action({
			// action for rule - send message to Kafka client with id 'KProducer'
			ctx ->
				routingContext.getDestinationById("KProducer").send(ctx.messageEvent)
				ctx.exit()
		})
		.build()

]


In the case when the FIXEdge/J acts as a consumer and subscribes for messages in Kafka topics, please use the following business rule:

Code Block
languagegroovy
titlerules.groovy
[
    rule("Route all messages from Kafka")
            .sourceCondition({
                params ->
                    return params.getGroups().contains("kafka_consumer")
            })
            .action({
                ctx->
                    def msg = ctx.getMessage()
                    logger.info("Processing message '{}' has been started", msg)
                    def sessionDestination = stringValue(msg, Header.DeliverToCompID)
                    send(routingContext, ctx, sessionDestination)
                    ctx.exit()
                    logger.info("Processing message '{}' has been finished: {}", msg, new java.util.Date().toInstant())
            })
            .build()
]

Camel destination endpoint

Camel framework is integrated into FIXEdge java as a destination endpoint which allows publishing the messages using the integrated components.

Configuration

Spring configuration is used for the configuration of camel routes.

Two types of endpoints are used in camel route, they are 'from' and 'to'.

  • 'from' endpoint is preconfigured in FEJ and must be the following "<from uri="direct:routingRule"/>". It is used by the camel endpoint for sending messages handled in groovy rules.
  • 'to' endpoints can be as many as required, they are implemented in camel and can be found in official camel documentation.

Example of configuration

The configuration is located in fej-camel-context.xml file.

Code Block
titleExampleof fej-camel-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
       ">

    <context:component-scan base-package="com.epam.fej.camel"/>

    <camelContext id="<context_id>" xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:routingRule"/>
            <to uri="google-pubsub://<project_id>:<topic_name>"/>
            <to uri="kafka:<topic_name>?brokers=<ip>:<port>"/>
        </route>
    </camelContext>

</beans>

Where <context_id> is an identification of the configuration which is used in groovy rules routing the messages.

Example of rule

Code Block
languagegroovy
rule("Route news messages to google pub/sub")
                .action({
                    ctx ->
                        try {
                            send(routingContext, ctx, "<context_id>")
                        } catch (all) {
                            logger.error(all.getMessage(), all)
                        }
                        ctx.exit()
                })
        .build()


note: please bear in mind each camel configuration specified should be started by the scheduler.

Code Block
titleExample of starting camel context in scheduler
collapsetrue
<?xml version="1.0" encoding="UTF-8"?>
<schedules xmlns="http://epam.com/fej/schedules">
    <schedule id="<context_id>">
        <task name="start" onLoad="true"/>
    </schedule>
</schedules>


SMTP Destination Endpoint

JavaMail framework integrated into FIXEdge Java in form of SMTP destination endpoints and give ability to send emails to external SMTP servers.

Configuration

Spring configuration used to config SMTP destination endpoints with their own properties for connection to external SMTP servers and converters to prepare automatic emails from FIX messages.

Example of Spring configuration

Code Block
languagexml
titleExample of 'fej-smtp.xml'
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:c="http://www.springframework.org/schema/c"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <bean id="smtpProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
          p:location="classpath:smtp-adaptor.properties"/>

    <bean id="smtpConfigProvider" class="com.epam.fej.smtp.SMTPConfigProvider"
          c:properties-ref="smtpProperties">
    </bean>

    <context:component-scan base-package="com.epam.fej.smtp"/>
</beans>

SMTPConfigProvider prepares SMTP configuration objects from 'smtp-adaptor.properties' file to use them in SMTPAdapterManager for SMTP endpoints creation.

SMTPAdapterManager and SMTPDestinationEnpoints configuration

SMTPAdapterManager created as Spring Configuration Bean and on it's initialization creates and registersSMTPDestinationEndpoint objects - FIXEdge Java destination endpoints that can be used in routing rules to send emails to external SMTP servers.

Each SMTPDestinationEndpoint contains:

  • JavaMailSender - prepared SMTP client to connect to configured SMTP server;
  • SimpleMailMessage - prepared template with configured 'From/To/CC/BCC/Subject' values that will be common for all emails from this endpoint;
  • EmailConverter - converter class with clients logic to convert info from MessageEvent (FIX message) and set email's text for this endpoint emails.
Code Block
languagejava
titleSMTPAdapterManager initialization
    public SMTPAdapterManager(final EndpointRegistry endpointRegistry,
                              final ApplicationContext applicationContext,
                              final MessageEventPool messageEventPool,
                              final SMTPConfigProvider configProvider) {
        this.endpointRegistry = endpointRegistry;
        this.applicationContext = applicationContext;
        this.messageEventPool = messageEventPool;
        this.configProvider = configProvider;
    }

    public void init() {
        // get all SMTP configs from configProvider
        Set<String> smtpConfigNames = configProvider.getConfigs();
        for (String id : smtpConfigNames) {
            SMTPConfig config = configProvider.getConfig(id);
            GeneralEndpointParams endpointParams = new GeneralEndpointParams();
            endpointParams.setId(id);
            endpointParams.setGroups(config.getGroups());
            endpointParams.setTargetCompId(id);

            // prepare sender, template and converter
            JavaMailSender sender = buildMailSender(config);
            SimpleMailMessage template = buildTemplateMessage(config);
            EmailConverter converter = buildEmailConverter(config);

            // create and register SMTP destination endpoint
            SMTPDestinationEndpoint smtpEndPoint =
                    new SMTPDestinationEndpoint(endpointParams, messageEventPool, sender, converter, template);
            endpointRegistry.register(smtpEndPoint);
        }
    }

Configuration properties

List of main FEJ SMTP endpoint properties:

PropertyRequiredDescription
mail.clientsmandatoryComma separated list of FEJ SMTP client ids
mail.<clientId>.hostmandatoryThe SMTP server to connect to
mail.<clientId>.portmandatoryThe SMTP server port to connect to
mail.<clientId>.usernamemandatoryThe SMTP user name to connect with
mail.<clientId>.passwordoptionalThe SMTP user's password, optional if SMTP support user access without password
mail.<clientId>.protocoloptionalProtocol for connection to SMTP server. 'smtp' supported
mail.<clientId>.subjectmandatoryDefault email's subject
mail.<clientId>.tomandatoryDefault email's 'To' comma separated list
mail.<clientId>.ccoptionalDefault email's 'Cc' comma separated list
mail.<clientId>.bccoptionalDefault email's 'Bcc' comma separated list
mail.<clientId>.groupsoptionalFEJ groups for routing to this SMTP endpoint
mail.<clientId>.converterRefoptionalConverter instance, as referenced Spring bean name, to convert incoming MessageEvent to email fields
mail.<clientId>.converterClassoptionalConverter class to create converter instance on endpoint initialization

List of main JavaMail properties, set with "mail.<clientId>'.properties" prefix (full list of properties could be found here)

PropertyDescription
mail.smtp.fromEmail address to use for SMTP MAIL command. This sets the envelope return address.
mail.smtp.authIf true, attempt to authenticate the user using the AUTH command.
mail.smtp.socketFactory.portSpecifies the port to connect to when using the specified socket factory.
mail.smtp.socketFactory.classIf set, specifies the name of a class that implements the javax.net.SocketFactory interface. This class will be used to create SMTP sockets
mail.smtp.starttls.enableIf true, enables the use of the STARTTLS command (if supported by the server) to switch the connection to a TLS-protected connection before issuing any login commands. If the server does not support STARTTLS, the connection continues without the use of TLS; see the mail.smtp.starttls.required property to fail if STARTTLS isn't supported.
mail.smtp.starttls.requiredIf true, requires the use of the STARTTLS command. If the server doesn't support the STARTTLS command, or the command fails, the connect method will fail.
Code Block
languageyml
titleSMTP properties example
# comma separated list of SMTP clients
mail.clients=smtp1

# SMTP server connection details
mail.smtp1.host=smtp.gmail.com
mail.smtp1.port=587
mail.smtp1.username=smtp-user1
mail.smtp1.password=pass1

# default Subject/To/CC/BCC values
mail.smtp1.subject=Test subject
mail.smtp1.to=<to list>
mail.smtp1.cc=<cc list>
mail.smtp1.bcc=<bcc list>
mail.smtp1.protocol=smtp

# FEJ routing groups
mail.smtp1.groups=mail1-group

# converter class or Spring reference bean name
mail.smtp1.converterClass=com.epam.fej.smtp.TestEmailConverter
mail.smtp1.converterRef=testEmailConverter

# additional SMTP properties
mail.smtp1.properties.mail.smtp.auth=true
mail.smtp1.properties.mail.smtp.starttls.enable=true

Converters

Converter must implement EmailConverter interface

Code Block
languagejava
titleEmailConverter interface
package com.epam.fej.smtp;

import com.epam.fej.routing.MessageEvent;
import org.springframework.mail.SimpleMailMessage;

public interface EmailConverter {
    SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to);
}

There are 3 possible ways to set SMTP endpoint converter:

  • FIXToEmailConverter - default converter to set email's text to FIX message
  • use 'converterClass' property - instance of converter created when SMTP endpoint initialized
  • use 'converterRef' property - define Spring bean in FEJ config (EmailConverter implemented) and set reference to it

Custom converter can implement client's specific logic to set/change email fields (Subject/To/CC/BCC/Text).

Code Block
languagejava
titleTestEmailConverter example
package com.epam.fej.smtp;

import com.epam.fej.routing.MessageEvent;
import org.springframework.mail.SimpleMailMessage;

public class TestEmailConverter implements EmailConverter {

    @Override
    public SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to) {
        // change Text to static title
        to.setText("Text from simple TestEmailConverter");
        
        // set Text to FIX message from incoming MessageEvent
        to.setText(from.getMessage().toPrintableString());

        // possible dynamicaly change Subject/To/CC/BCC fields
        to.setSubject(...);
        to.setTo(...);
        to.setCc(...);
        to.setBcc(...);

        return to;
    }
    
}

Gmail properties

Code Block
languageyml
titleSMTP properties to send emails from Gmail
mail.clients=smtp-gmail

mail.smtp-gmail.host=smtp.gmail.com
mail.smtp-gmail.port=587
mail.smtp-gmail.username=<user_name>
mail.smtp-gmail.password=<pass>

mail.smtp-gmail.subject=Test subject
mail.smtp-gmail.to=<to>
mail.smtp-gmail.cc=<cc>
mail.smtp-gmail.bcc=<bcc>
mail.smtp-gmail.protocol=smtp

mail.smtp-gmail.groups=gmail-group
#mail.smtp-gmail.converterClass=com.epam.fej.smtp.TestEmailConverter
#mail.smtp-gmail.converterRef=testEmailConverter

mail.smtp-gmail.properties.mail.smtp.auth=true
mail.smtp-gmail.properties.mail.smtp.starttls.enable=true
mail.smtp-gmail.properties.mail.smtp.host=smtp.gmail.com
mail.smtp-gmail.properties.mail.smtp.port=587
mail.smtp-gmail.properties.mail.smtp.socketFactory.port=587
mail.smtp-gmail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
#mail.smtp-gmail.properties.mail.debug=true


Persistence API

Persistent API provides easy and fast way to storing data. It support two main storages: PersistentSequence and PersistentQueue.

PersistentSequence

PersistentSequence provides functionality for storing sequential data (for example, logs). Each record should have unique index. Indexer implementation is used to provide index for certain storing record. Also PersistentSequence requires custom Serializer implementation to serialize objects to byte buffer and restore it back.

Code Block
languagejava
PersistenceFactory factory = ...

Indexer<MyRecord> indexer = new Indexer<MyRecord>() {
    @Override
    public long getIndex(MyRecord item) {
        return item.getId();
    }
};

Serializer<MyRecord> serializer = new Serializer<MyRecord>() {
    @Override
    public void serialize(MyRecord item, ElasticByteBuffer buffer) {
        // serialize instance ...
    }

    @Override
    public MyRecord deserialize(ElasticByteBuffer buffer, int length) {
        MyRecord record = new MyRecord;
        // load data to instance...
        return record;
    }
};

final PersistentSequence<MyRecord> sequence = factory.buildSequence("seq_sample", indexer, serializer);

//store record to sequence
sequence.append(record);

In additional to storing data PersistentSequence provides methods for retrieving records from storage. It supports reading single item by index or iterating through items:

Code Block
languagejava
//get single item with index 100
final MyRecord myRecord = sequence.get(100);

//iterate items from index 0 till 100
sequence.retrieveItems(0, 100, new RetrieveSequenceItemsListener<MyRecord>() {
    @Override
    public void onItem(long id, MyRecord record) {
        //...
    }
}, true);

There is a possibility to review stored records and remove some of them from storage:

Code Block
languagejava
sequence.cleanUp(new CleanupItemListener<MyRecord>() {
    @Override
    public boolean checkForClean(long id, MyRecord item) {
        //return true to removed this record from storage
        return false;
    }
});

Or remove all and reset sequence:

Code Block
languagejava
//remove all items and reset index
sequence.reset();

Default PersistentSequence implementation is optimized for writing data and reading operations may take a bit more time.

PersistentQueue

PersistentQueue works like a queue but persist all items to disk. Thus it can restore its state after application restart. PersistentQueue has similar to java.util.Queue methods:

Code Block
languagejava
final PersistentQueue<MyRecord> queue = factory.buildQueue("queue_sample", serializer);

//push item to the tail of queue
queue.add(record);

// read iem from head but doesn't remove
record = queue.peek();

// extract item from head and remove it from queue
record = queue.poll();

Also PersistentQueue allow to iterate all its items:

Code Block
languagejava
queue.iterate(new RetrieveQueueItemsListener<MyRecord>() {
    @Override
    public void onItem(MyRecord record) {
        //....
    }
});

To remove all items from queue you can use clean method:

Code Block
languagejava
//remove all items and clean file
queue.clear();

Event API

Overview

It requires an API for communication and notifying about the actions occur in the components of the system. In order not to to have tight coupling between modules and their API, Event API was introduced. The API is also used for handling events by custom rules which user can implement in groovy

Event bus API

Event bus is represented by com.epam.fej.event.EventBus interface which can be injected in any required module and has the following methods.

MethodDescription
<T extends AppEvent> void publish(T appEvent)Publishes the event provided to subscribers.
<T extends AppEvent> void publish(T appEvent, boolean sync)The same as the previous method but with an ability to specify the mode of publishing.
<T extends AppEvent> void publish(T appEvent, boolean sync, Predicate<T> afterRuleCallback, Consumer<T> afterPublishingCallback)afterRuleCallback - callback which decides if further processing of rules is executed
afterPublishingCallback - callback which is called either after processing all applicable rules or at the end of the method if there are not any rules for the event
<T extends AppEvent> void subscribe(Consumer<T> consumer, Class<T> type)Subscribes for the specified type of event.

It is also possible to register event listener by marking it com.epam.fej.event.annotation.EventSubscriber annotation. The class marked should implement java.util.function.Consumer interface and be registered as a spring bean.

Event bus Implementation

Event API based on custom implementation of event bus interface because none of analyzed libraries suits the requirement of garbage free. It is com.epam.fej.event.EventBusImpl class which uses pool of events to avoid garbage. The declaration is in com.epam.fej.event.SpringConfiguration. The implementation notifies subscribers asynchronously using specified executor with one thread in its pool to have the event ordered.

Event classes

com.epam.fej.event.AppEvent interface implementations are expected as events for event bus. It implies re-usability of the instances and creating them only from specific pool.

com.epam.fej.event.EventPool is static class which can create an instance of event or retrieve it from pool by method getEvent(Class<T> type).
note: you do not need to release the objects, event bus does it automatically after notifying all subscribers.

Example of creating and pushing an event

Code Block
languagejava
SchedulerEvent schedulerEvent = EventPool.getEvent(SchedulerEvent.class);
schedulerEvent.setId(id);
eventBus.publish(schedulerEvent);