Tuesday 8 January 2019

Camel: Exception handling vs Multicasting the messages


This is continuation to my previous post. In my previous post, I explained how to multicast the messages. By default, camel continues to multicasting the messages to destinations, even it fails to deliver to one or more destinations.

How can you stop the multicasting on exception?
If you want to stop the multicasting of messages in exceptional case, you can do that by using stopOnException() method.

from("file:C:\\Users\\Public\\demo?noop=true")
         .to(queue1)
         .multicast()
         .stopOnException()
         .parallelProcessing()
         .executorService(executor)
         .to(queue2, queue3, queue4);

FileCopyRoute.java

package com.sample.app.routes;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;

public class FileCopyRoute extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		String queue1 = "demoJMS:queue:A";
		String queue2 = "demoJMS:queue:B";
		String queue3 = "demoJMS:queue:C";
		String queue4 = "demoJMS:queue:D";

		ExecutorService executor = Executors.newFixedThreadPool(20);

		from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().stopOnException().parallelProcessing()
				.executorService(executor).to(queue2, queue3, queue4);

		from(queue2).process(new Processor() {
			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("Queue2 received " + fileName);

			}
		});

		from(queue3).process(new Processor() {
			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("Queue3 received " + fileName);

			}
		});

		from(queue4).process(new Processor() {
			@Override
			public void process(Exchange exchange) throws Exception {
				String fileName = (String) exchange.getIn().getHeader("CamelFileName");
				System.out.println("Queue4 received " + fileName);

			}
		});
	}
}


Application.java

package com.sample.app;

import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

import com.sample.app.routes.FileCopyRoute;

public class Application {
	public static void main(String args[]) throws Exception {
		CamelContext context = new DefaultCamelContext();

		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

		context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

		context.addRoutes(new FileCopyRoute());

		context.start();

		TimeUnit.MINUTES.sleep(1);

		context.stop();
	}
}



Previous                                                 Next                                                 Home<

No comments:

Post a Comment