Replication service API

Overview

Cluster implementation is scalable, customizable, and provides several endpoints for integration.

Quick setup

It is very useful to use replication with ClusterManager. ClusterManager allows starting and stopping the leader or a backup replication instance depending on the node's role in the cluster. ClusterManager can also automatically notify the replication service about available nodes and their address.
To use the replication service with Cluster Manager, you need to implement LocalNodeLeaderListener and LocalNodeBackupListener to receive and handle notifications about the cluster's state. You can use default implementations of these listeners for the replication service. They will handle all events, and enable and configure appropriate service depending on the node's role in the cluster.

Initialize replication leader

HazelcastCluster clusterManager = new HazelcastClusterManager();

// create controller for enable/disable leader replication role
ReplicationLeader replicationLeaderService = new ReplicationLeader(aeronTransport,correlationIdHolder,
        REPLICATION_LEADER_PORT,    										//(1)
        REPLICATION_BROADCAST_PORT, 										//(2)
        "./logs/" + name);

// register listener for enabling/disabling replication functionality for leader
LocalNodeLeaderListener leaderListener = new ReplicationLeaderListener(replicationLeaderService);
        clusterManager.addLocalNodeLeaderListener(leaderListener);
  1. REPLICATION_LEADER_PORT - a port that the leader is listening for a synchronization request from backups.
  2. REPLICATION_BROADCAST_PORT - a port that each backup node is listening for replication data from the leader.

Initialize replication backup

// create controller for enable/disable backup replication role
ReplicationBackup replicationBackupService = new ReplicationBackup(aeronTransport, correlationIdHolder,
        REPLICATION_LEADER_PORT, REPLICATION_BROADCAST_PORT, "./logs/" + name);

// register listener for enabling/disabling replication functionality for backup
LocalNodeBackupListener backupListener = new ReplicationBackupListener(replicationBackupService);
        clusterManager.addLocalNodeBackupListener(backupListener);

Creating storage directly

// Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();

// Request instance of named persistence sequence with synchronouse replication (replication timeout - 10 millis)
PersistentSequence<String> sequence = factory.getOrCreateSequence("seqName", indexer, new
                    StringSerializer(),
                    10, TimeUnit.MILLISECONDS);
// Request instance of named persistence queue with asynchronouse replication
PersistentQueue<String> queue = factory.getOrCreateQueue("queueName", new StringSerializer(),
                    0, TimeUnit.MILLISECONDS);

Creating storage with Persistence API

//Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();
//Enable synchronious replication by-default (replication timeout - 10 millis)
factory.setSyncMode(10, TimeUnit.MILLISECONDS);

// Request instance of named persistence sequence
PersistentSequence<String> sequence = factory.buildSequence("seqName", indexer, new StringSerializer());
// Request instance of named persistence queue
PersistentQueue<String> queue = factory.buildQueue("queueName", new StringSerializer());

Replication service lifecycle

The goal of a replication service is to maintain full copies of the storages within a network. To do this, it should support at least 2 operations:

  • synchronization of data to restore the actual state

  • replication of data in runtime to maintain the state

The service can replicate data in two modes - synchronous and asynchronous.

Sending messages asynchronously means that confirmation of the successful receipt and processing of messages by the other side is not expected. Replication is performed in parallel with the thread, which changes the data in the storages.

Asynchronous replication expects to receive confirmation about the successful processing, at least, from one of the backup nodes. A processing timeout can be specified for the storage instance during its creation. Otherwise, the default settings will be used. The thread that changes the data in the storages is blocked until receiving acknowledgment or until the timeout expires. In the latter case, the warning that signals that the data was not successfully transmitted to any of the backup nodes during the specified period will be logged.

The replication mode is set separately for each of the repositories so it is possible to simultaneously use both a storage with synchronous replication and a storage with asynchronous replication.

Each backup node maintains two transport channels because Aeron transport is unidirectional by its nature:

  • incoming: for sending synchronization requests and synchronous acknowledgments

  • outgoing: for receiving information about changing the data in the storages

The listening port is assigned to each node of the replication service, depending on its role. If you change the node’s role, the listening port is not changed. This same port will be always listened by the leader and the backups will always know how to contact it. Same for the port that is listened by the backups.

Persistent storages are designed for incremental updates. Internal storage contains a log of operations like 'APPEND' and 'REMOVE'. In case of the synchronization or replication of data, it only needs to send new updates.

Data synchronization procedure

After starting, the backup instance synchronizes its state with the leader.

The synchronization procedure is:

  1. The backup sends a GET_ALL_RESOURCES request to the leader right after its start.

  2. The leader sends lists of existing queries and sequences in response as 'RESOURCE_LIST' messages. A CorrelationId is also sent for every storage. The CorrelationId is a unique id for storage. It is used in the following communications.

  3. The backup loads exist and create non-exist storages.

  4. The backup sends a SYNC_REQ request for every storage and passes its index. The timestamp of the last reset is passed for a sequence in addition.

  5. The leader sends a SYNC_REQ_ACCEPTED message in response. It compares indexes and timestamps, and resends all the required data. For the sequence, it can also send a SEQUENCE_RESET message if the reset was missed. At the end, the leader sends a SYNC_FINISHED message to indicate the border of synchronization answer.

Data replication procedure

For every operation with persistent storage, the leader sends updates to all backups.
The replication procedure is:

  1. Upon adding the new item to internal storage, the leader sends a QUEUE_ADD or SEQUENCE_APPEND message. New data and an internal ordered index are sent with this message.

  2. The backup receives this message and compares the expected index with the received one.

  3. If the received index is different then expected, it starts a synchronization procedure (see Data synchronization procedure, n.4)

  4. If the leader indicates that this is synchronous storage and expects acknowledgment, the backup sends back an ACK message.

FIX session replication

FIX Antenna Java can also use replication storages through the Persistence API.

To enable replicated storage for a FIX session, it needs to setup a replication leader and a backup (Initialize replication leader, Initialize replication backup), and use the following configuration options for FIX antenna (in fixengine.properties) :

storageFactory=com.epam.fixengine.storage.persistence.PersistenceEnableFactory                                   //(1)
storage.persistenceFactoryBuilder=com.epam.fixengine.storage.persistence.ReplicatedPersistenceFactoryBuilder     //(2)
replicationTimeout=10                                                                                            //(3)
  1. Use PersistenceEnableFactory for the storageFactory property. This factory allows using the Persistence API for storing a FIX session's state. PersistenceEnableFactory is based on FilesystemStorageFactory and delegates all operation Persistence API objects. Working with this API requires the implementation of PersistentFactoryBuilder. The last one should construct an instance of PersistenceFactory.
  2. Define ReplicatedPersistenceFactoryBuilder like a factory builder for PersistenceEnableFactory. ReplicatedPersistenceFactoryBuilder implements PersistentFactoryBuilder and builds a replicated instance of the factory. It uses replicationTimeout options from the FIX Antenna config (or from the session’s Configuration instance) to configure synchronous or asynchronous replication for the FIX session.
  3. Define replication timeout in milliseconds. A zero value will enable asynchronous replication for a FIX session.