Configuring Confluent Cloud endpoints

Configuring Confluent Cloud endpoints

This guide provides a step-by-step overview of integrating FIXEdge with Confluent Cloud.
The following list outlines the end-to-end process of connecting FIXEdge to your Confluent Cloud source.

  1. Create an API key in Confluent Cloud
    Generate a key and secret for authenticating your Confluent Cloud Kafka client.

  2. Create topic
    Publish data to the target topic you created in Confluent Cloud.

  3. Get details from Confluent Cloud
    Retrieve the necessary cluster information, topic name, and endpoint details.

  4. Create a Confluent Cloud connection
    Configure your system (e.g., a connector or processing platform) to establish a secure connection using the API credentials.

  5. Start an ingestion job
    Initiate the data flow process that pulls or subscribes to data from the topic for downstream processing.


Create an API key in Confluent Cloud

In this section, you create an API key that FIXEdge will use to authenticate with Confluent Cloud.

  1. In the Confluent Cloud console, access your cluster and click API Keys in the left pane.

  2. Click +Add key and create a key with your chosen scope.

  3. Once created, download and save the Key and Secret.

image-20250613-084236.png

Create topic

In this section, you create a Kafka topic in Confluent Cloud.

  1. Access your cluster and click Topics in the left pane.

  2. Click on Add topic and create your topic for FIXEdge Confluent Cloud Kafka.

    image-20250613-085003.png

Get details from Confluent Cloud

In this step, you will retrieve the bootstrap server information and note the name of the topic from which FIXEdge will ingest and write data.

In the Confluent Cloud Console:

  1. Navigate to your cluster and select Topics from the left-hand menu.

  2. Copy and save the topic name you intend to ingest data from.

  3. Click Cluster settings in the left-hand menu to view the bootstrap server details and copy and save the Bootstrap server settings.

image-20250613-083429.png

Create a Confluent Cloud connection

In this step, you will configure Confluent Cloud Kafka TA in FIXEdge to connect to the created Confluent Cloud topic. To get more information about Confluent Cloud Kafka TA configuration, refer to Configuring Kafka endpoints page.

For this, open conf/kafka-adaptor.properties file and set up the configuration:

  1. Fill the list of Confluent Cloud Kafka TA endpoints (consumers and producers) that will be used for connection:

    kafka.clients = ConfluentCloudConsumer, ConfluentCloudProducer
  2. Configure connection properties:

    kafka.bootstrap.servers = xyz.us-east-2.aws.confluent.cloud:9092
  3. Configure producer session:

    kafka.producer.ConfluentCloudProducer.client.id = ConfluentCloudProducer #use name of topic created on step 2 kafka.producer.ConfluentCloudProducer.topic = topic_0 #use key and secret received on step 1 as username and password kafka.producer.ConfluentCloudProducer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="your_key" password="your_secret"; kafka.producer.ConfluentCloudProducer.sasl.mechanism = PLAIN kafka.producer.ConfluentCloudProducer.security.protocol = SASL_SSL kafka.producer.ConfluentCloudProducer.startOnload = true kafka.producer.ConfluentCloudProducer.transaction.timeout.ms = 5000 kafka.producer.ConfluentCloudProducer.max.block.ms = 1000 kafka.producer.ConfluentCloudProducer.groups = kafka_producer kafka.producer.key.serializer = com.epam.fej.kafka.FIXMessageEventSerializer kafka.producer.value.serializer = com.epam.fej.kafka.FIXMessageEventSerializer
  4. Configure consumer session

    kafka.consumer.ConfluentCloudConsumer.client.id = ConfluentCloudConsumer #use name of topic created on step 2 kafka.consumer.ConfluentCloudConsumer.topics = topic_0 #use akey and secret received on step 1 as username and password kafka.consumer.ConfluentCloudConsumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="your_key" password="your_secret";; kafka.consumer.ConfluentCloudConsumer.sasl.mechanism = PLAIN kafka.consumer.ConfluentCloudConsumer.security.protocol = SASL_SSL kafka.consumer.ConfluentCloudConsumer.startOnload = true kafka.consumer.ConfluentCloudConsumer.fej.store.and.forward = true kafka.consumer.ConfluentCloudConsumer.topics.regexp = topic.* kafka.consumer.ConfluentCloudConsumer.groups = kafka_consumer kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value.deserializer = com.epam.fej.kafka.FIXMessageEventDeserializer kafka.consumer.group.id = kafka_consumer
  5. Create a FIX session to route messages to and from Confluent Cloud Kafka TA (use instructions from Configuring FIX endpoints). Add routing rules to conf/rules.groovy (refer to Groovy DSL Rules for more information):

    import static dsl.CommonRulesDsl.rulesDSL import com.epam.fej.routing.RoutingContext rulesDSL(routingContext as RoutingContext) { messageRules { messageRule("Routing from Confluent Cloud") { source { id "ConfluentCloudConsumer" } condition { //add routing conditions if needed } action { sendTo "FIX_session" //replace 'FIX_session' with your session name context exit } } messageRule("Routing to Confluent Cloud") { source { id "FIX_session" //replace 'FIX_session' with your session name } condition { //add routing conditions if needed } action { sendTo "ConfluentCloudProducer" context exit } } }

Start an ingestion job

After the configuration is ready, run FIXEdge and start sending and receiving messages.

Connect to the configured FIX_session and send a message to it. According to the rules provided above, the sent message will be routed to the ConfluentCloudProducer session and sent to topic_0 of Confluent. Once received, it will be consumed by ConfluentCloudConsumer and sent back to FIX_session.