You can provide custom comparator while defining a PriorityChannel. PriorityChannel use this comparator to find out the priority of the messages.
Step 1: Define a comparator.
private static Comparator<Message<?>> msgComparator = new Comparator<Message<?>>() {
@Override
public int compare(Message<?> o1, Message<?> o2) {
Integer msg1 = (Integer) o1.getPayload();
Integer msg2 = (Integer) o2.getPayload();
if(isEven(msg1) && isEven(msg2)) {
return 0;
}else if(isEven(msg1)) {
return -1;
}
return 1;
}
};
This priority gives prioritiy to the message if it contains even number.
Step 2: Define PriorityChannel using this comparator.
PriorityChannel channel = new PriorityChannel(100, msgComparator);
Find the below working application.
Step 1: Create new maven project ‘priority-channel-comparator-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>priority-channel-comparator-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create a pakage ‘com.sample.app.endpoints’ and define CustomGateway and ConsumerEndpoint1 classes.
CustomGateway.java
package com.sample.app.endpoints;
import java.util.concurrent.Future;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {
@Gateway(requestChannel = "myQueueChannel")
public Future<Message<Integer>> print(Message<Integer> message);
}
ConsumerEndpoint1.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint1 {
@ServiceActivator(inputChannel = "myQueueChannel", poller = {
@Poller(maxMessagesPerPoll = "1", fixedDelay = "2000") })
public Message<String> consumeMessage(Message<Integer> message) {
System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
.build();
}
}
Step 4: Define App class in com.sample.app package.
App.java
package com.sample.app;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import com.sample.app.endpoints.CustomGateway;
@SpringBootApplication
@Configuration
public class App {
@Autowired
@Qualifier("myQueueChannel")
private PriorityChannel queueChannel;
@Autowired
private CustomGateway customGateway;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean(name = "myQueueChannel")
public PriorityChannel channel1() {
PriorityChannel channel = new PriorityChannel(100, msgComparator);
return channel;
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
List<Future<Message<Integer>>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message<Integer> message = MessageBuilder.withPayload(i).build();
Future<Message<Integer>> result = customGateway.print(message);
futures.add(result);
}
System.out.println("\nAll the messages sent in asynchronous way");
for (Future<Message<Integer>> future : futures) {
Message<Integer> message = future.get();
System.out.println(message.getPayload());
}
};
}
private static Comparator<Message<?>> msgComparator = new Comparator<Message<?>>() {
@Override
public int compare(Message<?> o1, Message<?> o2) {
Integer msg1 = (Integer) o1.getPayload();
Integer msg2 = (Integer) o2.getPayload();
if(isEven(msg1) && isEven(msg2)) {
return 0;
}else if(isEven(msg1)) {
return -1;
}
return 1;
}
};
private static boolean isEven(Integer num) {
return num % 2 ==0;
}
}
Total project structure looks like below.
Run App.java, you will see below messages in console.
All the messages sent in asynchronous way ConsumerEndpoint1 -> Received from gateway : 0 Message '0' received by ConsumerEndpoint1 ConsumerEndpoint1 -> Received from gateway : 2 ConsumerEndpoint1 -> Received from gateway : 8 ConsumerEndpoint1 -> Received from gateway : 4 ConsumerEndpoint1 -> Received from gateway : 6 ConsumerEndpoint1 -> Received from gateway : 7 ConsumerEndpoint1 -> Received from gateway : 9 ConsumerEndpoint1 -> Received from gateway : 5 ConsumerEndpoint1 -> Received from gateway : 1 Message '1' received by ConsumerEndpoint1 Message '2' received by ConsumerEndpoint1 ConsumerEndpoint1 -> Received from gateway : 3 Message '3' received by ConsumerEndpoint1 Message '4' received by ConsumerEndpoint1 Message '5' received by ConsumerEndpoint1 Message '6' received by ConsumerEndpoint1 Message '7' received by ConsumerEndpoint1 Message '8' received by ConsumerEndpoint1 Message '9' received by ConsumerEndpoint1
From the output, you can confirm that messages with even number delivered fast as compared to messages with odd number.
You can download complete working application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/priority-channel-comparator-demo
No comments:
Post a Comment