In this post, I am going to explain how to
a. Create a new topic
b. Submit a message to the topic
c. Read the message from a topic
Setup
Go through below post and setup kafka
https://self-learning-java-tutorial.blogspot.com/2019/10/install-and-setup-kafka-in-mac.html
Define KafkaTopicsConfig to add new topics
@Configuration
public class KafkaTopicsConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(AppConstants.NEW_TOPIC_NAME, 1, (short)1);
}
}
Define ProducerConfig
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
To send messages, we must configure a ProducerFactory which determines how Kafka Producer instances are created. A KafkaTemplate, which houses a Producer instance and offers easy methods for sending messages to Kafka topics, is then required.
Kafka Producers are safe to use by multiple threads, so using a single instance in an application context will enhance performance. Similarly, KafkaTemplates are also thread safe, and it's recommended to use just one instance.
Define consumer config
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConstants.NEW_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = AppConstants.NEW_TOPIC_NAME, groupId = AppConstants.NEW_GROUP_NAME)
public void listenOnTopic(String message) {
System.out.println("Received Message : " + message);
}
}
To receive messages, the configuration of a ConsumerFactory and a KafkaListenerContainerFactory is necessary. After these beans are present in the Spring bean factory, POJO-based consumers can be set up using the @KafkaListener annotation.
The @EnableKafka annotation must be added to the configuration class to enable the recognition of the @KafkaListener annotation on beans managed by Spring.
Find the below working application.
Step 1: Create new maven project ‘spring-kafka-hello-world’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>spring-kafka-hello-world</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<kafka.client.version>3.3.2</kafka.client.version>
<gson.version>2.10.1</gson.version>
<slf4j.version>2.0.6</slf4j.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Step 3: Define JsonUtil class.
JsonUtil.java
package com.sample.app.util;
import java.io.IOException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonUtil {
public static String marshal(Object obj) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(obj);
}
public static <T> T unmarshal(Class<T> clazz, String json)
throws JsonParseException, JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
return (T) mapper.readValue(json, clazz);
}
}
Step 4: Define AppConstants class.
AppConstants.java
package com.sample.app.constants;
public class AppConstants {
public static final String NEW_TOPIC_NAME = "myTopic1";
public static final String NEW_GROUP_NAME = "myGroup1";
}
Step 5: Define KafkaTopicsConfig, KafkaProducerConfig and KafkaProducerConfig classes.
KafkaTopicsConfig.java
package com.sample.app.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import com.sample.app.constants.AppConstants;
@Configuration
public class KafkaTopicsConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(AppConstants.NEW_TOPIC_NAME, 1, (short)1);
}
}
KafkaProducerConfig.java
package com.sample.app.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig.java
package com.sample.app.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.sample.app.constants.AppConstants;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConstants.NEW_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = AppConstants.NEW_TOPIC_NAME, groupId = AppConstants.NEW_GROUP_NAME)
public void listenOnTopic(String message) {
System.out.println("Received Message : " + message);
}
}
Step 6: Define PublisherController class.
PublisherController.java
package com.sample.app.controller;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.sample.app.constants.AppConstants;
import com.sample.app.util.JsonUtil;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;
@RestController
@RequestMapping("api/v1/publisher")
public class PublisherController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/publish-message")
public ResponseEntity sendMessage(
@RequestBody(description = "Publish new message", required = true, content = @Content(schema = @Schema(implementation = HashMap.class))) @org.springframework.web.bind.annotation.RequestBody final Map<String, Object> payload) {
Thread t1 = new Thread() {
public void run() {
try {
String json = JsonUtil.marshal(payload);
kafkaTemplate.send(AppConstants.NEW_TOPIC_NAME, json);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t1.start();
return ResponseEntity.ok().build();
}
}
Step 7: Define main application class.
App.java
package com.sample.app;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Total project structure looks like below.
Navigate to the file, where pom.xml is located and execute the command ‘mvn clean install’.
$ls ./target
classes
generated-sources
generated-test-sources
maven-archiver
maven-status
spring-kafka-hello-world-1.0.0.jar
spring-kafka-hello-world-1.0.0.jar.original
test-classes
Execute below command to run the application.
java -jar ./target/spring-kafka-hello-world-1.0.0.jar
Open the url ‘http://localhost:8080/swagger-ui/index.html’ in browser.
Experiment the api /publish-message with below payload.
{"a" : 1, "b" : 2, "c" : 3}
you will see the same message on logged to the console.
Received Message : {"a":1,"b":2,"c":3}
You can download this application from this link.
No comments:
Post a Comment