A thread pool is a predefined pool of
threads, which always exists to perform a task. Instantiating a new thread for
every new task is very costly. By using thread pool, we can define fixed set of
threads, where each thread is assigned with a task, and once thread finishes
its execution, it waits in the pool. If any new task submits to the thread
pool, pool assign the task to any idle thread (that is waiting in the pool), if
all the threads are busy in execution, then pool insert the new task to a
queue. Whenever pool finds a free thread, it removes a task from pool queue and
assign it to the thread.
One
of my friend got this interview question. You need to implement a thread pool,
which satisfies following requirements.
a.
You
can define a pool of fixed threads
b.
You
can submit a task to pool at an point of time
c.
You
should get the number of active threads at any point of time
d.
You
should remove a task from thread pool, if it is not started execution.
e.
You
should get list of all current running tasks.
f.
Interrupt
a task
g.
Stop
all threads
h.
Task
should return its execution status
i.
Tasks
must execute in the order of their priority
j.
If
any two tasks has same priority, then task with less sequence number should execute
first
I
implemented my thread pool like below. If you find any better implementation,
feel free to add it in comments.
ThreadPoolService.java
package threadPool; import java.util.List; public interface ThreadPoolService { /** * Return total active threads of the pool. * * @return */ public int totalActiveThreads(); /** * Return the maximum number of threads that can run at any point of time. * * @return */ public int maxThreads(); /** * * @param task * @return false if the task is null, else true */ public boolean submitTask(Task task); /** * * @return true, if the task is removed, else false */ public boolean removeTask(Task task); /** * Remove all the tasks. Since this is queue implementation, there is a * possibility that same task existance. * * @param task * @return */ public boolean removeAllTasks(Task task); /** * Return all the current running tasks * * @return */ public List<Task> currentRunningTasks(); /** * Interrupt given task. * * @param task * @return true if the thread is interrupted, false if the task is null (or) * no thread is running given task. use this method in caution. */ public boolean interruptTask(Task task); /** * Start all the threads. * */ public void execute(); /** * It stops all the threads execution, Once the threads are stopped, the * instance of ThreadPoolService is of no use. * * @return */ public boolean stopThreads(); }
Task.java
package threadPool; public interface Task extends Runnable { /** * Get the task name * * @return */ public String getTaskName(); /** * * @return true if the task is in active, else false */ public boolean isActive(); /** * @return priority of the task */ public int getPriority(); /** * * @return the sequence number of the task. If two tasks has same priority, * then the task with less sequence number executes first. */ public int getSequenceNumber(); }
CustomTask.java
package threadPool; import java.util.concurrent.atomic.AtomicInteger; public class CustomTask implements Task { private String name; private boolean active; private int priority = 0; private int sequenceNumber = Integer.MAX_VALUE; private static final AtomicInteger seq = new AtomicInteger(0); public CustomTask(String name) { this.name = name; this.priority = 0; this.sequenceNumber = seq.getAndIncrement(); } public CustomTask(String name, int priority) { this.name = name; this.priority = priority; this.sequenceNumber = seq.getAndIncrement(); } @Override public String getTaskName() { return name; } @Override public boolean isActive() { return active; } @Override public void run() { active = true; System.out.println("Executing the task " + name + ", priority " + priority); active = false; } @Override public int getPriority() { return priority; } @Override public int getSequenceNumber() { return sequenceNumber; } }
TaskComparator.java
package threadPool; import java.util.Comparator; public class TaskComparator implements Comparator<Task> { @Override public int compare(Task task1, Task task2) { int res = task2.getPriority() - task1.getPriority(); if (res == 0) { return (task1.getSequenceNumber() - task2.getSequenceNumber()); } return res; } }
ThreadPoolServiceImpl.java
package threadPool; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; public class ThreadPoolServiceImpl implements ThreadPoolService { private final List<CustomThread> workerThreads; private final BlockingQueue<Task> queue; private int activeThreads = 0; private final int MAX_THREADS; private List<Task> currentRunningTasks = Collections.synchronizedList(new ArrayList<Task>()); public ThreadPoolServiceImpl(BlockingQueue<Task> queue, int maxThreads) { this.queue = queue; MAX_THREADS = maxThreads; workerThreads = new ArrayList<>(MAX_THREADS); initWorkerThreads(); } private void initWorkerThreads() { for (int i = 0; i < MAX_THREADS; i++) { CustomThread thread = new CustomThread(queue, currentRunningTasks); workerThreads.add(thread); } } @Override public int totalActiveThreads() { return activeThreads; } @Override public int maxThreads() { return MAX_THREADS; } @Override public boolean submitTask(Task task) { if (task == null) { return false; } return queue.add(task); } @Override public boolean removeTask(Task task) { return queue.remove(task); } @Override public boolean removeAllTasks(Task task) { boolean status = false; while (queue.remove(task)) { status = true; } return status; } @Override public List<Task> currentRunningTasks() { return new ArrayList<>(currentRunningTasks); } @Override public boolean interruptTask(Task task) { if (task == null) { return false; } Optional<CustomThread> threadOpt = getTheThreadThatExecutingTheTask(task); if (!threadOpt.isPresent()) { return false; } CustomThread thread = threadOpt.get(); thread.interrupt(); return true; } /** * Return the thread that is running given task, if no thread is running * given task, then it returns empty. * * @param task * @return */ private Optional<CustomThread> getTheThreadThatExecutingTheTask(Task task) { for (CustomThread thread : workerThreads) { Task currentRunningTask = thread.getCurrentRunningTask(); if (currentRunningTask.equals(task)) { return Optional.of(thread); } } return Optional.empty(); } public class CustomThread extends Thread { private BlockingQueue<Task> queue; private Task currentRunningTask; private List<Task> currentRunningTasks; private boolean isActive = false; private volatile boolean stopThread = false; public CustomThread(BlockingQueue<Task> queue, List<Task> currentRunningTasks) { this.queue = queue; this.currentRunningTasks = currentRunningTasks; } @Override public void run() { while (true) { /* Get the task from queue & execute */ try { isActive = true; currentRunningTask = queue.take(); currentRunningTasks.add(currentRunningTask); currentRunningTask.run(); currentRunningTasks.remove(currentRunningTask); } catch (InterruptedException e) { e.printStackTrace(); } finally { isActive = false; } if (stopThread) { System.out.println("Stopping the threads"); break; } } } public Task getCurrentRunningTask() { return currentRunningTask; } public boolean isActive() { return isActive; } public synchronized void setStopThread(boolean stopThread) { this.stopThread = stopThread; } } @Override public void execute() { for (CustomThread thread : workerThreads) { System.out.println("Started the thread " + thread.getName()); thread.start(); } } @Override public boolean stopThreads() { for (CustomThread thread : workerThreads) { System.out.println("Stopping threads"); thread.setStopThread(true); } return true; } }
Application.java
package threadPool; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; public class Application { public static void main(String args[]) throws InterruptedException { TaskComparator taskComparator = new TaskComparator(); BlockingQueue<Task> queue = new PriorityBlockingQueue<>(10, taskComparator); ThreadPoolService threadPool = new ThreadPoolServiceImpl(queue, 5); Task task1 = new CustomTask("task1", 2); Task task2 = new CustomTask("task2", 1); Task task3 = new CustomTask("task3", 3); Task task4 = new CustomTask("task4", 2); Task task5 = new CustomTask("task5", 5); Task task6 = new CustomTask("task6", 3); Task task7 = new CustomTask("task7", 7); Task task8 = new CustomTask("task8", 4); Task task9 = new CustomTask("task9", 9); Task task10 = new CustomTask("task10", 5); queue.add(task1); queue.add(task2); queue.add(task3); queue.add(task4); queue.add(task5); queue.add(task6); queue.add(task7); queue.add(task8); queue.add(task9); queue.add(task10); threadPool.execute(); Thread.sleep(3000); threadPool.stopThreads(); /* * Since I am using priority blocking queue implementation, I need to * make sure the threads are not blocked on queue. If thread goes to * blocking state, we can't interrupt. */ queue.add(task6); queue.add(task7); queue.add(task8); queue.add(task9); queue.add(task10); } }
Output
Started the thread Thread-0 Started the thread Thread-1 Started the thread Thread-2 Executing the task task9, priority 9 Started the thread Thread-3 Executing the task task7, priority 7 Executing the task task5, priority 5 Executing the task task10, priority 5 Executing the task task8, priority 4 Executing the task task3, priority 3 Executing the task task6, priority 3 Executing the task task1, priority 2 Executing the task task4, priority 2 Executing the task task2, priority 1 Started the thread Thread-4 Stopping threads Stopping threads Stopping threads Stopping threads Stopping threads Executing the task task9, priority 9 Executing the task task10, priority 5 Executing the task task7, priority 7 Stopping the threads Executing the task task8, priority 4 Stopping the threads Executing the task task6, priority 3 Stopping the threads Stopping the threads Stopping the threads
No comments:
Post a Comment