Saturday 12 January 2019

Camel: Implement custom protocol


In this post, I am going to explain how to implement custom protocol using Apache Camel library. In this post, lets implement custom url "textFilesOnly" that takes a source folder and copy all the text files to some destination directory.

To implement a component, you should implement below interfaces.
org.apache.camel.Component
org.apache.camel.Endpoint
org.apache.camel.Consumer
org.apache.camel.Producer
org.apache.camel.Exchange
org.apache.camel.Message

Once you installed above interfaces, you should install the component in camel context. You can install the component in one of three ways.
a.   You can add the component to camel context using CamelContext.addComponent() method.

b.   You can specify the component class name by creating a file like META-INF/services/org/apache/camel/component/{protocolName}. The {protocolName} file must cotain the class name of the component.
class=com.smaple.app.component.TextFilesComponent

This post is divided into below sections.
a.   Implementing custom component
b.   Implementing custom endpoint
c.    Implementing Custom Consumer
d.   Implementing Custom Producer
e.   Implementing custom Exchnage
f.     Implementing custom Message

Implementing custom component
To implement a custom component, you must implement 'org.apache.camel.Component' interface.

public interface Component extends CamelContextAware {

    Endpoint createEndpoint(String uri) throws Exception;

    boolean useRawUri();

    default Collection<Class<? extends ComponentExtension>> getSupportedExtensions() {
        return Collections.emptyList();
    }

    default <T extends ComponentExtension> Optional<T> getExtension(Class<T> extensionType) {
        return Optional.empty();
    }
}


You can even create custom component by extending 'org.apache.camel.impl.DefaultComponent' class. it is an abstract class and implements Component interface.

public class TextFileTransferComponent extends DefaultComponent{

	@Override
	protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

}

Implementing custom endpoint
You need to implement 'org.apache.camel.Endpoint' interface to develop custom end point.


'DefaultEndpoint' is an abstract class that implements Endpoint interface. I implemented by extending 'TextFileTransferEndpoint' interface.

public class TextFileTransferEndpoint extends DefaultEndpoint {

	public TextFileTransferEndpoint(String endpointUri, Component component) {
		super(endpointUri, component);
	}

	@Override
	public Producer createProducer() throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public Consumer createConsumer(Processor processor) throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

}

Implementing Consumer Interface

You can create custom consumer by implementing 'org.apache.camel.Consumer' interface. Camel provides 'org.apache.camel.impl.DefaultConsumer' class that provides default implementation for Consumer interface.

public class TextFileTransferConsumer extends DefaultConsumer {

	public TextFileTransferConsumer(Endpoint endpoint, Processor processor) {
		super(endpoint, processor);
	}

}

Implementing Producer Interface

You can create custom producer by implementing Producer interface. Camel provides 'DefaultProducer' class that implements Producer interface and provide default implementation. You can extend the DefaultProducer class.

public class TextFileTransferProducer extends DefaultProducer {

	public TextFileTransferProducer(Endpoint endpoint) {
		super(endpoint);
		// TODO Auto-generated constructor stub
	}

	@Override
	public void process(Exchange exchange) throws Exception {
		
	}

}

Developing ‘textFilesOnly’ protocol
Theory is apart. Let’s implement the application, that copy textfiles from one directory to other directory.

Create new maven project 'camelTextFilesOnly'.

Open pom.xml and update maven dependencies.
I am using below maven dependencies.
<dependency>
         <groupId>org.apache.camel</groupId>
         <artifactId>camel-core</artifactId>
         <version>2.22.1</version>
</dependency>

Create a package 'com.sample.app.component' and define the class TextFileTransferComponent.java.


TextFileTransferComponent.java
package com.sample.app.component;

import java.util.Map;

import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;

import com.sample.app.endpoint.TextFileTransferEndpoint;

public class TextFileTransferComponent extends DefaultComponent{

	@Override
	protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
		return new TextFileTransferEndpoint(uri, remaining, this);
	}

}

Create a package 'com.sample.app.endpoint' and define the class TextFileTransferEndpoint.


TextFileTransferEndpoint.java
package com.sample.app.endpoint;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;

import com.sample.app.consumer.TextFileTransferConsumer;
import com.sample.app.producer.TextFileTransferProducer;

@UriEndpoint(firstVersion = "1.0", scheme = "textFilesOnly", title = "TextFilesOnly", syntax = "textFilesOnly:filePath", consumerClass = TextFileTransferConsumer.class)
public class TextFileTransferEndpoint extends DefaultEndpoint {

	@UriPath(description = "path the folder")
	@Metadata(required = "true")
	private String filePath;

	@UriParam(label = "oneLevel")
	private boolean oneLevel;

	public TextFileTransferEndpoint(String endpointUri, String filePath, Component component) {
		super(endpointUri, component);
		this.filePath = filePath;
	}

	@Override
	public Producer createProducer() throws Exception {
		return new TextFileTransferProducer(this);
	}

	@Override
	public Consumer createConsumer(Processor processor) throws Exception {
		return new TextFileTransferConsumer(this, processor);
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	public String getFilePath() {
		return filePath;
	}

	public void setFilePath(String filePath) {
		this.filePath = filePath;
	}

	public boolean isOneLevel() {
		return oneLevel;
	}

	public void setOneLevel(boolean oneLevel) {
		this.oneLevel = oneLevel;
	}

}

Create a package 'com.sample.app.producer' and define the class TextFileTransferProducer.


TextFileTransferProducer.java
package com.sample.app.producer;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;

import com.sample.app.endpoint.TextFileTransferEndpoint;

public class TextFileTransferProducer extends DefaultProducer {

	private String rootDirectoryPath;

	public TextFileTransferProducer(Endpoint endpoint) {
		super(endpoint);
	}

	@Override
	public void process(Exchange exchange) throws Exception {
		String obj = (String) exchange.getIn().getBody();
		exchange.getOut().copyFrom(exchange.getIn());

		TextFileTransferEndpoint textFileTransferEndPoint = (TextFileTransferEndpoint) super.getEndpoint();
		this.rootDirectoryPath = textFileTransferEndPoint.getFilePath();

		String relativePath = (String) exchange.getIn().getHeader("relativePath");
		String fullPath = rootDirectoryPath + File.separator + relativePath;
		//Create parent directories if not exists
		new File(fullPath).getParentFile().mkdirs();
		Files.write(Paths.get(fullPath), obj.getBytes());
	}

}

Create a package 'com.sample.app.consumer' and define the class TextFileTransferConsumer.java.


TextFileTransferConsumer.java
package com.sample.app.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;

import com.sample.app.endpoint.TextFileTransferEndpoint;

public class TextFileTransferConsumer extends DefaultConsumer {
	private Endpoint endpoint;
	private String rootDirectoryPath;
	private File fileToCopy;
	private boolean oneLevel;

	public TextFileTransferConsumer(Endpoint endpoint, Processor processor) {
		super(endpoint, processor);
		this.endpoint = endpoint;
		TextFileTransferEndpoint textFileTransferEndPoint = (TextFileTransferEndpoint) endpoint;

		this.rootDirectoryPath = textFileTransferEndPoint.getFilePath();
		this.fileToCopy = new File(this.rootDirectoryPath);
		
		oneLevel = textFileTransferEndPoint.isOneLevel();
		

	}

	public static String getRelativePath(String parentPath, String childPath) {
		Path parentPATH = Paths.get(parentPath);
		Path childPATH = Paths.get(childPath);
		return parentPATH.relativize(childPATH).toString();
	}

	@Override
	protected void doStart() throws Exception {
		super.doStart();
		copyAllFiles(fileToCopy);
	}

	@Override
	protected void doStop() throws Exception {
		super.doStop();
	}

	public void copyAllFiles(final File folder) throws Exception {
		for (final File file : folder.listFiles()) {
			if (file.isDirectory() && !oneLevel) {
				copyAllFiles(file);
			} else {

				if (file.getName().endsWith(".txt")) {
					System.out.println("Copying the file " + file.getName());
					System.out.println(file.getName());
					Exchange exchange = createExchnage(file);
					getProcessor().process(exchange);
				}

			}
		}
	}

	private Exchange createExchnage(File file) throws IOException {
		String relativePath = getRelativePath(rootDirectoryPath, file.getAbsolutePath());

		Exchange exchange = endpoint.createExchange();
		String content = new String(Files.readAllBytes(Paths.get(file.getAbsolutePath())));
		exchange.getIn().setBody(content);
		exchange.getIn().setHeader("relativePath", relativePath);
		return exchange;
	}

}

Create a package 'com.sample.app.route' and define the class FileCopyRoute.


FileCopyRoute.java
package com.sample.app.route;

import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		from("textFilesOnly:C:\\Users\\Public\\demo?oneLevel=false")
				.to("textFilesOnly:C:\\Users\\Public\\demoTxtFiles");
	}

}

Define the class Application in com.sample.app package.


Application.java
package com.sample.app;

import java.util.concurrent.TimeUnit;

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.route.FileCopyRoute;

public class Application {

	public static void main(String args[]) throws Exception {
		CamelContext context = new DefaultCamelContext();

		context.addRoutes(new FileCopyRoute());

		context.start();

		TimeUnit.MINUTES.sleep(1);

		context.stop();
	}
}

Register the protocol textFilesOnly with camel.
Create a file 'textFilesOnly' under the below folder hierarchy \src\main\resources\META-INF\services\org\apache\camel\component.

‘textFilesOnly’ contains the component full class name.

textFilesOnly
class=com.sample.app.component.TextFileTransferComponent


Total project structure looks like below.



Run Applicaiton.java, it copies all the text files from 'C:\\Users\\Public\\demo' to 'C:\\Users\\Public\\demoTxtFiles'.




Previous                                                 Next                                                 Home

No comments:

Post a Comment