Tuesday 15 November 2016

Implement Thread Pool in Java

A thread pool is a predefined pool of threads, which always exists to perform a task. Instantiating a new thread for every new task is very costly. By using thread pool, we can define fixed set of threads, where each thread is assigned with a task, and once thread finishes its execution, it waits in the pool. If any new task submits to the thread pool, pool assign the task to any idle thread (that is waiting in the pool), if all the threads are busy in execution, then pool insert the new task to a queue. Whenever pool finds a free thread, it removes a task from pool queue and assign it to the thread.

One of my friend got this interview question. You need to implement a thread pool, which satisfies following requirements.
a.   You can define a pool of fixed threads
b.   You can submit a task to pool at an point of time
c.    You should get the number of active threads at any point of time
d.   You should remove a task from thread pool, if it is not started execution.
e.   You should get list of all current running tasks.
f.    Interrupt a task
g.   Stop all threads
h.   Task should return its execution status
i.     Tasks must execute in the order of their priority
j.    If any two tasks has same priority, then task with less sequence number should execute first

I implemented my thread pool like below. If you find any better implementation, feel free to add it in comments.

ThreadPoolService.java
package threadPool;

import java.util.List;

public interface ThreadPoolService {
 /**
  * Return total active threads of the pool.
  * 
  * @return
  */
 public int totalActiveThreads();

 /**
  * Return the maximum number of threads that can run at any point of time.
  * 
  * @return
  */
 public int maxThreads();

 /**
  * 
  * @param task
  * @return false if the task is null, else true
  */
 public boolean submitTask(Task task);

 /**
  * 
  * @return true, if the task is removed, else false
  */
 public boolean removeTask(Task task);

 /**
  * Remove all the tasks. Since this is queue implementation, there is a
  * possibility that same task existance.
  * 
  * @param task
  * @return
  */
 public boolean removeAllTasks(Task task);

 /**
  * Return all the current running tasks
  * 
  * @return
  */
 public List<Task> currentRunningTasks();

 /**
  * Interrupt given task.
  * 
  * @param task
  * @return true if the thread is interrupted, false if the task is null (or)
  *         no thread is running given task. use this method in caution.
  */
 public boolean interruptTask(Task task);

 /**
  * Start all the threads.
  * 
  */
 public void execute();

 /**
  * It stops all the threads execution, Once the threads are stopped, the
  * instance of ThreadPoolService is of no use.
  * 
  * @return
  */
 public boolean stopThreads();

}

Task.java
package threadPool;

public interface Task extends Runnable {

 /**
  * Get the task name
  * 
  * @return
  */
 public String getTaskName();

 /**
  * 
  * @return true if the task is in active, else false
  */
 public boolean isActive();

 /**
  * @return priority of the task
  */
 public int getPriority();

 /**
  * 
  * @return the sequence number of the task. If two tasks has same priority,
  *         then the task with less sequence number executes first.
  */
 public int getSequenceNumber();
}

CustomTask.java
package threadPool;

import java.util.concurrent.atomic.AtomicInteger;

public class CustomTask implements Task {
 private String name;
 private boolean active;
 private int priority = 0;
 private int sequenceNumber = Integer.MAX_VALUE;
 private static final AtomicInteger seq = new AtomicInteger(0);
 
 public CustomTask(String name) {
  this.name = name;
  this.priority = 0;
  this.sequenceNumber = seq.getAndIncrement();
 }
 
 public CustomTask(String name, int priority) {
  this.name = name;
  this.priority = priority;
  this.sequenceNumber = seq.getAndIncrement();
 }

 @Override
 public String getTaskName() {
  return name;
 }

 @Override
 public boolean isActive() {
  return active;
 }

 @Override
 public void run() {
  active = true;
  System.out.println("Executing the task " + name + ", priority " + priority);
  active = false;
 }

 @Override
 public int getPriority() {
  return priority;
 }

 @Override
 public int getSequenceNumber() {
  return sequenceNumber;
 }

}


TaskComparator.java
package threadPool;

import java.util.Comparator;

public class TaskComparator implements Comparator<Task> {

 @Override
 public int compare(Task task1, Task task2) {
  int res = task2.getPriority() - task1.getPriority();

  if (res == 0) {
   return (task1.getSequenceNumber() - task2.getSequenceNumber());
  }

  return res;
 }

}

ThreadPoolServiceImpl.java
package threadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;

public class ThreadPoolServiceImpl implements ThreadPoolService {
 private final List<CustomThread> workerThreads;
 private final BlockingQueue<Task> queue;
 private int activeThreads = 0;
 private final int MAX_THREADS;
 private List<Task> currentRunningTasks = Collections.synchronizedList(new ArrayList<Task>());

 public ThreadPoolServiceImpl(BlockingQueue<Task> queue, int maxThreads) {
  this.queue = queue;
  MAX_THREADS = maxThreads;
  workerThreads = new ArrayList<>(MAX_THREADS);
  initWorkerThreads();
 }

 private void initWorkerThreads() {
  for (int i = 0; i < MAX_THREADS; i++) {
   CustomThread thread = new CustomThread(queue, currentRunningTasks);
   workerThreads.add(thread);
  }
 }

 @Override
 public int totalActiveThreads() {
  return activeThreads;
 }

 @Override
 public int maxThreads() {
  return MAX_THREADS;
 }

 @Override
 public boolean submitTask(Task task) {
  if (task == null) {
   return false;
  }

  return queue.add(task);
 }

 @Override
 public boolean removeTask(Task task) {
  return queue.remove(task);
 }

 @Override
 public boolean removeAllTasks(Task task) {
  boolean status = false;
  while (queue.remove(task)) {
   status = true;
  }

  return status;
 }

 @Override
 public List<Task> currentRunningTasks() {
  return new ArrayList<>(currentRunningTasks);
 }

 @Override
 public boolean interruptTask(Task task) {
  if (task == null) {
   return false;
  }
  Optional<CustomThread> threadOpt = getTheThreadThatExecutingTheTask(task);

  if (!threadOpt.isPresent()) {
   return false;
  }

  CustomThread thread = threadOpt.get();
  thread.interrupt();
  return true;
 }

 /**
  * Return the thread that is running given task, if no thread is running
  * given task, then it returns empty.
  * 
  * @param task
  * @return
  */
 private Optional<CustomThread> getTheThreadThatExecutingTheTask(Task task) {
  for (CustomThread thread : workerThreads) {
   Task currentRunningTask = thread.getCurrentRunningTask();
   if (currentRunningTask.equals(task)) {
    return Optional.of(thread);
   }
  }
  return Optional.empty();
 }

 public class CustomThread extends Thread {
  private BlockingQueue<Task> queue;
  private Task currentRunningTask;
  private List<Task> currentRunningTasks;
  private boolean isActive = false;
  private volatile boolean stopThread = false;

  public CustomThread(BlockingQueue<Task> queue, List<Task> currentRunningTasks) {
   this.queue = queue;
   this.currentRunningTasks = currentRunningTasks;
  }

  @Override
  public void run() {
   while (true) {
    /* Get the task from queue & execute */
    try {
     isActive = true;
     currentRunningTask = queue.take();
     currentRunningTasks.add(currentRunningTask);
     currentRunningTask.run();
     currentRunningTasks.remove(currentRunningTask);
    } catch (InterruptedException e) {
     e.printStackTrace();
    } finally {
     isActive = false;
    }

    if (stopThread) {
     System.out.println("Stopping the threads");
     break;
    }
   }
  }

  public Task getCurrentRunningTask() {
   return currentRunningTask;
  }

  public boolean isActive() {
   return isActive;
  }

  public synchronized void setStopThread(boolean stopThread) {
   this.stopThread = stopThread;
  }

 }

 @Override
 public void execute() {
  for (CustomThread thread : workerThreads) {
   System.out.println("Started the thread " + thread.getName());
   thread.start();
  }
 }

 @Override
 public boolean stopThreads() {
  for (CustomThread thread : workerThreads) {
   System.out.println("Stopping threads");
   thread.setStopThread(true);
  }
  return true;
 }

}

Application.java
package threadPool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Application {
 public static void main(String args[]) throws InterruptedException {
  TaskComparator taskComparator = new TaskComparator();
  BlockingQueue<Task> queue = new PriorityBlockingQueue<>(10, taskComparator);

  ThreadPoolService threadPool = new ThreadPoolServiceImpl(queue, 5);

  Task task1 = new CustomTask("task1", 2);
  Task task2 = new CustomTask("task2", 1);
  Task task3 = new CustomTask("task3", 3);
  Task task4 = new CustomTask("task4", 2);
  Task task5 = new CustomTask("task5", 5);
  Task task6 = new CustomTask("task6", 3);
  Task task7 = new CustomTask("task7", 7);
  Task task8 = new CustomTask("task8", 4);
  Task task9 = new CustomTask("task9", 9);
  Task task10 = new CustomTask("task10", 5);

  queue.add(task1);
  queue.add(task2);
  queue.add(task3);
  queue.add(task4);
  queue.add(task5);
  queue.add(task6);
  queue.add(task7);
  queue.add(task8);
  queue.add(task9);
  queue.add(task10);

  threadPool.execute();

  Thread.sleep(3000);

  threadPool.stopThreads();

  /*
   * Since I am using priority blocking queue implementation, I need to
   * make sure the threads are not blocked on queue. If thread goes to
   * blocking state, we can't interrupt.
   */
  queue.add(task6);
  queue.add(task7);
  queue.add(task8);
  queue.add(task9);
  queue.add(task10);

 }
}

Output
Started the thread Thread-0
Started the thread Thread-1
Started the thread Thread-2
Executing the task task9, priority 9
Started the thread Thread-3
Executing the task task7, priority 7
Executing the task task5, priority 5
Executing the task task10, priority 5
Executing the task task8, priority 4
Executing the task task3, priority 3
Executing the task task6, priority 3
Executing the task task1, priority 2
Executing the task task4, priority 2
Executing the task task2, priority 1
Started the thread Thread-4
Stopping threads
Stopping threads
Stopping threads
Stopping threads
Stopping threads
Executing the task task9, priority 9
Executing the task task10, priority 5
Executing the task task7, priority 7
Stopping the threads
Executing the task task8, priority 4
Stopping the threads
Executing the task task6, priority 3
Stopping the threads
Stopping the threads
Stopping the threads


No comments:

Post a Comment