In
my previous post, I explained how can we route the data based on the content
within the file system. In this post, I am going to explain how to route the
content to JMS queues.
As
you see above image, I am going to copy all the files from file system to ‘Raw
Queue’, after that all the xls files are moved to xls queue, txt files are
moved to txt queue and remaining all the files are moved to Other Queue.
To
demo this application, I am using ActiveMQ as JMS.
What is JMS?
JMS
stands for Java Message Service, it is an open standard, that documents common
way to create, send, receive and read an enterprise messaging system’s
messages.
JMS
documents, how the asynchronous communication between enterprise applications
takes place.
Why
JMS?
Prior
to JMS specification, every messaging service vendor has their own
implementation of messaging apis, it is quite difficult to a user to migrate
from one messaging vendor to other. Developers end up in writing vendor
specific implementation. JMS exists to address these issues.
By
using JMS client, you can able to connect to any JMS compliant provider
applications like Active MQ, Rabbit MQ etc.,
In
this post, I am going to use ActiveMQ as the JMS queue.
Setting up ActiveMQ
I
would recommend you to go through my below post to setup the activeMQ.
Once
you setup activeMQ, loginto the admin console by hitting below url.
Use
below credentials to login to the admin console.
UserName: admin
Password: admin
You
can see below kind of user interface.
Click
on the tab ‘Queues’.
Create
below 4 queues.
a. RawQueue
b. XlsQueue
c. TxtQueue
d. OtherQueue
Setting Up Eclipse
Project
Create
new maven project ‘camelContentRoutingToJMS’.
Update
pom.xml with maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>camelContentRoutingToJMS</groupId> <artifactId>camelContentRoutingToJMS</artifactId> <version>1</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.6</version> </dependency> </dependencies> </project>
Create
a package ‘com.sample.app.routes’ and define the class ‘FileCopyRoute.java’
like below.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; Predicate nonXlsAndTxtPredicate = new Predicate() { @Override public boolean matches(Exchange exchange) { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt"); } }; from(sourceFolder).to(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue); } }
Define
Application.java under ‘com.sample.app’ package.
Application.java
package com.sample.app; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; import com.sample.app.routes.FileCopyRoute; public class Application { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new FileCopyRoute()); context.start(); TimeUnit.MINUTES.sleep(1); context.stop(); } }
Assume
source folder has below files.
abc.txt
Chapter1.docx
Hello.bat
sales.xlsx
sample.txt
Once
you ran Application.java, you can observe below things.
a.
OtherQueue
has 2 message.
b.
RawQueue
has 5 message.
c.
TxtQueue
has 2 messages.
d.
XlsQueue
has 1 message.
As
you closed observe, there is a bug in above code. See above image, items in the
RawQueue are not cleared. It is because of the below snippet.
from(sourceFolder)
.to(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.when(nonXlsAndTxtPredicate)
.to(otherQueue);
Above
snippet populate all the queues from the sourceFolder.
But
our use case is like below.
a. Populate RawQueue
from sourceFolder.
b. Populate XlsQueue,
TxtQueue and OtherQueue from RawQueue.
We
should write the logic like below.
from(sourceFolder).to(rawQueue);
from(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.when(nonXlsAndTxtPredicate)
.to(otherQueue);
Delete
all the queues. Update FileCopyRoute.java with above snippet.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; Predicate nonXlsAndTxtPredicate = new Predicate() { @Override public boolean matches(Exchange exchange) { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt"); } }; from(sourceFolder).to(rawQueue); from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue); } }
Re
run Application.java, you can observe that messages in the raw queue are dequeued
and distributed to other queues.
Let’s
process the messages in OtherQueue, TxtQueue and XlsQueue.
from(xlsQueue).process(new
Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String fileName = (String)
exchange.getIn().getHeader("CamelFileName");
System.out.println("XlsQueue received " + fileName);
}
});
from(txtQueue).process(new
Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String fileName = (String)
exchange.getIn().getHeader("CamelFileName");
System.out.println("TxtQueue received " + fileName);
}
});
from(otherQueue).process(new
Processor() {
@Override
public void process(Exchange exchange)
throws Exception {
String fileName = (String)
exchange.getIn().getHeader("CamelFileName");
System.out.println("OtherQueue received " + fileName);
}
});
Update
FileCopyRoute.java like below.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; Predicate nonXlsAndTxtPredicate = new Predicate() { @Override public boolean matches(Exchange exchange) { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt"); } }; from(sourceFolder).to(rawQueue); from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue); from(xlsQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("XlsQueue received " + fileName); } }); from(txtQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("TxtQueue received " + fileName); } }); from(otherQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("OtherQueue received " + fileName); } }); } }
Delete
all the Queues and rerun Application.java, you can see below messages in the
console.
INFO
| Route: route1 started and consuming from:
file://C:%5CUsers%5CPublic%5Cdemo?noop=true
INFO | Route: route2 started and consuming
from: demoJMS://queue:RawQueue
INFO | Route: route3 started and consuming
from: demoJMS://queue:XlsQueue
INFO | Route: route4 started and consuming
from: demoJMS://queue:TxtQueue
INFO | Route: route5 started and consuming
from: demoJMS://queue:OtherQueue
INFO | Total 5 routes, of which 5 are started
INFO | Apache Camel 2.22.1 (CamelContext:
camel-1) started in 1.220 seconds
TxtQueue received abc.txt
OtherQueue received Chapter1.docx
OtherQueue received Hello.bat
XlsQueue received sales.xlsx
TxtQueue received sample.txt
Further enhancing
above application using otherwise function
I
written my own predicate to match all the non xls and txt files like below.
Predicate
nonXlsAndTxtPredicate = new Predicate() {
@Override
public boolean matches(Exchange
exchange) {
String fileName = (String)
exchange.getIn().getHeader("CamelFileName");
return
!fileName.endsWith(".xlsx") &&
!fileName.endsWith(".txt");
}
};
But
by using otherwise() method, I can route all the messages that are not matched
to any predicate.
from(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.otherwise()
.to(otherQueue);
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; from(sourceFolder).to(rawQueue); from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).otherwise().to(otherQueue); from(xlsQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("XlsQueue received " + fileName); } }); from(txtQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("TxtQueue received " + fileName); } }); from(otherQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("OtherQueue received " + fileName); } }); } }
Let’s
further enhance above flow. After the messages added to the xls, txt and Other
queues. I want all the messages to be added to Final Queue.
We
need to end the choice and add other destination FinalQueue using to method
like below.
from(rawQueue)
.choice()
.when(header("CamelFileName").endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName").endsWith(".txt"))
.to(txtQueue)
.otherwise()
.to(otherQueue)
.end()
.to(finalQueue);
Update
FileCopyRoute.java like below.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; String finalQueue = "demoJMS:queue:FinalQueue"; from(sourceFolder).to(rawQueue); from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).otherwise().to(otherQueue).end() .to(finalQueue); from(finalQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("FinalQueue received " + fileName); } }); } }
Rerun
Application.java, you can see below messages in console.
FinalQueue received abc.txt
FinalQueue received Chapter1.docx
FinalQueue received Hello.bat
FinalQueue received sales.xlsx
FinalQueue received sample.txt
As
you see above image, messages are not actually dequeued, they are just routed
to FinalQueue.
What about stopping
some kind of messages?
Suppose, FinalQueue is not interested in processing
.txt messages. You can achieve this by calling stop() method.
from(rawQueue)
.choice()
.when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
.when(header("CamelFileName").endsWith(".txt")).to(txtQueue).stop()
.otherwise().to(otherQueue)
.end()
.to(finalQueue);
As
you see above snippet, I called stop() method on txtQueue. ‘stop’ method stops
continue routing the current Exchange and marks it as completed.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true"; String rawQueue = "demoJMS:queue:RawQueue"; String xlsQueue = "demoJMS:queue:XlsQueue"; String txtQueue = "demoJMS:queue:TxtQueue"; String otherQueue = "demoJMS:queue:OtherQueue"; String finalQueue = "demoJMS:queue:FinalQueue"; from(sourceFolder).to(rawQueue); from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue) .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).stop().otherwise().to(otherQueue).end() .to(finalQueue); from(finalQueue).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("FinalQueue received " + fileName); } }); } }
Delete
all existing queues and rerun Application.java, you can see below messages in
the console.
FinalQueue received Chapter1.docx
FinalQueue received Hello.bat
FinalQueue received sales.xlsx
No comments:
Post a Comment