Sunday, 5 August 2018

JMS: Message Selectors

Message selectors are used to filter the messages. While creating a consumer, you can specify the selection criteria of the messages.

For example, there is an order processing queue in an organization.

Consumer 'C1' is interested in processing orders of <= 100$.

Consumer 'C2' is interested in processing orders of > 100$ & < 1000$

Consumer 'C3' is interested in processing orders of >= 1000$.

By specifying the selection criteria at the time of consumer creation, consumer will receive the messages that they are interested in. JMS provider (like ActiveMQ) filter these messages and send only the consumer is interested in.

Ex
MessageConsumer C3 = session.createConsumer(destination, "orderCost >= 1000");

JMSProducer.java
package com.sample.producer;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducer {
 private static final String JMS_PROVIDER_URL = "tcp://127.0.0.1:5050";
 private static final String QUEUE = "DEMO_QUEUE";

 /* Initialized using double brace initialization */
 private static Map<String, Double> orderMap = new HashMap() {
  {

   put("CustomerId:C123", 100.05);
   put("CustomerId:C124", 2100.12);
   put("CustomerId:C125", 200.56);
   put("CustomerId:C126", 1100.98);
   put("CustomerId:C127", 300.87);

  }
 };

 public static void main(String[] args) throws JMSException {
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(JMS_PROVIDER_URL);
  Connection connection = connectionFactory.createConnection();
  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Destination destination = session.createQueue(QUEUE);

  MessageProducer producer = session.createProducer(destination);

  Set<String> customerIds = orderMap.keySet();

  for (String customerId : customerIds) {
   TextMessage message = session.createTextMessage("CustomerId:" + customerId);
   System.out.println(orderMap.get(customerId));
   double doubelValue = orderMap.get(customerId).doubleValue();
   message.setDoubleProperty("orderCost", doubelValue);
   producer.send(message);
  }

  connection.close();

 }
}

JMSConsumer.java

package com.sample.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {
 private static final String JMS_PROVIDER_URL = "tcp://127.0.0.1:5050";
 private static final String QUEUE = "DEMO_QUEUE";

 public static void main(String args[]) throws JMSException {

  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(JMS_PROVIDER_URL);
  Connection connection = connectionFactory.createConnection();
  connection.start();

  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

  Destination destination = session.createQueue(QUEUE);

  MessageConsumer consumer = session.createConsumer(destination, "orderCost >= 1000");

  System.out.println("CustomerId : OrderCost");
  try {
   while (true) {
    Message message = consumer.receive();

    if (message instanceof TextMessage) {
     TextMessage textMessage = (TextMessage) message;
     String msg = textMessage.getText();
     String customerId = msg.split(":")[1];
     Double orderCost = message.getDoubleProperty("orderCost");
     
     System.out.println(customerId + " : " + orderCost);
    }
   }
  } finally {
   connection.close();
  }

 }

}

Run JMSproducer followed by JMSConsumer application, you can able to see below output.

CustomerId : OrderCost
CustomerId : 2100.12
CustomerId : 1100.98


Note
a.   On Queues, only messages that match the selector will be returned. Others stay in the queue (and thus can be read by a MessageConsumer with different selector).
b.   On Topics, messages that do not match the selector will be ignored as if they have not been published.


In my next posts, I am going to explain about message selectors syntax.




Previous                                                 Next                                                 Home

No comments:

Post a Comment