Thursday 14 November 2019

Java: Kafka: Create a consumer: Consume messages from topic


Below procedure explains how to create a consumer and consume messages from a topic.

Step 1: Initialize properties that are used to instantiate consumer.
Properties properties = new Properties();

// Populate consumer configurations
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Step 2: Instantiate kafka consumer.
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)

Step 3: Subscribe the consumer to topics.
consumer.subscribe(topics);

Step 4: Poll the records from topic.


Below snippet poll the records from topic for every 200 milliseconds.
while(true){
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
 ..........
}

Find the below working application.

JavaKafkaProducer.java
package com.sample.app;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class JavaKafkaProducer {

 public void start(String bootstratServer, String topicName) throws InterruptedException {
  Properties properties = new Properties();

  // Populate producer configurations
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "1");

  // Initialize KafkaProducer with configurations
  try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
   int i = 0;
   while (true) {
    String key = "key-" + (i % 3);
    String message = "Message : " + i;

    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);

    producer.send(record);
    TimeUnit.SECONDS.sleep(5);
    i++;

   }

   // producer.flush(): Since this operation is asynchronous, you need to either
   // flush or close the producer to publish the data. try-with-resource statement
   // automatically close the prodcuer
  }
 }

}


JavaKafkaConsumer.java
package com.sample.app;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class JavaKafkaConsumer {

 public void start(String bootstratServer, String consumerGroup, List<String> topics) {
  Properties properties = new Properties();

  // Populate consumer configurations
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  // Instantiate consumer
  try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
   // Subscribe to the topics
   consumer.subscribe(topics);

   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));

    for (ConsumerRecord<String, String> record : records) {
     String key = record.key();
     String value = record.value();
     long offset = record.offset();

     System.out.println("\n\nConsumer received : ");
     System.out.println("----------------------");
     System.out.println("key : " + key);
     System.out.println("value : " + value);
     System.out.println("offset : " + offset);

    }
   }

  }

 }
}


App.java
package com.sample.app;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;

public class App {
 public static void main(String args[]) throws InterruptedException, ExecutionException {
  JavaKafkaProducer producer = new JavaKafkaProducer();
  JavaKafkaConsumer consumer = new JavaKafkaConsumer();

  String bootstratServer = "127.0.0.1:9092";
  String consumerGroup = "myFirstConsumerGroup";
  String topic = "myFirstTopic";

  Thread t1 = new Thread() {
   public void run() {
    consumer.start(bootstratServer, consumerGroup, Arrays.asList(topic));
   }
  };

  Thread t2 = new Thread() {
   public void run() {
    try {
     producer.start(bootstratServer, topic);
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
  };

  t1.start();
  t2.start();

 }
}


Run App.java, you can see below messages in console.
Consumer received : 
----------------------
key : key-0
value : Message : 0
offset : 1


Consumer received : 
----------------------
key : key-0
value : Message : 0
offset : 2


Consumer received : 
----------------------
key : key-1
value : Message : 1
offset : 3


Previous                                                    Next                                                    Home

No comments:

Post a Comment