In Spring integration, message channels are used to transfer message from one endpoint to other endpoint. Producers send message to a channel and consumers receive messages from a channel.
By combining multiple endpoints and channels we can form a complex workflow.
Channels are used to transport the messages between endpoints, so endpoints remain loosely coupled.
MessageChannel is the core interface provided by spring integration, all other channels implements this interface.
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
MessageChannel support both point-to-point and Publish-Subscribe semantics.
In Point-to-point channel model, one consumer receive the message from a channel at any point of time, that means a message is delivered to once consumer atmost.
In publish-subscribe channel model, a message is delivered to all the subscribers.
Can a channel buffer the messages?
Yes, a Pollable channel can buffer the message within a queue. Buffering helps in throttling messages. For example, if producer is capabale of generating 10 messages and consumer is capable of consuming 6 messages per second, we can use buffer to prevent overloading a consumer.
In case of Pollable channel, you need to configure a poller and specify how frequently you want to poll the channel and how many messages you want to consume for each poll.
Example
@ServiceActivator(inputChannel = "myQueueChannel", poller = {@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1").build();
}
Let’s create a channel using DirectChannel class which invokes a single subscriber for each sent Message.
Step 1: Create a channel instance.
DirectChannel messageChannel = new DirectChannel();
Step 2: Add a subscriber to the message channel
messageChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String payload = (String) message.getPayload();
MessageHeaders messageHeaders = message.getHeaders();
System.out.println("Headers");
Set<Map.Entry<String, Object>> messageHeadersSet = messageHeaders.entrySet();
for (Map.Entry<String, Object> headerEntry : messageHeadersSet) {
System.out.println(headerEntry.getKey() + " -> " + headerEntry.getValue());
}
System.out.println("\nPayload");
System.out.println(payload);
}
});
Step 3: Send a message to the channel, you can observe that the message is received to the subscriber.
messageChannel.send(message);
Find the below working application.
Step 1: Create new maven project ‘channel-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>channel-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create a package ‘com.sample.app’ and define App class.
App.java
package com.sample.app;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
@SpringBootApplication
@Configuration
public class App {
@Autowired
private DirectChannel messageChannel;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public DirectChannel channel() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
messageChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String payload = (String) message.getPayload();
MessageHeaders messageHeaders = message.getHeaders();
System.out.println("Headers");
Set<Map.Entry<String, Object>> messageHeadersSet = messageHeaders.entrySet();
for (Map.Entry<String, Object> headerEntry : messageHeadersSet) {
System.out.println(headerEntry.getKey() + " -> " + headerEntry.getValue());
}
System.out.println("\nPayload");
System.out.println(payload);
}
});
String json = "{\"name\" : \"Krishna\"}";
Message<String> message = MessageBuilder.withPayload(json).setHeader("my-content-type", "application/json")
.setHeader("my-origin", "localhost").build();
messageChannel.send(message);
};
}
}
Total project structure looks like below.
Run App.java, you can see below messages in console.
Headers my-content-type -> application/json id -> 05101185-a8a2-bf96-88d9-1fcecb0956c2 my-origin -> localhost timestamp -> 1617360717242 Payload {"name" : "Krishna"}
You can download the application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/channel-demo
Previous Next Home
No comments:
Post a Comment