In
this post, I am going to explain, how can you filter the messages while sending
to JMS Queues.
FileCopyRoute.java
Let’s
implement an example like below.
a. Copy the files from
local file system to an ActiveMQ queue ‘A’.
b. Multicast all the
data from queue ‘A’ to other Queues B, C, D.
Setup ActiveMQ
I
would recommend you to go through my below post to setup the activeMQ.
Once
you setup activeMQ, loginto the admin console by hitting below url.
Use
below credentials to login to the admin console.
UserName: admin
Password: admin
You
can see below kind of user interface.
Setup Eclipse Maven Project
Create
new maven project 'camelMultiCasting'. Project structure looks like below.
Open
pom.xml and update maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>camelMultiCasting</groupId> <artifactId>camelMultiCasting</artifactId> <version>1</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.6</version> </dependency> </dependencies> </project>
Below
statement moves the files from local folder to queue1 and then multicast all the
items in queue to queue2, queue3, queue4.
from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2,
queue3, queue4);
Create
new package 'com.sample.app.routes' and define the class FileCopyRoute.
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String queue1 = "demoJMS:queue:A"; String queue2 = "demoJMS:queue:B"; String queue3 = "demoJMS:queue:C"; String queue4 = "demoJMS:queue:D"; from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2, queue3, queue4); from(queue2).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue2 received " + fileName); } }); from(queue3).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue3 received " + fileName); } }); from(queue4).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue4 received " + fileName); } }); } }
Application.java
package com.sample.app; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; import com.sample.app.routes.FileCopyRoute; public class Application { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new FileCopyRoute()); context.start(); TimeUnit.MINUTES.sleep(1); context.stop(); } }
In
my case ‘demo’ folder has below documents.
BigFile.txt
Chapter1.docx
Hello.bat
sales.xlsx
SmallFile.txt
Run
Application.java, you can see below messages in the console.
Queue2
received BigFile.txt
Queue3
received BigFile.txt
Queue4
received BigFile.txt
Queue2
received Chapter1.docx
Queue3
received Chapter1.docx
Queue4
received Chapter1.docx
Queue2
received Hello.bat
Queue3
received Hello.bat
Queue4
received Hello.bat
Queue2
received sales.xlsx
Queue3
received sales.xlsx
Queue4
received sales.xlsx
Queue2
received SmallFile.txt
Queue3
received SmallFile.txt
Queue4
received SmallFile.txt
See
the above image, Queue A has 5 messages, but Queues B, C, and D has 0 pending
messages. It is because, we processed the messages from B, C, D queues (while
printing), but not from Queue A.
One
more thing, when you observe the console output, messages are transmitted
sequentially. If you want parallel behavior, you can use parallelProcessing()
function.
from("file:C:\\Users\\Public\\demo?noop=true")
.to(queue1)
.multicast()
.parallelProcessing()
.to(queue2, queue3,queue4);
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String queue1 = "demoJMS:queue:A"; String queue2 = "demoJMS:queue:B"; String queue3 = "demoJMS:queue:C"; String queue4 = "demoJMS:queue:D"; from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().parallelProcessing().to(queue2, queue3, queue4); from(queue2).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue2 received " + fileName); } }); from(queue3).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue3 received " + fileName); } }); from(queue4).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue4 received " + fileName); } }); } }
Rerun
Application.java, you can see the parallel behavior in message transmission. In
my case, I seen below messages in console.
Queue2
received BigFile.txt
Queue4
received BigFile.txt
Queue3
received BigFile.txt
Queue3
received Chapter1.docx
Queue2
received Chapter1.docx
Queue4
received Chapter1.docx
Queue4
received Hello.bat
Queue2
received Hello.bat
Queue3
received Hello.bat
Queue3
received sales.xlsx
Queue4
received sales.xlsx
Queue2
received sales.xlsx
Queue4
received SmallFile.txt
Queue2
received SmallFile.txt
Queue3
received SmallFile.txt
When
you request camel to perform parallel processing, camel use a default thread
pool of size 10. You can even pass another thread pool to perform this task by
calling executorService() method.
ExecutorService
executor = Executors.newFixedThreadPool(20);
from("file:C:\\Users\\Public\\demo?noop=true")
.to(queue1)
.multicast()
.parallelProcessing()
.executorService(executor)
.to(queue2, queue3, queue4);
Above
snippet increase the maximum number of threads to 20.
No comments:
Post a Comment