Tuesday, 8 January 2019

Camel: Recipient lists


Recipient list pattern is used to route the message to number of dynamic recipients. In recipient list pattern, list of recipients to a message is calculated at run time.

How to implement Recipient list pattern?
Camel provides recipientList method to implement Recipient list pattern.

For example, you have two jms queues
a.   HighPrioOrders
b.   NormalOrders

If the order comes from big customers, then that order should go to both the queues. If the order comes from Normal customers, then the order should go to only NormalOrders queue.

In my example, I am considering customers like ‘ABC Corp’ as high priority customers.

Setup ActiveMQ
I would recommend you to go through my below post to setup the activeMQ.

Once you setup activeMQ, loginto the admin console by hitting below url.

Use below credentials to login to the admin console.
UserName: admin
Password: admin

You can see below kind of user interface.


Setup Eclipse Maven Project
Create new maven project 'camelRecipientList'. Project structure looks like below.




Update pom.xml for maven dependencies.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>camelRecipientList</groupId>
	<artifactId>camelRecipientList</artifactId>
	<version>1</version>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms -->
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-jms</artifactId>
			<version>2.22.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core -->
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.22.1</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.15.6</version>
		</dependency>

	</dependencies>
</project>

Create a package ‘com.sample.app.routes’ and define the class FileCopyRoute.java.


FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		String allOrdersQ = "demoJMS:queue:AllOrdersQ";
		String highPrioOrdersQ = "demoJMS:queue:HighPrioOrdersQ";
		String normalPrioOrdersQ = "demoJMS:queue:NormalPrioordersQ";

		from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				exchange.getIn().setHeader("customer", "");

				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				if (fileName.endsWith(".txt")) {
					exchange.getIn().setHeader("customer", "ABC Corp");
				}

			}

		}).to(allOrdersQ);

		from(allOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				String recipients = normalPrioOrdersQ;

				String customer = exchange.getIn().getHeader("customer", String.class);
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");

				if ("ABC Corp".equals(customer)) {
					System.out.println(fileName + " should sent to both the queues ");
					recipients = recipients + "," + highPrioOrdersQ;
				}
				exchange.getIn().setHeader("recipients", recipients);
			}

		}).recipientList(header("recipients"));
		
		from(highPrioOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("High prio order queue received " + fileName);
			}
			
		});
		
		from(normalPrioOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("Normal prio order queue received " + fileName);
			}
			
		});

	}
}

Define the class Application.java in com.sample.app package.


Application.java
package com.sample.app;

import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.routes.FileCopyRoute;

public class Application {
	public static void main(String args[]) throws Exception {
		CamelContext context = new DefaultCamelContext();

		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

		context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

		context.addRoutes(new FileCopyRoute());

		context.start();

		TimeUnit.MINUTES.sleep(1);

		context.stop();
	}
}

Assume ‘C:\Users\Public\demo’ directory has below documents.

ABC Quotation for interios.txt
ABC sales Q1.txt
Homio Orders.docx
XYZ Corp orders.xlsx


Run Application.java, you can see below messages in console.

ABC Quotation for interios.txt should sent to both the queues
Normal prio order queue received ABC Quotation for interios.txt
High prio order queue received ABC Quotation for interios.txt
ABC sales Q1.txt should sent to both the queues
Normal prio order queue received ABC sales Q1.txt
High prio order queue received ABC sales Q1.txt
Normal prio order queue received Homio Orders.docx
Normal prio order queue received XYZ Corp orders.xlsx 

I used recipient list as a comma separated list. You can also use any iterable as dynamic recipient list.
Below are valid for recipient lists.

a.   java.util.Collection
b.   java.util.Iterator
c.    arrays
d.   org.w3c.dom.NodeList
e.   a single String with values separated by comma
f.     any other type will be regarded as a single value

For example, below snippet uses ‘java.util.List’ for recipient list.

from(allOrdersQ).process(new Processor() {

         @Override
         public void process(Exchange exchange) throws Exception {
                  List<String> recipients = Arrays.asList(normalPrioOrdersQ);

                  String customer = exchange.getIn().getHeader("customer", String.class);
                  String fileName = (String) exchange.getIn().getHeader("CamelFileName");

                  if ("ABC Corp".equals(customer)) {
                           System.out.println(fileName + " should sent to both the queues ");
                           recipients = Arrays.asList(normalPrioOrdersQ, highPrioOrdersQ);
                  }
                  exchange.getIn().setHeader("recipients", recipients);
         }

}).recipientList(header("recipients"));


FileCopyRoute.java

package com.sample.app.routes;

import java.util.Arrays;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		String allOrdersQ = "demoJMS:queue:AllOrdersQ";
		String highPrioOrdersQ = "demoJMS:queue:HighPrioOrdersQ";
		String normalPrioOrdersQ = "demoJMS:queue:NormalPrioordersQ";

		from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				exchange.getIn().setHeader("customer", "");

				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				if (fileName.endsWith(".txt")) {
					exchange.getIn().setHeader("customer", "ABC Corp");
				}

			}

		}).to(allOrdersQ);

		from(allOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				List<String> recipients = Arrays.asList(normalPrioOrdersQ);

				String customer = exchange.getIn().getHeader("customer", String.class);
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");

				if ("ABC Corp".equals(customer)) {
					System.out.println(fileName + " should sent to both the queues ");
					recipients = Arrays.asList(normalPrioOrdersQ, highPrioOrdersQ);
				}
				exchange.getIn().setHeader("recipients", recipients);
			}

		}).recipientList(header("recipients"));
		
		from(highPrioOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("High prio order queue received " + fileName);
			}
			
		});
		
		from(normalPrioOrdersQ).process(new Processor() {

			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("Normal prio order queue received " + fileName);
			}
			
		});

	}
}


Previous                                                 Next                                                 Home<

No comments:

Post a Comment