‘PriorityChannel’ prioritizes messages based on a Comparator. The default comparator is based upon the message header's 'priority'.
How to set the priority?
By using ‘setPriority’ method of MessageBuilder, you can set priority to a Message.
Message<String> message = MessageBuilder.withPayload("msg " + i).setPriority(i).build();
Find the below working application.
Step 1: Create new maven project ‘priority-channel-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>priority-channel-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Create a package ‘com.sample.app.endpoints’ and define CustomGateway, ConsumerEndpoint1 classes.
ConsumerEndpoint1.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint1 {
@ServiceActivator(inputChannel = "myQueueChannel", poller = {
@Poller(maxMessagesPerPoll = "3", fixedDelay = "2000") })
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint1 -> Received from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
.build();
}
}
CustomGateway.java
package com.sample.app.endpoints;
import java.util.concurrent.Future;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
@MessagingGateway(name = "myGateway", defaultRequestChannel = "myQueueChannel")
public interface CustomGateway {
@Gateway(requestChannel = "myQueueChannel")
public Future<Message<String>> print(Message<String> message);
}
Step 4: Define App.java class in com.sample.app package.
App.java
package com.sample.app;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PriorityChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import com.sample.app.endpoints.CustomGateway;
@SpringBootApplication
@Configuration
public class App {
@Autowired
@Qualifier("myQueueChannel")
private PriorityChannel queueChannel;
@Autowired
private CustomGateway customGateway;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean(name = "myQueueChannel")
public PriorityChannel channel1() {
PriorityChannel channel = new PriorityChannel(100);
return channel;
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
List<Future<Message<String>>> futures = new ArrayList<> ();
for (int i = 0; i < 10; i++) {
//System.out.println("Sending message number " + i);
Message<String> message = MessageBuilder.withPayload("msg " + i).setPriority(i).build();
Future<Message<String>> result = customGateway.print(message);
futures.add(result);
}
System.out.println("\nAll the messages sent in asynchronous way");
for(Future<Message<String>> future: futures) {
Message<String> message = future.get();
System.out.println(message.getPayload());
}
};
}
}
Total project structure looks like below.
Run App.java, you will see below messages in console.
All the messages sent in asynchronous way ConsumerEndpoint1 -> Received from gateway : msg 8 ConsumerEndpoint1 -> Received from gateway : msg 9 ConsumerEndpoint1 -> Received from gateway : msg 7 ConsumerEndpoint1 -> Received from gateway : msg 6 ConsumerEndpoint1 -> Received from gateway : msg 5 ConsumerEndpoint1 -> Received from gateway : msg 4 ConsumerEndpoint1 -> Received from gateway : msg 3 ConsumerEndpoint1 -> Received from gateway : msg 2 ConsumerEndpoint1 -> Received from gateway : msg 1 ConsumerEndpoint1 -> Received from gateway : msg 0 Message 'msg 0' received by ConsumerEndpoint1 Message 'msg 1' received by ConsumerEndpoint1 Message 'msg 2' received by ConsumerEndpoint1 Message 'msg 3' received by ConsumerEndpoint1 Message 'msg 4' received by ConsumerEndpoint1 Message 'msg 5' received by ConsumerEndpoint1 Message 'msg 6' received by ConsumerEndpoint1 Message 'msg 7' received by ConsumerEndpoint1 Message 'msg 8' received by ConsumerEndpoint1 Message 'msg 9' received by ConsumerEndpoint1
You can download complete working application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/priority-channel-demo
Previous Next Home
No comments:
Post a Comment