Saturday 19 September 2015

RabbitMQ: Receive messages based on pattern

In previous tutorial, I explained how to deleiver messages using direct exchange. In this post I am going to explain, the usage of topic exchange.

Topic exchange is similar to direct exchange, only difference is routing key is a list of words delimited by dots. The words can be anything, for example, orders.asia.india, orders.asia.china, orders.asia.*, orders.#

*, # are special symbols, has special meaning.

‘*’ match exactly any one word.
‘#’ match zero or more words.

Suppose you are working for an global e-commerce company. You want to log orders country wise.

Routing key
Meaning
Orders.asia.india
Matches all order messages specific to india
orders.asia.china
Matches all order messages specific to china
orders.asia.*
Matches all order messages specific to asia
orders.#
Matches all order messages for this comapny


Following is the Producer program.

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

 private final static String INDIA_QUEUE = "india_orders";
 private final static String CHINA_QUEUE = "china_orders";
 private final static String ASIA_QUEUE = "asia_orders";
 private final static String ALL_QUEUE = "aLL_orders";

 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] argv) throws Exception {
  Connection connection = getConnection();

  Channel channel = connection.createChannel();

  /* Create exchanger */
  channel.exchangeDeclare("orders_exchange", "topic");

  channel.queueDeclare(INDIA_QUEUE, false, false, false, null);
  channel.queueDeclare(CHINA_QUEUE, false, false, false, null);
  channel.queueDeclare(ASIA_QUEUE, false, false, false, null);
  channel.queueDeclare(ALL_QUEUE, false, false, false, null);

  /* Bind queues to this exchanger */
  channel.queueBind(INDIA_QUEUE, "orders_exchange", "orders.asia.india");
  channel.queueBind(CHINA_QUEUE, "orders_exchange", "orders.asia.china");
  channel.queueBind(ASIA_QUEUE, "orders_exchange", "orders.asia.*");
  channel.queueBind(ALL_QUEUE, "orders_exchange", "orders.#");

  String messageIndia = "India orders 1M$";
  String messageChina = "China orders 2M$";
  String messageBangladesh = "Asia orders 0.3M$";
  String messageUK = "UK orders 10M$";

  /* Publish orders */
  channel.basicPublish("orders_exchange", "orders.asia.india", null,
    messageIndia.getBytes());
  channel.basicPublish("orders_exchange", "orders.asia.china", null,
    messageChina.getBytes());
  channel.basicPublish("orders_exchange", "orders.asia.bangladesh", null,
    messageBangladesh.getBytes());
  channel.basicPublish("orders_exchange", "orders.europe.uk", null,
    messageUK.getBytes());

  /* Close channel and connection */
  channel.close();
  connection.close();
 }
}


Following is the Receiver (consumer) program.

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] args) throws Exception {
  Connection connection = getConnection();
  Channel channel = connection.createChannel();
  String queueName = args[0];
  String routingKey = args[1];

  /* Create exchanger */
  channel.exchangeDeclare("orders_exchange", "topic");

  /* Create queue if not exist */
  channel.queueDeclare(queueName, false, false, false, null);
  channel.queueBind(queueName, "orders_exchange", routingKey);

  System.out.println("Waiting for tasks");

  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
     AMQP.BasicProperties properties, byte[] body)
     throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] " + message);
   }
  };
  channel.basicConsume(queueName, consumer);

 }
}


Compile Producer and Receiver programs.
javac -cp rabbitmq-client.jar Receiver.java Producer.java

Open new terminal and run Receiver to receive orders information from India.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver india_orders orders.asia.india

Open new terminal and run Receiver to receive orders information from china.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver china_orders orders.asia.china

Open new terminal and run Receiver to receive orders information from all countries of asia.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver asia_orders orders.asia.*

Open new terminal and run Receiver to receive orders information from all countries.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver all_orders orders.#

Run Producer program you will get orders of India to india_orders consumer, orders of china to china_orders consumer, order of India, China, Bangladesh to asia_orders consumer, order of India, China, Bangladesh, UK to all_orders consumer.

Use following command to run producer program.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver india_orders orders.asia.india
Waiting for tasks
 [x] India orders 1M$


$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver china_orders orders.asia.china
Waiting for tasks
 [x] China orders 2M$


$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver asia_orders orders.asia.*
Waiting for tasks
 [x] India orders 1M$
 [x] China orders 2M$
 [x] Asia orders 0.3M$


$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver all_orders orders.#
Waiting for tasks
 [x] India orders 1M$
 [x] China orders 2M$
 [x] Asia orders 0.3M$
 [x] UK orders 10M$




Prevoius                                                 Next                                                 Home

No comments:

Post a Comment