Info |
---|
The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++ |
...
The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.
...
The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.
Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge.
...
Committing to the Kafka platform
The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.
...
Enable a transport adapter to be used by FIXEdge:
In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:Code Block language text TransportLayer.TransportAdapters = TransportLayer.KafkaTA
Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:
Code Block language text TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTA
Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:
Code Block language text TransportLayer.KafkaTA.Description = Kafka Transport Adaptor TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll TransportLayer.KafkaTA.Sessions = Kafka TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092 TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44 TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID TransportLayer.KafkaTA.Kafka.Producer.Topic = topic
Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).
- Configure rules for message routing from the Kafka TA.
The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file. For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section. - Restart the FIXEdge server to apply the changes.
...
Property name | Description | Required | Default value | Example | ||||||
---|---|---|---|---|---|---|---|---|---|---|
Common properties | ||||||||||
TransportLayer.KafkaTA.Description | Adapter name. Note: It is recommended that this parameter is not empty. This parameter is used in the logging of Kafka events. | N | Kafka Transport Adaptor | |||||||
TransportLayer.KafkaTA.DllName | Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID | Y | bin/KafkaTA-vc10-MD-x64.dll | |||||||
TransportLayer.KafkaTA.AdapterId | The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied
| N | KAFKA | |||||||
TransportLayer.KafkaTA.Sessions | A comma-separated list of session names. At least one session should be defined. The values from this parameter will be used in the BL description. | Y | ||||||||
TransportLayer.<KafkaNameTA>.Cleanup | Provides the capability to clean the Kafka TA's persistent message storage. Acceptable values:
| N | true | |||||||
Kafka specific parapameters | ||||||||||
TransportLayer.KafkaTA.reconnect.backoff.ms | Delay after which the Kafka TA starts attempting to re-connect if the connection breaks. | N | 100 | |||||||
TransportLayer.KafkaTA.reconnect.backoff.max.ms | Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks. | N | 10,000 | |||||||
Session properties | ||||||||||
TransportLayer.KafkaTA.<Session>.FIXVersion | In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
Example use cases:
| N | FIX44 | |||||||
TransportLayer.KafkaTA.<Session>.ConnectTime | Scheduled time to connect to a Client The value should be in the cron time string format Local time zones will be used | N; Required if DisconnectTime parameter (see below) is configured | If a value is not specified the session is not using a schedule. | |||||||
TransportLayer.KafkaTA.<Session>.DisconnectTime | Scheduled time to disconnect from a Client The value should be in the cron time string format Local time zones will be used | N; Required if ConnectTime parameter (see above) is configured | If a value is not specified the session is not using a schedule. | |||||||
TransportLayer.KafkaTA.<Session>.DaysOff |
The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied. The DaysOff property must be specified in the CRON format. Several CRON expressions separated by the ";" can be defined as value for the DaysOff property.
| N | - | 0 0 9 * * 2-6 | ||||||
TransportLayer.KafkaTA.<Session>.Serializer | Serializer name. Acceptable values:
Serializer tasks can be set from an external plugin. Format: TestSerializer:KafkaSerializer, where:
ID values are set by the Plugin developer. | N | Raw | |||||||
TransportLayer.KafkaTA.<Session>.bootstrap.servers | An initial list of brokers. | Y | ||||||||
Secure connection properties | ||||||||||
TransportLayer.KafkaTA.Kafka.sasl.username | This parameter only applies and is required if using PLAIN authentication. | N | johndoe | |||||||
TransportLayer.KafkaTA.Kafka.sasl.password | This parameter only applies and is required if using PLAIN authentication. | N | password1234 | |||||||
TransportLayer.KafkaTA.<Session>.ssl.key.location | File or directory path to SSL private key. This parameter only applies and is required if using SSL certificate authentication. | N | D:/SSL/kafka01.pem | |||||||
TransportLayer.KafkaTA.<Session>.ssl.key.password | This parameter only applies and is required if using SSL certificate authentication. | N | ||||||||
TransportLayer.KafkaTA.<Session>.ssl.ca.location | File or directory path to CA certificate(s) for verifying the broker's key. The parameter is required if the security protocol is SSL. | N | D:/SSL/root.pem | |||||||
TransportLayer.KafkaTA.<Session>.sasl.mechanism | Valid values:
| N | If a value is not specified user authentication is not applied. | |||||||
TransportLayer.KafkaTA.<Session>.security.protocol | The protocol used to communicate with brokers. Valid values:
| N | PLAINTEXT | |||||||
TransportLayer.KafkaTA.Kafka.sasl.kerberos.service.name | Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as. | N | ||||||||
Consumer properties | ||||||||||
TransportLayer.KafkaTA.<Session>.Consumer.Commit | Commit mode. Acceptable values:
For more information see Sending commit messages to the Kafka platform. | N | Auto | |||||||
TransportLayer.KafkaTA.<Session>.Consumer.group.id | A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy. When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value. If the group.id value is identical across multiple sessions, and topic names are the same across multiple sessions, only one of the sessions is going to receive and process the data. This scenario is permissible when processing messages in parallel. | N | The default value is equal to the <Session> ID. | |||||||
TransportLayer.KafkaTA.<Session>.Consumer.Topics | If a value is not specified, the Consumer is not in use. To use the Consumer, a value must be specified. In this case, use a comma separated list of topics that should be listened by the Consumer. | N | outputTopic | |||||||
Producer properties | ||||||||||
TransportLayer.KafkaTA.<Session>.Producer.KeyTag | If a value is not specified, then a key tag will not be used to fill a key value. To use this parameter, specify a tag number in a FIX message. The value of the tag will be used as a key when the message is sent to Kafka. | N | ||||||||
TransportLayer.KafkaTA.<Session>.Producer.Topic | If a value is not specified, the Producer is not in use. To use the producer, specify the topic that the Producer should send FIX messages to. | N | ||||||||
TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection | Provides the option to reject all further outgoing messages from FIXEdge from being sent by alerting the Producer when messages cannot be delivered to Kafka. Depending on the operating system, a disconnection event may be detected at different times. For example, on Windows, if the Kafka server is not started the disconnection will be detected immediately. However, on Linux, the disconnection will be detected only once a message attempts to send.
| N | false | |||||||
TransportLayer.KafkaTA.<Session>.Producer.DisconnectionPeriodThreshold | The timeout period after which unsuccessful attempts at sending will be rejected if TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection=true.
| N | 0 |
Configuration sample
The samples below represent the Kafka TA's configuration with the minimal set of parameters required to run the adaptor:
...
- Make sure the Kafka broker and adaptor are configured for SSL connection
Set client authentication as "required" in the server.properties file
Code Block title Example collapse true listeners=PLAINTEXT://:9092,SSL://:9093 ssl.keystore.location=D:/SSL/kafka01.keystore.jks ssl.keystore.password=123456 ssl.key.password=123456 ssl.truststore.location=D:/SSL/kafka.truststore.jks ssl.truststore.password=123456 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth = required
Provide ssl.key details in the FIXEdge.properties file
Code Block title Example collapse true TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093 TransportLayer.KafkaTA.Kafka.security.protocol = SSL TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem TransportLayer.KafkaTA.Kafka.ssl.key.location = D:/SSL/kafka01.pem TransportLayer.KafkaTA.Kafka.ssl.key.password = 123456
...
Log type | Event description | Logging message format |
---|---|---|
ERROR | Client error | "Session '<session_name>': [Consumer/Producer]: <error_message>" |
Outgoing message failed to be queued | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" | |
Incoming message failed to be handled | "Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>" | |
Start operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be started: <description> Reason: Operation cannot be applied" | |
Stop operation failed to be applied | "Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be stopped: <description> Reason: Operation cannot be applied" | |
INFO | Client creation / any config parameter change | |
WARN | Kafka TA fails to apply any config parameter | "Session '<session_name>': <warning_message>" |
The outgoing message is rejected because the client is not connected to Kafka | "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA" | |
DEBUG | Creation of a Consumer or Producer | "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>" |
Adding an outgoing message to the queue | "Session '<session_name>': Producer: The message is put to the outgoing queue: <message>" | |
Removing an outgoing message from the queue | "Session '<session_name>': Producer: Sending the message via the producer: <message>" | |
An incoming message is being received | "Session '<session_name>': Consumer: Receiving the message: <message>" |
...
- Category 'Kafka_TA' is used for lifecycle, session state change, and errors events
- Categories with Kafka TA session name specified in TransportLayer.KafkaTA.Sessions are used for logging messages passing through the Kafka endpoints.
The following example shows how to save Kafka-related logs for multiple sessions to a separate file FIXEdge1/log/Kafka.log.
In the case of configuration with 3 sessions
...
Additional information about logging configuration can be found there:
- FIXEdge logs format
- How to redirect FIX Antenna and/or FIXEdge logging to Syslog instead of files
- How to divide different categories and severities of log files into different files in the Logging section
Scheduling
Info |
---|
(Available starting from version 6.11.0 of FIXEdge C++) |
...
Refer to the table in the Configuration parameters section above.
Start and Stop operations for Kafka TA
...
The FIXEdge behavior in the case when Kafka TA Producer session is in the Disconnected state is determined by the RejectMessageWhileNoConnection parameter.
Custom serialization
...
Info |
---|
The lifetime of the session is defined by parameters: TransportLayer.KafkaTA.<Session>.ConnectTime and TransportLayer.KafkaTA.<Session>.DisconnectTime and start/stop operations provided by monitoring application for particular Kafka TA session. |
...
In order for this information to be recorded in the logs, the user should enable DEBUG logging by setting Log.DebugIsOn = true in FIXEdge.properties.
...