Kafka Transport Adapter

Kafka Transport Adapter

The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++

Overview

The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform

Internally, its implementation uses the Kafka Consumer and Producer APIs. The broker API version must be 0.10.0.0 or higher.  v version >= 0.10.0.0ersion >= 0.10.0.0

The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.

The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.

The Kafka TA performance is close to Kafka's native speed of processing. The average reading/writing rate of FIX messages is 30,000 per second or higher. This number refers to the overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).

The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.

Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge. 

During FIXEdge server termination, the Kafka TA employs graceful termination, ensuring that message processing operations are executed correctly, completely, and without message loss. When the FIXEdge server fails and nongraceful termination occurs, messages will be retrieved from persistent message storage.

FIFO (First In First Out) ordering is guaranteed, meaning that the order in which messages were sent and received will be maintained upon the termination of the server instance.

Concept

The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.

Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.

The Kafka TA allows integration with Kafka via the Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to the topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers reading partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.

The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.

The schema below represents an interaction between FIXEdge and a customer's infrastructure via the Kafka solution.

Basic elements of the Kafka connectivity model

  • Producer = publisher, the sender of messages

  • Consumer = subscriber, the reader of messages

  • Message/event = a single unit of data

  • Broker = a server forming a storage layer containing one or more topics

  • Topic = ordered collection of events that are organized and stored in a durable way

  • Partition = buckets located within topics across which messages are distributed

  • Commit log = a record of changes that is committed to disk and can be replayed by consumers

  • Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written

  • Offset = tracking feature that helps consumers remember its position when last reading a partition

Client behavior

When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish a connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish a connection, the remaining clients are not affected by this failure.

Delivery

When the Kafka TA sends a message, it is first put into persistent storage, and then sent through the producer API to a Kafka broker. Once the Kafka platform sends a confirmation that the message was received, the Kafka TA marks the Kafka platform’s acknowledgement that the message was delivered in the persistent storage. These two steps implement a delivery guarantee. If the message couldn't be sent it will remain stored in the persistent storage. Once the FIXEdge server starts and Kafka connection is reestablished, any unsent messages in the persistent message storage will be processed before other runtime messages. After delivery, messages remain stored in the persistent message storage. 

Message ordering:

  • FIX messages sent to Kafka via the Producer API are sent in the same order as the messages that FIXEdge sends to the client.

  • FIXEdge fetches FIX messages via the Consumer API in the same order that messages arrive at a particular Kafka topic.

  • The Kafka TA services discipline is FIFO (First In First Out) in the scope of one Kafka partition (for messages that come from one client).

The connectivity mechanism preserves FIX messages even if critical situations occur, such as:

  • FIXEdge server is terminated

  • Kafka TA crashes or does not start due to invalid configuration

  • FIX Session is terminated non-gracefully

  • message broker is down

  • network disorder occurs

The Kafka TA provides an at least once delivery guarantee, (as opposed to only once). This means that messages are guaranteed to be delivered at least once, with the possibility of duplicates. 

Persistent Message Storage

Kafka TA persistent FIX messages in text format are stored as a file saved to a disk using memory mapping.

Storage is unlimited but should not exceed disk capacity.

All messages passing through the system are kept in the persistent message storage.

The TransportLayer.<KafkaNameTA>.Cleanup parameter provides the capability to cleanup the persistent message storage.

Committing to the Kafka platform 

The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.

To choose the best strategy for prioritizing either speed or reliability for commit messages, refer to the table below (options are shown in descending order):

Speed

Reliability

Speed

Reliability

1) 'Auto'

1) 'Sync'

2) 'Async'

2) 'Async'

3) 'Sync'

3) 'Auto'

The three strategies are described in more detail here:

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Description

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Description

'Auto'

Commit messages are sent by the Kafka TA to the Kafka platform as configured, for example, at least once during a defined period of time.

This parameter is the best option if there are no specific requirements as far as reliability.

'Auto' is also the:

  • Default value 

  • Fastest and most efficient option because the number of requests is reduced

  • Least reliable method

Since commit messages are not sent after every sent message, there is a possibility that duplicate messages will be sent by the Kafka TA.

'Sync'

For every message read by the Kafka TA's consumer, the Kafka TA sends a commit message is to the corresponding broker confirming the value of the last read offset. The Kafka TA does not read the next message without first sending the commit message.

If there is a message sending failure, FIXEdge will notice it immediately.

This is the most reliable option because Kafka is constantly updated on the consumer's progress. It is also the slowest method.

'Async'

The 'Async' option sends a commit message after each message, however, it does so in an asynchronous manner. Due to this, the Kafka TA does not block reading and delegates sending commit messages to an asynchronous thread.

Unsuccessfully executed commits do not stop the process and are written into the log.

This option is less reliable than the 'Sync' option but more reliable than the 'Auto' option. The number of lost messages might be smaller than the ‘Auto’ option.

This option is faster than the 'Sync' option but slower than the 'Auto' option.

'Sync' and 'Async' are chosen when there is less traffic, more time for processing, and the user wants to increase the reliability of message sending (avoiding duplicates, etc.)

Message handling

The Kafka TA employs the following messaging handling methods:

  • Sync message handling

    • Receiving messages in the Kafka TA from the Kafka platform is regarded as sync messaging.

    • Messages are not saved to any storage but become available at the FIXEdge Business Layer.  

    • Messages arrive at the FIXEdge Business Layer in the same order as they had arrived to the Kafka topic they were fetched from.

  • Async message handling

    • Sending messages from the Kafka TA to the Kafka platform is regarded as async messaging.

    • Messages are saved to persistent message storage.

Configuration steps

Given that FIXEdge has already been installed in the user's environment, use the following steps to step up the Kafka TA:

  1. Enable a transport adapter to be used by FIXEdge:
    In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:

    TransportLayer.TransportAdapters = TransportLayer.KafkaTA

    Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:

    TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTA
  2. Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:

    TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic

    Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).

  3. Configure rules for message routing from the Kafka TA.
    The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file.  For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section.

  4. Restart the FIXEdge server to apply the changes.

Configuring multiple adapter instances

Multiple instances of the Kafka TA can be used during one library instance, inside one instance of FIXEdge. 

To run multiple instances of the Kafka TA, each new instance must be assigned a new name and new session names. The name assigned to the new adapter instance must be consistent across parameters relating to that instance. New session names must be unique across all existing instances of the adapter. Topic names do not need to be unique.

TransportLayer.TransportAdapters = TransportLayer.Kafka1, TransportLayer.Kafka2 TransportLayer.Kafka1.Description = Kafka Transport Adaptor #1 TransportLayer.Kafka1.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.Kafka1.Sessions = Kafka_1 TransportLayer.Kafka1.Kafka_1.bootstrap.servers = localhost:9092 TransportLayer.Kafka1.Kafka_1.FIXVersion = FIX44 TransportLayer.Kafka1.Kafka_1.Consumer.Commit = Auto TransportLayer.Kafka1.Kafka_1.Consumer.Topics = outputTopic TransportLayer.Kafka1.Kafka_1.Consumer.group.id = ID TransportLayer.Kafka1.Kafka_1.Producer.Topic = topic TransportLayer.Kafka2.Description = Kafka Transport Adaptor #2 TransportLayer.Kafka2.DllName = bin/KafkaTA-vc10-MD-x64_2.dll TransportLayer.Kafka2.Sessions = Kafka_2 TransportLayer.Kafka2.Kafka_2.bootstrap.servers = localhost:9093 TransportLayer.Kafka2.Kafka_2.FIXVersion = FIX44 TransportLayer.Kafka2.Kafka_2.Consumer.Commit = Auto TransportLayer.Kafka2.Kafka_2.Consumer.Topics = outputTopic TransportLayer.Kafka2.Kafka_2.Consumer.group.id = ID TransportLayer.Kafka2.Kafka_2.Producer.Topic = topic

In the above example, there 2 separated instances of Kafka adapters:  "Kafka1" and "Kafka2". Session names are unique across all Kafka adapters

SSL Configuration Sample

Add the following values to server.properties file in Kafka broker installation:
listeners=PLAINTEXT://:9092,SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
Change the broker port to SSL, set security.protocol and ssl.ca.location properties for TA sessions
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem

 

Configuration parameters

Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.

Property name

Description

Required

Default value

Example

Property name

Description

Required

Default value

Example

Common properties

TransportLayer.KafkaTA.Description

Adapter name. 

Note: It is recommended that this parameter is not empty.

This parameter is used in the logging of Kafka events.

N

 

Kafka Transport Adaptor

TransportLayer.KafkaTA.DllName

Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID

Y

 

bin/KafkaTA-vc10-MD-x64.dll

TransportLayer.KafkaTA.AdapterId

The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied

This parameter is applicable since FIXEdge C++ version 6.14.0

N

 

KAFKA

TransportLayer.KafkaTA.Sessions

A comma-separated list of session names.

At least one session should be defined.

The values from this parameter will be used in the BL description.

Y

 

 

TransportLayer.<KafkaNameTA>.Cleanup

The behavior is changed starting from FIXEdge C++ version 7.0.0.

Kafka TA's adapter wide scope Cleanup setting for all sessions. Can be overridden on session level. 

See TransportLayer.KafkaTA.<Session>.Cleanup for details.

N

true

 

Kafka specific parapameters

 

 

 

 

TransportLayer.KafkaTA.reconnect.backoff.ms

Delay after which the Kafka TA starts attempting to re-connect if the connection breaks.

N

100

 

TransportLayer.KafkaTA.reconnect.backoff.max.ms

Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks.

N

10,000

 

Session properties

 

 

 

 

TransportLayer.KafkaTA.<Session>.FIXVersion

In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
 
Acceptable values:
FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2, FIXLatest, custom dictionaries.
 
The property is used in the following cases:

  • to specify the supported FIX-version for messages stored in internal storage

  • to deserialize messages received from Kafka to FIX-format

Example use cases:

  • When FIX-messages are transferred in raw format, (i.e. serialization is not applied, <TransportLayer.KafkaTA.<Session>.Serializer> = RAW), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the preferred version of the FIX-protocol while parsing input messages to FIX-format.

  • When FIX-messages are transferred and serialization is applied (see examples below), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the FIX-version of the messages that are generated.

    • JSON-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = JSON)

    • XML-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = XmlWrapper)

N

FIX44

 

TransportLayer.KafkaTA.<Session>.ConnectTime

Scheduled time to connect to a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if DisconnectTime parameter (see below) is configured

If a value is not specified the session is not using a schedule.

 

TransportLayer.KafkaTA.<Session>.DisconnectTime

Scheduled time to disconnect from a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if ConnectTime parameter (see above) is configured

If a value is not specified the session is not using a schedule.

 

TransportLayer.KafkaTA.<Session>.DaysOff

 

This property is available since the FIXEdge C++ 6.16.0 version.

The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied.

The DaysOff property must be specified in the cron format. 

If the DaysOff property is specified with the incorrect value for the Kafka TA session, then this Kafka TA session won't be started, and the following ERROR will be logged: 

ERROR Session '<Session>': [Consumer/Producer]: <error_message>

N

-

0 0 9 * * 2-6

TransportLayer.KafkaTA.<Session>.Cleanup

This property is available since FIXEdge C++ 7.0.0 version.

Provides the capability to clean up the Kafka TA session's persistent message storage.

Acceptable values:

  • true - cleaning of persistent message storage (when session is stopped) is ON
    When Kafka TA session is stopped :

    • Kafka TA session's storage is moved to the Archive folder.

    • New Kafka TA session's storage is created and includes only undelivered messages (copies the undelivered messages from a backup file to the new storage)

  • false - cleaning of Kafka TA session's persistent message storage is OFF

N

The value taken from

TransportLayer.<KafkaNameTA>.Cleanup 

or its default.

 

TransportLayer.KafkaTA.<Session>.Serializer

Serializer name.  Acceptable values:

  • Raw

  • XmlWrapper

  • JSON

Serializer tasks can be set from an external plugin.

Format: 

TestSerializer:KafkaSerializer,

where: 

  • TestSerializer is a serializer Class ID inside the plugin

  • KafkaSerializer is a Plugin ID

ID values are set by the Plugin developer.

N

 

 

TransportLayer.KafkaTA.<Session>.bootstrap.servers

An initial list of brokers.

Y

 

 

Secure connection properties

 

 

 

 

TransportLayer.KafkaTA.<Session>.sasl.username

This parameter only applies and is required if using PLAIN authentication.

The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details.

N

 

johndoe

TransportLayer.KafkaTA.<Session>.sasl.password

This parameter only applies and is required if using PLAIN authentication.

The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details.

N

 

password1234

TransportLayer.KafkaTA.<Session>.ssl.key.location

File or directory path to SSL private key.

This parameter only applies and is required if using SSL certificate authentication.

N

 

D:/SSL/kafka01.pem

TransportLayer.KafkaTA.<Session>.ssl.key.password

This parameter only applies and is required if using SSL certificate authentication.

The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details.

N

 

 

TransportLayer.KafkaTA.<Session>.ssl.ca.location

File or directory path to CA certificate(s) for verifying the broker's key.

The parameter is required if the security protocol is SSL.

N

 

D:/SSL/root.pem

TransportLayer.KafkaTA.<Session>.sasl.mechanism

Valid values:

  • (empty) - in this case, neither PLAIN nor GSSAPI authentications are in use

  • PLAIN - to use PLAIN authentication

  • GSSAPI - to use GSSAPI authentication

N

If a value is not specified user authentication is not applied.

 

TransportLayer.KafkaTA.<Session>.security.protocol

The protocol used to communicate with brokers. 

Valid values:

  • PLAINTEXT - using an unsecured connection

  • SSL - using a secured connection

  • SASL_PLAINTEXT - using authentication and an unsecured connection

  • SASL_SSL - using authentication and secured connection

N

PLAINTEXT

 

TransportLayer.KafkaTA.<Session>.sasl.kerberos.service.name

Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as.

N

 

 

Consumer properties

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Commit mode. Acceptable values:

  • Auto   - automatically, according to time interval expiration

  • Sync   - synchronously, after each received message

  • Async  - asynchronously, after each received message

For more information see Sending commit messages to the Kafka platform.

N

Auto

 

TransportLayer.KafkaTA.<Session>.Consumer.group.id

A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy.