In previous post, I explained how to broadcast messages to all queues. In this post I am
going to explain how to send messages to queues selectively. We can achieve
this by using exchange of type direct.
Direct
exchange routes messages with a routing key equal to the routing key declared
by the binding queue.
Suppose I am
maintaining 3 kind of log files.
a.
Information
log
b.
Warning
log
c.
Error
log
When
publisher sends message to exchange, If message is of type INFO, it should go
to information log, if message of type WARN, it should go to warning log, if message
is of type ERROR, it should go to error log.
Following
figure depicts this.
Step 1: Declare exchange of type direct.
exchangeDeclare(String
exchange, String type) throws IOException;
Ex:
channel.exchangeDeclare("log_exchanger",
"direct");
Step 2: Bind queues using queueBind method.
queueBind(String
queue, String exchange, String routingKey) throws IOException;
Ex:
channel.queueBind(INFO_QUEUE,
"log_exchanger", "INFO");
channel.queueBind(WARN_QUEUE,
"log_exchanger", "WARN");
channel.queueBind(ERROR_QUEUE,
"log_exchanger", "ERROR");
Step 3: Publish message by specifying routing key.
basicPublish(String
exchange, String routingKey, BasicProperties props, byte[] body)
Ex:
channel.basicPublish("log_exchanger",
"INFO", null,message.getBytes());
Step 4: At Consumer end, we need to bind queues.
channel.queueBind(queueName,
"log_exchanger", severity);
Following is the Producer program.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String INFO_QUEUE = "info_queue"; private final static String ERROR_QUEUE = "error_queue"; private final static String WARN_QUEUE = "warn_queue"; private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] argv) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); /* Create exchanger */ channel.exchangeDeclare("log_exchanger", "direct"); channel.queueDeclare(INFO_QUEUE, false, false, false, null); channel.queueDeclare(WARN_QUEUE, false, false, false, null); channel.queueDeclare(ERROR_QUEUE, false, false, false, null); /* Bind queues to this exchanger */ channel.queueBind(INFO_QUEUE, "log_exchanger", "INFO"); channel.queueBind(WARN_QUEUE, "log_exchanger", "WARN"); channel.queueBind(ERROR_QUEUE, "log_exchanger", "ERROR"); for (int i = 1; i < 10; i++) { String infoMessage = "InforMation message " + i; String warnMessage = "Warning message " + i; String errorMessage = "Error message " + i; /* Public message to exchanger */ channel.basicPublish("log_exchanger", "INFO", null, infoMessage.getBytes()); channel.basicPublish("log_exchanger", "WARN", null, warnMessage.getBytes()); channel.basicPublish("log_exchanger", "ERROR", null, errorMessage.getBytes()); } /* Close channel and connection */ channel.close(); connection.close(); } }
Following is the Consumer program.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receiver { private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); String queueName = args[0]; String severity = args[1]; /* Create exchanger */ channel.exchangeDeclare("log_exchanger", "direct"); /* Create queue if not exist */ channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, "log_exchanger", severity); System.out.println("Waiting for tasks"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] " + message); } }; channel.basicConsume(queueName, consumer); } }
Compile Producer and Receiver files.
javac -cp
rabbitmq-client.jar Receiver.java Producer.java
Open new
terminal and run information messages receiver.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
info_queue INFO
Open new
terminal and run error messages receiver.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
error_queue ERROR
Open new
terminal and run warning messages receiver.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver warn_queue
WARN
It is time
to run producer.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer
Once you ran
producer, you will get following outputs, in corresponding receiver windows.
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
info_queue INFO
Waiting for
tasks
[x] InforMation message 1
[x] InforMation message 2
[x] InforMation message 3
[x] InforMation message 4
[x] InforMation message 5
[x] InforMation message 6
[x] InforMation message 7
[x] InforMation message 8
[x] InforMation message 9
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
error_queue ERROR
Waiting for
tasks
[x] Error message 1
[x] Error message 2
[x] Error message 3
[x] Error message 4
[x] Error message 5
[x] Error message 6
[x] Error message 7
[x] Error message 8
[x] Error message 9
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
warn_queue WARN
Waiting for
tasks
[x] Warning message 1
[x] Warning message 2
[x] Warning message 3
[x] Warning message 4
[x] Warning message 5
[x] Warning message 6
[x] Warning message 7
[x] Warning message 8
[x] Warning message 9
No comments:
Post a Comment