Kafka Transport Adapter
The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++
- 1 Overview
- 2 Concept
- 3 Delivery
- 4 Configuration steps
- 5 Configuration parameters
- 5.1.1 Common properties
- 5.1.2 Kafka specific parapameters
- 5.1.3 Session properties
- 5.1.3.1 TransportLayer.KafkaTA.<Session>.FIXVersion
- 5.1.3.2 TransportLayer.KafkaTA.<Session>.ConnectTime
- 5.1.3.3 TransportLayer.KafkaTA.<Session>.DisconnectTime
- 5.1.3.4 TransportLayer.KafkaTA.<Session>.DaysOff
- 5.1.3.5 TransportLayer.KafkaTA.<Session>.Cleanup
- 5.1.3.6 TransportLayer.<KafkaNameTA>.Cleanup
- 5.1.3.7 TransportLayer.KafkaTA.<Session>.Serializer
- 5.1.3.8 TransportLayer.KafkaTA.<Session>.bootstrap.servers
- 5.1.4 Secure connection properties
- 5.1.4.1 TransportLayer.KafkaTA.<Session>.sasl.username
- 5.1.4.2 TransportLayer.KafkaTA.<Session>.sasl.password
- 5.1.4.3 TransportLayer.KafkaTA.<Session>.ssl.key.location
- 5.1.4.4 TransportLayer.KafkaTA.<Session>.ssl.key.password
- 5.1.4.5 TransportLayer.KafkaTA.<Session>.ssl.ca.location
- 5.1.4.6 TransportLayer.KafkaTA.<Session>.sasl.mechanism
- 5.1.4.7 TransportLayer.KafkaTA.<Session>.security.protocol
- 5.1.4.8 TransportLayer.KafkaTA.<Session>.sasl.kerberos.service.name
- 5.1.5 Consumer properties
- 5.1.6 Producer properties
- 5.2 Configuration sample
- 6 Authentication Configuration
- 7 Logging
- 8 Scheduling
- 9 Start and Stop operations for Kafka TA
- 9.1 Start operation
- 9.2 Stop operation
- 10 Custom serialization
- 11 Message Content Wrapping
- 12 Kafka Adapter Monitoring
- 13 Troubleshooting
- 13.1 Transport Adaptor 'KafkaTA' has failed to initialize
- 13.1.1 Example
- 13.2 The session wasn't created
- 13.2.1 Example
- 13.3 Adapter not sending or receiving messages
- 13.3.1 Example
- 13.4 Messages are not delivered with the log stating: "Message processing failed"
- 13.4.1 FIXEdge.properties
- 13.5 Kafka server can't start because all log dirs have failed
- 13.1 Transport Adaptor 'KafkaTA' has failed to initialize
Overview
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs. The broker API version must be 0.10.0.0 or higher. v version >= 0.10.0.0ersion >= 0.10.0.0
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
The Kafka TA performance is close to Kafka's native speed of processing. The average reading/writing rate of FIX messages is 30,000 per second or higher. This number refers to the overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).
The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.
Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge.
During FIXEdge server termination, the Kafka TA employs graceful termination, ensuring that message processing operations are executed correctly, completely, and without message loss. When the FIXEdge server fails and nongraceful termination occurs, messages will be retrieved from persistent message storage.
FIFO (First In First Out) ordering is guaranteed, meaning that the order in which messages were sent and received will be maintained upon the termination of the server instance.
Concept
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via the Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to the topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers reading partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.
The schema below represents an interaction between FIXEdge and a customer's infrastructure via the Kafka solution.
Basic elements of the Kafka connectivity model
Producer = publisher, the sender of messages
Consumer = subscriber, the reader of messages
Message/event = a single unit of data
Broker = a server forming a storage layer containing one or more topics
Topic = ordered collection of events that are organized and stored in a durable way
Partition = buckets located within topics across which messages are distributed
Commit log = a record of changes that is committed to disk and can be replayed by consumers
Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
Offset = tracking feature that helps consumers remember its position when last reading a partition
Client behavior
When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish a connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish a connection, the remaining clients are not affected by this failure.
Delivery
When the Kafka TA sends a message, it is first put into persistent storage, and then sent through the producer API to a Kafka broker. Once the Kafka platform sends a confirmation that the message was received, the Kafka TA marks the Kafka platform’s acknowledgement that the message was delivered in the persistent storage. These two steps implement a delivery guarantee. If the message couldn't be sent it will remain stored in the persistent storage. Once the FIXEdge server starts and Kafka connection is reestablished, any unsent messages in the persistent message storage will be processed before other runtime messages. After delivery, messages remain stored in the persistent message storage.
Message ordering:
FIX messages sent to Kafka via the Producer API are sent in the same order as the messages that FIXEdge sends to the client.
FIXEdge fetches FIX messages via the Consumer API in the same order that messages arrive at a particular Kafka topic.
The Kafka TA services discipline is FIFO (First In First Out) in the scope of one Kafka partition (for messages that come from one client).
The connectivity mechanism preserves FIX messages even if critical situations occur, such as:
FIXEdge server is terminated
Kafka TA crashes or does not start due to invalid configuration
FIX Session is terminated non-gracefully
message broker is down
network disorder occurs
The Kafka TA provides an at least once delivery guarantee, (as opposed to only once). This means that messages are guaranteed to be delivered at least once, with the possibility of duplicates.
Persistent Message Storage
Kafka TA persistent FIX messages in text format are stored as a file saved to a disk using memory mapping.
Storage is unlimited but should not exceed disk capacity.
All messages passing through the system are kept in the persistent message storage.
The TransportLayer.<KafkaNameTA>.Cleanup parameter provides the capability to cleanup the persistent message storage.
Committing to the Kafka platform
The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.
To choose the best strategy for prioritizing either speed or reliability for commit messages, refer to the table below (options are shown in descending order):
Speed | Reliability |
|---|---|
1) 'Auto' | 1) 'Sync' |
2) 'Async' | 2) 'Async' |
3) 'Sync' | 3) 'Auto' |
The three strategies are described in more detail here:
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Description |
|---|---|
'Auto' | Commit messages are sent by the Kafka TA to the Kafka platform as configured, for example, at least once during a defined period of time. This parameter is the best option if there are no specific requirements as far as reliability. 'Auto' is also the:
Since commit messages are not sent after every sent message, there is a possibility that duplicate messages will be sent by the Kafka TA. |
'Sync' | For every message read by the Kafka TA's consumer, the Kafka TA sends a commit message is to the corresponding broker confirming the value of the last read offset. The Kafka TA does not read the next message without first sending the commit message. If there is a message sending failure, FIXEdge will notice it immediately. This is the most reliable option because Kafka is constantly updated on the consumer's progress. It is also the slowest method. |
'Async' | The 'Async' option sends a commit message after each message, however, it does so in an asynchronous manner. Due to this, the Kafka TA does not block reading and delegates sending commit messages to an asynchronous thread. Unsuccessfully executed commits do not stop the process and are written into the log. This option is less reliable than the 'Sync' option but more reliable than the 'Auto' option. The number of lost messages might be smaller than the ‘Auto’ option. This option is faster than the 'Sync' option but slower than the 'Auto' option. |
'Sync' and 'Async' are chosen when there is less traffic, more time for processing, and the user wants to increase the reliability of message sending (avoiding duplicates, etc.)
Message handling
The Kafka TA employs the following messaging handling methods:
Sync message handling
Receiving messages in the Kafka TA from the Kafka platform is regarded as sync messaging.
Messages are not saved to any storage but become available at the FIXEdge Business Layer.
Messages arrive at the FIXEdge Business Layer in the same order as they had arrived to the Kafka topic they were fetched from.
Async message handling
Sending messages from the Kafka TA to the Kafka platform is regarded as async messaging.
Messages are saved to persistent message storage.
Configuration steps
Given that FIXEdge has already been installed in the user's environment, use the following steps to step up the Kafka TA:
Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:TransportLayer.TransportAdapters = TransportLayer.KafkaTANote: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTAConfigure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topicNote: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
Configure rules for message routing from the Kafka TA.
The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file. For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section.Restart the FIXEdge server to apply the changes.
Configuring multiple adapter instances
Multiple instances of the Kafka TA can be used during one library instance, inside one instance of FIXEdge.
To run multiple instances of the Kafka TA, each new instance must be assigned a new name and new session names. The name assigned to the new adapter instance must be consistent across parameters relating to that instance. New session names must be unique across all existing instances of the adapter. Topic names do not need to be unique.
TransportLayer.TransportAdapters = TransportLayer.Kafka1, TransportLayer.Kafka2
TransportLayer.Kafka1.Description = Kafka Transport Adaptor #1
TransportLayer.Kafka1.DllName = bin/KafkaTA-vc10-MD-x64.dll
TransportLayer.Kafka1.Sessions = Kafka_1
TransportLayer.Kafka1.Kafka_1.bootstrap.servers = localhost:9092
TransportLayer.Kafka1.Kafka_1.FIXVersion = FIX44
TransportLayer.Kafka1.Kafka_1.Consumer.Commit = Auto
TransportLayer.Kafka1.Kafka_1.Consumer.Topics = outputTopic
TransportLayer.Kafka1.Kafka_1.Consumer.group.id = ID
TransportLayer.Kafka1.Kafka_1.Producer.Topic = topic
TransportLayer.Kafka2.Description = Kafka Transport Adaptor #2
TransportLayer.Kafka2.DllName = bin/KafkaTA-vc10-MD-x64_2.dll
TransportLayer.Kafka2.Sessions = Kafka_2
TransportLayer.Kafka2.Kafka_2.bootstrap.servers = localhost:9093
TransportLayer.Kafka2.Kafka_2.FIXVersion = FIX44
TransportLayer.Kafka2.Kafka_2.Consumer.Commit = Auto
TransportLayer.Kafka2.Kafka_2.Consumer.Topics = outputTopic
TransportLayer.Kafka2.Kafka_2.Consumer.group.id = ID
TransportLayer.Kafka2.Kafka_2.Producer.Topic = topicIn the above example, there 2 separated instances of Kafka adapters: "Kafka1" and "Kafka2". Session names are unique across all Kafka adapters
SSL Configuration Sample
Add the following values to server.properties file in Kafka broker installation:
listeners=PLAINTEXT://:9092,SSL://:9093
ssl.keystore.location=D:/SSL/kafka01.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=D:/SSL/kafka.truststore.jks
ssl.truststore.password=123456
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1Change the broker port to SSL, set security.protocol and ssl.ca.location properties for TA sessions
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093
TransportLayer.KafkaTA.Kafka.security.protocol = SSL
TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem
Configuration parameters
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Property name | Description | Required | Default value | Example |
|---|---|---|---|---|
Common properties | ||||
TransportLayer.KafkaTA.Description | Adapter name. Note: It is recommended that this parameter is not empty. This parameter is used in the logging of Kafka events. | N |
| Kafka Transport Adaptor |
TransportLayer.KafkaTA.DllName | Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID | Y |
| bin/KafkaTA-vc10-MD-x64.dll |
TransportLayer.KafkaTA.AdapterId | The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied This parameter is applicable since FIXEdge C++ version 6.14.0 | N |
| KAFKA |
TransportLayer.KafkaTA.Sessions | A comma-separated list of session names. At least one session should be defined. The values from this parameter will be used in the BL description. | Y |
|
|
TransportLayer.<KafkaNameTA>.Cleanup | The behavior is changed starting from FIXEdge C++ version 7.0.0. Kafka TA's adapter wide scope Cleanup setting for all sessions. Can be overridden on session level. See TransportLayer.KafkaTA.<Session>.Cleanup for details. | N | true |
|
Kafka specific parapameters |
|
|
|
|
TransportLayer.KafkaTA.reconnect.backoff.ms | Delay after which the Kafka TA starts attempting to re-connect if the connection breaks. | N | 100 |
|
TransportLayer.KafkaTA.reconnect.backoff.max.ms | Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks. | N | 10,000 |
|
Session properties |
|
|
|
|
TransportLayer.KafkaTA.<Session>.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
Example use cases:
| N | FIX44 |
|
TransportLayer.KafkaTA.<Session>.ConnectTime | Scheduled time to connect to a Client The value should be in the cron time string format Local time zones will be used | N; Required if DisconnectTime parameter (see below) is configured | If a value is not specified the session is not using a schedule. |
|
TransportLayer.KafkaTA.<Session>.DisconnectTime | Scheduled time to disconnect from a Client The value should be in the cron time string format Local time zones will be used | N; Required if ConnectTime parameter (see above) is configured | If a value is not specified the session is not using a schedule. |
|
TransportLayer.KafkaTA.<Session>.DaysOff |
This property is available since the FIXEdge C++ 6.16.0 version. The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied. The DaysOff property must be specified in the cron format. If the DaysOff property is specified with the incorrect value for the Kafka TA session, then this Kafka TA session won't be started, and the following ERROR will be logged: ERROR Session '<Session>': [Consumer/Producer]: <error_message> | N | - | 0 0 9 * * 2-6 |
TransportLayer.KafkaTA.<Session>.Cleanup | This property is available since FIXEdge C++ 7.0.0 version. Provides the capability to clean up the Kafka TA session's persistent message storage. Acceptable values:
| N | The value taken from TransportLayer.<KafkaNameTA>.Cleanupor its default. |
|
TransportLayer.KafkaTA.<Session>.Serializer | Serializer name. Acceptable values:
Serializer tasks can be set from an external plugin. Format: TestSerializer:KafkaSerializer, where:
ID values are set by the Plugin developer. | N |
|
|
TransportLayer.KafkaTA.<Session>.bootstrap.servers | An initial list of brokers. | Y |
|
|
Secure connection properties |
|
|
|
|
TransportLayer.KafkaTA.<Session>.sasl.username | This parameter only applies and is required if using PLAIN authentication. The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details. | N |
| johndoe |
TransportLayer.KafkaTA.<Session>.sasl.password | This parameter only applies and is required if using PLAIN authentication. The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details. | N |
| password1234 |
TransportLayer.KafkaTA.<Session>.ssl.key.location | File or directory path to SSL private key. This parameter only applies and is required if using SSL certificate authentication. | N |
| D:/SSL/kafka01.pem |
TransportLayer.KafkaTA.<Session>.ssl.key.password | This parameter only applies and is required if using SSL certificate authentication. The ability to export the value using a script is available since the FIXEdge C++ 7.0.0 release. See, the link for details. | N |
|
|
TransportLayer.KafkaTA.<Session>.ssl.ca.location | File or directory path to CA certificate(s) for verifying the broker's key. The parameter is required if the security protocol is SSL. | N |
| D:/SSL/root.pem |
TransportLayer.KafkaTA.<Session>.sasl.mechanism | Valid values:
| N | If a value is not specified user authentication is not applied. |
|
TransportLayer.KafkaTA.<Session>.security.protocol | The protocol used to communicate with brokers. Valid values:
| N | PLAINTEXT |
|
TransportLayer.KafkaTA.<Session>.sasl.kerberos.service.name | Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as. | N |
|
|
Consumer properties | ||||
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Commit mode. Acceptable values:
For more information see Sending commit messages to the Kafka platform. | N | Auto |
|
TransportLayer.KafkaTA.<Session>.Consumer.group.id | A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy. | |||