Transport adapter
Implementation
FEJ routing server provides the ability to integrate custom 3rdparty transports and use them for routing messages. For such goal server provides few interfaces for implementations:
Endpoint
is a basic interface that represents instance of some entry, which could be unique identified by routing engine for receiving or delivering messages. EndpointParams
provides unique id of such endpoint and other additional attributes, which may be useful for routing logic.
Code Block |
---|
|
public interface Endpoint {
EndpointParams getParams();
} |
SourceEndpoint interface represent a provider of messages for routing engine.
Code Block |
---|
|
public interface SourceEndpoint extends Endpoint {
void setListener(SourceMessageListener listener);
} |
It offers the ability to register listeners SourceMessageListener
of incoming messages for external systems.
Code Block |
---|
|
public interface SourceMessageListener {
void onNewMessage(FIXFieldList message);
} |
DestinationEndpoint represent a target of routing rules. It allows you to pass routed massage to the certain system.
Code Block |
---|
|
public interface DestinationEndpoint extends Endpoint {
/**
* Send a {@link FIXFieldList} to this adapter. If the message is sent successfully,
* the method returns {@code true}. If the message cannot be sent due to a
* non-fatal reason, the method returns {@code false}. The method may also
* throw a RuntimeException in case of non-recoverable errors.
* <p>This method may block indefinitely, depending on the implementation.
*
* @param message the message to send
* @return whether or not the message was sent
*/
boolean send(FIXFieldList message);
} |
There is also mechanism for registering such sources and destinations into server. After registration such endpoints will be accessible for routing engine. Registration of sources and destinations are independent. It means that you can register source and destination endpoint with the same id. This is especially important for bidirectional transports like FIX, where in and out connections are identified by the same parameters. For such transports exists 'BidirectionalEndpoint' interface.
Code Block |
---|
|
public interface BidirectionalEndpoint extends ConsumerEndpoint, ProducerEndpoint {
} |
Anyway, there are two separate interface for registering sources and destinations:
Code Block |
---|
|
public interface SourceEndpointRegistry {
void registerConsumer(SourceEndpoint consumer);
void removeConsumer(String id);
}
public interface DestinationEndpointRegistry {
void registerProducer(DestinationEndpoint producer);
void removeProducer(String id);
} |
Both they are implemented by EndpointRegistryAdapter
class. it is available for accessing via Spring config:
Code Block |
---|
|
<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter"
c:destinationsRegister-ref="destinationsRegister"/> |
JMS Endpoint
There are several ways to add JMS connectivity into FEJ container. fej-jms.xml
configuration file already contains basic configuration for JMS adapter:
Code Block |
---|
|
<bean id="jmsProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
p:location="classpath:jms-adaptor.properties"/>
<bean id="jmsConfig" class="com.epam.fixengine.jms.config.Config"
c:prefix="jms.adaptor"
c:properties-ref="jmsProperties">
</bean>
<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
p:jmsManager-ref="jmsAdaptorManager"
c:config-ref="jmsConfig" init-method="init"/>
<bean id="jmsClientFactory" class="com.epam.fixengine.jms.client.JMSClientFactory" factory-method="getInstance"/>
<bean id="jmsAdaptorManager" class="com.epam.fej.jms.JmsAdapterManager"
c:endpointRegistry-ref="endpointRegistry"
c:clientFactory-ref="jmsClientFactory"
depends-on="rulesConfigManager"/> |
jms-adaptor.properties file contains parameters for JMS producers and consumers (FIXAJ JMS Adaptor properties). jmsConfigRegister
bean (com.epam.fej.jms.DefaultJmsConfigsRegister) is responsible for loading JMS session contexts (SessionContext) from configuration file and registering them with jmsAdaptorManager (com.epam.fej.jms.JmsAdaptorManager) for routing engine. JmsAdaptorManager builds source and destination endpoints adapters from given SessionContext objects and register them in server.
Info |
---|
If you want use your own Configuration Factory you can use JmsManager implementation for building and registering SessionContext instances also. |
DefaultJmsConfigsRegister produces SessionContext
via JmsContextFactory
implementation. By default it uses com.epam.fej.jms.JndiJmsSessionContextFactory implementation but you can set you own implementation via DefaultJmsConfigsRegister.setJmsContextFactory(JmsContextFactory jmsContextFactory)
. Also you can use com.epam.fej.jms.SimpleJmsContextFactory with your definition javax.jms.ConnectionFactory
Code Block |
---|
|
public SessionContext createSessionContext(JmsConfig config) {
final ConnectionInfo connectionInfo = config.getConnectionInfo();
final SessionInfo sessionInfo = config.getSessionInfo();
return new SessionContext(connectionInfo, sessionInfo, null, jmsContextFactory.createContext(config));
} |
Info |
---|
Please note that ConnectionInfo and SessionInfo classes support loading of custom properties from configuration files: Code Block |
---|
| final Properties additionalProperties = connectionInfo.getAdditionalProperties();
final Map<String, Object> additionalParams = sessionInfo.getAdditionalParams(); |
|
Custom connection factory instead of JNDI
Custom jms connection factory could be used in few ways:
Declaring ConnectionFactory
in Spring and inject it in SimpleJmsContextFactory
:
Code Block |
---|
|
<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
p:jmsManager-ref="jmsAdaptorManager"
p:jmsContextFactory-ref="jmsContextFactory"
c:config-ref="jmsConfig" init-method="init"/>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
c:brokerURL-ref="${jms.broker.url}"/>
<bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory"
c:connectionFactory-ref="jmsConnectionFactory"/> |
Implement your own JmsContextFactory
and pass it as a parameter for DefaultJmsConfigRegister
:
Code Block |
---|
|
public class ActiveMqJmsContextFactory implements JmsContextFactory {
@Override
public JMSContext createContext(JmsConfig config) {
return new SimpleJmsContext(new ActiveMQConnectionFactory(
config.getConnectionInfo().getProviderURL()), config.getConnectionInfo());
}
} |
Code Block |
---|
|
<bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/>
<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
p:jmsManager-ref="jmsAdaptorManager"
p:jmsContextFactory-ref="activeMqContextFactory"
c:config-ref="jmsConfig" init-method="init"/> |
Kafka Endpoint
Kafka Endpoint is a FEJ transport adaptor, which provides connectivity to the Kafka streaming platform. Its implementation is based on FIX Antenna Java Kafka Adapter and it uses internally Kafka Producer and Consumer API.
Inside FIX Edge Java server, each instance of Kafka producer is represented as destination endpoint and each instance of consumer - as source endpoint. After the server start, they are available for using them in routing rules, to accept messages from topics and to send them to Kafka.
Configuration
Basic Kafka parameters are described here.
There are several ways to add Kafka connectivity into FIXEdge/J container. fej-kafka.xml configuration file already contains basic configuration for Kafka adaptor:
Code Block |
---|
language | xml |
---|
title | fej-kafka.xml |
---|
|
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:c="http://www.springframework.org/schema/c"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="kafkaProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
p:location="classpath:kafka-adaptor.properties"/>
<bean id="kafkaConfig" class="com.epam.fixengine.kafka.config.Config"
c:prefix="kafka"
c:properties-ref="kafkaProperties">
</bean>
<bean id="kafkaConfigRegister" class="com.epam.fej.kafka.DefaultKafkaConfigsRegister"
p:clientManager-ref="kafkaClientManager"
c:config-ref="kafkaConfig"/>
<bean id="kafkaClientFactory" class="com.epam.fixengine.kafka.client.ClientFactory" factory-method="getInstance"/>
<bean id="kafkaClientManager" class="com.epam.fej.kafka.DefaultKafkaClientManager"
c:endpointRegistry-ref="endpointRegistry"
c:clientFactory-ref="kafkaClientFactory"
c:messageEventPool-ref="messageEventPool"
depends-on="rulesConfigManager"/>
</beans> |
kafkaConfigRegister bean is responsible for loading Kafka session contexts (SessionContext) from configuration file and registering them with kafkaClientManager (com.epam.fej.kafka.DefaultKafkaClientManager) for routing engine. kafkaClientManager builds source and destination endpoints adapters from given SessionContext objects and registers them in the server.
Properties
Kafka Endpoints parameters are described in conf/kafka-adaptor.properties file. The configuration should include a list of Kafka endpoints and link them to corresponding Kafka topics. Also, serializers/deserializers should be defined :
Code Block |
---|
|
# list of kafka endpoints (consumers and producer)
kafka.clients = KProducer, KConsumer
# main properties to specify producer's id and topic
kafka.producer.KProducer.client.id = KProducer
kafka.producer.KProducer.topic = PTopic1
# Type is class. Serializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeySerializer interface.
kafka.producer.key.serializer = com.epam.fej.kafka.FIXMessageEventSerializer
# Type is class. Serializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueSerializer interface.
kafka.producer.value.serializer = com.epam.fej.kafka.FIXMessageEventSerializer
# main properties to specify consumer's id and topic
kafka.consumer.KConsumer.client.id = KConsumer
kafka.consumer.KConsumer.topics = KTopic2
# Type is class. Deserializer class for key that implements the com.epam.fej.kafka.KafkaEndpointKeyDeserializer interface.
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# Type is class. Deserializer class for value that implements the com.epam.fej.kafka.KafkaEndpointValueDeserializer interface.
kafka.consumer.value.deserializer = com.epam.fej.kafka.FIXMessageEventDeserializer
|
...
Code Block |
---|
|
# Apply specific batch size for KProducer producer client only
kafka.producer.KProducer.batch.size = 32768
# wait for the full set of in-sync replicas to acknowledge the record
kafka.producer.KProducer.acks = all
# Apply batch size for every producer (default value for all producers)
kafka.producer.batch.size = 16384
|
FEJ allows adding a set of options, which affect endpoint behavior. They can be added in the way, described above, separately for each client:
Code Block |
---|
|
# set 'groups' property for endpoint. KProducer is assigned to groups 'md_consumer' and 'usa1'
kafka.producer.KProducer.groups = md_consumer, usa1
Also, this file can be extended with additional generic Kafka configuration options followed by the prefix 'kafka.'
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
kafka.bootstrap.servers = localhost:9092 |
...
Business Rules
In the case when the FIXEdge/J acts as a producer and forwards messages to Kafka topics, please use the following business rule:
Code Block |
---|
language | groovy |
---|
title | rules.groovy |
---|
|
[
rule("Route NewOrder to Kafka")
.sourceCondition({
//static filter for sesion
source -> source.id == "fix_client"
})
.condition({
// context filter - apply this rule for New Order - Single (D) messages
ctx -> ctx.getMessage().getTagValueAsString(35) == "D"
})
.action({
// action for rule - send message to Kafka client with id 'KProducer'
ctx ->
routingContext.getDestinationById("KProducer").send(ctx.messageEvent)
ctx.exit()
})
.build()
] |
In the case when the FIXEdge/J acts as a consumer and subscribes for messages in Kafka topics, please use the following business rule:
Code Block |
---|
language | groovy |
---|
title | rules.groovy |
---|
|
[
rule("Route all messages from Kafka")
.sourceCondition({
params ->
return params.getGroups().contains("kafka_consumer")
})
.action({
ctx->
def msg = ctx.getMessage()
logger.info("Processing message '{}' has been started", msg)
def sessionDestination = stringValue(msg, Header.DeliverToCompID)
send(routingContext, ctx, sessionDestination)
ctx.exit()
logger.info("Processing message '{}' has been finished: {}", msg, new java.util.Date().toInstant())
})
.build()
] |
Camel destination endpoint
Camel framework is integrated into FIXEdge java as a destination endpoint which allows publishing the messages using the integrated components.
Configuration
Spring configuration is used for the configuration of camel routes.
Two types of endpoints are used in camel route, they are 'from' and 'to'.
- 'from' endpoint is preconfigured in FEJ and must be the following "<from uri="direct:routingRule"/>". It is used by the camel endpoint for sending messages handled in groovy rules.
- 'to' endpoints can be as many as required, they are implemented in camel and can be found in official camel documentation.
Example of configuration
The configuration is located in fej-camel-context.xml file.
Code Block |
---|
title | Exampleof fej-camel-context.xml |
---|
|
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
">
<context:component-scan base-package="com.epam.fej.camel"/>
<camelContext id="<context_id>" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:routingRule"/>
<to uri="google-pubsub://<project_id>:<topic_name>"/>
<to uri="kafka:<topic_name>?brokers=<ip>:<port>"/>
</route>
</camelContext>
</beans> |
Where <context_id> is an identification of the configuration which is used in groovy rules routing the messages.
Example of rule
Code Block |
---|
|
rule("Route news messages to google pub/sub")
.action({
ctx ->
try {
send(routingContext, ctx, "<context_id>")
} catch (all) {
logger.error(all.getMessage(), all)
}
ctx.exit()
})
.build() |
note: please bear in mind each camel configuration specified should be started by the scheduler.
Code Block |
---|
title | Example of starting camel context in scheduler |
---|
collapse | true |
---|
|
<?xml version="1.0" encoding="UTF-8"?>
<schedules xmlns="http://epam.com/fej/schedules">
<schedule id="<context_id>">
<task name="start" onLoad="true"/>
</schedule>
</schedules>
|
SMTP Destination Endpoint
JavaMail framework integrated into FIXEdge Java in form of SMTP destination endpoints and give ability to send emails to external SMTP servers.
Configuration
Spring configuration used to config SMTP destination endpoints with their own properties for connection to external SMTP servers and converters to prepare automatic emails from FIX messages.
Example of Spring configuration
Code Block |
---|
language | xml |
---|
title | Example of 'fej-smtp.xml' |
---|
|
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<bean id="smtpProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
p:location="classpath:smtp-adaptor.properties"/>
<bean id="smtpConfigProvider" class="com.epam.fej.smtp.SMTPConfigProvider"
c:properties-ref="smtpProperties">
</bean>
<context:component-scan base-package="com.epam.fej.smtp"/>
</beans> |
SMTPConfigProvider
prepares SMTP configuration objects from 'smtp-adaptor.properties' file to use them in SMTPAdapterManager
for SMTP endpoints creation.
SMTPAdapterManager and SMTPDestinationEnpoints configuration
SMTPAdapterManager
created as Spring Configuration Bean and on it's initialization creates and registersSMTPDestinationEndpoint
objects - FIXEdge Java destination endpoints that can be used in routing rules to send emails to external SMTP servers.
Each SMTPDestinationEndpoint
contains:
JavaMailSender
- prepared SMTP client to connect to configured SMTP server;SimpleMailMessage
- prepared template with configured 'From/To/CC/BCC/Subject' values that will be common for all emails from this endpoint;EmailConverter
- converter class with clients logic to convert info from MessageEvent
(FIX message) and set email's text for this endpoint emails.
Code Block |
---|
language | java |
---|
title | SMTPAdapterManager initialization |
---|
|
public SMTPAdapterManager(final EndpointRegistry endpointRegistry,
final ApplicationContext applicationContext,
final MessageEventPool messageEventPool,
final SMTPConfigProvider configProvider) {
this.endpointRegistry = endpointRegistry;
this.applicationContext = applicationContext;
this.messageEventPool = messageEventPool;
this.configProvider = configProvider;
}
public void init() {
// get all SMTP configs from configProvider
Set<String> smtpConfigNames = configProvider.getConfigs();
for (String id : smtpConfigNames) {
SMTPConfig config = configProvider.getConfig(id);
GeneralEndpointParams endpointParams = new GeneralEndpointParams();
endpointParams.setId(id);
endpointParams.setGroups(config.getGroups());
endpointParams.setTargetCompId(id);
// prepare sender, template and converter
JavaMailSender sender = buildMailSender(config);
SimpleMailMessage template = buildTemplateMessage(config);
EmailConverter converter = buildEmailConverter(config);
// create and register SMTP destination endpoint
SMTPDestinationEndpoint smtpEndPoint =
new SMTPDestinationEndpoint(endpointParams, messageEventPool, sender, converter, template);
endpointRegistry.register(smtpEndPoint);
}
} |
Configuration properties
List of main FEJ SMTP endpoint properties:
...
List of main JavaMail properties, set with "mail.<clientId>'.properties" prefix (full list of properties could be found here)
...
Code Block |
---|
language | yml |
---|
title | SMTP properties example |
---|
|
# comma separated list of SMTP clients
mail.clients=smtp1
# SMTP server connection details
mail.smtp1.host=smtp.gmail.com
mail.smtp1.port=587
mail.smtp1.username=smtp-user1
mail.smtp1.password=pass1
# default Subject/To/CC/BCC values
mail.smtp1.subject=Test subject
mail.smtp1.to=<to list>
mail.smtp1.cc=<cc list>
mail.smtp1.bcc=<bcc list>
mail.smtp1.protocol=smtp
# FEJ routing groups
mail.smtp1.groups=mail1-group
# converter class or Spring reference bean name
mail.smtp1.converterClass=com.epam.fej.smtp.TestEmailConverter
mail.smtp1.converterRef=testEmailConverter
# additional SMTP properties
mail.smtp1.properties.mail.smtp.auth=true
mail.smtp1.properties.mail.smtp.starttls.enable=true
|
Converters
Converter must implement EmailConverter
interface
Code Block |
---|
language | java |
---|
title | EmailConverter interface |
---|
|
package com.epam.fej.smtp;
import com.epam.fej.routing.MessageEvent;
import org.springframework.mail.SimpleMailMessage;
public interface EmailConverter {
SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to);
} |
There are 3 possible ways to set SMTP endpoint converter:
FIXToEmailConverter
- default converter to set email's text to FIX message- use 'converterClass' property - instance of converter created when SMTP endpoint initialized
- use 'converterRef' property - define Spring bean in FEJ config (EmailConverter implemented) and set reference to it
Custom converter can implement client's specific logic to set/change email fields (Subject/To/CC/BCC/Text).
Code Block |
---|
language | java |
---|
title | TestEmailConverter example |
---|
|
package com.epam.fej.smtp;
import com.epam.fej.routing.MessageEvent;
import org.springframework.mail.SimpleMailMessage;
public class TestEmailConverter implements EmailConverter {
@Override
public SimpleMailMessage convert(MessageEvent from, SimpleMailMessage to) {
// change Text to static title
to.setText("Text from simple TestEmailConverter");
// set Text to FIX message from incoming MessageEvent
to.setText(from.getMessage().toPrintableString());
// possible dynamicaly change Subject/To/CC/BCC fields
to.setSubject(...);
to.setTo(...);
to.setCc(...);
to.setBcc(...);
return to;
}
} |
Gmail properties
Code Block |
---|
language | yml |
---|
title | SMTP properties to send emails from Gmail |
---|
|
mail.clients=smtp-gmail
mail.smtp-gmail.host=smtp.gmail.com
mail.smtp-gmail.port=587
mail.smtp-gmail.username=<user_name>
mail.smtp-gmail.password=<pass>
mail.smtp-gmail.subject=Test subject
mail.smtp-gmail.to=<to>
mail.smtp-gmail.cc=<cc>
mail.smtp-gmail.bcc=<bcc>
mail.smtp-gmail.protocol=smtp
mail.smtp-gmail.groups=gmail-group
#mail.smtp-gmail.converterClass=com.epam.fej.smtp.TestEmailConverter
#mail.smtp-gmail.converterRef=testEmailConverter
mail.smtp-gmail.properties.mail.smtp.auth=true
mail.smtp-gmail.properties.mail.smtp.starttls.enable=true
mail.smtp-gmail.properties.mail.smtp.host=smtp.gmail.com
mail.smtp-gmail.properties.mail.smtp.port=587
mail.smtp-gmail.properties.mail.smtp.socketFactory.port=587
mail.smtp-gmail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
#mail.smtp-gmail.properties.mail.debug=true |
Persistence API
Persistent API provides easy and fast way to storing data. It support two main storages: PersistentSequence
and PersistentQueue
.
PersistentSequence
PersistentSequence
provides functionality for storing sequential data (for example, logs). Each record should have unique index. Indexer
implementation is used to provide index for certain storing record. Also PersistentSequence
requires custom Serializer
implementation to serialize objects to byte buffer and restore it back.
Code Block |
---|
|
PersistenceFactory factory = ...
Indexer<MyRecord> indexer = new Indexer<MyRecord>() {
@Override
public long getIndex(MyRecord item) {
return item.getId();
}
};
Serializer<MyRecord> serializer = new Serializer<MyRecord>() {
@Override
public void serialize(MyRecord item, ElasticByteBuffer buffer) {
// serialize instance ...
}
@Override
public MyRecord deserialize(ElasticByteBuffer buffer, int length) {
MyRecord record = new MyRecord;
// load data to instance...
return record;
}
};
final PersistentSequence<MyRecord> sequence = factory.buildSequence("seq_sample", indexer, serializer);
//store record to sequence
sequence.append(record); |
In additional to storing data PersistentSequence
provides methods for retrieving records from storage. It supports reading single item by index or iterating through items:
Code Block |
---|
|
//get single item with index 100
final MyRecord myRecord = sequence.get(100);
//iterate items from index 0 till 100
sequence.retrieveItems(0, 100, new RetrieveSequenceItemsListener<MyRecord>() {
@Override
public void onItem(long id, MyRecord record) {
//...
}
}, true); |
There is a possibility to review stored records and remove some of them from storage:
Code Block |
---|
|
sequence.cleanUp(new CleanupItemListener<MyRecord>() {
@Override
public boolean checkForClean(long id, MyRecord item) {
//return true to removed this record from storage
return false;
}
}); |
Or remove all and reset sequence:
Code Block |
---|
|
//remove all items and reset index
sequence.reset(); |
Default PersistentSequence
implementation is optimized for writing data and reading operations may take a bit more time.
PersistentQueue
PersistentQueue
works like a queue but persist all items to disk. Thus it can restore its state after application restart. PersistentQueue
has similar to java.util.Queue methods:
Code Block |
---|
|
final PersistentQueue<MyRecord> queue = factory.buildQueue("queue_sample", serializer);
//push item to the tail of queue
queue.add(record);
// read iem from head but doesn't remove
record = queue.peek();
// extract item from head and remove it from queue
record = queue.poll(); |
Also PersistentQueue
allow to iterate all its items:
Code Block |
---|
|
queue.iterate(new RetrieveQueueItemsListener<MyRecord>() {
@Override
public void onItem(MyRecord record) {
//....
}
}); |
To remove all items from queue you can use clean method:
Code Block |
---|
|
//remove all items and clean file
queue.clear(); |
Event API
Overview
It requires an API for communication and notifying about the actions occur in the components of the system. In order not to to have tight coupling between modules and their API, Event API was introduced. The API is also used for handling events by custom rules which user can implement in groovy
Event bus API
Event bus is represented by com.epam.fej.event.EventBus interface which can be injected in any required module and has the following methods.
...
It is also possible to register event listener by marking it com.epam.fej.event.annotation.EventSubscriber annotation. The class marked should implement java.util.function.Consumer interface and be registered as a spring bean.
Event bus Implementation
Event API based on custom implementation of event bus interface because none of analyzed libraries suits the requirement of garbage free. It is com.epam.fej.event.EventBusImpl class which uses pool of events to avoid garbage. The declaration is in com.epam.fej.event.SpringConfiguration. The implementation notifies subscribers asynchronously using specified executor with one thread in its pool to have the event ordered.
Event classes
...
Example of creating and pushing an event
...
...
Child pages (Children Display) |
---|
Overview
FIXEdge Java functionality can be extended with custom modules if required. The new logic can be integrated via Spring configuration files. As a default extension point, use the conf/spring/custom-ext.xml configuration file. Add any custom Beans there.
The conf/spring/custom-ext.xml currently includes two Java Beans for extending Groovy rules with custom functionalities:
- Listing a bean with the customBLImports ID allows extending default imports for Groovy scripts:
Code Block |
---|
|
<util:list id="customImports" value-type="java.lang.String">
<!--Example of import value-->
<value>java.util.concurrent.atomic.AtomicInteger</value>
</util:list> |
- Mapping a Bean with the customBLBeans ID allows adding custom Beans as variables to Groovy scripts:
Code Block |
---|
|
<util:map id="customAdditionalProperties" key-type="java.lang.String">
<!--Example-->
<!-- exampleBean now is accessible in Groovy scripts -->
<entry key="exampleBean" value-ref="testBean"/>
</util:map>
<bean id="testBean" class="java.lang.Object"/> |