Friday 12 September 2014

DelayQueue

It is an unbounded blocking queue. DelayQueue stores elements of type 'java.util.concurrent.Delayed'. Element in DelayQueue is taken out when its delay is expired. Expiration occurs when an element's getDelay method returns a value less than or equal to zero.

If you want to place an element into DelayQueue, then that element must implement Delayed interface.

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

getDelay method returns remaining delay. If the value return by 'getDelay' method is less than or equal to zero, then The element considered to be expired and removed at the next take().

As you observe, Delayed interface extends Comparable interface, so You must provide implementation for 'compareTo' method also.
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayedElement implements Delayed{

    int id, delay;
    long startTime;
    
 
    public DelayedElement(int id, int delay) {
        this.id = id;
        this.startTime = System.currentTimeMillis() + delay;
        this.delay = delay;
    }
 
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
 
    @Override
    public int compareTo(Delayed o) {
        if (this.startTime < ((DelayedElement) o).startTime) {
            return -1;
        }
        if (this.startTime > ((DelayedElement) o).startTime) {
            return 1;
        }
        return 0;
    }
 
    @Override
    public String toString() {
        return "{" +
                "data='" + id + '\'' +
                ", startTime=" + startTime +
                '}';
    }
}

import java.util.concurrent.DelayQueue;
import java.util.Random;;

public class Producer implements Runnable{
    DelayQueue<DelayedElement> queue;
    Random rand = new Random();
    
    Producer(DelayQueue q){
        queue = q;
    }
    
    @Override
    public void run(){
        int i=0;
        int delay;
        
        while(true){
            i++;
            delay = rand.nextInt(10000);
            DelayedElement obj = new DelayedElement(i, delay);
            System.out.println("Producer : " + obj.id +" " + obj.delay);
            queue.put(obj);
        }
    }
}

import java.util.concurrent.DelayQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable{
    DelayQueue<DelayedElement> queue;
    
    Consumer(DelayQueue q){
        queue = q;
    }
    
    public void run(){
        while(true){
            try {
                DelayedElement obj = queue.take();
                System.out.println("Consumer : " + obj.id +" " + obj.delay);
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}


import java.util.concurrent.DelayQueue;

public class DelayedQueueEx {
    public static void main(String args[]){
        DelayQueue q = new DelayQueue();
        
        Producer p1 = new Producer(q);
        
        Consumer con = new Consumer(q);
        
        Thread t1 = new Thread(p1);
        
        Thread t3 = new Thread(con);
        
        t1.start();
        t3.start();
    }
}


Prevoius                                                 Next                                                 Home

No comments:

Post a Comment