You can
add a key to a message. Messages with same key always go to same partition
(until you change number of partitions).
Example
ProducerRecord<String,
String> record = new ProducerRecord<>(topicName, key, message);
Find the
below working application.
package com.sample.app;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public class App {
public static void main(String args[]) throws InterruptedException, ExecutionException {
Properties properties = new Properties();
// Populate producer configurations
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");
String topicName = "myFirstTopic";
// Initialize KafkaProducer with configurations
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
for (int i = 0; i < 10; i++) {
String key = "key-" + (i % 3);
String message = "Message : " + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, key, message);
// Do not use get(), it is synchronous leads to performance issues in production
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println("*************************************************");
System.out.println("Sent <" + key + "," + message + ">");
//System.out.println("Topic : " + recordMetadata.topic());
System.out.println("Partition : " + recordMetadata.partition());
/*System.out.println("Offset : " + recordMetadata.offset());
System.out.println("Key Size : " + recordMetadata.serializedKeySize());
System.out.println("Value Size : " + recordMetadata.serializedValueSize());
System.out.println("Time Stamp : " + recordMetadata.timestamp());*/
System.out.println("*************************************************\n");
}
// producer.flush(): Since this operation is asynchronous, you need to either
// flush or close the producer to publish the data. try-with-resource statement
// automatically close the prodcuer
}
}
}
How to
run the application?
Step 1:
Create myFirstTopic
kafka-topics.sh
--bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3
--replication-factor 1
$kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$
$kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
myFirstTopic
Step 2:
Create a consumer for
myFirstTopic.
kafka-console-consumer.sh
--bootstrap-server localhost:9092 --topic myFirstTopic
Run
App.java, you can see below messages in console.
************************************************* Sent <key-0,Message : 0> Partition : 1 ************************************************* ************************************************* Sent <key-1,Message : 1> Partition : 0 ************************************************* ************************************************* Sent <key-2,Message : 2> Partition : 2 ************************************************* ************************************************* Sent <key-0,Message : 3> Partition : 1 ************************************************* ************************************************* Sent <key-1,Message : 4> Partition : 0 ************************************************* ************************************************* Sent <key-2,Message : 5> Partition : 2 ************************************************* ************************************************* Sent <key-0,Message : 6> Partition : 1 ************************************************* ************************************************* Sent <key-1,Message : 7> Partition : 0 ************************************************* ************************************************* Sent <key-2,Message : 8> Partition : 2 ************************************************* ************************************************* Sent <key-0,Message : 9> Partition : 1 *************************************************
As you
observe console messages
a.
Messages
with key-0 went to partiton 1
b.
Messages
with key-1 went to partiton 0
c.
Messages
with key-2 went to partiton 2
No comments:
Post a Comment