Saturday, 19 September 2015

RabbitMQ: Deliver same message to multiple consumers

In this post, I am going to explain how to send same message to multiple consumers. In this post, you are going to learn new term called ‘exchanges’.

Exchange
In previous posts, we are sending messages to the queue directly. In basic model of RabbitMQ, producer sends messages to exchanges. Exchange is responsible for sending messages to one/more queues that are registered to this exchange.

The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules are defined by exchange type.

Currently RabbitMQ support following types of exchanges.
a.   Direct
b.   Fanout
c.    Topic and
d.   Headers.

Direct Exchange
It routes messages with a routing key equal to the routing key declared by the binding queue.
        
As shown in above figure, if producer sent a message with routing key advertisement, to direct exchange it sends to advertisement queue. Every queue is automatically bound to a default exchange using a routing key equal to the queue name. 

Fanout Exchange
Fanout exchange routes messages to all the registered queues.
Topic Exchange
The Topic exchange type routes messages to queues whose routing key matches to all, or a portion of a routing key.
Header Exchanges
The Headers exchange type routes messages based upon a matching of message headers to the expected headers specified by the binding queue.

Deliver same message to multiple consumers
We can deliver same message to multiple consumers by using fanout exchange.

a. Declare fanout exchange
You can declare an exchange by using following statement.
channel.exchangeDeclare("simple_fanout_exchange", "fanout");

First parameter is the exchange name and second parameter is the type of exchange. Fanout exchange simply broadcast the messages to all the queues it knows.

b. Bind queues to this exchange
channel.queueBind(queueName, "simple_fanout_exchange ", "");

c. Publish messages to exchange
After creating an exchange, we need to publish messages to exchange.

channel.basicPublish( "simple_fanout_exchange ", "", null, message.getBytes());

Above statement publish messages to exchange.


Following Producer program, creates an exchanger of type fanout and register 3 queues (queue1, queue2, queue3) to this exchanger. Whenever producer sends message to this exchanger, it is delivered to all the queues.
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

 private final static String QUEUE_NAME_1 = "queue1";
 private final static String QUEUE_NAME_2 = "queue2";
 private final static String QUEUE_NAME_3 = "queue3";

 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] argv) throws Exception {
  Connection connection = getConnection();

  Channel channel = connection.createChannel();

  /* Create exchanger */
  channel.exchangeDeclare("simple_fanout_exchange", "fanout");

  channel.queueBind(QUEUE_NAME_1, "simple_fanout_exchange", "");
  channel.queueBind(QUEUE_NAME_2, "simple_fanout_exchange", "");
  channel.queueBind(QUEUE_NAME_3, "simple_fanout_exchange", "");

  String message = "Hello world";

  /* Public message to exchanger */
  channel.basicPublish("simple_fanout_exchange", "", null,
    message.getBytes());

  /* Close channel and connection */
  channel.close();
  connection.close();
 }
}


Following is a consumer program, which takes queue name as command line argument and waiting on that queue.
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] args) throws Exception {
  Connection connection = getConnection();
  Channel channel = connection.createChannel();
  String queueName = args[0];

  /* Create queue if not exist */
  channel.queueDeclare(queueName, false, false, false, null);

  System.out.println("Waiting for tasks");

  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
     AMQP.BasicProperties properties, byte[] body)
     throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received task " + message);
   }
  };
  channel.basicConsume(queueName, consumer);
 

 }
}


Compile Producer and Receiver programs.
javac -cp rabbitmq-client.jar Receiver.java Producer.java

Open new terminal and runs Receiver program by passing queue1 as argument.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue1

Open new terminal and runs Receiver program by passing queue2 as argument.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue2
  
Open new terminal and runs Receiver program by passing queue3 as argument.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue3

Run Producer program, you will get HelloWorld message on all consumer terminals.

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer



Prevoius                                                 Next                                                 Home

No comments:

Post a Comment