Monday, 6 February 2023

Spring kafka: Hello world application

In this post, I am going to explain how to

a.   Create a new topic

b.   Submit a message to the topic

c.    Read the message from a topic

 

Setup

Go through below post and setup kafka

https://self-learning-java-tutorial.blogspot.com/2019/10/install-and-setup-kafka-in-mac.html

 

 

Define KafkaTopicsConfig to add new topics

@Configuration
public class KafkaTopicsConfig {

	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	@Bean
	public KafkaAdmin kafkaAdmin() {
		Map<String, Object> configs = new HashMap<>();
		configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		return new KafkaAdmin(configs);
	}

	@Bean
	public NewTopic topic1() {
		return new NewTopic(AppConstants.NEW_TOPIC_NAME, 1, (short)1);
	}
}

 

Define ProducerConfig

@Configuration
public class KafkaProducerConfig {
	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	
	@Bean
	public ProducerFactory<String, String> producerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

 

To send messages, we must configure a ProducerFactory which determines how Kafka Producer instances are created. A KafkaTemplate, which houses a Producer instance and offers easy methods for sending messages to Kafka topics, is then required.

 

Kafka Producers are safe to use by multiple threads, so using a single instance in an application context will enhance performance. Similarly, KafkaTemplates are also thread safe, and it's recommended to use just one instance.

 

Define consumer config

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConstants.NEW_GROUP_NAME);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(props);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}

	@KafkaListener(topics = AppConstants.NEW_TOPIC_NAME, groupId = AppConstants.NEW_GROUP_NAME)
	public void listenOnTopic(String message) {
		System.out.println("Received Message : " + message);
	}
}

 

To receive messages, the configuration of a ConsumerFactory and a KafkaListenerContainerFactory is necessary. After these beans are present in the Spring bean factory, POJO-based consumers can be set up using the @KafkaListener annotation.

 

The @EnableKafka annotation must be added to the configuration class to enable the recognition of the @KafkaListener annotation on beans managed by Spring.

 

Find the below working application.

 

Step 1: Create new maven project ‘spring-kafka-hello-world’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>spring-kafka-hello-world</artifactId>
	<version>1.0.0</version>

	<properties>
		<maven.compiler.target>1.8</maven.compiler.target>
		<maven.compiler.source>1.8</maven.compiler.source>
		<kafka.client.version>3.3.2</kafka.client.version>
		<gson.version>2.10.1</gson.version>
		<slf4j.version>2.0.6</slf4j.version>
	</properties>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.5</version>
	</parent>


	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>


		<dependency>
			<groupId>org.springdoc</groupId>
			<artifactId>springdoc-openapi-ui</artifactId>
			<version>1.6.6</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

 

Step 3: Define JsonUtil class.

 

JsonUtil.java

package com.sample.app.util;

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonUtil {
	public static String marshal(Object obj) throws JsonProcessingException {
		ObjectMapper mapper = new ObjectMapper();
		return mapper.writeValueAsString(obj);
	}

	public static <T> T unmarshal(Class<T> clazz, String json)
			throws JsonParseException, JsonMappingException, IOException {
		ObjectMapper mapper = new ObjectMapper();
		return (T) mapper.readValue(json, clazz);
	}
}

Step 4: Define AppConstants class.

 

AppConstants.java

package com.sample.app.constants;

public class AppConstants {
	
	public static final String NEW_TOPIC_NAME = "myTopic1";
	public static final String NEW_GROUP_NAME = "myGroup1";

}

Step 5: Define KafkaTopicsConfig, KafkaProducerConfig and KafkaProducerConfig classes.

 

KafkaTopicsConfig.java

package com.sample.app.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

import com.sample.app.constants.AppConstants;

@Configuration
public class KafkaTopicsConfig {

	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	@Bean
	public KafkaAdmin kafkaAdmin() {
		Map<String, Object> configs = new HashMap<>();
		configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		return new KafkaAdmin(configs);
	}

	@Bean
	public NewTopic topic1() {
		return new NewTopic(AppConstants.NEW_TOPIC_NAME, 1, (short)1);
	}
}

 

KafkaProducerConfig.java

package com.sample.app.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {
	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	
	@Bean
	public ProducerFactory<String, String> producerFactory() {
		Map<String, Object> configProps = new HashMap<>();
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return new DefaultKafkaProducerFactory<>(configProps);
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}
}

 

KafkaConsumerConfig.java

package com.sample.app.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import com.sample.app.constants.AppConstants;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

	@Value(value = "${spring.kafka.bootstrap-servers}")
	private String kafkaBootstrapServers;

	@Bean
	public ConsumerFactory<String, String> consumerFactory() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, AppConstants.NEW_GROUP_NAME);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return new DefaultKafkaConsumerFactory<>(props);
	}

	@Bean
	public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

		ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}

	@KafkaListener(topics = AppConstants.NEW_TOPIC_NAME, groupId = AppConstants.NEW_GROUP_NAME)
	public void listenOnTopic(String message) {
		System.out.println("Received Message : " + message);
	}
}

 

Step 6: Define PublisherController class.

 

PublisherController.java

package com.sample.app.controller;

import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.sample.app.constants.AppConstants;
import com.sample.app.util.JsonUtil;

import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.parameters.RequestBody;

@RestController
@RequestMapping("api/v1/publisher")
public class PublisherController {

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	@PostMapping("/publish-message")
	public ResponseEntity sendMessage(
			@RequestBody(description = "Publish new message", required = true, content = @Content(schema = @Schema(implementation = HashMap.class))) @org.springframework.web.bind.annotation.RequestBody final Map<String, Object> payload) {

		Thread t1 = new Thread() {
			public void run() {
				try {
					String json = JsonUtil.marshal(payload);
					kafkaTemplate.send(AppConstants.NEW_TOPIC_NAME, json);
				} catch (JsonProcessingException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

			}
		};

		t1.start();

		return ResponseEntity.ok().build();

	}

}

 

Step 7: Define main application class.

 

App.java

package com.sample.app;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
	public static void main(String[] args) {

		SpringApplication.run(App.class, args);

	}
}

 

Total project structure looks like below.

 


 

Navigate to the file, where pom.xml is located and execute the command ‘mvn clean install’.

 

$ls ./target
classes
generated-sources
generated-test-sources
maven-archiver
maven-status
spring-kafka-hello-world-1.0.0.jar
spring-kafka-hello-world-1.0.0.jar.original
test-classes

Execute below command to run the application.

java -jar ./target/spring-kafka-hello-world-1.0.0.jar

Open the url ‘http://localhost:8080/swagger-ui/index.html’ in browser.




Experiment the api /publish-message with below payload.

{"a" : 1, "b" : 2, "c" : 3}

 

you will see the same message on logged to the console.

Received Message : {"a":1,"b":2,"c":3}

 

You can download this application from this link.




 

 

 


 

 

 

 


Previous                                                 Next                                                 Home

No comments:

Post a Comment