Sunday 20 December 2015

Python: multiprocessing: pool: Using pool of workers


'multiprocessing.Pool' class is used to create a pool of processes. We can submit jobs to this pool.
from multiprocessing import Pool
from time import sleep
import random


def sum(task, a, b):
    sleepTime = random.randint(1, 4)
    print(task, " requires ", sleepTime, " seconds to finish")
    sleep(sleepTime)
    return a+b

if __name__=="__main__":
    myPool = Pool(5)

    result1 = myPool.apply_async(sum, args=("task1", 10, 20,))
    result2 = myPool.apply_async(sum, args=("task2", 20, 30,))
    result3 = myPool.apply_async(sum, args=("task3", 30, 40,))
    result4 = myPool.apply_async(sum, args=("task4", 40, 50,))
    result5 = myPool.apply_async(sum, args=("task5", 50, 60,))

    print("Submitted tasks to pool")
    print(result1.get())
    print(result2.get())
    print(result3.get())
    print(result4.get())
    print(result5.get())


Sample Output
Submitted tasks to pool
task1  requires  1  seconds to finish
task2  requires  1  seconds to finish
task3  requires  1  seconds to finish
task4  requires  4  seconds to finish
task5  requires  3  seconds to finish
30
50
70
90
110

myPool = Pool(5)
Creates a pool of 5 processes.

result1 = myPool.apply_async(sum, args=("task1", 10, 20,))
Calls function sum with given arguments.

result1.get()
Returns the result when it arrives. Blocks until result is ready. You can specify maximum timeout to wait. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised.
                                         
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
Argument
Description
processes
Number of processes in pool. If processes is None then the number returned by os.cpu_count() is used.
initializer
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
maxtasksperchild
Specifies the number of tasks completed by process before exit and replaced by fresh worker process. By default processes will live as long as the pool.
context
Specify the context used for starting the worker processes.


from multiprocessing import Pool
from time import sleep
import random

def initPool(num):
    print("Initializing pool with ", num,  " processes")

def sum(task, a, b):
    sleepTime = random.randint(1, 4)
    print(task, " requires ", sleepTime, " seconds to finish")
    sleep(sleepTime)
    return a+b

if __name__=="__main__":
    myPool = Pool(5, initializer=initPool, initargs=(5, ))

    result1 = myPool.apply_async(sum, args=("task1", 10, 20,))
    result2 = myPool.apply_async(sum, args=("task2", 20, 30,))
    result3 = myPool.apply_async(sum, args=("task3", 30, 40,))
    result4 = myPool.apply_async(sum, args=("task4", 40, 50,))
    result5 = myPool.apply_async(sum, args=("task5", 50, 60,))

    print("Submitted tasks to pool")
    print(result1.get())
    print(result2.get())
    print(result3.get())
    print(result4.get())
    print(result5.get())


Output
Initializing pool with  5  processes
Initializing pool with  5  processes
Initializing pool with  5  processes
Initializing pool with  5  processes
Submitted tasks to pool
task2  requires  4  seconds to finish
task1  requires  2  seconds to finish
task3  requires  4  seconds to finish
Initializing pool with  5  processes
task4  requires  3  seconds to finish
task5  requires  3  seconds to finish
30
50
70
90
110

Observe the output, initPool method is called when process starts.

Following are the methods provided by pool class.
Method
Description
apply(func[, args[, kwds]])
Call a function with given arguments, keyword arguments. It block until the result is ready.
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
It is just like apply method, it will not block and best suited for performing work in parallel.

Pool.apply_async method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get().The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

You can specify error_callback method also; this is called when function failed with an error. Both callback, error_callback accepts single argument.
map(func, iterable[, chunksize])
It is parallel equivalent of map function. Apply the function to every item of iterable and return the result. The chunksize parameter will cause the iterable to be split into pieces of approximately that size, and each piece is submitted as a separate task.
imap(func, iterable[, chunksize])
It is lazier version of map. It will iterate over the iterable one element at a time, and send them each to a worker process. That means no memory overhead problems.

One major advantage of imap is, you can start receiving results from processes as soon as they're ready, rather than having to wait for all of them to be finished.
imap_unordered(func, iterable[, chunksize])
Same as imap, but results don't maintain any order.
starmap(func, iterable[, chunksize])
Just like map, by using this we can pass multiple arguments to a function. Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
It is a combination of starmap() and map_async()
close()
Used to close the pool. After calling this method, pool won't take any tasks. Once all the submitted tasks have been completed the worker processes will exit.
terminate()
Stop worker processes immediately.
join()
Wait for the worker processes to exit. One must call close() or terminate() before using join().




Previous                                                 Next                                                 Home

No comments:

Post a Comment