Tuesday 25 May 2021

Spring integration: PriorityChannel: set priority using priority header

‘PriorityChannel’ prioritizes messages based on a Comparator. The default comparator is based upon the message header's 'priority'.

 

How to set the priority?

By using ‘setPriority’ method of MessageBuilder, you can set priority to a Message.

Message<String> message = MessageBuilder.withPayload("msg " + i).setPriority(i).build();

 

Find the below working application.

 

Step 1: Create new maven project ‘priority-channel-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>priority-channel-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Create a package ‘com.sample.app.endpoints’ and define CustomGateway, ConsumerEndpoint1 classes.

 

ConsumerEndpoint1.java

 

package com.sample.app.endpoints;

import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "myQueueChannel", poller = {
			@Poller(maxMessagesPerPoll = "3", fixedDelay = "2000") })
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

CustomGateway.java

package com.sample.app.endpoints;

import java.util.concurrent.Future;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "myQueueChannel")
	public Future<Message<String>> print(Message<String> message);

}


Step 4: Define App.java class in com.sample.app package.

 

App.java

package com.sample.app;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import com.sample.app.endpoints.CustomGateway;

@SpringBootApplication
@Configuration
public class App {

	@Autowired
	@Qualifier("myQueueChannel")
	private PriorityChannel queueChannel;

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean(name = "myQueueChannel")
	public PriorityChannel channel1() {
		PriorityChannel channel = new PriorityChannel(100);
		return channel;
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			List<Future<Message<String>>> futures = new ArrayList<> ();
			
			for (int i = 0; i < 10; i++) {
				//System.out.println("Sending message number " + i);
				Message<String> message = MessageBuilder.withPayload("msg " + i).setPriority(i).build();
				Future<Message<String>> result = customGateway.print(message);
				futures.add(result);
			}
			
			System.out.println("\nAll the messages sent in asynchronous way");
			
			for(Future<Message<String>> future: futures) {
				Message<String> message = future.get();
				
				System.out.println(message.getPayload());
			}

		};
	}

}

Total project structure looks like below.


Run App.java, you will see below messages in console.

All the messages sent in asynchronous way

ConsumerEndpoint1 -> Received from gateway : msg 8
ConsumerEndpoint1 -> Received from gateway : msg 9
ConsumerEndpoint1 -> Received from gateway : msg 7
ConsumerEndpoint1 -> Received from gateway : msg 6
ConsumerEndpoint1 -> Received from gateway : msg 5
ConsumerEndpoint1 -> Received from gateway : msg 4
ConsumerEndpoint1 -> Received from gateway : msg 3
ConsumerEndpoint1 -> Received from gateway : msg 2
ConsumerEndpoint1 -> Received from gateway : msg 1
ConsumerEndpoint1 -> Received from gateway : msg 0
Message 'msg 0' received by ConsumerEndpoint1
Message 'msg 1' received by ConsumerEndpoint1
Message 'msg 2' received by ConsumerEndpoint1
Message 'msg 3' received by ConsumerEndpoint1
Message 'msg 4' received by ConsumerEndpoint1
Message 'msg 5' received by ConsumerEndpoint1
Message 'msg 6' received by ConsumerEndpoint1
Message 'msg 7' received by ConsumerEndpoint1
Message 'msg 8' received by ConsumerEndpoint1
Message 'msg 9' received by ConsumerEndpoint1

 

You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/priority-channel-demo

 

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment