Thursday, 3 June 2021

Spring integration: Channel interceptor demo

Channel interceptors are used to log the information and message modification etc.,

 

How to define channel interceptor?

By implementing ChannelInterceptor interface, you can define custom intercetptor.

 

ChannelInterceptor interface provides following methods.

 

Method

Description

default Message<?> preSend(Message<?> message, MessageChannel channel)

Invoked before the Message is actually sent to the channel.

default void postSend(Message<?> message, MessageChannel channel, boolean sent)

Invoked immediately after the send invocation.

default void afterSendCompletion

Invoked after the completion of a send regardless of any exception that have been raised thus allowing for proper resource cleanup.

default boolean preReceive(MessageChannel channel)

Invoked as soon as receive is called and before a Message is actually retrieved.

default Message<?> postReceive(Message<?> message, MessageChannel channel)

Invoked immediately after a Message has been retrieved but before it is returned to the caller.

default void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel,

                           @Nullable Exception ex)

Invoked after the completion of a receive regardless of any exception that have been raised thus allowing for proper resource cleanup.

 

How to add interceptor to a channel?

Using 'setInterceptors' method, you can set interceptors.

 

Example

QueueChannel channel = new QueueChannel();

channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));

 

Find the below working application.

 

Step 1: Create new maven project 'channel-interceptor-demo'.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>channel-interceptor-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Define custom channel interceptor.

 

CustomChannelInterceptor.java

 

package com.sample.app.interceptor;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;

public class CustomChannelInterceptor implements ChannelInterceptor {

	@Override
	public Message<?> preSend(Message<?> message, MessageChannel channel) {
		System.out.println("preSend: Converting the data to uppercase");

		String payloadInUppercase = message.getPayload().toString().toUpperCase();
		Map<String, Object> headers = getHeaders(message);
		Message msg = MessageBuilder.withPayload(payloadInUppercase).copyHeaders(headers).build();

		return ChannelInterceptor.super.preSend(msg, channel);
	}

	private static Map<String, Object> getHeaders(Message source) {
		Set<Map.Entry<String, Object>> headersEntrySet = source.getHeaders().entrySet();
		Map<String, Object> headersMap = new HashMap<>();

		for (Map.Entry<String, Object> entry : headersEntrySet) {
			headersMap.put(entry.getKey(), entry.getValue());
		}
		return headersMap;

	}
}

 

Step 4: Define gate way.

CustomGateway.java

package com.sample.app.gateway;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "pollableChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "pollableChannel")
	public void print(Message<String> message);

}

 

Step 5: Define endpoint.

 

ConsumerEndpoint1.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.integration.annotation.*;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "pollableChannel", poller = {
			@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received message from pollableChannel channel : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

Step 6: Define main application class.

 

App.java

package com.sample.app;

import java.util.Arrays;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;

import com.sample.app.gateway.CustomGateway;
import com.sample.app.interceptor.CustomChannelInterceptor;

@SpringBootApplication
@Configuration
public class App {
	@Bean(name = "pollableChannel")
	public PollableChannel pollableChannel() {
		QueueChannel channel = new QueueChannel();
		channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));
		return channel;
	}

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			for (int i = 0; i < 10; i++) {
				Message<String> message = MessageBuilder.withPayload("secret message " + i).build();
				customGateway.print(message);
			}

		};
	}

}

 

Total project structure looks like below.


 

Run App.java, you will see below messages in console.


 

preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
preSend: Converting the data to uppercase
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 0
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 1
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 2
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 3
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 4
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 5
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 6
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 7
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 8
ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 9


You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/channel-interceptor-demo


 

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment