Messaging bridge is used to connect to message channels or message adapters.
What is the usecase of Messaging Bridge?
Messaging bridge is used to throttle inbound messages.
For example, we can connect a PollableChannel to a SubscribableChannel. so that the subscribing endpoints do not aware of any polling configuration. Instead, the messaging bridge provides the polling configuration. Poller determines the rate at which messages arrive on the second channel.
How to define a bridge?
@BridgeFrom annotation is used to mark a bean method to produce a BridgeHandler and a consumer endpoint.
Example
@Bean(name="pubSubChannel")
@BridgeFrom(value = "pollableChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
public SubscribableChannel directChanel() {
return new DirectChannel();
}
Above snippet connect pollableChannel to pubSubChannel using a bridge.
Find the below working application.
Step 1: Create new maven project ‘messaging-bridge-demo’.
Step 2: Update pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.app</groupId>
<artifactId>messaging-bridge-demo</artifactId>
<version>1</version>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
</project>
Step 3: Define service activator endpoint.
ConsumerEndpoint1.java
package com.sample.app.endpoints;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ConsumerEndpoint1 {
@ServiceActivator(inputChannel = "pubSubChannel")
public Message<String> consumeMessage(Message<String> message) {
System.out.println("ConsumerEndpoint1 -> Received message from pubsub channel : " + message.getPayload());
return MessageBuilder.withPayload("Message '" + message.getPayload() + "' received by ConsumerEndpoint1")
.build();
}
}
Step 4: Define gateway.
CustomGateway.java
package com.sample.app.gateway;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
@MessagingGateway(name = "myGateway", defaultRequestChannel = "pollableChannel")
public interface CustomGateway {
@Gateway(requestChannel = "pollableChannel")
public void print(Message<String> message);
}
Step 5: Define main application class.
App.java
package com.sample.app;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import com.sample.app.gateway.CustomGateway;
@SpringBootApplication
@Configuration
public class App {
@Bean(name = "pollableChannel")
public PollableChannel pollableChannel() {
return new QueueChannel();
}
@Bean(name = "pubSubChannel")
@BridgeFrom(value = "pollableChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
public SubscribableChannel directChanel() {
return new DirectChannel();
}
@Autowired
private CustomGateway customGateway;
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean
public CommandLineRunner demo() {
return (args) -> {
for (int i = 0; i < 10; i++) {
Message<String> message = MessageBuilder.withPayload("Msg " + i).build();
customGateway.print(message);
}
};
}
}
Total project structure looks like below.
Run App.java, you will see below messages in console.
ConsumerEndpoint1 -> Received message from pubsub channel : Msg 0 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 1 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 2 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 3 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 4 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 5 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 6 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 7 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 8 ConsumerEndpoint1 -> Received message from pubsub channel : Msg 9
You can download complete working application from below link.
https://github.com/harikrishna553/springboot/tree/master/spring-integration/messaging-bridge-demo
Previous Next Home
No comments:
Post a Comment