Saturday 19 September 2015

RabbitMQ: Receiving messages selectively

In previous post, I explained how to broadcast messages to all queues. In this post I am going to explain how to send messages to queues selectively. We can achieve this by using exchange of type direct.

Direct exchange routes messages with a routing key equal to the routing key declared by the binding queue.

Suppose I am maintaining 3 kind of log files.
a.   Information log
b.   Warning log
c.    Error log

When publisher sends message to exchange, If message is of type INFO, it should go to information log, if message of type WARN, it should go to warning log, if message is of type ERROR, it should go to error log.

Following figure depicts this.
Step 1: Declare exchange of type direct.
exchangeDeclare(String exchange, String type) throws IOException;

Ex:
channel.exchangeDeclare("log_exchanger", "direct");

Step 2: Bind queues using queueBind method.
queueBind(String queue, String exchange, String routingKey) throws IOException;

Ex:
channel.queueBind(INFO_QUEUE, "log_exchanger", "INFO");
channel.queueBind(WARN_QUEUE, "log_exchanger", "WARN");
channel.queueBind(ERROR_QUEUE, "log_exchanger", "ERROR");

Step 3: Publish message by specifying routing key.
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

Ex:
channel.basicPublish("log_exchanger", "INFO", null,message.getBytes());

Step 4: At Consumer end, we need to bind queues.
channel.queueBind(queueName, "log_exchanger", severity);


Following is the Producer program.
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

 private final static String INFO_QUEUE = "info_queue";
 private final static String ERROR_QUEUE = "error_queue";
 private final static String WARN_QUEUE = "warn_queue";

 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] argv) throws Exception {
  Connection connection = getConnection();

  Channel channel = connection.createChannel();

  /* Create exchanger */
  channel.exchangeDeclare("log_exchanger", "direct");
  
  channel.queueDeclare(INFO_QUEUE, false, false, false, null);
  channel.queueDeclare(WARN_QUEUE, false, false, false, null);
  channel.queueDeclare(ERROR_QUEUE, false, false, false, null);

  /* Bind queues to this exchanger */
  channel.queueBind(INFO_QUEUE, "log_exchanger", "INFO");
  channel.queueBind(WARN_QUEUE, "log_exchanger", "WARN");
  channel.queueBind(ERROR_QUEUE, "log_exchanger", "ERROR");

  for (int i = 1; i < 10; i++) {
   String infoMessage = "InforMation message " + i;
   String warnMessage = "Warning message " + i;
   String errorMessage = "Error message " + i;

   /* Public message to exchanger */
   channel.basicPublish("log_exchanger", "INFO", null,
     infoMessage.getBytes());

   channel.basicPublish("log_exchanger", "WARN", null,
     warnMessage.getBytes());

   channel.basicPublish("log_exchanger", "ERROR", null,
     errorMessage.getBytes());
  }

  /* Close channel and connection */
  channel.close();
  connection.close();
 }
}


Following is the Consumer program.

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] args) throws Exception {
  Connection connection = getConnection();
  Channel channel = connection.createChannel();
  String queueName = args[0];
  String severity = args[1];

  /* Create exchanger */
  channel.exchangeDeclare("log_exchanger", "direct");
  
  /* Create queue if not exist */
  channel.queueDeclare(queueName, false, false, false, null);
  channel.queueBind(queueName, "log_exchanger", severity);

  System.out.println("Waiting for tasks");

  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
     AMQP.BasicProperties properties, byte[] body)
     throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] " + message);
   }
  };
  channel.basicConsume(queueName, consumer);

 }
}


Compile Producer and Receiver files.
javac -cp rabbitmq-client.jar Receiver.java Producer.java

Open new terminal and run information messages receiver.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver info_queue INFO

Open new terminal and run error messages receiver.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver error_queue ERROR

Open new terminal and run warning messages receiver.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver warn_queue WARN

It is time to run producer.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

Once you ran producer, you will get following outputs, in corresponding receiver windows.

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver info_queue INFO
Waiting for tasks
 [x] InforMation message 1
 [x] InforMation message 2
 [x] InforMation message 3
 [x] InforMation message 4
 [x] InforMation message 5
 [x] InforMation message 6
 [x] InforMation message 7
 [x] InforMation message 8
 [x] InforMation message 9

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver error_queue ERROR
Waiting for tasks
 [x] Error message 1
 [x] Error message 2
 [x] Error message 3
 [x] Error message 4
 [x] Error message 5
 [x] Error message 6
 [x] Error message 7
 [x] Error message 8
 [x] Error message 9

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver warn_queue WARN
Waiting for tasks
 [x] Warning message 1
 [x] Warning message 2
 [x] Warning message 3
 [x] Warning message 4
 [x] Warning message 5
 [x] Warning message 6
 [x] Warning message 7
 [x] Warning message 8
 [x] Warning message 9




Prevoius                                                 Next                                                 Home

No comments:

Post a Comment