Friday, 19 September 2014

ThreadPoolExecutor

ThreadPoolExecutor is an implementation of ExecutorService interface.

To know about Executor and ExecutorService follow the below link.
https://self-learning-java-tutorial.blogspot.com/2014/03/thread-pools_8.html
https://self-learning-java-tutorial.blogspot.com/2014/03/executor-interface.html
https://self-learning-java-tutorial.blogspot.com/2014/03/executorservice.html

ThreadPoolExecutor executes the given task whether it is Runnable/Callable using one of its pooled threads. Thread Pool Executor provides below constructors to create ThreadPoolExecutor object.

1. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

2. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

3. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)

4. ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

corePoolSize
When a new task is submitted to the thread pool and there are fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. That menas core threads are initially created and started only when new tasks arrive, you can override this behavior using prestartCoreThread() or prestartAllCoreThreads() methods.

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. You can change the corePoolSize, maximumPoolSize values dynamically using setCorePoolSize(int) and setMaximumPoolSize(int) methods.

maximumPoolSize
The maximum number of threads to allow in the pool

keepAliveTime
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime. If the pool needs more threads later, new threads will be created on demand. By default keepAliveTime policy applicable when number of threads in the pool are more than corePoolSize. You can apply this time-out policy to core threads as well using allowCoreThreadTimeOut(boolean) method.

BlockingQueue
It is used to transfer and hold the submitted tasks to the ThreadPoolExecutor. There are three cases arise like below.

a. If the number of threads in Thread Pool are less than corePoolSize, then the Executor always prefers adding a new thread rather than queuing, whenever a new task submits.

b. If the number of threads in Thread Pool are greater than or equal to corePoolSize, then the Executor always prefers queuing a request rather than adding a new thread.

c. If queue is full and Thread Pool capacity reaches maximumPoolSize, and no thread is idle in ThreadPool, then the task will be rejected.

RejectedExecutionHandler
There are two cases to reject a task.
   a. When the Executor shudown
   b. If queue is full and Thread Pool capacity reaches maximumPoolSize, and no thread is idle in ThreadPool, then the task will be rejected.
In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler.

There are four pre defined policies provided by ThreadPoolExecutor class for rejected tasks.
Policy Description
ThreadPoolExecutor.AbortPolicy Handler throws a runtime RejectedExecutionException upon rejection.
ThreadPoolExecutor.CallerRunsPolicy The thread that invokes execute itself runs the task.
ThreadPoolExecutor.DiscardPolicy Task that cannot be executed is simply dropped.
ThreadPoolExecutor.DiscardOldestPolicy If the executor is not shut down, the task at the head of the work queue is dropped

public class Task implements Runnable{
    
   int taskNum;
   
   Task(int taskNum){
       this.taskNum = taskNum;
   }
    
   @Override
   public void run(){
       System.out.println(Thread.currentThread().getName() +" Started " + taskNum);
       System.out.println(Thread.currentThread().getName() +" Finished " + taskNum);
   }
}

import java.util.concurrent.*;

public class ThreadPoolExecutorEx {
    public static void main(String args[]){
        
        /* Parameters initialization for ThreadPoolExecutor constructor */
        int corePoolSize = 3;
        int maximumPoolSize = 6;
        long keepAliveTime = 10;
        TimeUnit unit = TimeUnit.SECONDS;
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<> ();
        
        ThreadPoolExecutor pool;
        pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, queue);
        
       for(int i=0; i< 1000; i++){
           pool.submit(new Task(i));
       }
    }
}

Output
pool-1-thread-1 Started 0
pool-1-thread-1 Finished 0
pool-1-thread-3 Started 2
pool-1-thread-3 Finished 2
pool-1-thread-1 Started 3
pool-1-thread-1 Finished 3
pool-1-thread-1 Started 5
pool-1-thread-1 Finished 5
pool-1-thread-1 Started 6
pool-1-thread-1 Finished 6
pool-1-thread-1 Started 7
pool-1-thread-1 Finished 7
pool-1-thread-2 Started 1
pool-1-thread-1 Started 8




Prevoius                                                 Next                                                 Home

No comments:

Post a Comment