Thursday 3 June 2021

Spring integration: Messaging Bridge

Messaging bridge is used to connect to message channels or message adapters.

 

What is the usecase of Messaging Bridge?

Messaging bridge is used to throttle inbound messages.

 

For example, we can connect a PollableChannel to a SubscribableChannel. so that the subscribing endpoints do not aware of any polling configuration. Instead, the messaging bridge provides the polling configuration. Poller determines the rate at which messages arrive on the second channel.

 

How to define a bridge?

@BridgeFrom annotation is used to mark a bean method to produce a BridgeHandler and a consumer endpoint.

 

Example

@Bean(name="pubSubChannel")
@BridgeFrom(value = "pollableChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
public SubscribableChannel directChanel() {
    return new DirectChannel();
}

 

Above snippet connect pollableChannel to pubSubChannel using a bridge.

 

Find the below working application.

 

Step 1: Create new maven project ‘messaging-bridge-demo’.

 

Step 2: Update pom.xml with maven dependencies.

 

pom.xml

 

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.sample.app</groupId>
	<artifactId>messaging-bridge-demo</artifactId>
	<version>1</version>

	<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.0</version>
	</parent>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

	</dependencies>

</project>

 

Step 3: Define service activator endpoint.

 

ConsumerEndpoint1.java

package com.sample.app.endpoints;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ConsumerEndpoint1 {

	@ServiceActivator(inputChannel = "pubSubChannel")
	public Message<String> consumeMessage(Message<String> message) {
		System.out.println("ConsumerEndpoint1 -> Received message from pubsub channel : " + message.getPayload());
		return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
				.build();
	}

}

 

Step 4: Define gateway.

 

CustomGateway.java

 

package com.sample.app.gateway;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;

@MessagingGateway(name = "myGateway", defaultRequestChannel = "pollableChannel")
public interface CustomGateway {

	@Gateway(requestChannel = "pollableChannel")
	public void print(Message<String> message);

}

 

Step 5: Define main application class.

 

App.java

 

package com.sample.app;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;

import com.sample.app.gateway.CustomGateway;

@SpringBootApplication
@Configuration
public class App {
	@Bean(name = "pollableChannel")
	public PollableChannel pollableChannel() {
		return new QueueChannel();
	}

	@Bean(name = "pubSubChannel")
	@BridgeFrom(value = "pollableChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
	public SubscribableChannel directChanel() {
		return new DirectChannel();
	}

	@Autowired
	private CustomGateway customGateway;

	public static void main(String[] args) {
		SpringApplication.run(App.class, args);
	}

	@Bean
	public CommandLineRunner demo() {
		return (args) -> {

			for (int i = 0; i < 10; i++) {
				Message<String> message = MessageBuilder.withPayload("Msg " + i).build();
				customGateway.print(message);
			}

		};
	}

}

 

Total project structure looks like below.


 

Run App.java, you will see below messages in console.

 

ConsumerEndpoint1 -> Received message from pubsub channel : Msg 0
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 1
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 2
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 3
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 4
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 5
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 6
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 7
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 8
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 9

 

You can download complete working application from below link.

https://github.com/harikrishna553/springboot/tree/master/spring-integration/messaging-bridge-demo

 

 

 

 

 

 

  

Previous                                                    Next                                                    Home

No comments:

Post a Comment