Tuesday 25 May 2021

Spring Integration: Working with QueueChannel

Queue channel implements the PollableChannel interface. In QueueChannel, message is placed in a blocking queue (capacity of the blocking queue may be specified upon construction) and these messages are polled by consumer.

 

Endpoints poll the messages in QueueChannel by calling receive method. Let’s design our application in asynchronous way, where a gateway place the messages in queue channel and do not wait for the response from consumer.


For example, let’s build an application where Gateway place the messages in queueChannel and read the response in asynchronous way (That means our application do not wait for the response from consumer endpoints like endpoint1, endpoint2).

 

Define a gateway.

@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "myQueueChannel")
	public Future<Message<String>> print(Message<String> message);

}

 

As you see the definition of ‘CustomGateway’, it has print method, which write the messages to myQueueChannel.

 

Define myQueueChannel.

@Bean(name = "myQueueChannel")
public QueueChannel channel1() {
	QueueChannel channel = new QueueChannel(100);
	return channel;
}

 

Let’s define consumer endpoints that poll messages from the QueueChannel.

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "myQueueChannel", poller = {
			@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

As you see the definition of ServiceActivator, I defined poller fixedRate as 3000 milliseconds, that means it polls ‘myQueueChannel’ for every 3 seconds and poll 3 message maximum for every poll.

 

Find the below working application.

 

Step 1: Create new maven project ‘queue-channel-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>queue-channel-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Create a package 'com.sample.app.endpoints' and define ConsumerEndpoint1, ConsumerEndpoint2 and CustomGateway classes.

 

ConsumerEndpoint1.java
package com.sample.app.endpoints;

import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "myQueueChannel", poller = {
			@Poller(maxMessagesPerPoll = "3", fixedRate = "3000") })
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

ConsumerEndpoint2.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint2 {

	@ServiceActivator(inputChannel = "myQueueChannel", poller = {
			@Poller(maxMessagesPerPoll = "2", fixedRate = "2000") })
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint2 -> Received from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint2").build();
	}

}

 

CustomGateway.java

package com.sample.app.endpoints;

import java.util.concurrent.Future;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "myQueueChannel")
	public Future<Message<String>> print(Message<String> message);

}

 

Step 4: Define App class in com.sample.app package.

 

App.java

 

package com.sample.app;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import com.sample.app.endpoints.CustomGateway;

@SpringBootApplication
@Configuration
public class App {

	@Autowired
	@Qualifier("myQueueChannel")
	private QueueChannel queueChannel;

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean(name = "myQueueChannel")
	public QueueChannel channel1() {
		QueueChannel channel = new QueueChannel(100);
		return channel;
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			List<Future<Message<String>>> futures = new ArrayList<> ();
			
			for (int i = 0; i < 10; i++) {
				System.out.println("Sending message number " + i);
				Message<String> message = MessageBuilder.withPayload("msg " + i).build();
				Future<Message<String>> result = customGateway.print(message);
				futures.add(result);
			}
			
			System.out.println("\nAll the messages sent in asynchronous way");
			
			for(Future<Message<String>> future: futures) {
				Message<String> message = future.get();
				
				System.out.println(message.getPayload());
			}

		};
	}

}


Total project structure looks like below.

Run App.java, you will see below kind of messages in console.

Sending message number 0
Sending message number 1
Sending message number 2
Sending message number 3
Sending message number 4
Sending message number 5
Sending message number 6
Sending message number 7
Sending message number 8
Sending message number 9

All the messages sent in asynchronous way

ConsumerEndpoint1 -> Received from gateway : msg 0
ConsumerEndpoint2 -> Received from gateway : msg 1
ConsumerEndpoint2 -> Received from gateway : msg 8
Message 'msg 0' received by ConsumerEndpoint1
Message 'msg 1' received by ConsumerEndpoint2
ConsumerEndpoint1 -> Received from gateway : msg 5
ConsumerEndpoint1 -> Received from gateway : msg 2
Message 'msg 2' received by ConsumerEndpoint1
ConsumerEndpoint2 -> Received from gateway : msg 7
ConsumerEndpoint2 -> Received from gateway : msg 3
Message 'msg 3' received by ConsumerEndpoint2
ConsumerEndpoint1 -> Received from gateway : msg 9
ConsumerEndpoint1 -> Received from gateway : msg 4
ConsumerEndpoint1 -> Received from gateway : msg 6
Message 'msg 4' received by ConsumerEndpoint1
Message 'msg 5' received by ConsumerEndpoint1
Message 'msg 6' received by ConsumerEndpoint1
Message 'msg 7' received by ConsumerEndpoint2
Message 'msg 8' received by ConsumerEndpoint2
Message 'msg 9' received by ConsumerEndpoint1

You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/queue-channel-demo

 

 

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment