Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info
The Kafka transport adapter is available starting from version 6.10.0 of FIXEdge C++

...

Property name

Description

RequiredDefault valueExample

Common properties

TransportLayer.KafkaTA.Description

Adapter name. 

Note: It is recommended that this parameter is not empty.

This parameter is used in the logging of Kafka events.

N
Kafka Transport Adaptor

TransportLayer.KafkaTA.DllName

Contains path and name of the Kafka adapter DLL. In case this parameter is not specified, the TransportLayer.KafkaTA.AdapterId is applied to define the adapter's library by ID

Y
bin/KafkaTA-vc10-MD-x64.dll

TransportLayer.KafkaTA.AdapterId

The parameter to define the adapter's library by ID. In case this parameter is not specified, or TransportLayer.KafkaTA.DllName parameter is specified too, the TransportLayer.KafkaTA.DllName is applied

Info

This parameter is applicable since FIXEdge C++ version 6.14.0

N
KAFKA

TransportLayer.KafkaTA.Sessions

A comma-separated list of session names.

At least one session should be defined.

The values from this parameter will be used in the BL description.

Y

TransportLayer.<KafkaNameTA>.Cleanup

Provides the capability to clean the Kafka TA's persistent message storage.

Acceptable values:

  • true - cleaning of persistent message storage (on TA startup) is ON
    On Kafka TA startup:
    • Kafka storage moves to the Archive folder
    • New Kafka storage is created and includes only undelivered messages (copies the undelivered messages from a backup file to the new storage)
  • false - cleaning of persistent message storage is OFF
Ntrue

Kafka specific parapameters





TransportLayer.KafkaTA.reconnect.backoff.ms

Delay after which the Kafka TA starts attempting to re-connect if the connection breaks.N100

TransportLayer.KafkaTA.reconnect.backoff.max.ms

Max delay time after which the Kafka TA stops attempts to re-connect if the connection breaks.N10,000

Session properties





TransportLayer.KafkaTA.<Session>.FIXVersion

In every session, FIX messages use a particular version of the FIX protocol. Use this parameter to set the version of the FIX protocol for a given session.
 
Acceptable values:
FIX40, FIX41, FIX42, FIX43, FIX44, FIX50, FIX50SP1, FIX50SP2.
 
The property is used in the following cases:

  • to specify the supported FIX-version for messages stored in internal storage
  • to deserialize messages received from Kafka to FIX-format

Example use cases:

  • When FIX-messages are transferred in raw format, (i.e. serialization is not applied, <TransportLayer.KafkaTA.<Session>.Serializer> = RAW), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the preferred version of the FIX-protocol while parsing input messages to FIX-format.
  • When FIX-messages are transferred and serialization is applied (see examples below), the <TransportLayer.KafkaTA.<Session>.FIXVersion> property specifies the FIX-version of the messages that are generated.
    • JSON-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = JSON)
    • XML-serialization (<TransportLayer.KafkaTA.<Session>.Serializer> = XmlWrapper)
NFIX44

TransportLayer.KafkaTA.<Session>.ConnectTime

Scheduled time to connect to a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if DisconnectTime parameter (see below) is configured

If a value is not specified the session is not using a schedule.


TransportLayer.KafkaTA.<Session>.DisconnectTime

Scheduled time to disconnect from a Client

The value should be in the cron time string format

Local time zones will be used

N;

Required if ConnectTime parameter (see above) is configured

If a value is not specified the session is not using a schedule.

TransportLayer.KafkaTA.<Session>.Serializer

Serializer name.  Acceptable values:

  • Raw
  • XmlWrapper
  • JSON

Serializer tasks can be set from an external plugin.

Format: 

TestSerializer:KafkaSerializer,

where: 

  • TestSerializer is a serializer Class ID inside the plugin
  • KafkaSerializer is a Plugin ID

ID values are set by the Plugin developer.

N

Raw


TransportLayer.KafkaTA.<Session>.bootstrap.servers

An initial list of brokers.

Y

Secure connection properties





TransportLayer.KafkaTA.Kafka.sasl.username

This parameter only applies and is required if using PLAIN authentication.N
johndoe

TransportLayer.KafkaTA.Kafka.sasl.password

This parameter only applies and is required if using PLAIN authentication.N
password1234

TransportLayer.KafkaTA.<Session>.ssl.key.location

File or directory path to SSL private key.

This parameter only applies and is required if using SSL certificate authentication.

N
D:/SSL/kafka01.pem

TransportLayer.KafkaTA.<Session>.ssl.key.password

This parameter only applies and is required if using SSL certificate authentication.N

TransportLayer.KafkaTA.<Session>.ssl.ca.location

File or directory path to CA certificate(s) for verifying the broker's key.

The parameter is required if the security protocol is SSL.

N
D:/SSL/root.pem

TransportLayer.KafkaTA.<Session>.sasl.mechanism

Valid values:

  • (empty) - in this case, neither PLAIN nor GSSAPI authentications are in use
  • PLAIN - to use PLAIN authentication
  • GSSAPI - to use GSSAPI authentication
NIf a value is not specified user authentication is not applied.

TransportLayer.KafkaTA.<Session>.security.protocol

The protocol used to communicate with brokers. 

Valid values:

  • PLAINTEXT - using an unsecured connection
  • SSL - using a secured connection
  • SASL_PLAINTEXT - using authentication and an unsecured connection
  • SASL_SSL - using authentication and secured connection
N

PLAINTEXT


TransportLayer.KafkaTA.Kafka.sasl.kerberos.service.name

Only applies and is required if using GSS_API authentication. In this case, use the Kerberos principal name that Kafka runs as.N

Consumer properties

TransportLayer.KafkaTA.<Session>.Consumer.Commit

Commit mode. Acceptable values:

  • Auto   - automatically, according to time interval expiration
  • Sync   - synchronously, after each received message
  • Async  - asynchronously, after each received message

For more information see Sending commit messages to the Kafka platform.

NAuto

TransportLayer.KafkaTA.<Session>.Consumer.group.id

A unique string that identifies the consumer group that the given Consumer belongs to. This property is required if the Consumer uses either the group management functionality by using subscribe (Topic) or the Kafka-based offset management strategy.

When the "group.id" parameter is not configured and its value is yet to be assigned, the session ID is used as the value.

If the group.id value is identical across multiple sessions, and topic names are the same across multiple sessions, only one of the sessions is going to receive and process the data. This scenario is permissible when processing messages in parallel.

NThe default value is equal to the <Session> ID.

TransportLayer.KafkaTA.<Session>.Consumer.Topics

If a value is not specified, the Consumer is not in use. 

To use the Consumer, a value must be specified. In this case, use a comma separated list of topics that should be listened by the Consumer.

N
outputTopic

Producer properties

TransportLayer.KafkaTA.<Session>.Producer.KeyTag

If a value is not specified, then a key tag will not be used to fill a key value.

To use this parameter, specify a tag number in a FIX message. The value of the tag will be used as a key when the message is sent to Kafka.

N

TransportLayer.KafkaTA.<Session>.Producer.Topic

If a value is not specified, the Producer is not in use. 

To use the producer, specify the topic that the Producer should send FIX messages to.

N

TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection

Provides the option to reject all further outgoing messages from FIXEdge from being sent by alerting the Producer when messages cannot be delivered to Kafka.

Depending on the operating system, a disconnection event may be detected at different times. For example, on Windows, if the Kafka server is not started the disconnection will be detected immediately. However, on Linux, the disconnection will be detected only once a message attempts to send.

Info

If the Kafka TA is unable to deliver a message, this generates an OnUndeliveredMessageEvent in the BL. A user can define actions inside this event such as generating a reject message or logging the error.

Nfalse

TransportLayer.KafkaTA.<Session>.Producer.DisconnectionPeriodThreshold

The timeout period after which unsuccessful attempts at sending will be rejected if TransportLayer.KafkaTA.<Session>.Producer.RejectMessageWhileNoConnection=true.

  • This parameter is useful when a disconnection happens for a short period of time. If connection is restored during the specified threshold, (for example, 5 seconds), message processing doesn't stop during the connection gap and messages continue to be sent to the Kafka platform broker.
  • Alternatively, if the disconnection lasts for longer than the specified threshold, the connection is considered broken, the sender is informed that messages cannot be delivered to the Kafka platform, and reject messages are sent back to the sender.
N0

...

The following example shows how to save Kafka-related logs for multiple sessions to a separate file FIXEdge1/log/Kafka.log.

In the case of configuration with 3 sessions

...

FIXEdge C++ provides Start operation for Disconnected Kafka TA sessions and Stop operation for Connecting and Running Kafka TA sessions on request from the subscribed monitoring application.Please refer to the Transport adaptor operations for FIXEdge C++ page for additional details about Start and Stop operations.

Start operation

Start operation can be applied to Kafka TA sessions in the Disconnected state.

...