JoinableQueue
is a sub class of multiprocessing.Queue, provides following additional methods.
Method
|
Description
|
task_done()
|
You must
call this method, whenever a task in queue is finished. If a join() is
currently blocking, it will resume when all items have been processed.
Whenever a
new task is added to queue, then the count of unfinished tasks is incremented
by one. Whenever you call task_done() method, the count of unfinished tasks
is decremented by one. When the count of unfinished tasks reaches to 0, then
join method unblocks.
|
join()
|
Block until
all items in the queue have been gotten and processed.
|
from multiprocessing import Process, JoinableQueue import time import random class MyConsumer(Process): def __init__(self, task_queue): Process.__init__(self) self.task_queue = task_queue def run(self): taskName=self.task_queue.get() print("Started processing task ", taskName) timeToFinish=random.randint(1, 5) print(taskName, " takes ", timeToFinish, " seconds") time.sleep(timeToFinish) # Call task_done() method after task finishes self.task_queue.task_done() print(taskName, " finished") def add_to_queue(queue, data): queue.put(data) if __name__ == '__main__': queue = JoinableQueue() queue.put("Task 1") queue.put("Task 2") queue.put("Task 3") queue.put("Task 4") consumer1 = MyConsumer(queue) consumer2 = MyConsumer(queue) consumer3 = MyConsumer(queue) consumer4 = MyConsumer(queue) consumer1.start() consumer2.start() consumer3.start() consumer4.start() queue.join() print("SUCCESSFULL")
Output
Started processing task Task 1 Task 1 takes 2 seconds Started processing task Task 2 Task 2 takes 1 seconds Started processing task Task 3 Task 3 takes 5 seconds Started processing task Task 4 Task 4 takes 1 seconds Task 2 finished Task 4 finished Task 1 finished Task 3 finished SUCCESSFUL
No comments:
Post a Comment