Thursday, 20 May 2021

Spring integration: Message Channels Demo

In Spring integration, message channels are used to transfer message from one endpoint to other endpoint. Producers send message to a channel and consumers receive messages from a channel.

By combining multiple endpoints and channels we can form a complex workflow.


Channels are used to transport the messages between endpoints, so endpoints remain loosely coupled.

 

MessageChannel is the core interface provided by spring integration, all other channels implements this interface.

@FunctionalInterface
public interface MessageChannel {
	long INDEFINITE_TIMEOUT = -1;

	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}

	boolean send(Message<?> message, long timeout);

}

 

MessageChannel support both point-to-point and Publish-Subscribe semantics.

 

In Point-to-point channel model, one consumer receive the message from a channel at any point of time, that means a message is delivered to once  consumer atmost.

 

In publish-subscribe channel model, a message is delivered to all the subscribers.

 

Can a channel buffer the messages?

Yes, a Pollable channel can buffer the message within a queue. Buffering helps in throttling messages. For example, if producer is capabale of generating 10 messages and consumer is capable of consuming 6 messages per second, we can use buffer to prevent overloading a consumer.

 

In case of Pollable channel, you need to configure a poller and specify how frequently you want to poll the channel and how many messages you want to consume for each poll.

 

Example

@ServiceActivator(inputChannel = "myQueueChannel", poller = {@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
public Message<String> consumeMessage(Message<String> message) {
	System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
	return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1").build();
}

Let’s create a channel using DirectChannel class which invokes a single subscriber for each sent Message.

 

Step 1: Create a channel instance.

DirectChannel messageChannel = new DirectChannel();

 

Step 2: Add a subscriber to the message channel

messageChannel.subscribe(new MessageHandler() {

	@Override
	public void handleMessage(Message<?> message) throws MessagingException {
		String payload = (String) message.getPayload();
		MessageHeaders messageHeaders = message.getHeaders();

		System.out.println("Headers");
		Set<Map.Entry<String, Object>> messageHeadersSet = messageHeaders.entrySet();

		for (Map.Entry<String, Object> headerEntry : messageHeadersSet) {
			System.out.println(headerEntry.getKey() + " -> " + headerEntry.getValue());
		}

		System.out.println("\nPayload");
		System.out.println(payload);
	}
});

Step 3: Send a message to the channel, you can observe that the message is received to the subscriber.

messageChannel.send(message);

 

Find the below working application.

 

Step 1: Create new maven project ‘channel-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>channel-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

Step 3: Create a package ‘com.sample.app’ and define App class.

 

App.java

package com.sample.app;

import java.util.Map;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

@SpringBootApplication
@Configuration
public class App {

	@Autowired
	private DirectChannel messageChannel;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean
	public DirectChannel channel() {
		DirectChannel channel = new DirectChannel();
		return channel;
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {
			messageChannel.subscribe(new MessageHandler() {

				@Override
				public void handleMessage(Message<?> message) throws MessagingException {
					String payload = (String) message.getPayload();
					MessageHeaders messageHeaders = message.getHeaders();

					System.out.println("Headers");
					Set<Map.Entry<String, Object>> messageHeadersSet = messageHeaders.entrySet();

					for (Map.Entry<String, Object> headerEntry : messageHeadersSet) {
						System.out.println(headerEntry.getKey() + " -> " + headerEntry.getValue());
					}

					System.out.println("\nPayload");
					System.out.println(payload);
				}
			});
			
			

			String json = "{\"name\" : \"Krishna\"}";

			Message<String> message = MessageBuilder.withPayload(json).setHeader("my-content-type", "application/json")
					.setHeader("my-origin", "localhost").build();

			messageChannel.send(message);

		};
	}

}

Total project structure looks like below.


Run App.java, you can see below messages in console.

Headers
my-content-type -> application/json
id -> 05101185-a8a2-bf96-88d9-1fcecb0956c2
my-origin -> localhost
timestamp -> 1617360717242

Payload
{"name" : "Krishna"}

You can download the application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/channel-demo


 

Previous                                                    Next                                                    Home

No comments:

Post a Comment