Tuesday, 12 November 2019

Java: Send messages to kafka topic


Step 1: Initialize kafka properties
Properties properties = new Properties();

// Populate producer configurations
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "1");

Step 2: Initialise KafkaProducer with properties.
KafkaProducer<String, String> producer = new KafkaProducer<>(properties)

I defined kafka producer, where key and value are both of type String.

Step 3: Create a record to send to kafka topic.
ProducerRecord<String, String> record = new ProducerRecord<>("myFirstTopic", "My First Message");

Step 4: Send the record to kafka topic.
producer.send(record);

Step 5: Since messages will send asynchronously, use flush or close method to send the data to kafka topic.
producer.close();
producer.flush();

Find the below working application.


App.java
package com.sample.app;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class App {
 public static void main(String args[]) {
  Properties properties = new Properties();

  // Populate producer configurations
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "1");

  // Initialize KafkaProducer with configurations
  try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
   ProducerRecord<String, String> record = new ProducerRecord<>("myFirstTopic", "My First Message");

   producer.send(record);

   // producer.flush(): Since this operation is asynchronous, you need to either
   // flush or close the producer to publish the data. try-with-resource statement
   // automatically close the prodcuer

  }

 }
}

How to run the application?

Step 1: Create myFirstTopic

kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$kafka-topics.sh --bootstrap-server localhost:9092 --topic myFirstTopic --create --partitions 3 --replication-factor 1
$
$kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
myFirstTopic

Step 2: Create a consumer for myFirstTopic.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic


Run App.java, you can see the published message in consumer console.
$kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myFirstTopic
My First Message



Previous                                                    Next                                                    Home

No comments:

Post a Comment