Tuesday 12 November 2019

Java: Kafka producer call backs

Kafka support call back mechanism, where you can get notified once a record is published to Kafka topic.

Example
producer.send(record, (metadata, exception) -> {
 if(exception != null) {
  exception.printStackTrace();
  return;
 }
 
 System.out.println("Topic : " + metadata.topic());
 System.out.println("Partition : " + metadata.partition());
 System.out.println("Offset : " + metadata.offset());
 System.out.println("Key Size : " + metadata.serializedKeySize());
 System.out.println("Value Size : " + metadata.serializedValueSize());
 System.out.println("Time Stamp : " + metadata.timestamp());
 
});

Find the below working application.

App.java
package com.sample.app;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class App {
 public static void main(String args[]) {
  Properties properties = new Properties();

  // Populate producer configurations
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "1");

  // Initialize KafkaProducer with configurations
  try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
   ProducerRecord<String, String> record = new ProducerRecord<>("myFirstTopic", "My First Message");

   producer.send(record, (metadata, exception) -> {
    if(exception != null) {
     exception.printStackTrace();
     return;
    }
    
    System.out.println("Topic : " + metadata.topic());
    System.out.println("Partition : " + metadata.partition());
    System.out.println("Offset : " + metadata.offset());
    System.out.println("Key Size : " + metadata.serializedKeySize());
    System.out.println("Value Size : " + metadata.serializedValueSize());
    System.out.println("Time Stamp : " + metadata.timestamp());
    
   });

   // producer.flush(): Since this operation is asynchronous, you need to either
   // flush or close the producer to publish the data. try-with-resource statement
   // automatically close the prodcuer

  }

 }
}

How to run this application?

Step 1: Create myFirstTopic
kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1

$kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$
$kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
myFirstTopic

Step 2: Create a consumer for myFirstTopic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic

Run App.java, you can see the published message in consumer console.    
$kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic
My First Message

App.java console output
First Run
Topic : myFirstTopic
Partition : 0
Offset : 0
Key Size : -1
Value Size : 16
Time Stamp : 1570763932061

Second Run
Topic : myFirstTopic
Partition : 1
Offset : 0
Key Size : -1
Value Size : 16
Time Stamp : 1570764003664

Third Run
Topic : myFirstTopic
Partition : 1
Offset : 1
Key Size : -1
Value Size : 16
Time Stamp : 1570764022578

Since we do not specify any key here, size of the key is -1. In my next post, I will explain how to send message with keys.


Previous                                                    Next                                                    Home

No comments:

Post a Comment