Kafka Transport Adapter
Overview
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs. The broker API version must be 0.10.0.0 or higher. v version >= 0.10.0.0ersion >= 0.10.0.0
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
The Kafka TA performance is close to Kafka's native speed of processing. The average reading/writing rate of FIX messages is 30,000 per second or higher. This number refers to the overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).
The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.
Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge.
During FIXEdge server termination, the Kafka TA employs graceful termination, ensuring that message processing operations are executed correctly, completely, and without message loss. When the FIXEdge server fails and nongraceful termination occurs, messages will be retrieved from persistent message storage.
FIFO (First In First Out) ordering is guaranteed, meaning that the order in which messages were sent and received will be maintained upon the termination of the server instance.
Concept
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via the Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to the topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers reading partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.
The schema below represents an interaction between FIXEdge and a customer's infrastructure via the Kafka solution.
Basic elements of the Kafka connectivity model
- Producer = publisher, the sender of messages
- Consumer = subscriber, the reader of messages
- Message/event = a single unit of data
- Broker = a server forming a storage layer containing one or more topics
- Topic = ordered collection of events that are organized and stored in a durable way
- Partition = buckets located within topics across which messages are distributed
- Commit log = a record of changes that is committed to disk and can be replayed by consumers
- Guaranteed ordering = a guarantee that a consumer of a given topic-partition will always read that partition's events in exactly the same order as they were written
- Offset = tracking feature that helps consumers remember its position when last reading a partition
Client behavior
When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish a connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish a connection, the remaining clients are not affected by this failure.
Delivery
When the Kafka TA sends a message, it is first put into persistent storage, and then sent through the producer API to a Kafka broker. Once the Kafka platform sends a confirmation that the message was received, the Kafka TA marks the Kafka platform’s acknowledgement that the message was delivered in the persistent storage. These two steps implement a delivery guarantee. If the message couldn't be sent it will remain stored in the persistent storage. Once the FIXEdge server starts and Kafka connection is reestablished, any unsent messages in the persistent message storage will be processed before other runtime messages. After delivery, messages remain stored in the persistent message storage.
Message ordering:
- FIX messages sent to Kafka via the Producer API are sent in the same order as the messages that FIXEdge sends to the client.
- FIXEdge fetches FIX messages via the Consumer API in the same order that messages arrive at a particular Kafka topic.
- The Kafka TA services discipline is FIFO (First In First Out) in the scope of one Kafka partition (for messages that come from one client).
The connectivity mechanism preserves FIX messages even if critical situations occur, such as:
- FIXEdge server is terminated
- Kafka TA crashes or does not start due to invalid configuration
- FIX Session is terminated non-gracefully
- message broker is down
- network disorder occurs
The Kafka TA provides an at least once delivery guarantee, (as opposed to only once). This means that messages are guaranteed to be delivered at least once, with the possibility of duplicates.
Persistent Message Storage
Kafka TA persistent FIX messages in text format are stored as a file saved to a disk using memory mapping.
Storage is unlimited but should not exceed disk capacity.
All messages passing through the system are kept in the persistent message storage.
The TransportLayer.<KafkaNameTA>.Cleanup parameter provides the capability to cleanup the persistent message storage.
Committing to the Kafka platform
The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.
To choose the best strategy for prioritizing either speed or reliability for commit messages, refer to the table below (options are shown in descending order):
Speed | Reliability |
---|---|
1) 'Auto' | 1) 'Sync' |
2) 'Async' | 2) 'Async' |
3) 'Sync' | 3) 'Auto' |
The three strategies are described in more detail here:
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Description |
---|---|
'Auto' | Commit messages are sent by the Kafka TA to the Kafka platform as configured, for example, at least once during a defined period of time. This parameter is the best option if there are no specific requirements as far as reliability. 'Auto' is also the:
Since commit messages are not sent after every sent message, there is a possibility that duplicate messages will be sent by the Kafka TA. |
'Sync' | For every message read by the Kafka TA's consumer, the Kafka TA sends a commit message is to the corresponding broker confirming the value of the last read offset. The Kafka TA does not read the next message without first sending the commit message. If there is a message sending failure, FIXEdge will notice it immediately. This is the most reliable option because Kafka is constantly updated on the consumer's progress. It is also the slowest method. |
'Async' | The 'Async' option sends a commit message after each message, however, it does so in an asynchronous manner. Due to this, the Kafka TA does not block reading and delegates sending commit messages to an asynchronous thread. Unsuccessfully executed commits do not stop the process and are written into the log. This option is less reliable than the 'Sync' option but more reliable than the 'Auto' option. The number of lost messages might be smaller than the ‘Auto’ option. This option is faster than the 'Sync' option but slower than the 'Auto' option. |
'Sync' and 'Async' are chosen when there is less traffic, more time for processing, and the user wants to increase the reliability of message sending (avoiding duplicates, etc.)
Message handling
The Kafka TA employs the following messaging handling methods:
Sync message handling
- Receiving messages in the Kafka TA from the Kafka platform is regarded as sync messaging.
- Messages are not saved to any storage but become available at the FIXEdge Business Layer.
- Messages arrive at the FIXEdge Business Layer in the same order as they had arrived to the Kafka topic they were fetched from.
Async message handling
- Sending messages from the Kafka TA to the Kafka platform is regarded as async messaging.
- Messages are saved to persistent message storage.
Configuration steps
Given that FIXEdge has already been installed in the user's environment, use the following steps to step up the Kafka TA:
Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:TransportLayer.TransportAdapters = TransportLayer.KafkaTA
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTA
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
- Configure rules for message routing from the Kafka TA.
The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file. For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section. - Restart the FIXEdge server to apply the changes.
Configuring multiple adapter instances
Multiple instances of the Kafka TA can be used during one library instance, inside one instance of FIXEdge.
To run multiple instances of the Kafka TA, each new instance must be assigned a new name and new session names. The name assigned to the new adapter instance must be consistent across parameters relating to that instance. New session names must be unique across all existing instances of the adapter. Topic names do not need to be unique.
TransportLayer.TransportAdapters = TransportLayer.Kafka1, TransportLayer.Kafka2 TransportLayer.Kafka1.Description = Kafka Transport Adaptor #1 TransportLayer.Kafka1.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.Kafka1.Sessions = Kafka_1 TransportLayer.Kafka1.Kafka_1.bootstrap.servers = localhost:9092 TransportLayer.Kafka1.Kafka_1.FIXVersion = FIX44 TransportLayer.Kafka1.Kafka_1.Consumer.Commit = Auto TransportLayer.Kafka1.Kafka_1.Consumer.Topics = outputTopic TransportLayer.Kafka1.Kafka_1.Consumer.group.id = ID TransportLayer.Kafka1.Kafka_1.Producer.Topic = topic TransportLayer.Kafka2.Description = Kafka Transport Adaptor #2 TransportLayer.Kafka2.DllName = bin/KafkaTA-vc10-MD-x64_2.dll TransportLayer.Kafka2.Sessions = Kafka_2 TransportLayer.Kafka2.Kafka_2.bootstrap.servers = localhost:9093 TransportLayer.Kafka2.Kafka_2.FIXVersion = FIX44 TransportLayer.Kafka2.Kafka_2.Consumer.Commit = Auto TransportLayer.Kafka2.Kafka_2.Consumer.Topics = outputTopic TransportLayer.Kafka2.Kafka_2.Consumer.group.id = ID TransportLayer.Kafka2.Kafka_2.Producer.Topic = topic
In the above example, there 2 separated instances of Kafka adapters: "Kafka1" and "Kafka2". Session names are unique across all Kafka adapters
SSL Configuration Sample
listeners=PLAINTEXT://:9092,SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem
Configuration parameters
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Property name | Description | Required | Default value | Example |
---|---|---|---|---|
Common properties | ||||
TransportLayer.KafkaTA.Description | Adapter name. Note: It is recommended that this parameter is not empty. This parameter is used in the logging of Kafka events. | N | Kafka Transport Adaptor | |
TransportLayer.KafkaTA.DllName | Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID | Y | bin/KafkaTA-vc10-MD-x64.dll | |
TransportLayer.KafkaTA.AdapterId | The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied This parameter is applicable since FIXEdge C++ version 6.14.0 | N | KAFKA | |
TransportLayer.KafkaTA.Sessions | A comma-separated list of session names. At least one session should be defined. The values from this parameter will be used in the BL description. | Y | ||
TransportLayer.<KafkaNameTA>.Cleanup | Provides the capability to clean the Kafka TA's persistent message storage. Acceptable values:
| N | true | |
Kafka specific parapameters | ||||
TransportLayer.KafkaTA.reconnect.backoff.ms | Delay after which the Kafka TA starts attempting to re-connect if the connection breaks. | N | 100 | |
TransportLayer.KafkaTA.reconnect.backoff.max.ms | Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks. | N | 10,000 | |
Session properties | ||||
TransportLayer.KafkaTA.<Session>.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
Example use cases:
| N | FIX44 | |
TransportLayer.KafkaTA.<Session>.ConnectTime | Scheduled time to connect to a Client The value should be in the cron time string format Local time zones will be used | N; Required if DisconnectTime parameter (see below) is configured | If a value is not specified the session is not using a schedule. | |
TransportLayer.KafkaTA.<Session>.DisconnectTime | Scheduled time to disconnect from a Client The value should be in the cron time string format Local time zones will be used | N; Required if ConnectTime parameter (see above) is configured | If a value is not specified the session is not using a schedule. | |
TransportLayer.KafkaTA.<Session>.DaysOff | This property is available since the FIXEdge C++ 6.16.0 version. The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied. The DaysOff property must be specified in the CRON format. Several CRON expressions separated by the ";" can be defined as value for the DaysOff property. If the DaysOff property is specified with the incorrect value for the Kafka TA session, then this Kafka TA session won't be started, and the following ERROR will be logged: ERROR Session '<Session>': [Consumer/Producer]: <error_message> | N | - | 0 0 9 * * 2-6 |
TransportLayer.KafkaTA.<Session>.Serializer | Serializer name. Acceptable values:
Serializer tasks can be set from an external plugin. Format: TestSerializer:KafkaSerializer, where:
ID values are set by the Plugin developer. | N | Raw | |
TransportLayer.KafkaTA.<Session>.bootstrap.servers | An initial list of brokers. | Y | ||
Secure connection properties | ||||
TransportLayer.KafkaTA.<Session>.sasl.username | This parameter only applies and is required if using PLAIN authentication. | N | johndoe | |
TransportLayer.KafkaTA.<Session>.sasl.password | This parameter only applies and is required if using PLAIN authentication. | N | password1234 | |
TransportLayer.KafkaTA.<Session>.ssl.key.location | File or directory path to SSL private key. This parameter only applies and is required if using SSL certificate authentication. | N | D:/SSL/kafka01.pem | |
TransportLayer.KafkaTA.<Session>.ssl.key.password | This parameter only applies and is required if using SSL certificate authentication. | N | ||
TransportLayer.KafkaTA.<Session>.ssl.ca.location | File or directory path to CA certificate(s) for verifying the broker's key. The parameter is required if the security protocol is SSL. | N | D:/SSL/root.pem | |
TransportLayer.KafkaTA.<Session>.sasl.mechanism | Valid values:
| N | If a value is not specified user authentication is not applied. | |
TransportLayer.KafkaTA.<Session>.security.protocol | The protocol used to communicate with brokers. Valid values:
| N | PLAINTEXT | |
TransportLayer.KafkaTA.<Session>.sasl.kerberos.service.name | Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as. | N | ||
Consumer properties | ||||
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Commit mode. Acceptable values:
For more information see Sending commit messages to the Kafka platform. | N | Auto | |
TransportLayer.KafkaTA.<Session>.Consumer.group.id | A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy. When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value. If the group.id value is identical across multiple sessions, and topic names are the same across multiple sessions, only one of the sessions is going to receive and process the data. This scenario is permissible when processing messages in parallel. | N | The default value is equal to the <Session> ID. | |
TransportLayer.KafkaTA.<Session>.Consumer.Topics | If a value is not specified, the Consumer is not in use. To use the Consumer, a value must be specified. In this case, use a comma separated list of topics that should be listened by the Consumer. | N | outputTopic | |
Producer properties | ||||
TransportLayer.KafkaTA.<Session>.Producer.KeyTag | If a value is not specified, then a key tag will not be used to fill a key value. To use this parameter, specify a tag number in a FIX message. The value of the tag will be used as a key when the message is sent to Kafka. | N | ||
TransportLayer.KafkaTA.<Session>.Producer.Topic | If a value is not specified, the Producer is not in use. To use the producer, specify the topic that the Producer should send FIX messages to. | N | ||
TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection | Provides the option to reject all further outgoing messages from FIXEdge from being sent by alerting the Producer when messages cannot be delivered to Kafka. Depending on the operating system, a disconnection event may be detected at different times. For example, on Windows, if the Kafka server is not started the disconnection will be detected immediately. However, on Linux, the disconnection will be detected only once a message attempts to send. If the Kafka TA is unable to deliver a message, this generates an OnUndeliveredMessageEvent in the BL. A user can define actions inside this event such as generating a reject message or logging the error. | N | false | |
TransportLayer.KafkaTA.<Session>.Producer.DisconnectionPeriodThreshold | The timeout period after which unsuccessful attempts at sending will be rejected if TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection=true.
| N | 0 |
Configuration sample
The samples below represent the Kafka TA's configuration with the minimal set of parameters required to run the adaptor:
Kafka TA configuration file:
Kafka Business Layer rules:
Authentication Configuration
SSL certificate authentication
To configure SSL authentication, follow these steps:
- Make sure the Kafka broker and adaptor are configured for SSL connection
Set client authentication as "required" in the server.properties file
Provide ssl.key details in the FIXEdge.properties file
SASL_PLAIN authentication
To configure SASL_PLAIN authentication:
Use the following files and corresponding settings to configure the Kafka broker
Use the following file and corresponding settings to configure the Kafka TA:
SASL_SSL authentication
This is a combination of an SSL connection with client authentication and SASL_PLAIN authentication.
To configure SASL_SSL authentication:
Use the following files and corresponding settings to configure the Kafka broker
Use the following file and corresponding settings to configure the Kafka TA:
SASL_GSSAPI authentication
To configure SASL_GSSAPI authentication:
Use the following files and corresponding settings to configure the Kafka broker
Use the following file and corresponding settings to configure the Kafka TA
Logging
Kafka exhaustive logging means that any action with a configuration parameter is logged.
List of logged actions:
- Adapter initialization
- Parsing validation errors
- Configuration parameters
- Sent and received messages
The following levels of logging are used for Kafka:
- ERROR - is always ON
- INFO - can be optionally ON
- WARN - can be optionally ON
- DEBUG - is turned off by default and is used in critical cases
Any configuration event is logged in the "Kafka_TA" log category in the log file via the INFO messages.
The table below specifies the most common logging messages for Kafka.
Log type | Event description | Logging message format |
---|---|---|
ERROR | Client error | "Session '<session_name>': [Consumer/Producer]: <error_message>" |
Outgoing message failed to be queued | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" | |
Incoming message failed to be handled | "Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>" | |
Start operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be started: <description> Reason: Operation cannot be applied" | |
Stop operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be stopped: <description> Reason: Operation cannot be applied" | |
INFO | Client creation / any config parameter change | |
WARN | Kafka TA fails to apply any config parameter | "Session '<session_name>': <warning_message>" |
The outgoing message is rejected because the client is not connected to Kafka | "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA" | |
DEBUG | Creation of a Consumer or Producer | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" |
Adding an outgoing message to the queue | "Session '<session_name>': Producer: The message is put to the outgoing queue: <message>" | |
Removing an outgoing message from the queue | "Session '<session_name>': Producer: Sending the message via the producer: <message>" | |
An incoming message is being received | "Session '<session_name>': Consumer: Receiving the message: <message>" |
Logging setup for saving messages processed by Kafka to a separate file
Kafka TA logs different events with different categories.
- Category 'Kafka_TA' is used for lifecycle, session state change, and errors events
- Categories with Kafka TA session name specified in TransportLayer.KafkaTA.Sessions are used for logging messages passing through the Kafka endpoints.
The following example shows how to save Kafka-related logs for multiple sessions to a separate file FIXEdge1/log/Kafka.log.
In the case of configuration with 3 sessions
TransportLayer.KafkaTA.Sessions = KafkaConsumer, KafkaNewOrderProducer, KafkaCancelOrderProducer
There would be 3 new log categories: KafkaConsumer, KafkaNewOrderProducer, KafkaCancelOrderProducer.
Therefore logging configuration is
Log.KafkaConsumer.Device = File Log.KafkaConsumer.DebugIsOn = true Log.KafkaConsumer.NoteIsOn = true Log.KafkaConsumer.File.Name = ../FIXEdge1/log/Kafka.log Log.KafkaNewOrderProducer.Device = File Log.KafkaNewOrderProducer.DebugIsOn = true Log.KafkaNewOrderProducer.NoteIsOn = true Log.KafkaNewOrderProducer.File.Name = ../FIXEdge1/log/Kafka.log Log.KafkaCancelOrderProducer.Device = File Log.KafkaCancelOrderProducer.DebugIsOn = true Log.KafkaCancelOrderProducer.NoteIsOn = true Log.KafkaCancelOrderProducer.File.Name = ../FIXEdge1/log/Kafka.log
And configuration for routing life cycle events to the same file as Kafka sessions
Log.Kafka_TA.Device = File Log.Kafka_TA.DebugIsOn = true Log.Kafka_TA.NoteIsOn = true Log.Kafka_TA.File.Name = ../FIXEdge1/log/Kafka.log
For example, KafkaConsumer session log:
FIXEdge replace SOHs delimiters in FIX message with pipes '|' for better visibility in these logs
Error messages in Kafka TA are logged under DEBUG severity.
Additional information about logging configuration can be found there:
- FIXEdge logs format
- How to redirect FIX Antenna and/or FIXEdge logging to Syslog instead of files
- How to divide different categories and severities of log files into different files in the Logging section
Scheduling
The schedule settings regulate the work of Kafka TA Producer/Consumer. The schedule can be configured as a cron expression or directly tied up to FIXEdge.
If the cron expression is used for scheduling, the Producer/Consumer starts using Client's connection to the Kafka platform at a scheduled time. At a scheduled disconnection time, the Producer/Consumer stops using the Client's connection to Kafka.
If the schedule settings are directly tied up to FIXEdge, the Producer/Consumer starts using Client's connection to the Kafka platform at the moment when the FIXEdge server starts. When the FIXEdge server stops, the Producer/Consumer stops using the Client's connection to Kafka.
Scheduling is represented by the following properties:
- TransportLayer.KafkaTA.<Session>.ConnectTime
- TransportLayer.KafkaTA.<Session>.DisconnectTime
Refer to the table in the Configuration parameters section above.
Start and Stop operations for Kafka TA
Available since FIXEdge 6.13.0 release.
FIXEdge C++ provides Start operation for Disconnected Kafka TA sessions and Stop operation for Connecting and Running Kafka TA sessions on request from the subscribed monitoring application.
Start operation
Start operation can be applied to Kafka TA sessions in the Disconnected state.
FIXEdge starts a particular Kafka TA session on the request from the subscribed monitoring application and if the operation has been applied replies with a successful response.
In case when there is no session with the requested name FIXEdge logs an error and replies to the subscribed monitoring application with an error response below:
Can't find session with id <session_name>
Stop operation
Stop operation can be applied to Kafka TA sessions in the Running and Connecting states.
FIXEdge stops a particular Kafka TA session on the request from the subscribed monitoring application and if the operation has been applied replies with a successful response.
In case when there is no session with the requested name FIXEdge logs an error and replies to the subscribed monitoring application with an error response below:
Can't find session with id <session_name>
The FIXEdge behavior in the case when Kafka TA Producer session is in the Disconnected state is determined by the RejectMessageWhileNoConnection parameter.
Custom serialization
When FIX messages are transferred in raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values has been converted to strings.
A serialized format of communication is fully supported by the Kafka TA. Custom serialization implies specially designed external code and includes the serialization of all Kafka message parts:
- Key
- Value
- Header(-s)
FIXEdge is supplied with a serialization plugin b2b_fixserver_kafka_test_serializer.dll (or on Linux libb2b_fixserver_kafka_test_serializer.so) that can be find in <FIXEdge installation>/plugins directory
Serialization on sending
When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to a particular Kafka topic via the Producer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
Serialization on receiving
When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via the Consumer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
The message is deserialized via the custom serializer.
Message key processing
In the case of default serialization, the Kafka TA produces a key using the value of the tag configured in the "KeyTag" parameter. The specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.
In the case of custom serialization, the custom plugin produces the key that is passed to the Kafka platform along with the message sent to the particular Kafka topic.
Custom partitioning
If custom partitioning is configured, the custom plugin produces the partition ID based on the tag values of a FIX message.
Message Content Wrapping
When FIXEdge sends a message to a particular Kafka topic and the BL Client is configured to work with XMLData (213), two flows are possible:
- The XMLData (213) field exists in the FIX message and is not empty.
In this case, the XML message extracted from the XMLData (213) field is sent to the particular Kafka topic via the Producer API. - The XMLData (213) field does not exist in the FIX message or is empty.
In this case, the corresponding error is sent to the Business Layer and nothing is sent to a Kafka topic via the Producer API.
When Kafka sends an XML message to FIXEdge and the BL Client is configured to work with XMLData (213), the FIX message with 35=n (XML message, JSON plain text, etc.) is received by FIXEdge, the XMLData (213) field is filled in with the received XML message, and the XMLDataLen (212) field is filled in with the received message length.
Kafka Adapter Monitoring
The Kafka Transport Adapter is integrated with a FIXICC monitoring feature.
Information about the configured TA's session parameters list and dynamic state is sent to the subscribed monitoring application when FIXEdge starts.
When the TA session dynamic state is changed FIXEdge sends the update to the subscribed monitoring application.
The adapter provides information about the following indicators:
Name shown | Description |
---|---|
Messages received during the last session | Number of incoming messages from Kafka handled per client, beginning from the client’s current working session |
Messages received total | Number of incoming messages from Kafka handled per client, beginning when FIXEdge started |
Messages sent during the last session | Number of outgoing messages to Kafka handled per client, beginning from the client’s current working session |
Messages sent total | Number of outgoing messages to Kafka handled per client, beginning when FIXEdge started |
Messages with errors during the last session | Number of errors related to the Kafka TA, beginning from the client’s current working session |
Messages with errors total | Number of errors related to the Kafka TA, beginning when FIXEdge started |
Producer’s queue depth | The size of the queue per producer |
Status | Status of the consumer and producer connections with Kafka |
Time of the last successful message sending | Time of the last successful message sending in UTC time zone |
Time of the last successful message receiving | Time of the last successful message receiving in UTC time zone |
FIXEdge Kafka TA monitors both consumer and producer connections.
When the consumer disconnects, Kafka can react to this by sending messages to other consumers in the same group as the disconnected consumer.
The lifetime of the session is defined by parameters: TransportLayer.KafkaTA.<Session>.ConnectTime and TransportLayer.KafkaTA.<Session>.DisconnectTime and start/stop operations provided by monitoring application for particular Kafka TA session.
Troubleshooting
Transport Adaptor 'KafkaTA' has failed to initialize
If the adapter fails to start up, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Example
In this example, the path specified to the adapter dll is the incorrect one:
2020-11-25 15:51:39,019 UTC INFO [FixLayer_Version] 20936 Module 'FixLayer' version 0.2.1.5 was loaded. 2020-11-25 15:51:39,019 UTC INFO [Engine] 20936 Custom AdminApplication is registered. 2020-11-25 15:51:39,024 UTC ERROR [TransportLayer] 20936 Transport Adaptor 'KafkaTA' has failed to initialize: Error loading DLL 'd:\FIXEdge\bin\KafkaTA-vc10-MD-x64.dll'. . The specified module could not be found. (Error code = 126) 2020-11-25 15:51:39,024 UTC INFO [TransportLayer] 20936 Module 'TransportLayer' version 0.1.1.5 was loaded.
The session wasn't created
If one session doesn’t load successfully during start-up while others are created successfully, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
Example
In this example, two sessions ('Kafka' and 'Kafka2') are configured, and one was not created successfully.
In the first session, the wrong parameter, 'protocol', was used instead of the correct parameter, 'security.protocol', and the session was not created.
In the log file, an ERROR message saying, "Failed to set protocol..." appears instead of an INFO message saying, "Session 'Kafka': was created successfully".
2020-11-25 16:03:00,882 UTC INFO [Kafka_TA] 7504 process logon for session 'Kafka'
2020-11-25 16:03:00,882 UTC INFO [Kafka_TA] 7504 Session 'Kafka': Is about to be created with parameters:
Consumer.Topics = outputTopic
Consumer.group.id = ID
FIXVersion = FIX44
Producer.Topic = topic
Serializer = Raw
bootstrap.servers = localhost:9092
protocol = SSL
2020-11-25 16:03:00,894 UTC ERROR [Kafka_TA] 7504 Failed to set protocol: No such configuration property: "protocol"
2020-11-25 16:03:00,894 UTC INFO [Kafka_TA] 7504 process logon for session 'Kafka2'
2020-11-25 16:03:00,895 UTC INFO [Kafka_TA] 7504 Session 'Kafka2': Is about to be created with parameters:
Consumer.Topics = topic
Consumer.group.id = ID
FIXVersion = FIX44
Producer.Topic = outputTopic
Serializer = Raw
bootstrap.servers = localhost:9092
2020-11-25 16:03:00,922 UTC INFO [CC_Layer] 7504 Client Kafka2 has logged in
2020-11-25 16:03:00,936 UTC INFO [Kafka_TA] 7504 Session 'Kafka2': was created successfully
2020-11-25 16:03:00,936 UTC INFO [Kafka_TA] 7504 Kafka Adaptor v.0.1 started.
Adapter not sending or receiving messages
If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connection.
If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages. If the default level of logging doesn't explain the reason, you need to enable a deeper level of logging.
To establish what the error is, you must enable the DEBUG logging as follows:
- Open the FIXEdge.properties file
- Set the parameter Log.DebugIsOn = True
Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.
Example
In this example, the wrong server port, 9093 (for SSL connection), is configured instead of the correct one, 9092 (for PLAINTEXT connection).
With DEBUG logging enabled, we can see that the adapter is permanently trying to connect to the server, as well as an error message specifying the configuration issue.
2020-11-25 16:22:54,533 UTC DEBUG [Kafka_TA] 5808 Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 197ms in state APIVERSION_QUERY) 2020-11-25 16:22:54,534 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:22:54,534 UTC DEBUG [Kafka_TA] 22440 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1086ms in state APIVERSION_QUERY) 2020-11-25 16:22:54,761 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:22:55,033 UTC DEBUG [Kafka_TA] 24160 Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:07,261 UTC DEBUG [Kafka_TA] 22440 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed) 2020-11-25 16:23:07,261 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:15,441 UTC DEBUG [Kafka_TA] 24160 Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:15,442 UTC DEBUG [Kafka_TA] 5808 Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)
Messages are not delivered with the log stating: "Message processing failed"
The Kafka TA consumer receives a message but the message isn't delivered anywhere (for example, in a FIX session) and FixEdge.log states:
"No BL rules found for message with ClientID '<client id>', executing DefaultRule."
- "
Message processing failed
"
2020-11-25 16:35:36,545 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Receiving the message: 8=FIX.4.49=15235=D49=SC56=FE34=252=20201125-16:35:35.936212=4213=test11=BTC/USD_LimitB_GTC55=BTC/USD54=160=20160401-15:15:58.08038=0.1540=244=630.159=110=078 2020-11-25 16:35:36,545 UTC DEBUG [BL_RoutingTable] 21964 No BL rules found for message with ClientID 'Kafka2', executing DefaultRule. 2020-11-25 16:35:36,546 UTC DEBUG [CC_Layer] 21964 BL has processed a message. Number of client IDs for delivery :0. Number or FIX sessions for delivery :0.. Number or sources identifiers for delivery :0. 2020-11-25 16:35:36,546 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Message processing failed
In order for this information to be recorded in the logs, the user should enable DEBUG logging by setting Log.DebugIsOn = true in FIXEdge.properties.
Root cause
This happens if the business logic is not configured for handling the message from the adapter or if message processing doesn't fit rule conditions.
Solution
Check if there is a logic processing message from the Kafka TA Consumer.
For example:BL_Config.xml<Rule Description="Route order events from Kafka Consumer Adapter to active FIX-session"> <Source> <Client Name="KafkaConsumer"/> </Source> <Action> <Send> <FixSession/> </Send> </Action> </Rule>
- Check whether the message was filtered by rule conditions.
If the message was filtered, extend the rules/conditions.
Kafka server can't start because all log dirs have failed
Starting the kafka-server-start script has failed with the following error:
ERROR Shutdown broker because all log dirs in d:\tmp\kafka-logs have failed
Root cause
One of the possible reasons is that the Kafka logs are corrupted due to an unexpected shutdown.
Solution
Backup the Kafka and zookeeper logs according to the path in the error message (for this example it's d:\tmp\) for future investigation if needed.
Clean log dir by the specified log directories and restart the service.