Below
procedure explains how to create a consumer and consume messages from a topic.
Step 1:
Initialize properties
that are used to instantiate consumer.
Properties properties = new Properties();
// Populate consumer configurations
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Step 2:
Instantiate kafka
consumer.
KafkaConsumer<String,
String> consumer = new KafkaConsumer<>(properties)
Step 3:
Subscribe the
consumer to topics.
consumer.subscribe(topics);
Step 4:
Poll the records from
topic.
Below
snippet poll the records from topic for every 200 milliseconds.
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
..........
}
Find the
below working application.
package com.sample.app;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class JavaKafkaProducer {
public void start(String bootstratServer, String topicName) throws InterruptedException {
Properties properties = new Properties();
// Populate producer configurations
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// Initialize KafkaProducer with configurations
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
int i = 0;
while (true) {
String key = "key-" + (i % 3);
String message = "Message : " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);
producer.send(record);
TimeUnit.SECONDS.sleep(5);
i++;
}
// producer.flush(): Since this operation is asynchronous, you need to either
// flush or close the producer to publish the data. try-with-resource statement
// automatically close the prodcuer
}
}
}
JavaKafkaConsumer.java
package com.sample.app;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class JavaKafkaConsumer {
public void start(String bootstratServer, String consumerGroup, List<String> topics) {
Properties properties = new Properties();
// Populate consumer configurations
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstratServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Instantiate consumer
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
// Subscribe to the topics
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
long offset = record.offset();
System.out.println("\n\nConsumer received : ");
System.out.println("----------------------");
System.out.println("key : " + key);
System.out.println("value : " + value);
System.out.println("offset : " + offset);
}
}
}
}
}
App.java
package com.sample.app;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
public class App {
public static void main(String args[]) throws InterruptedException, ExecutionException {
JavaKafkaProducer producer = new JavaKafkaProducer();
JavaKafkaConsumer consumer = new JavaKafkaConsumer();
String bootstratServer = "127.0.0.1:9092";
String consumerGroup = "myFirstConsumerGroup";
String topic = "myFirstTopic";
Thread t1 = new Thread() {
public void run() {
consumer.start(bootstratServer, consumerGroup, Arrays.asList(topic));
}
};
Thread t2 = new Thread() {
public void run() {
try {
producer.start(bootstratServer, topic);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t1.start();
t2.start();
}
}
Run
App.java, you can see below messages in console.
Consumer received : ---------------------- key : key-0 value : Message : 0 offset : 1 Consumer received : ---------------------- key : key-0 value : Message : 0 offset : 2 Consumer received : ---------------------- key : key-1 value : Message : 1 offset : 3
No comments:
Post a Comment