In previous tutorial, I explained simple Producer-Receiver application. In this post, you
are going to learn single producer and multiple consumers model.
For
simplicity purpose, Producer generates a random number between 1 to 10 (assume
number 1 represents task1, 2 represents task2 …. 10 represents task10) and send
it to message queue. Consumers read message from queue and executes specific
task.
Assume
task 1 takes 1 second to execute.
task 1 takes 1 second to execute.
Task2 takes
2 seconds
...
...
task 10 takes 10 seconds.
...
...
task 10 takes 10 seconds.
In case of single producer and consumer model, all tasks are executed by single consumer.
In this model, tasks will be shared between them.
If producer
submits 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 to message queue, and we have consumers
C1, C2, C3, C4. Then tasks are given to consumers in Round-Robbin fashion.
C1 consumes
task 1
C2 consumes
task 2
C3 consumes
task 3
C4 consumes
task 4
C1 consumes
task 5
C2 consumes
task 6
C3 consumes
task 7
C4 consumes
task 8
C1 consumes
task 9
C2 consumes
task 10
Following
code develops a Producer who generates tasks and submit to queue.
Producer.java
import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME = "hello"; private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] argv) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); /* Create queue if not exist */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); Random rand = new Random(); for (int i = 1; i <= 10; i++) { String message = "" + i; /* Public message to queue */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Submitted task " + i); } /* Close channel and connection */ channel.close(); connection.close(); } }
Receiver.java
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receiver { private final static String QUEUE_NAME = "hello"; private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] argv) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); /* Create queue if not exist */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for tasks"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received task " + message); try { Thread.sleep(Integer.parseInt(message) * 1000); } catch (NumberFormatException | InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
Compile
Producer and Receiver java files.
javac -cp rabbitmq-client.jar Receiver.java
Producer.java
Open 4
terminals and run consumer process using following command.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Open new
terminal and run Producer process.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer
Submitted task 1
Submitted task 2
Submitted task 3
Submitted task 4
Submitted task 5
Submitted task 6
Submitted task 7
Submitted task 8
Submitted task 9
Submitted task 10
On terminal1
(where receiver1 running), you will get following output.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for
tasks
[x] Received task 1
[x] Received task 5
[x] Received task 9
On terminal2
(where receiver2 running), you will get following output.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for
tasks
[x] Received task 2
[x] Received task 6
[x] Received task 10
On terminal3
(where receiver3 running), you will get following output.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for
tasks
[x] Received task 3
[x] Received task 7
On terminal4
(where receiver4 running), you will get following output.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
Waiting for
tasks
[x] Received task 4
[x] Received task 8
Note:
1. To make
messages acknowledged, use following statements.
channel.basicConsume(QUEUE_NAME,
true, consumer);
thanks for sharing
ReplyDeletejava -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:amqp-client-5.6.0.jar:slf4j-simple-1.7.26.jar:slf4j-api-1.7.26.jar Receiver
for running