Camel support multicasting of the messages to different
destinations.
Let’s implement an example like below.
a.
Copy the files from local file system to an
ActiveMQ queue ‘A’.
b.
Multicast all the data from queue ‘A’ to
other Queues B, C, D.
Setup ActiveMQ
I would recommend you to go through my below post to
setup the activeMQ.
Once you setup activeMQ, loginto the admin console by
hitting below url.
Use below credentials to login to the admin console.
UserName: admin
Password: admin
You can see below kind of user interface.
Setup Eclipse Maven
Project
Create new maven project 'camelMultiCasting'. Project
structure looks like below.
Open pom.xml and update maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>camelMultiCasting</groupId> <artifactId>camelMultiCasting</artifactId> <version>1</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.6</version> </dependency> </dependencies> </project>
Below statement moves the files from local folder to
queue1 and then multicast all the items in queue to queue2, queue3, queue4.
from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2,
queue3, queue4);
Create new package 'com.sample.app.routes' and define the
class FileCopyRoute.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String queue1 = "demoJMS:queue:A"; String queue2 = "demoJMS:queue:B"; String queue3 = "demoJMS:queue:C"; String queue4 = "demoJMS:queue:D"; from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2, queue3, queue4); from(queue2).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue2 received " + fileName); } }); from(queue3).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue3 received " + fileName); } }); from(queue4).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue4 received " + fileName); } }); } }
Application.java
package com.sample.app; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; import com.sample.app.routes.FileCopyRoute; public class Application { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new FileCopyRoute()); context.start(); TimeUnit.MINUTES.sleep(1); context.stop(); } }
In my case ‘demo’ folder has below documents.
BigFile.txt
Chapter1.docx
Hello.bat
sales.xlsx
SmallFile.txt
Run Application.java, you can see below messages in the
console.
Queue2 received BigFile.txt
Queue3 received BigFile.txt
Queue4 received BigFile.txt
Queue2 received Chapter1.docx
Queue3 received Chapter1.docx
Queue4 received Chapter1.docx
Queue2 received Hello.bat
Queue3 received Hello.bat
Queue4 received Hello.bat
Queue2 received sales.xlsx
Queue3 received sales.xlsx
Queue4 received sales.xlsx
Queue2 received SmallFile.txt
Queue3 received SmallFile.txt
Queue4 received SmallFile.txt
See the above image, Queue A has 5 messages, but Queues
B, C, and D has 0 pending messages. It is because, we processed the messages
from B, C, D queues (while printing), but not from Queue A.
One more thing, when you observe the console output,
messages are transmitted sequentially. If you want parallel behavior, you can
use parallelProcessing() function.
from("file:C:\\Users\\Public\\demo?noop=true")
.to(queue1)
.multicast()
.parallelProcessing()
.to(queue2,
queue3,queue4);
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String queue1 = "demoJMS:queue:A"; String queue2 = "demoJMS:queue:B"; String queue3 = "demoJMS:queue:C"; String queue4 = "demoJMS:queue:D"; from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().parallelProcessing().to(queue2, queue3, queue4); from(queue2).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue2 received " + fileName); } }); from(queue3).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue3 received " + fileName); } }); from(queue4).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue4 received " + fileName); } }); } }
Rerun Application.java, you can see the parallel behavior
in message transmission. In my case, I seen below messages in console.
Queue2 received BigFile.txt
Queue4 received BigFile.txt
Queue3 received BigFile.txt
Queue3 received Chapter1.docx
Queue2 received Chapter1.docx
Queue4 received Chapter1.docx
Queue4 received Hello.bat
Queue2 received Hello.bat
Queue3 received Hello.bat
Queue3 received sales.xlsx
Queue4 received sales.xlsx
Queue2 received sales.xlsx
Queue4 received SmallFile.txt
Queue2 received SmallFile.txt
Queue3 received SmallFile.txt
When you request camel to perform parallel processing,
camel use a default thread pool of size 10. You can even pass another thread
pool to perform this task by calling executorService() method.
ExecutorService executor = Executors.newFixedThreadPool(20);
from("file:C:\\Users\\Public\\demo?noop=true")
.to(queue1)
.multicast()
.parallelProcessing()
.executorService(executor)
.to(queue2,
queue3, queue4);
Above snippet increase the maximum number of threads to
20.
No comments:
Post a Comment