Monday, 15 September 2014

SynchronousQueue

Synchronous Queue implements Blocking Queue. One thing to remember here is it has no capacity, even to store one element also. In Synchronous Queue each insert operation must wait for a corresponding remove operation by another thread. It only allows taking from the queue when another process is currently trying to put in the queue.

For Example:
Thread t1 tries to put in the queue. This blocks for now. Thread t2 tries to take from the queue. Since someone is trying to put, the item is transferred from t1 to t2, and both are unblocked.

Thread t2 tries to take from the queue, but no one tries to put. So t2 is now blocked. Thread t2 now wants to put an item. Now the item is transferred over to t2, and t1 and t2 are no longer blocked.

import java.util.concurrent.SynchronousQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable{
    SynchronousQueue<Integer> queue;
    
    Producer(SynchronousQueue q){
        queue = q;
    }
    
    @Override
    public void run(){
        int i=0;
        while(true){
            i++;
            try {
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+ " :"+  i);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            }
            
        }
    }
}

import java.util.concurrent.SynchronousQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable{
    SynchronousQueue<Integer> queue;
    
    Consumer(SynchronousQueue q){
        queue = q;
    }
    
    @Override
    public void run(){
        while(true){
            try {
                Thread.sleep(1000);
                System.out.println("Consumer : " + queue.take());
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
     
        }
    }
}

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueEx {
    public static void main(String args[]){
        SynchronousQueue q = new SynchronousQueue(true);
        
        Producer p1 = new Producer(q);
        Producer p2 = new Producer(q);
        
        Consumer con = new Consumer(q);
        
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        
        Thread t3 = new Thread(con);
        
        t1.setName("Producer1");
        t2.setName("Producer2");
        
        t1.start();
        t2.start();
        t3.start();             
    }
}

Output
Consumer : 1
Producer1 :1
Consumer : 1
Producer2 :1
Producer1 :2
Consumer : 2
Consumer : 2
Producer2 :2
Consumer : 3
Producer1 :3







Prevoius                                                 Next                                                 Home

No comments:

Post a Comment