Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info
The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++

...

The Kafka TA transfers FIX messages in raw or serialized formats from the FIXEdge Server to the Kafka platform and vice versa.

...

The Kafka TA automatically reconnects to Kafka if the connection to a message broker was unexpectedly terminated. Reconnection behavior can be configured by parameters TransportLayer.KafkaTA.reconnect.backoff.ms and TransportLayer.KafkaTA.reconnect.backoff.max.ms.

Multiple instances of the Kafka TA can be configured using one library instance, inside one instance of FIXEdge. 

...

Committing to the Kafka platform 

The TransportLayer.KafkaTA.<Session>.Consumer.Commit parameter provides three strategies that dictate how commit messages (acknowledgements of messages received) are sent to the Kafka platform.

...

  1. Enable a transport adapter to be used by FIXEdge:
    In the ‘Transport Layer Section’ of the FIXEdge.properties, add the Kafka TA to the list of supported adapters:

    Code Block
    languagetext
    TransportLayer.TransportAdapters = TransportLayer.KafkaTA

    Note: If you use other transport adapters, just add TransportLayer.KafkaTA to the end of the list:

    Code Block
    languagetext
    TransportLayer.TransportAdapters = ..., TransportLayer.KafkaTA
  2. Configure the Kafka TA by adding the Kafka TA section to the FIXEdge.properties file:

    Code Block
    languagetext
    TransportLayer.KafkaTA.Description = Kafka Transport Adaptor
    TransportLayer.KafkaTA.DllName = bin/KafkaTA-vc10-MD-x64.dll
    TransportLayer.KafkaTA.Sessions = Kafka
     
    TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9092
    TransportLayer.KafkaTA.Kafka.FIXVersion = FIX44
    
    TransportLayer.KafkaTA.Kafka.Consumer.Commit = Auto
    TransportLayer.KafkaTA.Kafka.Consumer.Topics = outputTopic
    TransportLayer.KafkaTA.Kafka.Consumer.group.id = ID
    
    TransportLayer.KafkaTA.Kafka.Producer.Topic = topic

    Note: Sample settings can be copied to the FIXEdge.properties file from the KafkaTA.properties file (located in the doc folder of the FIXEdge distribution package).

  3. Configure rules for message routing from the Kafka TA.
    The Kafka TA Client is referred to the Business Layer (BL) by the ClientID name specified in the FIXEdge.properties file.  For a Kafka Business Layer configuration sample, refer to the Configuration sample sub-section.

  4. Restart the FIXEdge server to apply the changes.

...

Property name

Description

RequiredDefault valueExample

Common properties

TransportLayer.KafkaTA.Description

Adapter name. 

Note: It is recommended that this parameter is not empty.

This parameter is used in the logging of Kafka events.

N
Kafka Transport Adaptor

TransportLayer.KafkaTA.DllName

Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID

Y
bin/KafkaTA-vc10-MD-x64.dll

TransportLayer.KafkaTA.AdapterId

The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied

Info

This parameter is applicable since FIXEdge C++ version 6.14.0

N
KAFKA

TransportLayer.KafkaTA.Sessions

A comma-separated list of session names.

At least one session should be defined.

The values from this parameter will be used in the BL description.

Y

TransportLayer.<KafkaNameTA>.Cleanup

Provides the capability to clean the Kafka TA's persistent message storage.

Acceptable values:

  • true - cleaning of persistent message storage (on TA startup) is ON
    On Kafka TA startup:
    • Kafka storage moves to the Archive folder
    • New Kafka storage is created and includes only undelivered messages (copies the undelivered messages from a backup file to the new storage)
  • false - cleaning of persistent message storage is OFF
Ntrue

Kafka specific parapameters





TransportLayer.KafkaTA.reconnect.backoff.ms

Delay after which the Kafka TA starts attempting to re-connect if the connection breaks.N100

TransportLayer.KafkaTA.reconnect.backoff.max.ms

Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks.N10,000

Session properties





TransportLayer.KafkaTA.<Session>.FIXVersion

In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
 
Acceptable values:
FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2.
 
The property is used in the following cases:

  • to specify the supported FIX-version for messages stored in internal storage
  • to deserialize messages received from Kafka to FIX-format

Example use cases:

  • When FIX-messages are transferred in raw format, (i.e. serialization is not applied, <TransportLayer.KafkaTA.<Session>.Serializer> = RAW), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the preferred version of the FIX-protocol while parsing input messages to FIX-format.
  • When FIX-messages are transferred and serialization is applied (see examples below), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the FIX-version of the messages that are generated.
    • JSON-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = JSON)
    • XML-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = XmlWrapper)
NFIX44

TransportLayer.KafkaTA.<Session>.ConnectTime

Scheduled time to connect to a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if DisconnectTime parameter (see below) is configured

If a value is not specified the session is not using a schedule.


TransportLayer.KafkaTA.<Session>.DisconnectTime

Scheduled time to disconnect from a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if ConnectTime parameter (see above) is configured

If a value is not specified the session is not using a schedule.

TransportLayer.KafkaTA.<Session>.DaysOff


Info

This property is available since the FIXEdge C++ 6.16.0 version.

The date and/or time when the ConnectTime and DisconnectTime properties configured for the Kafka TA session are ignored/not applied.

The DaysOff property must be specified in the CRON format. 

Several CRON expressions separated by the ";" can be defined as value for the DaysOff property.

Info

If the DaysOff property is specified with the incorrect value for the Kafka TA session, then this Kafka TA session won't be started, and the following ERROR will be logged: 

Code Block
ERROR Session '<Session>': [Consumer/Producer]: <error_message>



N

-0 0 9 * * 2-6

TransportLayer.KafkaTA.<Session>.Serializer

Serializer name.  Acceptable values:

  • Raw
  • XmlWrapper
  • JSON

Serializer tasks can be set from an external plugin.

Format: 

TestSerializer:KafkaSerializer,

where: 

  • TestSerializer is a serializer Class ID inside the plugin
  • KafkaSerializer is a Plugin ID

ID values are set by the Plugin developer.

N

Raw


TransportLayer.KafkaTA.<Session>.bootstrap.servers

An initial list of brokers.

Y

Secure connection properties





TransportLayer.KafkaTA.Kafka.sasl.username

This parameter only applies and is required if using PLAIN authentication.N
johndoe

TransportLayer.KafkaTA.Kafka.sasl.password

This parameter only applies and is required if using PLAIN authentication.N
password1234

TransportLayer.KafkaTA.<Session>.ssl.key.location

File or directory path to SSL private key.

This parameter only applies and is required if using SSL certificate authentication.

N
D:/SSL/kafka01.pem

TransportLayer.KafkaTA.<Session>.ssl.key.password

This parameter only applies and is required if using SSL certificate authentication.N

TransportLayer.KafkaTA.<Session>.ssl.ca.location

File or directory path to CA certificate(s) for verifying the broker's key.

The parameter is required if the security protocol is SSL.

N
D:/SSL/root.pem

TransportLayer.KafkaTA.<Session>.sasl.mechanism

Valid values:

  • (empty) - in this case, neither PLAIN nor GSSAPI authentications are in use
  • PLAIN - to use PLAIN authentication
  • GSSAPI - to use GSSAPI authentication
NIf a value is not specified user authentication is not applied.

TransportLayer.KafkaTA.<Session>.security.protocol

The protocol used to communicate with brokers. 

Valid values:

  • PLAINTEXT - using an unsecured connection
  • SSL - using a secured connection
  • SASL_PLAINTEXT - using authentication and an unsecured connection
  • SASL_SSL - using authentication and secured connection
N

PLAINTEXT


TransportLayer.KafkaTA.Kafka.sasl.kerberos.service.name

Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as.N

Consumer properties

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Commit mode. Acceptable values:

  • Auto   - automatically, according to time interval expiration
  • Sync   - synchronously, after each received message
  • Async  - asynchronously, after each received message

For more information see Sending commit messages to the Kafka platform.

NAuto

TransportLayer.KafkaTA.<Session>.Consumer.group.id

A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy.

When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value.

If the group.id value is identical across multiple sessions, and topic names are the same across multiple sessions, only one of the sessions is going to receive and process the data. This scenario is permissible when processing messages in parallel.

NThe default value is equal to the <Session> ID.

TransportLayer.KafkaTA.<Session>.Consumer.Topics

If a value is not specified, the Consumer is not in use. 

To use the Consumer, a value must be specified. In this case, use a comma separated list of topics that should be listened by the Consumer.

N
outputTopic

Producer properties

TransportLayer.KafkaTA.<Session>.Producer.KeyTag

If a value is not specified, then a key tag will not be used to fill a key value.

To use this parameter, specify a tag number in a FIX message. The value of the tag will be used as a key when the message is sent to Kafka.

N

TransportLayer.KafkaTA.<Session>.Producer.Topic

If a value is not specified, the Producer is not in use. 

To use the producer, specify the topic that the Producer should send FIX messages to.

N

TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection

Provides the option to reject all further outgoing messages from FIXEdge from being sent by alerting the Producer when messages cannot be delivered to Kafka.

Depending on the operating system, a disconnection event may be detected at different times. For example, on Windows, if the Kafka server is not started the disconnection will be detected immediately. However, on Linux, the disconnection will be detected only once a message attempts to send.

Info

If the Kafka TA is unable to deliver a message, this generates an OnUndeliveredMessageEvent in the BL. A user can define actions inside this event such as generating a reject message or logging the error.

Nfalse

TransportLayer.KafkaTA.<Session>.Producer.DisconnectionPeriodThreshold

The timeout period after which unsuccessful attempts at sending will be rejected if TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection=true.

  • This parameter is useful when a disconnection happens for a short period of time. If connection is restored during the specified threshold, (for example, 5 seconds), message processing doesn't stop during the connection gap and messages continue to be sent to the Kafka platform broker.
  • Alternatively, if the disconnection lasts for longer than the specified threshold, the connection is considered broken, the sender is informed that messages cannot be delivered to the Kafka platform, and reject messages are sent back to the sender.
N0

Configuration sample

The samples below represent the Kafka TA's configuration with the minimal set of parameters required to run the adaptor:

...

  1. Make sure the Kafka broker and adaptor are configured for SSL connection
  2. Set client authentication as "required" in the server.properties file

    Code Block
    titleExample
    collapsetrue
    listeners=PLAINTEXT://:9092,SSL://:9093
    ssl.keystore.location=D:/SSL/kafka01.keystore.jks
    ssl.keystore.password=123456
    ssl.key.password=123456
    ssl.truststore.location=D:/SSL/kafka.truststore.jks
    ssl.truststore.password=123456
    ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
    ssl.client.auth = required
  3. Provide ssl.key details in the FIXEdge.properties file

    Code Block
    titleExample
    collapsetrue
    TransportLayer.KafkaTA.Kafka.bootstrap.servers = localhost:9093
    TransportLayer.KafkaTA.Kafka.security.protocol = SSL
    TransportLayer.KafkaTA.Kafka.ssl.ca.location = D:/SSL/root.pem
    TransportLayer.KafkaTA.Kafka.ssl.key.location = D:/SSL/kafka01.pem
    TransportLayer.KafkaTA.Kafka.ssl.key.password = 123456

...

Log typeEvent descriptionLogging message format
ERRORClient error"Session '<session_name>': [Consumer/Producer]: <error_message>"
Outgoing message failed to be queued "Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>"
Incoming message failed to be handled"Session '<session_name>': Consumer: Error handling the incoming message: <native error>. Message (optional): <message>"
Start operation failed to be applied"Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be started: <description> Reason: Operation cannot be applied"
Stop operation failed to be applied"Session '<session_name>': [Consumer/Producer]: The session <session_name> can't be stopped: <description> Reason: Operation cannot be applied"
INFOClient creation / any config parameter change
WARNKafka TA fails to apply any config parameter"Session '<session_name>': <warning_message>"
The outgoing message is rejected because the client is not connected to Kafka "Session '<session_name>': Producer: The outgoing message is rejected: <message>" is visible in the "Kafka_TA"
DEBUG

Creation of a Consumer or Producer

"Session '<session_name>': Producer: Error handling the outgoing message: <native error>. Message: <message>"

Adding an outgoing message to the queue"Session '<session_name>': Producer: The message is put to the outgoing queue: <message>"
Removing an outgoing message from the queue "Session '<session_name>': Producer: Sending the message via the producer: <message>"
An incoming message is being received "Session '<session_name>': Consumer: Receiving the message: <message>"

...

  • Category 'Kafka_TA' is used for lifecycle, session state change, and errors events
  • Categories with Kafka TA session name specified in TransportLayer.KafkaTA.Sessions are used for logging messages passing through the Kafka endpoints.

The following example shows how to save Kafka-related logs for multiple sessions to a separate file FIXEdge1/log/Kafka.log.

In the case of configuration with 3 sessions

...

Additional information about logging configuration can be found there: 

Scheduling

Info
(Available starting from version 6.11.0 of FIXEdge C++)

...

Refer to the table in the Configuration parameters section above.

Start and Stop operations for Kafka TA

...

The FIXEdge behavior in the case when Kafka TA Producer session is in the Disconnected state is determined by the RejectMessageWhileNoConnection parameter.

Custom serialization

...

Info

The lifetime of the session is defined by parameters: TransportLayer.KafkaTA.<Session>.ConnectTime and TransportLayer.KafkaTA.<Session>.DisconnectTime and start/stop operations provided by monitoring application for particular Kafka TA session.

...

In order for this information to be recorded in the logs, the user should enable DEBUG logging by setting Log.DebugIsOn = true in FIXEdge.properties.

...