Wednesday, 13 November 2019

Java: Kafka: Produce message with keys

You can add a key to a message. Messages with same key always go to same partition (until you change number of partitions).

Example
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);

Find the below working application.

App.java
package com.sample.app;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class App {
  public static void main(String args[]) throws InterruptedException, ExecutionException {
    Properties properties = new Properties();

    // Populate producer configurations
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.ACKS_CONFIG, "1");

    String topicName = "myFirstTopic";

    // Initialize KafkaProducer with configurations
    try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {

      for (int i = 0; i < 10; i++) {
        String key = "key-" + (i % 3);
        String message = "Message : " + i;

        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);

        // Do not use get(), it is synchronous leads to performance issues in production
        RecordMetadata recordMetadata = producer.send(record).get();

        System.out.println("*************************************************");
        System.out.println("Sent <" + key + "," + message + ">");
        //System.out.println("Topic : " + recordMetadata.topic());
        System.out.println("Partition : " + recordMetadata.partition());
        /*System.out.println("Offset : " + recordMetadata.offset());
        System.out.println("Key Size : " + recordMetadata.serializedKeySize());
        System.out.println("Value Size : " + recordMetadata.serializedValueSize());
        System.out.println("Time Stamp : " + recordMetadata.timestamp());*/
        System.out.println("*************************************************\n");

      }

      // producer.flush(): Since this operation is asynchronous, you need to either
      // flush or close the producer to publish the data. try-with-resource statement
      // automatically close the prodcuer

    }

  }
}

How to run the application?
Step 1: Create myFirstTopic
kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$
$kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
myFirstTopic

Step 2: Create a consumer for myFirstTopic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic

Run App.java, you can see below messages in console.
*************************************************
Sent <key-0,Message : 0>
Partition : 1
*************************************************

*************************************************
Sent <key-1,Message : 1>
Partition : 0
*************************************************

*************************************************
Sent <key-2,Message : 2>
Partition : 2
*************************************************

*************************************************
Sent <key-0,Message : 3>
Partition : 1
*************************************************

*************************************************
Sent <key-1,Message : 4>
Partition : 0
*************************************************

*************************************************
Sent <key-2,Message : 5>
Partition : 2
*************************************************

*************************************************
Sent <key-0,Message : 6>
Partition : 1
*************************************************

*************************************************
Sent <key-1,Message : 7>
Partition : 0
*************************************************

*************************************************
Sent <key-2,Message : 8>
Partition : 2
*************************************************

*************************************************
Sent <key-0,Message : 9>
Partition : 1
*************************************************

As you observe console messages
a.   Messages with key-0 went to partiton 1
b.   Messages with key-1 went to partiton 0
c.    Messages with key-2 went to partiton 2



Previous                                                    Next                                                    Home

No comments:

Post a Comment