Wednesday 26 May 2021

Spring integration: PriorityChannel with custom comparator

You can provide custom comparator while defining a PriorityChannel. PriorityChannel use this comparator to find out the priority of the messages.

 

Step 1: Define a comparator.

private static Comparator<Message<?>> msgComparator = new Comparator<Message<?>>() {

	@Override
	public int compare(Message<?> o1, Message<?> o2) {
		Integer msg1 = (Integer) o1.getPayload();
		Integer msg2 = (Integer) o2.getPayload();

		if(isEven(msg1) && isEven(msg2)) {
			return 0;
		}else if(isEven(msg1)) {
			return -1;
		}
		
		return 1;

	}

};

 

This priority gives prioritiy to the message if it contains even number.

 

Step 2: Define PriorityChannel using this comparator.

PriorityChannel channel = new PriorityChannel(100, msgComparator);

 

Find the below working application.

 

Step 1: Create new maven project ‘priority-channel-comparator-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>priority-channel-comparator-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Create a pakage ‘com.sample.app.endpoints’ and define CustomGateway and ConsumerEndpoint1 classes.

 

CustomGateway.java

 

package com.sample.app.endpoints;

import java.util.concurrent.Future;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "myQueueChannel")
	public Future<Message<Integer>> print(Message<Integer> message);

}

 

ConsumerEndpoint1.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "myQueueChannel", poller = {
			@Poller(maxMessagesPerPoll = "1", fixedDelay = "2000") })
	public Message<String> consumeMessage(Message<Integer> message) {
		System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

Step 4: Define App class in com.sample.app package.

 

App.java

 

package com.sample.app;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import com.sample.app.endpoints.CustomGateway;

@SpringBootApplication
@Configuration
public class App {

	@Autowired
	@Qualifier("myQueueChannel")
	private PriorityChannel queueChannel;

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean(name = "myQueueChannel")
	public PriorityChannel channel1() {
		PriorityChannel channel = new PriorityChannel(100, msgComparator);
		return channel;
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			List<Future<Message<Integer>>> futures = new ArrayList<>();

			for (int i = 0; i < 10; i++) {
				Message<Integer> message = MessageBuilder.withPayload(i).build();
				Future<Message<Integer>> result = customGateway.print(message);
				futures.add(result);
			}

			System.out.println("\nAll the messages sent in asynchronous way");

			for (Future<Message<Integer>> future : futures) {
				Message<Integer> message = future.get();

				System.out.println(message.getPayload());
			}

		};
	}

	private static Comparator<Message<?>> msgComparator = new Comparator<Message<?>>() {

		@Override
		public int compare(Message<?> o1, Message<?> o2) {
			Integer msg1 = (Integer) o1.getPayload();
			Integer msg2 = (Integer) o2.getPayload();

			if(isEven(msg1) && isEven(msg2)) {
				return 0;
			}else if(isEven(msg1)) {
				return -1;
			}
			
			return 1;

		}

	};
	
	private static boolean isEven(Integer num) {
		return num % 2 ==0;
	}

}

 

Total project structure looks like below.

 



 

Run App.java, you will see below messages in console.

 

All the messages sent in asynchronous way

ConsumerEndpoint1 -> Received from gateway : 0
Message '0' received by ConsumerEndpoint1
ConsumerEndpoint1 -> Received from gateway : 2
ConsumerEndpoint1 -> Received from gateway : 8
ConsumerEndpoint1 -> Received from gateway : 4
ConsumerEndpoint1 -> Received from gateway : 6
ConsumerEndpoint1 -> Received from gateway : 7
ConsumerEndpoint1 -> Received from gateway : 9
ConsumerEndpoint1 -> Received from gateway : 5
ConsumerEndpoint1 -> Received from gateway : 1
Message '1' received by ConsumerEndpoint1
Message '2' received by ConsumerEndpoint1
ConsumerEndpoint1 -> Received from gateway : 3
Message '3' received by ConsumerEndpoint1
Message '4' received by ConsumerEndpoint1
Message '5' received by ConsumerEndpoint1
Message '6' received by ConsumerEndpoint1
Message '7' received by ConsumerEndpoint1
Message '8' received by ConsumerEndpoint1
Message '9' received by ConsumerEndpoint1

 

From the output, you can confirm that messages with even number delivered fast as compared to messages with odd number.

 

You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/priority-channel-comparator-demo



Previous                                                    Next                                                    Home

No comments:

Post a Comment