In previous tutorial, I explained how to deleiver messages using direct exchange. In this
post I am going to explain, the usage of topic exchange.
Topic
exchange is similar to direct exchange, only difference is routing key is a
list of words delimited by dots. The words can be anything, for example, orders.asia.india,
orders.asia.china, orders.asia.*, orders.#
*, # are
special symbols, has special meaning.
‘*’ match
exactly any one word.
‘#’ match zero
or more words.
Suppose you
are working for an global e-commerce company. You want to log orders country
wise.
Routing key
|
Meaning
|
Orders.asia.india
|
Matches
all order messages specific to india
|
orders.asia.china
|
Matches
all order messages specific to china
|
orders.asia.*
|
Matches
all order messages specific to asia
|
orders.#
|
Matches
all order messages for this comapny
|
Following is
the Producer program.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private final static String INDIA_QUEUE = "india_orders"; private final static String CHINA_QUEUE = "china_orders"; private final static String ASIA_QUEUE = "asia_orders"; private final static String ALL_QUEUE = "aLL_orders"; private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] argv) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); /* Create exchanger */ channel.exchangeDeclare("orders_exchange", "topic"); channel.queueDeclare(INDIA_QUEUE, false, false, false, null); channel.queueDeclare(CHINA_QUEUE, false, false, false, null); channel.queueDeclare(ASIA_QUEUE, false, false, false, null); channel.queueDeclare(ALL_QUEUE, false, false, false, null); /* Bind queues to this exchanger */ channel.queueBind(INDIA_QUEUE, "orders_exchange", "orders.asia.india"); channel.queueBind(CHINA_QUEUE, "orders_exchange", "orders.asia.china"); channel.queueBind(ASIA_QUEUE, "orders_exchange", "orders.asia.*"); channel.queueBind(ALL_QUEUE, "orders_exchange", "orders.#"); String messageIndia = "India orders 1M$"; String messageChina = "China orders 2M$"; String messageBangladesh = "Asia orders 0.3M$"; String messageUK = "UK orders 10M$"; /* Publish orders */ channel.basicPublish("orders_exchange", "orders.asia.india", null, messageIndia.getBytes()); channel.basicPublish("orders_exchange", "orders.asia.china", null, messageChina.getBytes()); channel.basicPublish("orders_exchange", "orders.asia.bangladesh", null, messageBangladesh.getBytes()); channel.basicPublish("orders_exchange", "orders.europe.uk", null, messageUK.getBytes()); /* Close channel and connection */ channel.close(); connection.close(); } }
Following is
the Receiver (consumer) program.
import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receiver { private static final String HOST_NAME = "127.0.0.1"; /* get connection instance to connect to RabbitMQ broker */ private static Connection getConnection() throws IOException, TimeoutException { /* Get factory instance to open connection to RabbitMQ broker */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); /* Create new broker connection */ Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); String queueName = args[0]; String routingKey = args[1]; /* Create exchanger */ channel.exchangeDeclare("orders_exchange", "topic"); /* Create queue if not exist */ channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, "orders_exchange", routingKey); System.out.println("Waiting for tasks"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] " + message); } }; channel.basicConsume(queueName, consumer); } }
Compile
Producer and Receiver programs.
javac -cp
rabbitmq-client.jar Receiver.java Producer.java
Open new
terminal and run Receiver to receive orders information from India.
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Receiver india_orders orders.asia.india
Open new
terminal and run Receiver to receive orders information from china.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
china_orders orders.asia.china
Open new
terminal and run Receiver to receive orders information from all countries of
asia.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
asia_orders orders.asia.*
Open new
terminal and run Receiver to receive orders information from all countries.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
all_orders orders.#
Run Producer
program you will get orders of India to india_orders consumer, orders of china
to china_orders consumer, order of India, China, Bangladesh to asia_orders
consumer, order of India, China, Bangladesh, UK to all_orders consumer.
Use
following command to run producer program.
java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
india_orders orders.asia.india
Waiting for
tasks
[x] India orders 1M$
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
china_orders orders.asia.china
Waiting for
tasks
[x] China orders 2M$
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
asia_orders orders.asia.*
Waiting for
tasks
[x] India orders 1M$
[x] China orders 2M$
[x] Asia orders 0.3M$
$ java -cp
.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Receiver
all_orders orders.#
Waiting for
tasks
[x] India orders 1M$
[x] China orders 2M$
[x] Asia orders 0.3M$
[x] UK orders 10M$
No comments:
Post a Comment