Monday, 31 May 2021

Spring integration: PublishSubscribeChannel demo

PublishSubscribeChannel deliver the messages to all of its subscribers in sequence by default.

You can define publish and subscribe channel using PublishSubscribeChannel class.

 

Example

PublishSubscribeChannel channel = new PublishSubscribeChannel();

 

 

Follow below step-by-step procedure to build complete working application.

 

Step 1: Create new maven project ‘pub-sub-channel-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>pub-sub-channel-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Define consumer endpoints.

 

ConsumerEndpoint1.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "pubSubChannel")
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received message from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

ConsumerEndpoint2.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint2 {

	@ServiceActivator(inputChannel = "pubSubChannel")
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint2 -> Received message from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint2")
				.build();
	}

}


ConsumerEndpoint3.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint3 {

	@ServiceActivator(inputChannel = "pubSubChannel")
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint3 -> Received message from gateway : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint3")
				.build();
	}

}

 

Step 4: Define gateway.

 

CustomGateway.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "pubSubChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "pubSubChannel")
	public Message<String> print(Message<String> message);

}

 

Step 5: Define main application class.

 

App.java

package com.sample.app;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import com.sample.app.endpoints.CustomGateway;

@SpringBootApplication
@Configuration
public class App {

	@Autowired
	@Qualifier("pubSubChannel")
	private PublishSubscribeChannel pubSubChannel;

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean(name = "pubSubChannel")
	public PublishSubscribeChannel channel1() {
		PublishSubscribeChannel channel = new PublishSubscribeChannel();
		return channel;
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			for (int i = 0; i < 10; i++) {
				Message<String> message = MessageBuilder.withPayload("Msg " + i).build();
				Message<String> result = customGateway.print(message);
				System.out.println(result.getPayload());
			}

		};
	}

}

 

Total project structure looks like below.


 

Run App.java, you will see below messages in console.

 

ConsumerEndpoint1 -> Received message from gateway : Msg 0
ConsumerEndpoint2 -> Received message from gateway : Msg 0
ConsumerEndpoint3 -> Received message from gateway : Msg 0
Message 'Msg 0' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 1
ConsumerEndpoint2 -> Received message from gateway : Msg 1
ConsumerEndpoint3 -> Received message from gateway : Msg 1
Message 'Msg 1' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 2
ConsumerEndpoint2 -> Received message from gateway : Msg 2
ConsumerEndpoint3 -> Received message from gateway : Msg 2
Message 'Msg 2' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 3
ConsumerEndpoint2 -> Received message from gateway : Msg 3
ConsumerEndpoint3 -> Received message from gateway : Msg 3
Message 'Msg 3' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 4
ConsumerEndpoint2 -> Received message from gateway : Msg 4
ConsumerEndpoint3 -> Received message from gateway : Msg 4
Message 'Msg 4' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 5
ConsumerEndpoint2 -> Received message from gateway : Msg 5
ConsumerEndpoint3 -> Received message from gateway : Msg 5
Message 'Msg 5' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 6
ConsumerEndpoint2 -> Received message from gateway : Msg 6
ConsumerEndpoint3 -> Received message from gateway : Msg 6
Message 'Msg 6' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 7
ConsumerEndpoint2 -> Received message from gateway : Msg 7
ConsumerEndpoint3 -> Received message from gateway : Msg 7
Message 'Msg 7' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 8
ConsumerEndpoint2 -> Received message from gateway : Msg 8
ConsumerEndpoint3 -> Received message from gateway : Msg 8
Message 'Msg 8' received by ConsumerEndpoint3
ConsumerEndpoint1 -> Received message from gateway : Msg 9
ConsumerEndpoint2 -> Received message from gateway : Msg 9
ConsumerEndpoint3 -> Received message from gateway : Msg 9
Message 'Msg 9' received by ConsumerEndpoint3

 

As you see the output, messages are delivered to subscribers in sequential order and the response from last delivered subscriber is sent to gateway method.

 

In my next post, I will explain how to deliver the messages parallelly.

 

You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/pub-sub-channel-demo

 

 

 


 

 


 

 

 

 

  

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment