Using
Consumer assign method, you can read data from specific partition.
How to read from partition 1 of given topic.
TopicPartition
topicPartition = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(topicPartition));
Find the
below working application.
package com.sample.app;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class JavaKafkaProducer {
public void start(String bootstratServer, String topicName) throws InterruptedException {
Properties properties = new Properties();
// Populate producer configurations
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// Initialize KafkaProducer with configurations
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
int i = 0;
while (true) {
String key = "key-" + (i % 3);
String message = "Message : " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);
producer.send(record);
TimeUnit.SECONDS.sleep(1);
i++;
}
// producer.flush(): Since this operation is asynchronous, you need to either
// flush or close the producer to publish the data. try-with-resource statement
// automatically close the prodcuer
}
}
}
JavaKafkaConsumer.java
package com.sample.app;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
public class JavaKafkaConsumer {
public void start(String bootstratServer, String consumerGroup, String topic) {
Properties properties = new Properties();
// Populate consumer configurations
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Instantiate consumer
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
TopicPartition topicPartition = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(topicPartition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
long offset = record.offset();
int partition = record.partition();
System.out.println("\n\nConsumer received : ");
System.out.println("----------------------");
System.out.println("key : " + key);
System.out.println("value : " + value);
System.out.println("offset : " + offset);
System.out.println("partition : " + partition);
}
}
}
}
}
App.java
package com.sample.app;
import java.util.concurrent.ExecutionException;
public class App {
public static void main(String args[]) throws InterruptedException, ExecutionException {
JavaKafkaProducer producer = new JavaKafkaProducer();
JavaKafkaConsumer consumer = new JavaKafkaConsumer();
String bootstratServer = "127.0.0.1:9092";
String consumerGroup = "myFirstConsumerGroup";
String topic = "myFirstTopic";
Thread t1 = new Thread() {
public void run() {
consumer.start(bootstratServer, consumerGroup, topic);
}
};
Thread t2 = new Thread() {
public void run() {
try {
producer.start(bootstratServer, topic);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t1.start();
t2.start();
}
}
Create a
topic with 4 partitions.
kafka-topics.sh
--bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 4
--replication-factor 1
Run
App.java, you can see below messages in console.
Consumer received : ---------------------- key : key-0 value : Message : 0 offset : 0 partition : 1 Consumer received : ---------------------- key : key-0 value : Message : 3 offset : 1 partition : 1 Consumer received : ---------------------- key : key-0 value : Message : 6 offset : 2 partition : 1 Consumer received : ---------------------- key : key-0 value : Message : 9 offset : 3 partition : 1
As you observe the messages, consumer read the messages from partition 1.
No comments:
Post a Comment