Wednesday 2 January 2019

Camel: Filtering messages to jms queues

In this post, I am going to explain, how can you filter the messages while sending to JMS Queues.



Let’s implement an example like below.
a.   Copy the files from local file system to an ActiveMQ queue ‘A’.
b.   Multicast all the data from queue ‘A’ to other Queues B, C, D.

Setup ActiveMQ
I would recommend you to go through my below post to setup the activeMQ.

Once you setup activeMQ, loginto the admin console by hitting below url.

Use below credentials to login to the admin console.
UserName: admin
Password: admin

You can see below kind of user interface.


Setup Eclipse Maven Project
Create new maven project 'camelMultiCasting'. Project structure looks like below.

Open pom.xml and update maven dependencies.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>camelMultiCasting</groupId>
 <artifactId>camelMultiCasting</artifactId>
 <version>1</version>


 <dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-jms</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-core</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.6</version>
  </dependency>

 </dependencies>
</project>

Below statement moves the files from local folder to queue1 and then multicast all the items in queue to queue2, queue3, queue4.

from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2, queue3, queue4);

Create new package 'com.sample.app.routes' and define the class FileCopyRoute.

FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String queue1 = "demoJMS:queue:A";
  String queue2 = "demoJMS:queue:B";
  String queue3 = "demoJMS:queue:C";
  String queue4 = "demoJMS:queue:D";

  from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().to(queue2, queue3, queue4);

  from(queue2).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue2 received " + fileName);

   }
  });

  from(queue3).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue3 received " + fileName);

   }
  });

  from(queue4).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue4 received " + fileName);

   }
  });
 }
}


Application.java
package com.sample.app;

import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.routes.FileCopyRoute;

public class Application {
 public static void main(String args[]) throws Exception {
  CamelContext context = new DefaultCamelContext();

  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

  context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

  context.addRoutes(new FileCopyRoute());

  context.start();

  TimeUnit.MINUTES.sleep(1);

  context.stop();
 }
}

In my case ‘demo’ folder has below documents.
BigFile.txt
Chapter1.docx
Hello.bat
sales.xlsx
SmallFile.txt

Run Application.java, you can see below messages in the console.

Queue2 received BigFile.txt
Queue3 received BigFile.txt
Queue4 received BigFile.txt
Queue2 received Chapter1.docx
Queue3 received Chapter1.docx
Queue4 received Chapter1.docx
Queue2 received Hello.bat
Queue3 received Hello.bat
Queue4 received Hello.bat
Queue2 received sales.xlsx
Queue3 received sales.xlsx
Queue4 received sales.xlsx
Queue2 received SmallFile.txt
Queue3 received SmallFile.txt

Queue4 received SmallFile.txt

See the above image, Queue A has 5 messages, but Queues B, C, and D has 0 pending messages. It is because, we processed the messages from B, C, D queues (while printing), but not from Queue A.

One more thing, when you observe the console output, messages are transmitted sequentially. If you want parallel behavior, you can use parallelProcessing() function.

from("file:C:\\Users\\Public\\demo?noop=true")
         .to(queue1)
         .multicast()
         .parallelProcessing()
         .to(queue2, queue3,queue4);

FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String queue1 = "demoJMS:queue:A";
  String queue2 = "demoJMS:queue:B";
  String queue3 = "demoJMS:queue:C";
  String queue4 = "demoJMS:queue:D";

  from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().parallelProcessing().to(queue2, queue3,
    queue4);

  from(queue2).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue2 received " + fileName);

   }
  });

  from(queue3).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue3 received " + fileName);

   }
  });

  from(queue4).process(new Processor() {
   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("Queue4 received " + fileName);

   }
  });
 }
}


Rerun Application.java, you can see the parallel behavior in message transmission. In my case, I seen below messages in console.

Queue2 received BigFile.txt
Queue4 received BigFile.txt
Queue3 received BigFile.txt
Queue3 received Chapter1.docx
Queue2 received Chapter1.docx
Queue4 received Chapter1.docx
Queue4 received Hello.bat
Queue2 received Hello.bat
Queue3 received Hello.bat
Queue3 received sales.xlsx
Queue4 received sales.xlsx
Queue2 received sales.xlsx
Queue4 received SmallFile.txt
Queue2 received SmallFile.txt
Queue3 received SmallFile.txt

When you request camel to perform parallel processing, camel use a default thread pool of size 10. You can even pass another thread pool to perform this task by calling executorService() method.

ExecutorService executor = Executors.newFixedThreadPool(20);

from("file:C:\\Users\\Public\\demo?noop=true")
         .to(queue1)
         .multicast()
         .parallelProcessing()
         .executorService(executor)
         .to(queue2, queue3, queue4);

Above snippet increase the maximum number of threads to 20.


Previous                                                 Next                                                 Home

No comments:

Post a Comment