Channel interceptors are used to log the information and message modification etc.,
How to define channel interceptor?
By implementing ChannelInterceptor interface, you can define custom intercetptor.
ChannelInterceptor interface provides following methods.
Method |
Description |
default Message<?> preSend(Message<?> message, MessageChannel channel) |
Invoked before the Message is actually sent to the channel. |
default void postSend(Message<?> message, MessageChannel channel, boolean sent) |
Invoked immediately after the send invocation. |
default void afterSendCompletion |
Invoked after the completion of a send regardless of any exception that have been raised thus allowing for proper resource cleanup. |
default boolean preReceive(MessageChannel channel) |
Invoked as soon as receive is called and before a Message is actually retrieved. |
default Message<?> postReceive(Message<?> message, MessageChannel channel) |
Invoked immediately after a Message has been retrieved but before it is returned to the caller. |
default void afterReceiveCompletion(@Nullable Message<?> message, MessageChannel channel, @Nullable Exception ex) |
Invoked after the completion of a receive regardless of any exception that have been raised thus allowing for proper resource cleanup. |
How to add interceptor to a channel?
Using 'setInterceptors' method, you can set interceptors.
Example
QueueChannel channel = new QueueChannel();
channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));
Find the below working application.
Step 1: Create new maven project 'channel-interceptor-demo'.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>channel-interceptor-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Define custom channel interceptor.
CustomChannelInterceptor.java
package com.sample.app.interceptor;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
public class CustomChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("preSend: Converting the data to uppercase");
String payloadInUppercase = message.getPayload().toString().toUpperCase();
Map<String, Object> headers = getHeaders(message);
Message msg = MessageBuilder.withPayload(payloadInUppercase).copyHeaders(headers).build();
return ChannelInterceptor.super.preSend(msg, channel);
}
private static Map<String, Object> getHeaders(Message source) {
Set<Map.Entry<String, Object>> headersEntrySet = source.getHeaders().entrySet();
Map<String, Object> headersMap = new HashMap<>();
for (Map.Entry<String, Object> entry : headersEntrySet) {
headersMap.put(entry.getKey(), entry.getValue());
}
return headersMap;
}
}
Step 4: Define gate way.
CustomGateway.java
package com.sample.app.gateway;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
@MessagingGateway(name = "myGateway", defaultRequestChannel = "pollableChannel")
public interface CustomGateway {
@Gateway(requestChannel = "pollableChannel")
public void print(Message<String> message);
}
Step 5: Define endpoint.
ConsumerEndpoint1.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.integration.annotation.*;
@Component
public class ConsumerEndpoint1 {
@ServiceActivator(inputChannel = "pollableChannel", poller = {
@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint1 -> Received message from pollableChannel channel : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
.build();
}
}
Step 6: Define main application class.
App.java
package com.sample.app;
import java.util.Arrays;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import com.sample.app.gateway.CustomGateway;
import com.sample.app.interceptor.CustomChannelInterceptor;
@SpringBootApplication
@Configuration
public class App {
@Bean(name = "pollableChannel")
public PollableChannel pollableChannel() {
QueueChannel channel = new QueueChannel();
channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));
return channel;
}
@Autowired
private CustomGateway customGateway;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload("secret message " + i).build();
customGateway.print(message);
}
};
}
}
Total project structure looks like below.
Run App.java, you will see below messages in console.
preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase preSend: Converting the data to uppercase ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 0 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 1 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 2 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 3 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 4 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 5 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 6 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 7 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 8 ConsumerEndpoint1 -> Received message from pollableChannel channel : SECRET MESSAGE 9
You can download complete working application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/channel-interceptor-demo
No comments:
Post a Comment