Saturday, 19 September 2015

RabbitMQ: One Producer and multiple consumers

In previous tutorial, I explained simple Producer-Receiver application. In this post, you are going to learn single producer and multiple consumers model.

For simplicity purpose, Producer generates a random number between 1 to 10 (assume number 1 represents task1, 2 represents task2 …. 10 represents task10) and send it to message queue. Consumers read message from queue and executes specific task.

Assume 
task 1 takes 1 second to execute.
Task2 takes 2 seconds 
            ...
            ...
task 10 takes 10 seconds.

In case of single producer and consumer model, all tasks are executed by single consumer. In this model, tasks will be shared between them.

If producer submits 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 to message queue, and we have consumers C1, C2, C3, C4. Then tasks are given to consumers in Round-Robbin fashion.

C1 consumes task 1
C2 consumes task 2
C3 consumes task 3
C4 consumes task 4
C1 consumes task 5
C2 consumes task 6
C3 consumes task 7
C4 consumes task 8
C1 consumes task 9
C2 consumes task 10

Following code develops a Producer who generates tasks and submit to queue.


Producer.java

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

 private final static String QUEUE_NAME = "hello";
 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] argv) throws Exception {
  Connection connection = getConnection();

  Channel channel = connection.createChannel();

  /* Create queue if not exist */
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  Random rand = new Random();

  for (int i = 1; i <= 10; i++) {
   String message = "" + i;

   /* Public message to queue */
   channel.basicPublish("", QUEUE_NAME, null,
     message.getBytes("UTF-8"));

   System.out.println("Submitted  task " + i);
  }

  /* Close channel and connection */
  channel.close();
  connection.close();
 }
}


Receiver.java

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receiver {
 private final static String QUEUE_NAME = "hello";
 private static final String HOST_NAME = "127.0.0.1";

 /* get connection instance to connect to RabbitMQ broker */
 private static Connection getConnection() throws IOException,
   TimeoutException {
  /* Get factory instance to open connection to RabbitMQ broker */
  ConnectionFactory factory = new ConnectionFactory();

  factory.setHost(HOST_NAME);

  /* Create new broker connection */
  Connection connection = factory.newConnection();

  return connection;
 }

 public static void main(String[] argv) throws Exception {
  Connection connection = getConnection();
  Channel channel = connection.createChannel();

  /* Create queue if not exist */
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  System.out.println("Waiting for tasks");

  Consumer consumer = new DefaultConsumer(channel) {
   @Override
   public void handleDelivery(String consumerTag, Envelope envelope,
     AMQP.BasicProperties properties, byte[] body)
     throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received task " + message);
    try {
     Thread.sleep(Integer.parseInt(message) * 1000);
    } catch (NumberFormatException | InterruptedException e) {
     e.printStackTrace();
    }
   }
  };
  channel.basicConsume(QUEUE_NAME, true, consumer);

 }
}


Compile Producer and Receiver java files.
javac -cp rabbitmq-client.jar Receiver.java Producer.java

Open 4 terminals and run consumer process using following command.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver

Open new terminal and run Producer process.
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer
Submitted  task 1
Submitted  task 2
Submitted  task 3
Submitted  task 4
Submitted  task 5
Submitted  task 6
Submitted  task 7
Submitted  task 8
Submitted  task 9
Submitted  task 10

On terminal1 (where receiver1 running), you will get following output.
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for tasks
 [x] Received task 1
 [x] Received task 5
 [x] Received task 9

On terminal2 (where receiver2 running), you will get following output.
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for tasks
 [x] Received task 2
 [x] Received task 6
 [x] Received task 10

On terminal3 (where receiver3 running), you will get following output.
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for tasks
 [x] Received task 3
 [x] Received task 7

On terminal4 (where receiver4 running), you will get following output.
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for tasks
 [x] Received task 4
 [x] Received task 8

Note:
1. To make messages acknowledged, use following statements.
channel.basicConsume(QUEUE_NAME, true, consumer);



Prevoius                                                 Next                                                 Home

1 comment:

  1. thanks for sharing

    java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:amqp-client-5.6.0.jar:slf4j-simple-1.7.26.jar:slf4j-api-1.7.26.jar Receiver

    for running

    ReplyDelete