PublishSubscribeChannel
deliver the messages to all of its subscribers in sequence by default.
You can define publish and subscribe channel using PublishSubscribeChannel class.
Example
PublishSubscribeChannel channel = new PublishSubscribeChannel();
Follow below step-by-step procedure to build complete working application.
Step 1: Create new maven project ‘pub-sub-channel-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>pub-sub-channel-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Define consumer endpoints.
ConsumerEndpoint1.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint1 {
@ServiceActivator(inputChannel = "pubSubChannel")
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint1 -> Received message from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
.build();
}
}
ConsumerEndpoint2.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint2 {
@ServiceActivator(inputChannel = "pubSubChannel")
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint2 -> Received message from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint2")
.build();
}
}
ConsumerEndpoint3.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint3 {
@ServiceActivator(inputChannel = "pubSubChannel")
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint3 -> Received message from gateway : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint3")
.build();
}
}
Step 4: Define gateway.
CustomGateway.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
@MessagingGateway(name = "myGateway", defaultRequestChannel = "pubSubChannel")
public interface CustomGateway {
@Gateway(requestChannel = "pubSubChannel")
public Message<String> print(Message<String> message);
}
Step 5: Define main application class.
App.java
package com.sample.app;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import com.sample.app.endpoints.CustomGateway;
@SpringBootApplication
@Configuration
public class App {
@Autowired
@Qualifier("pubSubChannel")
private PublishSubscribeChannel pubSubChannel;
@Autowired
private CustomGateway customGateway;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean(name = "pubSubChannel")
public PublishSubscribeChannel channel1() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
return channel;
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload("Msg " + i).build();
Message<String> result = customGateway.print(message);
System.out.println(result.getPayload());
}
};
}
}
Total project structure looks like below.
Run App.java, you will see below messages in console.
ConsumerEndpoint1 -> Received message from gateway : Msg 0 ConsumerEndpoint2 -> Received message from gateway : Msg 0 ConsumerEndpoint3 -> Received message from gateway : Msg 0 Message 'Msg 0' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 1 ConsumerEndpoint2 -> Received message from gateway : Msg 1 ConsumerEndpoint3 -> Received message from gateway : Msg 1 Message 'Msg 1' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 2 ConsumerEndpoint2 -> Received message from gateway : Msg 2 ConsumerEndpoint3 -> Received message from gateway : Msg 2 Message 'Msg 2' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 3 ConsumerEndpoint2 -> Received message from gateway : Msg 3 ConsumerEndpoint3 -> Received message from gateway : Msg 3 Message 'Msg 3' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 4 ConsumerEndpoint2 -> Received message from gateway : Msg 4 ConsumerEndpoint3 -> Received message from gateway : Msg 4 Message 'Msg 4' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 5 ConsumerEndpoint2 -> Received message from gateway : Msg 5 ConsumerEndpoint3 -> Received message from gateway : Msg 5 Message 'Msg 5' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 6 ConsumerEndpoint2 -> Received message from gateway : Msg 6 ConsumerEndpoint3 -> Received message from gateway : Msg 6 Message 'Msg 6' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 7 ConsumerEndpoint2 -> Received message from gateway : Msg 7 ConsumerEndpoint3 -> Received message from gateway : Msg 7 Message 'Msg 7' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 8 ConsumerEndpoint2 -> Received message from gateway : Msg 8 ConsumerEndpoint3 -> Received message from gateway : Msg 8 Message 'Msg 8' received by ConsumerEndpoint3 ConsumerEndpoint1 -> Received message from gateway : Msg 9 ConsumerEndpoint2 -> Received message from gateway : Msg 9 ConsumerEndpoint3 -> Received message from gateway : Msg 9 Message 'Msg 9' received by ConsumerEndpoint3
As you see the output, messages are delivered to subscribers in sequential order and the response from last delivered subscriber is sent to gateway method.
In my next post, I will explain how to deliver the messages parallelly.
You can download complete working application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/pub-sub-channel-demo
Previous Next Home
No comments:
Post a Comment