'multiprocessing.Pool'
class is used to create a pool of processes. We can submit jobs to this pool.
from multiprocessing import Pool from time import sleep import random def sum(task, a, b): sleepTime = random.randint(1, 4) print(task, " requires ", sleepTime, " seconds to finish") sleep(sleepTime) return a+b if __name__=="__main__": myPool = Pool(5) result1 = myPool.apply_async(sum, args=("task1", 10, 20,)) result2 = myPool.apply_async(sum, args=("task2", 20, 30,)) result3 = myPool.apply_async(sum, args=("task3", 30, 40,)) result4 = myPool.apply_async(sum, args=("task4", 40, 50,)) result5 = myPool.apply_async(sum, args=("task5", 50, 60,)) print("Submitted tasks to pool") print(result1.get()) print(result2.get()) print(result3.get()) print(result4.get()) print(result5.get())
Sample Output
Submitted tasks to pool task1 requires 1 seconds to finish task2 requires 1 seconds to finish task3 requires 1 seconds to finish task4 requires 4 seconds to finish task5 requires 3 seconds to finish 30 50 70 90 110
myPool = Pool(5)
Creates a
pool of 5 processes.
result1 = myPool.apply_async(sum,
args=("task1", 10, 20,))
Calls
function sum with given arguments.
result1.get()
Returns the
result when it arrives. Blocks until result is ready. You can specify maximum
timeout to wait. If timeout is not None and the result does not arrive within
timeout seconds then multiprocessing.TimeoutError is raised.
multiprocessing.pool.Pool([processes[,
initializer[, initargs[, maxtasksperchild[, context]]]]])
Argument
|
Description
|
processes
|
Number of
processes in pool. If processes is None then the number returned by
os.cpu_count() is used.
|
initializer
|
If
initializer is not None then each worker process will call
initializer(*initargs) when it starts.
|
maxtasksperchild
|
Specifies
the number of tasks completed by process before exit and replaced by fresh
worker process. By default processes will live as long as the pool.
|
context
|
Specify
the context used for starting the worker processes.
|
from multiprocessing import Pool from time import sleep import random def initPool(num): print("Initializing pool with ", num, " processes") def sum(task, a, b): sleepTime = random.randint(1, 4) print(task, " requires ", sleepTime, " seconds to finish") sleep(sleepTime) return a+b if __name__=="__main__": myPool = Pool(5, initializer=initPool, initargs=(5, )) result1 = myPool.apply_async(sum, args=("task1", 10, 20,)) result2 = myPool.apply_async(sum, args=("task2", 20, 30,)) result3 = myPool.apply_async(sum, args=("task3", 30, 40,)) result4 = myPool.apply_async(sum, args=("task4", 40, 50,)) result5 = myPool.apply_async(sum, args=("task5", 50, 60,)) print("Submitted tasks to pool") print(result1.get()) print(result2.get()) print(result3.get()) print(result4.get()) print(result5.get())
Output
Initializing pool with 5 processes Initializing pool with 5 processes Initializing pool with 5 processes Initializing pool with 5 processes Submitted tasks to pool task2 requires 4 seconds to finish task1 requires 2 seconds to finish task3 requires 4 seconds to finish Initializing pool with 5 processes task4 requires 3 seconds to finish task5 requires 3 seconds to finish 30 50 70 90 110
Observe the
output, initPool method is called when process starts.
Following
are the methods provided by pool class.
Method
|
Description
|
apply(func[,
args[, kwds]])
|
Call a
function with given arguments, keyword arguments. It block until the result
is ready.
|
apply_async(func[,
args[, kwds[, callback[, error_callback]]]])
|
It is just
like apply method, it will not block and best suited for performing work in
parallel.
Pool.apply_async
method also has a callback which, if supplied, is called when the function is
complete. This can be used instead of calling get().The order of the results
is not guaranteed to be the same as the order of the calls to Pool.apply_async.
You can
specify error_callback method also; this is called when function failed with
an error. Both callback, error_callback accepts single argument.
|
map(func,
iterable[, chunksize])
|
It is
parallel equivalent of map function. Apply the function to every item of
iterable and return the result. The chunksize parameter will cause the
iterable to be split into pieces of approximately that size, and each piece
is submitted as a separate task.
|
imap(func,
iterable[, chunksize])
|
It is
lazier version of map. It will iterate over the iterable one element at a
time, and send them each to a worker process. That means no memory overhead
problems.
One major
advantage of imap is, you can start receiving results from processes as soon
as they're ready, rather than having to wait for all of them to be finished.
|
imap_unordered(func,
iterable[, chunksize])
|
Same as
imap, but results don't maintain any order.
|
starmap(func,
iterable[, chunksize])
|
Just like
map, by using this we can pass multiple arguments to a function. Hence an
iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
|
starmap_async(func,
iterable[, chunksize[, callback[, error_back]]])
|
It is a
combination of starmap() and map_async()
|
close()
|
Used to
close the pool. After calling this method, pool won't take any tasks. Once
all the submitted tasks have been completed the worker processes will exit.
|
terminate()
|
Stop
worker processes immediately.
|
join()
|
Wait for
the worker processes to exit. One must call close() or terminate() before
using join().
|
No comments:
Post a Comment