Sunday, 30 December 2018

Apache Camel: Content based routing using jms

In my previous post, I explained how can we route the data based on the content within the file system. In this post, I am going to explain how to route the content to JMS queues.
As you see above image, I am going to copy all the files from file system to ‘Raw Queue’, after that all the xls files are moved to xls queue, txt files are moved to txt queue and remaining all the files are moved to Other Queue.

To demo this application, I am using ActiveMQ as JMS.

What is JMS?
JMS stands for Java Message Service, it is an open standard, that documents common way to create, send, receive and read an enterprise messaging system’s messages.

JMS documents, how the asynchronous communication between enterprise applications takes place.

Why JMS?
Prior to JMS specification, every messaging service vendor has their own implementation of messaging apis, it is quite difficult to a user to migrate from one messaging vendor to other. Developers end up in writing vendor specific implementation. JMS exists to address these issues.

By using JMS client, you can able to connect to any JMS compliant provider applications like Active MQ, Rabbit MQ etc.,

In this post, I am going to use ActiveMQ as the JMS queue.

Setting up ActiveMQ
I would recommend you to go through my below post to setup the activeMQ.

Once you setup activeMQ, loginto the admin console by hitting below url.

Use below credentials to login to the admin console.
UserName: admin
Password: admin

You can see below kind of user interface.

Click on the tab ‘Queues’.

Create below 4 queues.
a.   RawQueue
b.   XlsQueue
c.   TxtQueue
d.   OtherQueue


Setting Up Eclipse Project
Create new maven project ‘camelContentRoutingToJMS’.

Update pom.xml with maven dependencies.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>camelContentRoutingToJMS</groupId>
 <artifactId>camelContentRoutingToJMS</artifactId>
 <version>1</version>

 <dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-jms</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-core</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.6</version>
  </dependency>

 </dependencies>
</project>

Create a package ‘com.sample.app.routes’ and define the class ‘FileCopyRoute.java’ like below.


FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";

  Predicate nonXlsAndTxtPredicate = new Predicate() {

   @Override
   public boolean matches(Exchange exchange) {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt");
   }

  };

  from(sourceFolder).to(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue);

 }

}

Define Application.java under ‘com.sample.app’ package.


Application.java
package com.sample.app;

import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.routes.FileCopyRoute;

public class Application {
 public static void main(String args[]) throws Exception {
  CamelContext context = new DefaultCamelContext();

  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

  context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

  context.addRoutes(new FileCopyRoute());

  context.start();

  TimeUnit.MINUTES.sleep(1);

  context.stop();
 }
}

Assume source folder has below files.
abc.txt
Chapter1.docx
Hello.bat
sales.xlsx
sample.txt

Once you ran Application.java, you can observe below things.
a.   OtherQueue has 2 message.
b.   RawQueue has 5 message.
c.   TxtQueue has 2 messages.

d.   XlsQueue has 1 message.
As you closed observe, there is a bug in above code. See above image, items in the RawQueue are not cleared. It is because of the below snippet.

from(sourceFolder)
.to(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.when(nonXlsAndTxtPredicate)
.to(otherQueue);

Above snippet populate all the queues from the sourceFolder.

But our use case is like below.
a.   Populate RawQueue from sourceFolder.
b.   Populate XlsQueue, TxtQueue and OtherQueue from RawQueue.

We should write the logic like below.

from(sourceFolder).to(rawQueue);
                 
from(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.when(nonXlsAndTxtPredicate)
.to(otherQueue);

Delete all the queues. Update FileCopyRoute.java with above snippet.


FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";

  Predicate nonXlsAndTxtPredicate = new Predicate() {

   @Override
   public boolean matches(Exchange exchange) {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt");
   }

  };

  from(sourceFolder).to(rawQueue);
  
  from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue);

 }

}


Re run Application.java, you can observe that messages in the raw queue are dequeued and distributed to other queues.
Let’s process the messages in OtherQueue, TxtQueue and XlsQueue.

from(xlsQueue).process(new Processor() {

         @Override
         public void process(Exchange exchange) throws Exception {
                  String fileName = (String) exchange.getIn().getHeader("CamelFileName");
                  System.out.println("XlsQueue  received " + fileName);

         }

});

from(txtQueue).process(new Processor() {

         @Override
         public void process(Exchange exchange) throws Exception {
                  String fileName = (String) exchange.getIn().getHeader("CamelFileName");
                  System.out.println("TxtQueue  received " + fileName);

         }

});

from(otherQueue).process(new Processor() {

         @Override
         public void process(Exchange exchange) throws Exception {
                  String fileName = (String) exchange.getIn().getHeader("CamelFileName");
                  System.out.println("OtherQueue  received " + fileName);

         }

});

Update FileCopyRoute.java like below.

FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";

  Predicate nonXlsAndTxtPredicate = new Predicate() {

   @Override
   public boolean matches(Exchange exchange) {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt");
   }

  };

  from(sourceFolder).to(rawQueue);

  from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).when(nonXlsAndTxtPredicate).to(otherQueue);

  from(xlsQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("XlsQueue  received " + fileName);

   }

  });

  from(txtQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("TxtQueue  received " + fileName);

   }

  });

  from(otherQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("OtherQueue  received " + fileName);

   }

  });

 }

}


Delete all the Queues and rerun Application.java, you can see below messages in the console.

INFO | Route: route1 started and consuming from: file://C:%5CUsers%5CPublic%5Cdemo?noop=true
 INFO | Route: route2 started and consuming from: demoJMS://queue:RawQueue
 INFO | Route: route3 started and consuming from: demoJMS://queue:XlsQueue
 INFO | Route: route4 started and consuming from: demoJMS://queue:TxtQueue
 INFO | Route: route5 started and consuming from: demoJMS://queue:OtherQueue
 INFO | Total 5 routes, of which 5 are started
 INFO | Apache Camel 2.22.1 (CamelContext: camel-1) started in 1.220 seconds
TxtQueue  received abc.txt
OtherQueue  received Chapter1.docx
OtherQueue  received Hello.bat
XlsQueue  received sales.xlsx
TxtQueue  received sample.txt

Further enhancing above application using otherwise function
I written my own predicate to match all the non xls and txt files like below.
Predicate nonXlsAndTxtPredicate = new Predicate() {

         @Override
         public boolean matches(Exchange exchange) {
                  String fileName = (String) exchange.getIn().getHeader("CamelFileName");
                  return !fileName.endsWith(".xlsx") && !fileName.endsWith(".txt");
         }

};
But by using otherwise() method, I can route all the messages that are not matched to any predicate.

from(rawQueue)
.choice()
.when(header("CamelFileName")
.endsWith(".xlsx"))
.to(xlsQueue)
.when(header("CamelFileName")
.endsWith(".txt"))
.to(txtQueue)
.otherwise()
.to(otherQueue);


FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";

  from(sourceFolder).to(rawQueue);

  from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).otherwise().to(otherQueue);

  from(xlsQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("XlsQueue  received " + fileName);

   }

  });

  from(txtQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("TxtQueue  received " + fileName);

   }

  });

  from(otherQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("OtherQueue  received " + fileName);

   }

  });

 }

}

Let’s further enhance above flow. After the messages added to the xls, txt and Other queues. I want all the messages to be added to Final Queue.
We need to end the choice and add other destination FinalQueue using to method like below.

from(rawQueue)
  .choice()
    .when(header("CamelFileName").endsWith(".xlsx"))
      .to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt"))
      .to(txtQueue)
         .otherwise()
      .to(otherQueue)
  .end()
.to(finalQueue);

Update FileCopyRoute.java like below.

FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";
  String finalQueue = "demoJMS:queue:FinalQueue";

  from(sourceFolder).to(rawQueue);

  from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).otherwise().to(otherQueue).end()
    .to(finalQueue);

  from(finalQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("FinalQueue  received " + fileName);

   }

  });

 }

}


Rerun Application.java, you can see below messages in console.

FinalQueue  received abc.txt
FinalQueue  received Chapter1.docx
FinalQueue  received Hello.bat
FinalQueue  received sales.xlsx
FinalQueue  received sample.txt
As you see above image, messages are not actually dequeued, they are just routed to FinalQueue.

What about stopping some kind of messages?
Suppose,  FinalQueue is not interested in processing .txt messages. You can achieve this by calling stop() method.

from(rawQueue)
  .choice()
    .when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).stop()
    .otherwise().to(otherQueue)
  .end()
.to(finalQueue);

As you see above snippet, I called stop() method on txtQueue. ‘stop’ method stops continue routing the current Exchange and marks it as completed.

FileCopyRoute.java

package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  String sourceFolder = "file:C:\\Users\\Public\\demo?noop=true";
  String rawQueue = "demoJMS:queue:RawQueue";
  String xlsQueue = "demoJMS:queue:XlsQueue";
  String txtQueue = "demoJMS:queue:TxtQueue";
  String otherQueue = "demoJMS:queue:OtherQueue";
  String finalQueue = "demoJMS:queue:FinalQueue";

  from(sourceFolder).to(rawQueue);

  from(rawQueue).choice().when(header("CamelFileName").endsWith(".xlsx")).to(xlsQueue)
    .when(header("CamelFileName").endsWith(".txt")).to(txtQueue).stop().otherwise().to(otherQueue).end()
    .to(finalQueue);

  from(finalQueue).process(new Processor() {

   @Override
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    System.out.println("FinalQueue  received " + fileName);

   }

  });

 }

}


Delete all existing queues and rerun Application.java, you can see below messages in the console.

FinalQueue  received Chapter1.docx
FinalQueue  received Hello.bat
FinalQueue  received sales.xlsx



Previous                                                 Next                                                 Home

No comments:

Post a Comment