The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++ |
The Kafka Transport Adapter (hereinafter Kafka TA) provides the FIXEdge server connectivity to the Kafka distributed streaming platform.
Internally, its implementation uses the Kafka Consumer and Producer APIs. The broker API version must be 0.10.0.0 or higher. v version >= 0.10.0.0ersion >= 0.10.0.0
The Kafka TA acts as a FIXEdge plugin and establishes a connection to a Kafka single instance or a Kafka cluster. The adapter supports TLS connectivity.
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
The Kafka TA performance is close to Kafka's native speed of processing. The average reading/writing rate of FIX messages is 30,000 per second or higher. This number refers to the overall throughput capacity for the FIXEdge server with the Kafka TA (from FIX session to Kafka server).
The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.
Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge.
During FIXEdge server termination, the Kafka TA employs graceful termination, ensuring that message processing operations are executed correctly, completely, and without message loss. When the FIXEdge server fails and nongraceful termination occurs, messages will be retrieved from persistent message storage.
FIFO (First In First Out) ordering is guaranteed, meaning that the order in which messages were sent and received will be maintained upon the termination of the server instance.
The Kafka TA is intended to communicate FIX messages to other applications using the Kafka streaming platform as middleware.
Communication is provided between the FIXEdge server and a Kafka broker or cluster on the FIXEdge side.
The Kafka TA allows integration with Kafka via the Consumer/Producer API. FIXEdge clients, configured either as producers or consumers, push and pull messages to/from brokers housed in Kafka that organize and store messages according to topics. Brokers commit messages to disk storage for a customizable period of time or space, configurable according to the topic. Topics are further broken down into partitions, with each partition acting as a separate commit log. A broker administrator keeps track of consumers reading partitions by assigning offsets, thereby providing guaranteed-ordering. These features facilitate easy scaling and event replay, as well as the building of fault-tolerant systems.
The Kafka TA differentiates from other Transport Adapters in providing custom serialization/deserialization and durable message storage.
The schema below represents an interaction between FIXEdge and a customer's infrastructure via the Kafka solution.
When several clients (either one Producer or one Consumer, or both one Producer and one Consumer) are configured for the Kafka TA, all of them establish a connection to the Kafka platform when FIXEdge starts its work.
If one of the clients fails to establish a connection, the remaining clients are not affected by this failure.
When the Kafka TA sends a message, it is first put into persistent storage, and then sent through the producer API to a Kafka broker. Once the Kafka platform sends a confirmation that the message was received, the Kafka TA marks the Kafka platform’s acknowledgement that the message was delivered in the persistent storage. These two steps implement a delivery guarantee. If the message couldn't be sent it will remain stored in the persistent storage. Once the FIXEdge server starts and Kafka connection is reestablished, any unsent messages in the persistent message storage will be processed before other runtime messages. After delivery, messages remain stored in the persistent message storage.
Message ordering:
The connectivity mechanism preserves FIX messages even if critical situations occur, such as:
The Kafka TA provides an at least once delivery guarantee, (as opposed to only once). This means that messages are guaranteed to be delivered at least once, with the possibility of duplicates.
Kafka TA persistent FIX messages in text format are stored as a file saved to a disk using memory mapping.
Storage is unlimited but should not exceed disk capacity.
All messages passing through the system are kept in the persistent message storage.
The TransportLayer.<KafkaNameTA>.Cleanup parameter provides the capability to cleanup the persistent message storage.
The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.
To choose the best strategy for prioritizing either speed or reliability for commit messages, refer to the table below (options are shown in descending order):
Speed | Reliability |
---|---|
1) 'Auto' | 1) 'Sync' |
2) 'Async' | 2) 'Async' |
3) 'Sync' | 3) 'Auto' |
The three strategies are described in more detail here:
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Description |
---|---|
'Auto' | Commit messages are sent by the Kafka TA to the Kafka platform as configured, for example, at least once during a defined period of time. This parameter is the best option if there are no specific requirements as far as reliability. 'Auto' is also the:
Since commit messages are not sent after every sent message, there is a possibility that duplicate messages will be sent by the Kafka TA. |
'Sync' | For every message read by the Kafka TA's consumer, the Kafka TA sends a commit message is to the corresponding broker confirming the value of the last read offset. The Kafka TA does not read the next message without first sending the commit message. If there is a message sending failure, FIXEdge will notice it immediately. This is the most reliable option because Kafka is constantly updated on the consumer's progress. It is also the slowest method. |
'Async' | The 'Async' option sends a commit message after each message, however, it does so in an asynchronous manner. Due to this, the Kafka TA does not block reading and delegates sending commit messages to an asynchronous thread. Unsuccessfully executed commits do not stop the process and are written into the log. This option is less reliable than the 'Sync' option but more reliable than the 'Auto' option. The number of lost messages might be smaller than the ‘Auto’ option. This option is faster than the 'Sync' option but slower than the 'Auto' option. |
'Sync' and 'Async' are chosen when there is less traffic, more time for processing, and the user wants to increase the reliability of message sending (avoiding duplicates, etc.) |
The Kafka TA employs the following messaging handling methods:
Given that FIXEdge has already been installed in the user's environment, use the following steps to step up the Kafka TA:
Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:
TransportLayer.TransportAdapters = TransportLayer.KafkaTA |
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTA |
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:
TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic |
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
Multiple instances of the Kafka TA can be used during one library instance, inside one instance of FIXEdge.
To run multiple instances of the Kafka TA, each new instance must be assigned a new name and new session names. The name assigned to the new adapter instance must be consistent across parameters relating to that instance. New session names must be unique across all existing instances of the adapter. Topic names do not need to be unique.
TransportLayer.TransportAdapters = TransportLayer.Kafka1, TransportLayer.Kafka2 TransportLayer.Kafka1.Description = Kafka Transport Adaptor #1 TransportLayer.Kafka1.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.Kafka1.Sessions = Kafka_1 TransportLayer.Kafka1.Kafka_1.bootstrap.servers = localhost:9092 TransportLayer.Kafka1.Kafka_1.FIXVersion = FIX44 TransportLayer.Kafka1.Kafka_1.Consumer.Commit = Auto TransportLayer.Kafka1.Kafka_1.Consumer.Topics = outputTopic TransportLayer.Kafka1.Kafka_1.Consumer.group.id = ID TransportLayer.Kafka1.Kafka_1.Producer.Topic = topic TransportLayer.Kafka2.Description = Kafka Transport Adaptor #2 TransportLayer.Kafka2.DllName = bin/KafkaTA-vc10-MD-x64_2.dll TransportLayer.Kafka2.Sessions = Kafka_2 TransportLayer.Kafka2.Kafka_2.bootstrap.servers = localhost:9093 TransportLayer.Kafka2.Kafka_2.FIXVersion = FIX44 TransportLayer.Kafka2.Kafka_2.Consumer.Commit = Auto TransportLayer.Kafka2.Kafka_2.Consumer.Topics = outputTopic TransportLayer.Kafka2.Kafka_2.Consumer.group.id = ID TransportLayer.Kafka2.Kafka_2.Producer.Topic = topic |
In the above example, there 2 separated instances of Kafka adapters: "Kafka1" and "Kafka2". Session names are unique across all Kafka adapters
listeners=PLAINTEXT://:9092,SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 |
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem |
Connection to the Kafka platform can be established when the Kafka TA is properly configured with the Kafka IP address.
Property name | Description | Required | Default value | Example | |||
---|---|---|---|---|---|---|---|
Common properties | |||||||
TransportLayer.KafkaTA.Description | Adapter name. Note: It is recommended that this parameter is not empty. This parameter is used in the logging of Kafka events. | N | Kafka Transport Adaptor | ||||
TransportLayer.KafkaTA.DllName | Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID | Y | bin/KafkaTA-vc10-MD-x64.dll | ||||
TransportLayer.KafkaTA.AdapterId | The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied
| N | KAFKA | ||||
TransportLayer.KafkaTA.Sessions | A comma-separated list of session names. At least one session should be defined. The values from this parameter will be used in the BL description. | Y | |||||
TransportLayer.<KafkaNameTA>.Cleanup | Provides the capability to clean the Kafka TA's persistent message storage. Acceptable values:
| N | true | ||||
Kafka specific parapameters | |||||||
TransportLayer.KafkaTA.reconnect.backoff.ms | Delay after which the Kafka TA starts attempting to re-connect if the connection breaks. | N | 100 | ||||
TransportLayer.KafkaTA.reconnect.backoff.max.ms | Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks. | N | 10,000 | ||||
Session properties | |||||||
TransportLayer.KafkaTA.<Session>.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
Example use cases:
| N | FIX44 | ||||
TransportLayer.KafkaTA.<Session>.ConnectTime | Scheduled time to connect to a Client The value should be in the cron time string format Local time zones will be used | N; Required if DisconnectTime parameter (see below) is configured | If a value is not specified the session is not using a schedule. | ||||
TransportLayer.KafkaTA.<Session>.DisconnectTime | Scheduled time to disconnect from a Client The value should be in the cron time string format Local time zones will be used | N; Required if ConnectTime parameter (see above) is configured | If a value is not specified the session is not using a schedule. | ||||
TransportLayer.KafkaTA.<Session>.DaysOff |
The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied. The DaysOff property must be specified in the CRON format. Several CRON expressions separated by the ";" can be defined as value for the DaysOff property.
| N | - | 0 0 9 * * 2-6 | |||
TransportLayer.KafkaTA.<Session>.Serializer | Serializer name. Acceptable values:
Serializer tasks can be set from an external plugin. Format: TestSerializer:KafkaSerializer, where:
ID values are set by the Plugin developer. | N | Raw | ||||
TransportLayer.KafkaTA.<Session>.bootstrap.servers | An initial list of brokers. | Y | |||||
Secure connection properties | |||||||
TransportLayer.KafkaTA.<Session>.sasl.username | This parameter only applies and is required if using PLAIN authentication.
| N | johndoe | ||||
TransportLayer.KafkaTA.<Session>.sasl.password | This parameter only applies and is required if using PLAIN authentication.
| N | password1234 | ||||
TransportLayer.KafkaTA.<Session>.ssl.key.location | File or directory path to SSL private key. This parameter only applies and is required if using SSL certificate authentication. | N | D:/SSL/kafka01.pem | ||||
TransportLayer.KafkaTA.<Session>.ssl.key.password | This parameter only applies and is required if using SSL certificate authentication.
| N | |||||
TransportLayer.KafkaTA.<Session>.ssl.ca.location | File or directory path to CA certificate(s) for verifying the broker's key. The parameter is required if the security protocol is SSL. | N | D:/SSL/root.pem | ||||
TransportLayer.KafkaTA.<Session>.sasl.mechanism | Valid values:
| N | If a value is not specified user authentication is not applied. | ||||
TransportLayer.KafkaTA.<Session>.security.protocol | The protocol used to communicate with brokers. Valid values:
| N | PLAINTEXT | ||||
TransportLayer.KafkaTA.<Session>.sasl.kerberos.service.name | Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as. | N | |||||
Consumer properties | |||||||
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Commit mode. Acceptable values:
For more information see Sending commit messages to the Kafka platform. | N | Auto | ||||
TransportLayer.KafkaTA.<Session>.Consumer.group.id | A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy. When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value. If the group.id value is identical across multiple sessions, and topic names are the same across multiple sessions, only one of the sessions is going to receive and process the data. This scenario is permissible when processing messages in parallel. | N | The default value is equal to the <Session> ID. | ||||
TransportLayer.KafkaTA.<Session>.Consumer.Topics | If a value is not specified, the Consumer is not in use. To use the Consumer, a value must be specified. In this case, use a comma separated list of topics that should be listened by the Consumer. | N | outputTopic | ||||
Producer properties | |||||||
TransportLayer.KafkaTA.<Session>.Producer.KeyTag | If a value is not specified, then a key tag will not be used to fill a key value. To use this parameter, specify a tag number in a FIX message. The value of the tag will be used as a key when the message is sent to Kafka. | N | |||||
TransportLayer.KafkaTA.<Session>.Producer.Topic | If a value is not specified, the Producer is not in use. To use the producer, specify the topic that the Producer should send FIX messages to. | N | |||||
TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection | Provides the option to reject all further outgoing messages from FIXEdge from being sent by alerting the Producer when messages cannot be delivered to Kafka. Depending on the operating system, a disconnection event may be detected at different times. For example, on Windows, if the Kafka server is not started the disconnection will be detected immediately. However, on Linux, the disconnection will be detected only once a message attempts to send.
| N | false | ||||
TransportLayer.KafkaTA.<Session>.Producer.DisconnectionPeriodThreshold | The timeout period after which unsuccessful attempts at sending will be rejected if TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection=true.
| N | 0 |
The samples below represent the Kafka TA's configuration with the minimal set of parameters required to run the adaptor:
Kafka TA configuration file:
#------------------------------------------------------------ # Transport Layer Section #------------------------------------------------------------ # A comma separated list of identifiers of Transport Adapters should be loaded. TransportLayer.TransportAdapters = TransportLayer.KafkaTA #------------------------------------------------------------ # The Kafka Transport Adaptor (KafkaTA) configuration file. #------------------------------------------------------------ # Adaptor's name. Property is required TransportLayer.KafkaTA.Description = Kafka Transport Adaptor # Contains path and name of the Kafka adaptor dll. Property is required TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll # List of adaptor sessions TransportLayer.KafkaTA.Sessions = Kafka # The following are parameters for each session TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic |
Kafka Business Layer rules:
<FIXEdge> <BusinessLayer> <Rule> <Source> <FixSession SenderCompID="SC" TargetCompID="FE"/> </Source> <Action> <Send><Client Name="Kafka"/></Send> </Action> </Rule> <Rule> <Source> <Client Name="Kafka"/> </Source> <Action> <Send> <FixSession SenderCompID="FE" TargetCompID="SC"/> </Send> </Action> </Rule> <DefaultRule> <Action> <DoNothing/> </Action> </DefaultRule> </BusinessLayer> </FIXEdge> |
To configure SSL authentication, follow these steps:
Set client authentication as "required" in the server.properties file
listeners=PLAINTEXT://:9092,SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth = required |
Provide ssl.key details in the FIXEdge.properties file
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem TransportLayer.KafkaTA.Kafka.ssl.key.location = D:/SSL/kafka01.pem TransportLayer.KafkaTA.Kafka.ssl.key.password = 123456 |
To configure SASL_PLAIN authentication:
Use the following files and corresponding settings to configure the Kafka broker
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 advertised.listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 sasl.enabled.mechanisms=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true sasl.jaas.config= \ org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret"; |
Use the following file and corresponding settings to configure the Kafka TA:
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.security.protocol = SASL_PLAINTEXT TransportLayer.KafkaTA.Kafka.sasl.mechanism=PLAIN TransportLayer.KafkaTA.Kafka.sasl.username=admin TransportLayer.KafkaTA.Kafka.sasl.password=admin-secret |
This is a combination of an SSL connection with client authentication and SASL_PLAIN authentication.
To configure SASL_SSL authentication:
Use the following files and corresponding settings to configure the Kafka broker
listeners=PLAINTEXT://:9092,SASL_SSL://:9093 advertised.listeners=PLAINTEXT://:9092,SASL_SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth = required sasl.enabled.mechanisms=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true sasl.jaas.config= \ org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" \ password="admin-secret" \ user_admin="admin-secret"; |
Use the following file and corresponding settings to configure the Kafka TA:
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem TransportLayer.KafkaTA.Kafka.ssl.key.location = D:/SSL/kafka01.pem TransportLayer.KafkaTA.Kafka.ssl.key.password = 123456 TransportLayer.KafkaTA.Kafka.sasl.mechanism=PLAIN TransportLayer.KafkaTA.Kafka.sasl.username=admin TransportLayer.KafkaTA.Kafka.sasl.password=admin-secret |
To configure SASL_GSSAPI authentication:
Use the following files and corresponding settings to configure the Kafka broker
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 advertised.listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 sasl.enabled.mechanisms=GSSAPI sasl.mechanism.inter.broker.protocol=GSSAPI sasl.kerberos.service.name=kafka listener.name.sasl_plaintext.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ storeKey=true \ keyTab="D:/SSL/kafka_server.keytab" \ principal="kafka/kafka_server@example.com"; |
Use the following file and corresponding settings to configure the Kafka TA
TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.security.protocol = SASL_GSSAPI TransportLayer.KafkaTA.Kafka.sasl.mechanism=GSSAPI TransportLayer.KafkaTA.Kafka.sasl.kerberos.service.name=kafka |
Kafka exhaustive logging means that any action with a configuration parameter is logged.
List of logged actions:
The following levels of logging are used for Kafka:
Any configuration event is logged in the "Kafka_TA" log category in the log file via the INFO messages.
The table below specifies the most common logging messages for Kafka.
Log type | Event description | Logging message format |
---|---|---|
ERROR | Client error | "Session '<session_name>': [Consumer/Producer]: <error_message>" |
Outgoing message failed to be queued | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" | |
Incoming message failed to be handled | "Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>" | |
Start operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be started: <description> Reason: Operation cannot be applied" | |
Stop operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be stopped: <description> Reason: Operation cannot be applied" | |
INFO | Client creation / any config parameter change | |
WARN | Kafka TA fails to apply any config parameter | "Session '<session_name>': <warning_message>" |
The outgoing message is rejected because the client is not connected to Kafka | "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA" | |
DEBUG | Creation of a Consumer or Producer | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" |
Adding an outgoing message to the queue | "Session '<session_name>': Producer: The message is put to the outgoing queue: <message>" | |
Removing an outgoing message from the queue | "Session '<session_name>': Producer: Sending the message via the producer: <message>" | |
An incoming message is being received | "Session '<session_name>': Consumer: Receiving the message: <message>" |
Kafka TA logs different events with different categories.
The following example shows how to save Kafka-related logs for multiple sessions to a separate file FIXEdge1/log/Kafka.log.
In the case of configuration with 3 sessions
TransportLayer.KafkaTA.Sessions = KafkaConsumer, KafkaNewOrderProducer, KafkaCancelOrderProducer |
There would be 3 new log categories: KafkaConsumer, KafkaNewOrderProducer, KafkaCancelOrderProducer.
Therefore logging configuration is
Log.KafkaConsumer.Device = File Log.KafkaConsumer.DebugIsOn = true Log.KafkaConsumer.NoteIsOn = true Log.KafkaConsumer.File.Name = ../FIXEdge1/log/Kafka.log Log.KafkaNewOrderProducer.Device = File Log.KafkaNewOrderProducer.DebugIsOn = true Log.KafkaNewOrderProducer.NoteIsOn = true Log.KafkaNewOrderProducer.File.Name = ../FIXEdge1/log/Kafka.log Log.KafkaCancelOrderProducer.Device = File Log.KafkaCancelOrderProducer.DebugIsOn = true Log.KafkaCancelOrderProducer.NoteIsOn = true Log.KafkaCancelOrderProducer.File.Name = ../FIXEdge1/log/Kafka.log |
And configuration for routing life cycle events to the same file as Kafka sessions
Log.Kafka_TA.Device = File Log.Kafka_TA.DebugIsOn = true Log.Kafka_TA.NoteIsOn = true Log.Kafka_TA.File.Name = ../FIXEdge1/log/Kafka.log |
For example, KafkaConsumer session log:
2020-12-07 11:12:35,093 UTC DEBUG [KafkaConsumer] 20132 The message content: headers: , key: , partition Id: 0, value: 8=FIX.4.4|9=146|35=D|49=FIXEDGE|56=Sender|34=2|52=20201207-11:12:35.065|11=BTC/USD_LimitB_GTC|55=BTC/USD|54=1|60=20160401-15:15:58.080|38=0.15|40=2|44=630.1|59=1|10=226| |
FIXEdge replace SOHs delimiters in FIX message with pipes '|' for better visibility in these logs
2021-06-09 17:44:49,522 UTC INFO [Kafka_TA] 34188 Kafka Adaptor v.0.1 started. |
Error messages in Kafka TA are logged under DEBUG severity. |
Additional information about logging configuration can be found there:
The Kafka Transport Adapter provides detailed debug-level logging for Kafka Producers, including the sequence number (SeqNum) of FIX messages. This functionality aids in debugging and message traceability, allowing users to track individual FIX messages as they are processed.
Specifically, the adapter logs the following messages at the DEBUG level:
Successful Message Delivery: This message indicates successful delivery of a FIX message to the Kafka platform, with the SeqNum included for verification.
"Session '" + id_ + "': Producer: Message (SeqNum: {SeqNum}) delivered successfully" |
Message Delivery Error: This message is logged when a delivery error occurs, providing the SeqNum of the affected message and a description of the error for troubleshooting.
Session '<session_name>': Producer: Message (SeqNum: {SeqNum}) delivery error: <error_description> |
These log messages offer valuable insights into message flow and delivery status, particularly for debugging complex scenarios or investigating message loss. To utilize these logs, ensure DEBUG level logging is enabled in your FIXEdge.properties
file.
(Available starting from version 6.11.0 of FIXEdge C++) |
The schedule settings regulate the work of Kafka TA Producer/Consumer. The schedule can be configured as a cron expression or directly tied up to FIXEdge.
If the cron expression is used for scheduling, the Producer/Consumer starts using Client's connection to the Kafka platform at a scheduled time. At a scheduled disconnection time, the Producer/Consumer stops using the Client's connection to Kafka.
If the schedule settings are directly tied up to FIXEdge, the Producer/Consumer starts using Client's connection to the Kafka platform at the moment when the FIXEdge server starts. When the FIXEdge server stops, the Producer/Consumer stops using the Client's connection to Kafka.
Scheduling is represented by the following properties:
Refer to the table in the Configuration parameters section above.
Available since FIXEdge 6.13.0 release. |
FIXEdge C++ provides Start operation for Disconnected Kafka TA sessions and Stop operation for Connecting and Running Kafka TA sessions on request from the subscribed monitoring application.
Start operation can be applied to Kafka TA sessions in the Disconnected state.
FIXEdge starts a particular Kafka TA session on the request from the subscribed monitoring application and if the operation has been applied replies with a successful response.
In case when there is no session with the requested name FIXEdge logs an error and replies to the subscribed monitoring application with an error response below:
Can't find session with id <session_name> |
Stop operation can be applied to Kafka TA sessions in the Running and Connecting states.
FIXEdge stops a particular Kafka TA session on the request from the subscribed monitoring application and if the operation has been applied replies with a successful response.
In case when there is no session with the requested name FIXEdge logs an error and replies to the subscribed monitoring application with an error response below:
Can't find session with id <session_name> |
The FIXEdge behavior in the case when Kafka TA Producer session is in the Disconnected state is determined by the RejectMessageWhileNoConnection parameter.
When FIX messages are transferred in raw format (i.e. serialization is not applied), the Kafka TA implies a string containing the tag values has been converted to strings.
A serialized format of communication is fully supported by the Kafka TA. Custom serialization implies specially designed external code and includes the serialization of all Kafka message parts:
FIXEdge is supplied with a serialization plugin b2b_fixserver_kafka_test_serializer.dll (or on Linux libb2b_fixserver_kafka_test_serializer.so) that can be find in <FIXEdge installation>/plugins directory |
Serialization on sending
When custom serialization is applied and FIXEdge sends the message to the particular Kafka topic, this message is serialized via the custom serializer and is sent to a particular Kafka topic via the Producer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
When custom serialization is applied and the new message arrives at the particular Kafka topic, this message is received by FIXEdge via the Consumer API. The following DEBUG message is logged in the client log category of the log file:
"The message content:
headers: <headers>,
key: <key>,
partition Id: <partition Id>,
value: <value>"
The message is deserialized via the custom serializer.
In the case of default serialization, the Kafka TA produces a key using the value of the tag configured in the "KeyTag" parameter. The specified key is passed to the Kafka platform along with the message sent to the particular Kafka topic.
In the case of custom serialization, the custom plugin produces the key that is passed to the Kafka platform along with the message sent to the particular Kafka topic.
If custom partitioning is configured, the custom plugin produces the partition ID based on the tag values of a FIX message.
When FIXEdge sends a message to a particular Kafka topic and the BL Client is configured to work with XMLData (213), two flows are possible:
When Kafka sends an XML message to FIXEdge and the BL Client is configured to work with XMLData (213), the FIX message with 35=n (XML message, JSON plain text, etc.) is received by FIXEdge, the XMLData (213) field is filled in with the received XML message, and the XMLDataLen (212) field is filled in with the received message length.
The Kafka Transport Adapter is integrated with a FIXICC monitoring feature.
Information about the configured TA's session parameters list and dynamic state is sent to the subscribed monitoring application when FIXEdge starts. When the TA session dynamic state is changed FIXEdge sends the update to the subscribed monitoring application. |
The adapter provides information about the following indicators:
Name shown | Description |
---|---|
Messages received during the last session | Number of incoming messages from Kafka handled per client, beginning from the client’s current working session |
Messages received total | Number of incoming messages from Kafka handled per client, beginning when FIXEdge started |
Messages sent during the last session | Number of outgoing messages to Kafka handled per client, beginning from the client’s current working session |
Messages sent total | Number of outgoing messages to Kafka handled per client, beginning when FIXEdge started |
Messages with errors during the last session | Number of errors related to the Kafka TA, beginning from the client’s current working session |
Messages with errors total | Number of errors related to the Kafka TA, beginning when FIXEdge started |
Producer’s queue depth | The size of the queue per producer |
Status | Status of the consumer and producer connections with Kafka |
Time of the last successful message sending | Time of the last successful message sending in UTC time zone |
Time of the last successful message receiving | Time of the last successful message receiving in UTC time zone |
FIXEdge Kafka TA monitors both consumer and producer connections. When the consumer disconnects, Kafka can react to this by sending messages to other consumers in the same group as the disconnected consumer. |
The lifetime of the session is defined by parameters: TransportLayer.KafkaTA.<Session>.ConnectTime and TransportLayer.KafkaTA.<Session>.DisconnectTime and start/stop operations provided by monitoring application for particular Kafka TA session. |
If the adapter fails to start up, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
In this example, the path specified to the adapter dll is the incorrect one:
2020-11-25 15:51:39,019 UTC INFO [FixLayer_Version] 20936 Module 'FixLayer' version 0.2.1.5 was loaded. 2020-11-25 15:51:39,019 UTC INFO [Engine] 20936 Custom AdminApplication is registered. 2020-11-25 15:51:39,024 UTC ERROR [TransportLayer] 20936 Transport Adaptor 'KafkaTA' has failed to initialize: Error loading DLL 'd:\FIXEdge\bin\KafkaTA-vc10-MD-x64.dll'. . The specified module could not be found. (Error code = 126) 2020-11-25 15:51:39,024 UTC INFO [TransportLayer] 20936 Module 'TransportLayer' version 0.1.1.5 was loaded. |
If one session doesn’t load successfully during start-up while others are created successfully, there is an issue with a configuration parameter.
To resolve this issue, the configuration parameter specified in the error message must be corrected. Check the log for error messages.
In this example, two sessions ('Kafka' and 'Kafka2') are configured, and one was not created successfully.
In the first session, the wrong parameter, 'protocol', was used instead of the correct parameter, 'security.protocol', and the session was not created.
In the log file, an ERROR message saying, "Failed to set protocol..." appears instead of an INFO message saying, "Session 'Kafka': was created successfully".
2020-11-25 16:03:00,882 UTC INFO [Kafka_TA] 7504 process logon for session 'Kafka' Consumer.Topics = outputTopic 2020-11-25 16:03:00,894 UTC ERROR [Kafka_TA] 7504 Failed to set protocol: No such configuration property: "protocol" Consumer.Topics = topic 2020-11-25 16:03:00,922 UTC INFO [CC_Layer] 7504 Client Kafka2 has logged in 2020-11-25 16:03:00,936 UTC INFO [Kafka_TA] 7504 Session 'Kafka2': was created successfully |
If sessions have been created successfully, but the adapter isn’t sending or receiving messages to/from the Kafka server, this issue has most likely occurred due to a problem with the connection.
If the adapter can’t connect to the Kafka server, it will continue to make connection attempts in given intervals until a reason for the error is established. Until this is done, the TA will not be able to send or receive messages. If the default level of logging doesn't explain the reason, you need to enable a deeper level of logging.
To establish what the error is, you must enable the DEBUG logging as follows:
Once this is done, the log will show error messages stating previous connection attempts and the reason for the error. To resolve this issue, correct the configuration issue specified in the error message.
In this example, the wrong server port, 9093 (for SSL connection), is configured instead of the correct one, 9092 (for PLAINTEXT connection).
With DEBUG logging enabled, we can see that the adapter is permanently trying to connect to the server, as well as an error message specifying the configuration issue.
2020-11-25 16:22:54,533 UTC DEBUG [Kafka_TA] 5808 Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 197ms in state APIVERSION_QUERY) 2020-11-25 16:22:54,534 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:22:54,534 UTC DEBUG [Kafka_TA] 22440 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1086ms in state APIVERSION_QUERY) 2020-11-25 16:22:54,761 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:22:55,033 UTC DEBUG [Kafka_TA] 24160 Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:07,261 UTC DEBUG [Kafka_TA] 22440 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:app]: rdkafka#consumer-1: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed) 2020-11-25 16:23:07,261 UTC DEBUG [Kafka_TA] 10912 Session 'Kafka': Consumer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:15,441 UTC DEBUG [Kafka_TA] 24160 Session 'Kafka': Producer: librdkafka ERROR: [thrd:localhost:9093/bootstrap]: 1/1 brokers are down 2020-11-25 16:23:15,442 UTC DEBUG [Kafka_TA] 5808 Session 'Kafka': Producer: librdkafka ERROR: [thrd:app]: rdkafka#producer-2: localhost:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 1ms in state APIVERSION_QUERY, 4 identical error(s) suppressed) |
The Kafka TA consumer receives a message but the message isn't delivered anywhere (for example, in a FIX session) and FixEdge.log states:
"No BL rules found for message with ClientID '<client id>', executing DefaultRule."
Message processing failed
"2020-11-25 16:35:36,545 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Receiving the message: 8=FIX.4.49=15235=D49=SC56=FE34=252=20201125-16:35:35.936212=4213=test11=BTC/USD_LimitB_GTC55=BTC/USD54=160=20160401-15:15:58.08038=0.1540=244=630.159=110=078 2020-11-25 16:35:36,545 UTC DEBUG [BL_RoutingTable] 21964 No BL rules found for message with ClientID 'Kafka2', executing DefaultRule. 2020-11-25 16:35:36,546 UTC DEBUG [CC_Layer] 21964 BL has processed a message. Number of client IDs for delivery :0. Number or FIX sessions for delivery :0.. Number or sources identifiers for delivery :0. 2020-11-25 16:35:36,546 UTC DEBUG [Kafka_TA] 21964 Session 'Kafka2': Consumer: Message processing failed |
In order for this information to be recorded in the logs, the user should enable DEBUG logging by setting Log.DebugIsOn = true in FIXEdge.properties.
Root cause
This happens if the business logic is not configured for handling the message from the adapter or if message processing doesn't fit rule conditions.
Solution
Check if there is a logic processing message from the Kafka TA Consumer.
For example:
<Rule Description="Route order events from Kafka Consumer Adapter to active FIX-session"> <Source> <Client Name="KafkaConsumer"/> </Source> <Action> <Send> <FixSession/> </Send> </Action> </Rule> |
Starting the kafka-server-start script has failed with the following error:
ERROR Shutdown broker because all log dirs in d:\tmp\kafka-logs have failed |
Root cause
One of the possible reasons is that the Kafka logs are corrupted due to an unexpected shutdown.
Solution
Backup the Kafka and zookeeper logs according to the path in the error message (for this example it's d:\tmp\) for future investigation if needed.
Clean log dir by the specified log directories and restart the service.