Kafka
support call back mechanism, where you can get notified once a record is
published to Kafka topic.
producer.send(record, (metadata, exception) -> {
if(exception != null) {
exception.printStackTrace();
return;
}
System.out.println("Topic : " + metadata.topic());
System.out.println("Partition : " + metadata.partition());
System.out.println("Offset : " + metadata.offset());
System.out.println("Key Size : " + metadata.serializedKeySize());
System.out.println("Value Size : " + metadata.serializedValueSize());
System.out.println("Time Stamp : " + metadata.timestamp());
});
Find the
below working application.
App.java
package com.sample.app;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class App {
public static void main(String args[]) {
Properties properties = new Properties();
// Populate producer configurations
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
// Initialize KafkaProducer with configurations
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<String, String> record = new ProducerRecord<>("myFirstTopic", "My First Message");
producer.send(record, (metadata, exception) -> {
if(exception != null) {
exception.printStackTrace();
return;
}
System.out.println("Topic : " + metadata.topic());
System.out.println("Partition : " + metadata.partition());
System.out.println("Offset : " + metadata.offset());
System.out.println("Key Size : " + metadata.serializedKeySize());
System.out.println("Value Size : " + metadata.serializedValueSize());
System.out.println("Time Stamp : " + metadata.timestamp());
});
// producer.flush(): Since this operation is asynchronous, you need to either
// flush or close the producer to publish the data. try-with-resource statement
// automatically close the prodcuer
}
}
}
How to
run this application?
Step 1:
Create myFirstTopic
kafka-topics.sh
--bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3
--replication-factor 1
$kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$
$kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
myFirstTopic
Step 2:
Create a consumer for
myFirstTopic.
kafka-console-consumer.sh
--bootstrap-server localhost:9092 --topic myFirstTopic
$kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic
My First Message
App.java
console output
First
Run
Topic : myFirstTopic
Partition
: 0
Offset : 0
Key Size :
-1
Value Size
: 16
Time Stamp
: 1570763932061
Second
Run
Topic :
myFirstTopic
Partition
: 1
Offset : 0
Key Size :
-1
Value Size
: 16
Time Stamp
: 1570764003664
Third
Run
Topic :
myFirstTopic
Partition
: 1
Offset : 1
Key Size :
-1
Value Size
: 16
Time Stamp
: 1570764022578
Since we
do not specify any key here, size of the key is -1. In my next post, I will
explain how to send message with keys.
No comments:
Post a Comment