Friday 28 December 2018

Apache camel: Sending files from filesystem to jms queue

In this post, I am going to explain how to send files from filesystem to a jms queue using Apache Camel.


What is JMS?
JMS stands for Java Message Service, it is an open standard, that documents common way to create, send, receive and read an enterprise messaging system’s messages.

JMS documents, how the asynchronous communication between enterprise applications takes place.

Why JMS?
Prior to JMS specification, every messaging service vendor has their own implementation of messaging apis, it is quite difficult to a user to migrate from one messaging vendor to other. Developers end up in writing vendor specific implementation. JMS exists to address these issues.

By using JMS client, you can able to connect to any JMS compliant provider applications like Active MQ, Rabbit MQ etc.,

In this post, I am going to use ActiveMQ as the JMS queue.

Setting up ActiveMQ
I would recommend you to go through my below post to setup the activeMQ.

Once you setup activeMQ, loginto the admin console by hitting below url.

Use below credentials to login to the admin console.
UserName: admin
Password: admin

You can see below kind of user interface.



Click on the tab ‘Queues’.

Set queue name as 'camelFileTransfer', and click on Create button.
Queue gets created.


Setting up Eclipse project to transfer the files to ActiveMQ Queue
Open Eclipse.

Right click on Project Explorer -> New -> Other.


In the new wizard section, select the ‘Maven Project’
Click on Next button.
Select the checkbox ‘Create a simple project (skip archetype selection).
Click on Next button.
Set the Group Id, Artifact Id as ‘camelFileTransferToJMS´and click on Finish button.

It generates a project structure like below.
Open pom.xml and update the dependencies.

Final pom.xml file looks like below.

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>camelFileTransferToJMS</groupId>
 <artifactId>camelFileTransferToJMS</artifactId>
 <version>1</version>

 <dependencies>
  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jms -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-jms</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-core -->
  <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-core</artifactId>
   <version>2.22.1</version>
  </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.15.6</version>
  </dependency>

 </dependencies>
</project>

Create a package ‘com.sample.app.routes’ and define the class FileCopyRoute.java.


FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  this.from("file:C:\\Users\\Public\\demo?noop=true").to("demoJMS:queue:camelFileTransfer");
 }

}

Above route copy the files from the directory ‘C:\\Users\\Public\\demo’ to the queue camelFileTransfer.


Application.java
 package com.sample.app;

import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.routes.FileCopyRoute;
import org.apache.camel.component.jms.JmsComponent;

public class Application {
 public static void main(String args[]) throws Exception {
  CamelContext context = new DefaultCamelContext();

  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

  context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

  context.addRoutes(new FileCopyRoute());

  context.start();

  TimeUnit.MINUTES.sleep(1);

  context.stop();
 }
}

Run Application.java, you can see below messages in the console.
INFO | Apache Camel 2.22.1 (CamelContext: camel-1) is starting
 INFO | JMX is enabled
 INFO | Type converters loaded (core: 195, classpath: 3)
 INFO | StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
 INFO | Endpoint is configured with noop=true so forcing endpoint to be idempotent as well
 INFO | Using default memory based idempotent repository with cache max size: 1000
 INFO | Route: route1 started and consuming from: file://C:%5CUsers%5CPublic%5Cdemo?noop=true
 INFO | Total 1 routes, of which 1 are started
 INFO | Apache Camel 2.22.1 (CamelContext: camel-1) started in 1.705 seconds
 INFO | Apache Camel 2.22.1 (CamelContext: camel-1) is shutting down
 INFO | Starting to graceful shutdown 1 routes (timeout 300 seconds)
 INFO | Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 1]
 INFO | Route: route1 shutdown complete, was consuming from: file://C:%5CUsers%5CPublic%5Cdemo?noop=true
 INFO | Graceful shutdown of 1 routes completed in 1 seconds
 INFO | Apache Camel 2.22.1 (CamelContext: camel-1) uptime 1 minute
 INFO | Apache Camel 2.22.1 (CamelContext: camel-1) is shutdown in 1.058 seconds

Once the application executed successfully, recheck the queue status from activeMQ admin console.


As you see, now there are two messages in enqueued state.

‘file:C:\\Users\\Public\\demo’ folder has two files in it. It copied the files to the queue.

Click on the queue ‘camelFileTransfer’, you can see two messages

Click on the any message id, you can see more details about the message.


For example, one of the message contains below information like message body, file length, file source path etc.,

Source code analysis
Below statements add jms component to camel context.

CamelContext context = new DefaultCamelContext();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

As you see, I added demoJMS component. In my case jms application is running on tcp port 61616.

Below statement move the files from demo folder the camelFileTransfer queue. You see the to method, I am using demoJMS component that I attached to camel context in previous step.

from("file:C:\\Users\\Public\\demo?noop=true").to("demoJMS:queue:camelFileTransfer");


How the mapping between file to jms queue happened?
You may wonder, how the mapping between a file system file and the jms queue is done. Camel has inbuilt converters that convert given input to specific jms type.

For example,
java.io.File is converted to javax.jms.BytesMessage
String is converted to javax.jms.BytesMessage

You can even process the message before transferring it to the jms queue.

For example, below statements print the file name and length of the file.

                  from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() {
                           public void process(Exchange exchange) throws Exception {
                                    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
                                    long fileLength = (Long) exchange.getIn().getHeader("CamelFileLength");

                                    System.out.println(fileName + " of length " + fileLength + " is copied from file system to exchnage");
                           }
                  }).to("demoJMS:queue:camelFileTransfer");

FileCopyRoute.java
package com.sample.app.routes;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

 @Override
 public void configure() throws Exception {
  from("file:C:\\Users\\Public\\demo?noop=true").process(new Processor() {
   public void process(Exchange exchange) throws Exception {
    String fileName = (String) exchange.getIn().getHeader("CamelFileName");
    long fileLength = (Long) exchange.getIn().getHeader("CamelFileLength");

    System.out.println(fileName + " of length " + fileLength + " is copied from file system to exchnage");
   }
  }).to("demoJMS:queue:camelFileTransfer");
 }

}


Relaunch Application.java, you can see below kind of messages in console.

Hello.bat of length 321 is copied from file system to exchnage
sample.txt of length 11 is copied from file system to exchange


Previous                                                 Next                                                 Home

No comments:

Post a Comment