In this
post, I am going to explain how to send same message to multiple consumers. In
this post, you are going to learn new term called ‘exchanges’.
Exchange
In previous
posts, we are sending messages to the queue directly. In basic model of
RabbitMQ, producer sends messages to exchanges. Exchange is responsible for
sending messages to one/more queues that are registered to this exchange.
The exchange
must know exactly what to do with a message it receives. Should it be appended
to a particular queue? Should it be appended to many queues? Or should it get
discarded. The rules are defined by exchange type.
Currently
RabbitMQ support following types of exchanges.
a.
Direct
b.
Fanout
c.
Topic
and
d.
Headers.
Direct Exchange
It routes
messages with a routing key equal to the routing key declared by the binding
queue.
As shown in
above figure, if producer sent a message with routing key advertisement, to
direct exchange it sends to advertisement queue. Every queue is automatically
bound to a default exchange using a routing key equal to the queue name.
Fanout Exchange
Fanout
exchange routes messages to all the registered queues.
Topic Exchange
The Topic
exchange type routes messages to queues whose routing key matches to all, or a
portion of a routing key.
Header Exchanges
The Headers
exchange type routes messages based upon a matching of message headers to the
expected headers specified by the binding queue.
Deliver same message to multiple consumers
We can
deliver same message to multiple consumers by using fanout exchange.
a. Declare fanout exchange
You can
declare an exchange by using following statement.
channel.exchangeDeclare("simple_fanout_exchange",
"fanout");
First
parameter is the exchange name and second parameter is the type of exchange.
Fanout exchange simply broadcast the messages to all the queues it knows.
b. Bind queues to this exchange
channel.queueBind(queueName,
"simple_fanout_exchange ", "");
c. Publish messages to exchange
After
creating an exchange, we need to publish messages to exchange.
channel.basicPublish(
"simple_fanout_exchange ", "", null, message.getBytes());
Above
statement publish messages to exchange.
Following
Producer program, creates an exchanger of type fanout and register 3 queues
(queue1, queue2, queue3) to this exchanger. Whenever producer sends message to
this exchanger, it is delivered to all the queues.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String QUEUE_NAME_1 = "queue1"; private final static String QUEUE_NAME_2 = "queue2"; private final static String QUEUE_NAME_3 = "queue3"; private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] argv) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); /* Create exchanger */ channel.exchangeDeclare("simple_fanout_exchange", "fanout"); channel.queueBind(QUEUE_NAME_1, "simple_fanout_exchange", ""); channel.queueBind(QUEUE_NAME_2, "simple_fanout_exchange", ""); channel.queueBind(QUEUE_NAME_3, "simple_fanout_exchange", ""); String message = "Hello world"; /* Public message to exchanger */ channel.basicPublish("simple_fanout_exchange", "", null, message.getBytes()); /* Close channel and connection */ channel.close(); connection.close(); } }
Following is
a consumer program, which takes queue name as command line argument and waiting
on that queue.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receiver { private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); String queueName = args[0]; /* Create queue if not exist */ channel.queueDeclare(queueName, false, false, false, null); System.out.println("Waiting for tasks"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received task " + message); } }; channel.basicConsume(queueName, consumer); } }
Compile
Producer and Receiver programs.
javac -cp
rabbitmq-client.jar Receiver.java Producer.java
Open new
terminal and runs Receiver program by passing queue1 as argument.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue1
Open new
terminal and runs Receiver program by passing queue2 as argument.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue2
Open new
terminal and runs Receiver program by passing queue3 as argument.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver queue3
Run Producer
program, you will get HelloWorld message on all consumer terminals.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer
No comments:
Post a Comment