Thursday 14 November 2019

Java: Kafka: Read data from specific partition

Using Consumer assign method, you can read data from specific partition.

How to read from partition 1 of given topic.
TopicPartition topicPartition = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(topicPartition));

Find the below working application.

JavaKafkaProducer.java
package com.sample.app;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class JavaKafkaProducer {

 public void start(String bootstratServer, String topicName) throws InterruptedException {
  Properties properties = new Properties();

  // Populate producer configurations
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "1");

  // Initialize KafkaProducer with configurations
  try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
   int i = 0;
   while (true) {
    String key = "key-" + (i % 3);
    String message = "Message : " + i;

    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);

    producer.send(record);
    TimeUnit.SECONDS.sleep(1);
    i++;

   }

   // producer.flush(): Since this operation is asynchronous, you need to either
   // flush or close the producer to publish the data. try-with-resource statement
   // automatically close the prodcuer
  }
 }

}

JavaKafkaConsumer.java
package com.sample.app;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class JavaKafkaConsumer {

 public void start(String bootstratServer, String consumerGroup, String topic) {
  Properties properties = new Properties();

  // Populate consumer configurations
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
  properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

  // Instantiate consumer
  try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {

   TopicPartition topicPartition = new TopicPartition(topic, 1);
   consumer.assign(Arrays.asList(topicPartition));

   while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));

    for (ConsumerRecord<String, String> record : records) {
     String key = record.key();
     String value = record.value();
     long offset = record.offset();
     int partition = record.partition();
     

     System.out.println("\n\nConsumer received : ");
     System.out.println("----------------------");
     System.out.println("key : " + key);
     System.out.println("value : " + value);
     System.out.println("offset : " + offset);
     System.out.println("partition : " + partition);

    }
   }

  }

 }
}

App.java
package com.sample.app;

import java.util.concurrent.ExecutionException;

public class App {
 public static void main(String args[]) throws InterruptedException, ExecutionException {
  JavaKafkaProducer producer = new JavaKafkaProducer();
  JavaKafkaConsumer consumer = new JavaKafkaConsumer();

  String bootstratServer = "127.0.0.1:9092";
  String consumerGroup = "myFirstConsumerGroup";
  String topic = "myFirstTopic";

  Thread t1 = new Thread() {
   public void run() {
    consumer.start(bootstratServer, consumerGroup, topic);
   }
  };

  Thread t2 = new Thread() {
   public void run() {
    try {
     producer.start(bootstratServer, topic);
    } catch (InterruptedException e) {
     // TODO Auto-generated catch block
     e.printStackTrace();
    }
   }
  };

  t1.start();
  t2.start();

 }
}

Create a topic with 4 partitions.

kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 4 --replication-factor 1

Run App.java, you can see below messages in console.
Consumer received : 
----------------------
key : key-0
value : Message : 0
offset : 0
partition : 1


Consumer received : 
----------------------
key : key-0
value : Message : 3
offset : 1
partition : 1


Consumer received : 
----------------------
key : key-0
value : Message : 6
offset : 2
partition : 1


Consumer received : 
----------------------
key : key-0
value : Message : 9
offset : 3
partition : 1

As you observe the messages, consumer read the messages from partition 1.    

Previous                                                    Next                                                    Home

No comments:

Post a Comment