Recipient list pattern is used to route the message to
number of dynamic recipients. In recipient list pattern, list of recipients to
a message is calculated at run time.
How to implement
Recipient list pattern?
Camel provides recipientList method to implement
Recipient list pattern.
For example, you have two jms queues
a.
HighPrioOrders
b.
NormalOrders
If the order comes from big customers, then that order
should go to both the queues. If the order comes from Normal customers, then
the order should go to only NormalOrders queue.
In my example, I am considering customers like ‘ABC Corp’
as high priority customers.
Setup ActiveMQ
I would recommend you to go through my below post to
setup the activeMQ.
Once you setup activeMQ, loginto the admin console by
hitting below url.
Use below credentials to login to the admin console.
UserName: admin
Password: admin
You can see below kind of user interface.
Setup Eclipse Maven
Project
Create new maven project 'camelRecipientList'. Project
structure looks like below.
Update pom.xml for maven dependencies.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>camelRecipientList</groupId> <artifactId>camelRecipientList</artifactId> <version>1</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.22.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.6</version> </dependency> </dependencies> </project>
Create a package ‘com.sample.app.routes’ and define the
class FileCopyRoute.java.
FileCopyRoute.java
package com.sample.app.routes; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String allOrdersQ = "demoJMS:queue:AllOrdersQ"; String highPrioOrdersQ = "demoJMS:queue:HighPrioOrdersQ"; String normalPrioOrdersQ = "demoJMS:queue:NormalPrioordersQ"; from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader("customer", ""); String fileName = (String) exchange.getIn().getHeader("CamelFileName"); if (fileName.endsWith(".txt")) { exchange.getIn().setHeader("customer", "ABC Corp"); } } }).to(allOrdersQ); from(allOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String recipients = normalPrioOrdersQ; String customer = exchange.getIn().getHeader("customer", String.class); String fileName = (String) exchange.getIn().getHeader("CamelFileName"); if ("ABC Corp".equals(customer)) { System.out.println(fileName + " should sent to both the queues "); recipients = recipients + "," + highPrioOrdersQ; } exchange.getIn().setHeader("recipients", recipients); } }).recipientList(header("recipients")); from(highPrioOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("High prio order queue received " + fileName); } }); from(normalPrioOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Normal prio order queue received " + fileName); } }); } }
Define the class Application.java in com.sample.app
package.
Application.java
package com.sample.app; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; import com.sample.app.routes.FileCopyRoute; public class Application { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new FileCopyRoute()); context.start(); TimeUnit.MINUTES.sleep(1); context.stop(); } }
Assume ‘C:\Users\Public\demo’ directory has below
documents.
ABC Quotation for interios.txt
ABC sales Q1.txt
Homio Orders.docx
XYZ Corp orders.xlsx
Run Application.java, you can see below messages in console.
ABC Quotation for interios.txt should sent to both the
queues
Normal prio order queue received ABC Quotation for
interios.txt
High prio order queue received ABC Quotation for
interios.txt
ABC sales Q1.txt should sent to both the queues
Normal prio order queue received ABC sales Q1.txt
High prio order queue received ABC sales Q1.txt
Normal prio order queue received Homio Orders.docx
Normal prio order queue received XYZ Corp orders.xlsx
I used recipient list as a comma separated list. You can
also use any iterable as dynamic recipient list.
Below are valid for recipient lists.
a.
java.util.Collection
b.
java.util.Iterator
c.
arrays
d.
org.w3c.dom.NodeList
e.
a single String with values separated by
comma
f.
any other type will be regarded as a single
value
For example, below snippet uses ‘java.util.List’ for
recipient list.
from(allOrdersQ).process(new Processor() {
@Override
public void
process(Exchange exchange) throws Exception {
List<String>
recipients = Arrays.asList(normalPrioOrdersQ);
String
customer = exchange.getIn().getHeader("customer", String.class);
String
fileName = (String) exchange.getIn().getHeader("CamelFileName");
if
("ABC Corp".equals(customer)) {
System.out.println(fileName
+ " should sent to both the queues ");
recipients
= Arrays.asList(normalPrioOrdersQ, highPrioOrdersQ);
}
exchange.getIn().setHeader("recipients",
recipients);
}
}).recipientList(header("recipients"));
FileCopyRoute.java
package com.sample.app.routes; import java.util.Arrays; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String allOrdersQ = "demoJMS:queue:AllOrdersQ"; String highPrioOrdersQ = "demoJMS:queue:HighPrioOrdersQ"; String normalPrioOrdersQ = "demoJMS:queue:NormalPrioordersQ"; from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setHeader("customer", ""); String fileName = (String) exchange.getIn().getHeader("CamelFileName"); if (fileName.endsWith(".txt")) { exchange.getIn().setHeader("customer", "ABC Corp"); } } }).to(allOrdersQ); from(allOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { List<String> recipients = Arrays.asList(normalPrioOrdersQ); String customer = exchange.getIn().getHeader("customer", String.class); String fileName = (String) exchange.getIn().getHeader("CamelFileName"); if ("ABC Corp".equals(customer)) { System.out.println(fileName + " should sent to both the queues "); recipients = Arrays.asList(normalPrioOrdersQ, highPrioOrdersQ); } exchange.getIn().setHeader("recipients", recipients); } }).recipientList(header("recipients")); from(highPrioOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("High prio order queue received " + fileName); } }); from(normalPrioOrdersQ).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Normal prio order queue received " + fileName); } }); } }
No comments:
Post a Comment