Saturday, 13 September 2014

PriorityBlockingQueue

What is Priority Queue?
Priority Queue is a queue kind of data structure, each element has a "priority" associated with it. In a priority queue, an element with high priority is served before an element with low priority. If two elements have the same priority, they are served according to their order in the queue.

What is priority Blocking Queue?
Priority blocking queue is thread safe implementation of priority queue.

In this post, I am going to explain Java Priority Blocking queue implementation. Java Collections has 'java.util.concurrent.PriorityBlockingQueue<E>' class that implement priority blocking queue.

Is java.util.concurrent.PriorityBlockingQueue support null elements?
No PriorityBlockingQueue don’t support null elements.

PriorityBlockingQueue() class provides following constructors to instantiate.
Constructor
Description
PriorityBlockingQueue()
Creates a PriorityBlockingQueue with the default initial capacity (11) that orders its elements according to their natural ordering.
PriorityBlockingQueue(Collection<? extends E> c)
Creates a PriorityBlockingQueue containing the elements in the specified collection.
PriorityBlockingQueue(int initialCapacity)
Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to their natural ordering.
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
Creates a PriorityBlockingQueue with the specified initial capacity that orders its elements according to the specified comparator.

Let me explain with an example, suppose you want to process collection of events, where each event has priority. To demonstrate our example, I am assuming there are 5 different kinds of priorities.

a.   LOWEST
b.    LOWER
c.    MEDIUM
d.    HIGHER
e.    HIGHEST

The events with HIGHEST priority must execute first followed by HIGHER events followed by MEDIUM events followed by LOWER events followed by LOWEST events. In this kinds of scenarios, we require priority queue kind of implementation to solve.


Following is the complete working application.
public enum EventPriorities {
 LOWEST(0), LOWER(1), MEDIUM(2), HIGHER(3), HIGHEST(4);

 int priority;

 EventPriorities(int priority) {
  this.priority = priority;
 }

 public int getPriority() {
  return priority;
 }

 public void setPriority(int priority) {
  this.priority = priority;
 }

}

public class Event {
 private EventPriorities eventPriority;
 private String eventName;

 public Event(EventPriorities eventPriority, String eventName) {
  this.eventPriority = eventPriority;
  this.eventName = eventName;
 }

 public EventPriorities getEventPriority() {
  return eventPriority;
 }

 @Override
 public String toString() {
  StringBuilder builder = new StringBuilder();
  builder.append("Event [eventName=").append(eventName).append("] Executed");
  return builder.toString();
 }

}

import java.util.Comparator;

public class EventComparator implements Comparator<Event> {

 public int compare(Event event1, Event event2) {
  return event2.getEventPriority().getPriority() - event1.getEventPriority().getPriority();
 }

}

import java.util.concurrent.PriorityBlockingQueue;

public class EventPriorityQueue<T> {

 private PriorityBlockingQueue<T> queue = new PriorityBlockingQueue(50, new EventComparator());

 public boolean enqueEvent(T event) {
  return queue.add(event);
 }

 public T getEvent() throws InterruptedException {
  return queue.take();
 }
}

public class PriorityBlockingQueueTest {
 
 private static void printEvent(Event event){
  if(event.getEventPriority() == EventPriorities.LOWEST){
   System.out.println("Lowest priority event " + event);
  }else if(event.getEventPriority() == EventPriorities.LOWER){
   System.out.println("Lower priority event" + event);
  }else if(event.getEventPriority() == EventPriorities.MEDIUM){
   System.out.println("Medium priority event" + event);
  }else if(event.getEventPriority() == EventPriorities.HIGHER){
   System.out.println("Higher priority event" + event);
  }else if(event.getEventPriority() == EventPriorities.HIGHEST){
   System.out.println("Highest priority event" + event);
  }else{
   System.out.println("Wrong Event");
  }
 }

 public static void main(String args[]) throws InterruptedException {
  Event lowest = new Event(EventPriorities.LOWEST, "event_1");
  Event lower = new Event(EventPriorities.LOWER, "event_2");
  Event middle = new Event(EventPriorities.MEDIUM, "event_3");
  Event higher = new Event(EventPriorities.HIGHER, "event_4");
  Event highest = new Event(EventPriorities.HIGHEST, "event_5");

  EventPriorityQueue<Event> priorityQueue = new EventPriorityQueue<>();

  priorityQueue.enqueEvent(lowest);
  priorityQueue.enqueEvent(lower);
  priorityQueue.enqueEvent(middle);
  priorityQueue.enqueEvent(higher);
  priorityQueue.enqueEvent(highest);
  
  while(true){
   Event event = priorityQueue.getEvent();
   printEvent(event);
  }

 }
}


Run ‘PriorityBlockingQueueTest.java’, you can able to see following output.
Highest priority eventEvent [eventName=event_5] Executed
Higher priority eventEvent [eventName=event_4] Executed
Medium priority eventEvent [eventName=event_3] Executed
Lower priority eventEvent [eventName=event_2] Executed
Lowest priority event Event [eventName=event_1] Executed

What is the problem with above application?
There is a problem with above application, what if more than one events has same priority. Let’s give a try.


Update ‘PriorityBlockingQueueTest’ class like below and re run the application.
public class PriorityBlockingQueueTest {

 private static void printEvent(Event event) {
  if (event.getEventPriority() == EventPriorities.LOWEST) {
   System.out.println("Lowest priority event " + event);
  } else if (event.getEventPriority() == EventPriorities.LOWER) {
   System.out.println("Lower priority event" + event);
  } else if (event.getEventPriority() == EventPriorities.MEDIUM) {
   System.out.println("Medium priority event" + event);
  } else if (event.getEventPriority() == EventPriorities.HIGHER) {
   System.out.println("Higher priority event" + event);
  } else if (event.getEventPriority() == EventPriorities.HIGHEST) {
   System.out.println("Highest priority event" + event);
  } else {
   System.out.println("Wrong Event");
  }
 }

 public static void main(String args[]) throws InterruptedException {
  Event event1 = new Event(EventPriorities.HIGHEST, "event_1");
  Event event2 = new Event(EventPriorities.HIGHEST, "event_2");
  Event event3 = new Event(EventPriorities.HIGHEST, "event_3");
  Event event4 = new Event(EventPriorities.HIGHEST, "event_4");
  Event event5 = new Event(EventPriorities.HIGHEST, "event_5");

  EventPriorityQueue<Event> priorityQueue = new EventPriorityQueue<>();

  priorityQueue.enqueEvent(event1);
  priorityQueue.enqueEvent(event2);
  priorityQueue.enqueEvent(event3);
  priorityQueue.enqueEvent(event4);
  priorityQueue.enqueEvent(event5);

  while (true) {
   Event event = priorityQueue.getEvent();
   printEvent(event);
  }

 }
}


I got following kind of output.
Highest priority eventEvent [eventName=event_1] Executed
Highest priority eventEvent [eventName=event_5] Executed
Highest priority eventEvent [eventName=event_4] Executed
Highest priority eventEvent [eventName=event_3] Executed
Highest priority eventEvent [eventName=event_2] Executed

But I inserted events in the order 1, 2, 3, 4, 5. But the execution order is 1, 5, 4, 3 and 2. To solve this problem, you need to give sequence number to the events and if any two events has same priority, you need to use this sequence number to resolve which event should execute first.


Update Event and EventComparator classes like below and re run PriorityBlockingQueueTest class.
import java.util.concurrent.atomic.AtomicInteger;

public class Event {
 private EventPriorities eventPriority;
 private String eventName;
 private long sequenceNumber;
 private static final AtomicInteger seq = new AtomicInteger(0);

 public Event(EventPriorities eventPriority, String eventName) {
  this.eventPriority = eventPriority;
  this.eventName = eventName;
  this.sequenceNumber = seq.getAndIncrement();
 }

 public EventPriorities getEventPriority() {
  return eventPriority;
 }

 public long getSequenceNumber() {
  return sequenceNumber;
 }

 public void setSequenceNumber(long sequenceNumber) {
  this.sequenceNumber = sequenceNumber;
 }

 @Override
 public String toString() {
  StringBuilder builder = new StringBuilder();
  builder.append("Event [eventName=").append(eventName).append("] Executed");
  return builder.toString();
 }

}

import java.util.Comparator;

public class EventComparator implements Comparator<Event> {

 public int compare(Event event1, Event event2) {
  int res = event2.getEventPriority().getPriority() - event1.getEventPriority().getPriority();

  if (res == 0) {
   return (int) (event1.getSequenceNumber() - event2.getSequenceNumber());
  }

  return res;
 }

}

Re run ‘PriorityBlockingQueueTest.java’, you can able to see following output.
Highest priority eventEvent [eventName=event_1] Executed
Highest priority eventEvent [eventName=event_2] Executed
Highest priority eventEvent [eventName=event_3] Executed
Highest priority eventEvent [eventName=event_4] Executed
Highest priority eventEvent [eventName=event_5] Executed





Prevoius                                                 Next                                                 Home

No comments:

Post a Comment