This is continuation to my previous post. In my previous
post, I explained how to multicast the messages. By default, camel continues to
multicasting the messages to destinations, even it fails to deliver to one or
more destinations.
How can you stop the
multicasting on exception?
If you want to stop the multicasting of messages in
exceptional case, you can do that by using stopOnException() method.
from("file:C:\\Users\\Public\\demo?noop=true")
.to(queue1)
.multicast()
.stopOnException()
.parallelProcessing()
.executorService(executor)
.to(queue2,
queue3, queue4);
FileCopyRoute.java
package com.sample.app.routes; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; public class FileCopyRoute extends RouteBuilder { @Override public void configure() throws Exception { String queue1 = "demoJMS:queue:A"; String queue2 = "demoJMS:queue:B"; String queue3 = "demoJMS:queue:C"; String queue4 = "demoJMS:queue:D"; ExecutorService executor = Executors.newFixedThreadPool(20); from("file:C:\\Users\\Public\\demo?noop=true").to(queue1).multicast().stopOnException().parallelProcessing() .executorService(executor).to(queue2, queue3, queue4); from(queue2).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue2 received " + fileName); } }); from(queue3).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue3 received " + fileName); } }); from(queue4).process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String fileName = (String) exchange.getIn().getHeader("CamelFileName"); System.out.println("Queue4 received " + fileName); } }); } }
Application.java
package com.sample.app; import java.util.concurrent.TimeUnit; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; import com.sample.app.routes.FileCopyRoute; public class Application { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); context.addComponent("demoJMS", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new FileCopyRoute()); context.start(); TimeUnit.MINUTES.sleep(1); context.stop(); } }
No comments:
Post a Comment