Sunday 20 December 2015

Python: multiprocessing: pool: map example

map(func, iterable[, chunksize])

It is parallel equivalent of map function. Apply the function to every item of iterable and return the result. The chunksize parameter will cause the iterable to be split into pieces of approximately that size, and each piece is submitted as a separate task.
from multiprocessing import Pool
from time import sleep


def process(task):
    print("Started ", task)
    sleep(2)
    return task+" Finished"

if __name__=="__main__":
    myPool = Pool(5)

    tasks=[]

    for i in range(20):
        tasks.append("task"+str(i))

    print("Submitted tasks to pool")
    results = myPool.map(process, tasks, 4)
    print("Got the results")

    for result in results:
        print(result)


Output
Submitted tasks to pool
Started  task0
Started  task4
Started  task8
Started  task12
Started  task16
Started  task1
Started  task5
Started  task9
Started  task17
Started  task13
Started  task2
Started  task10
Started  task6
Started  task14
Started  task18
Started  task11
Started  task3
Started  task7
Started  task15
Started  task19
Got the results
task0 Finished
task1 Finished
task2 Finished
task3 Finished
task4 Finished
task5 Finished
task6 Finished
task7 Finished
task8 Finished
task9 Finished
task10 Finished
task11 Finished
task12 Finished
task13 Finished
task14 Finished
task15 Finished
task16 Finished
task17 Finished
task18 Finished
task19 Finished


I submitted 20 tasks from 0 to 19 and chunk size is 4, number of process are 5.

process1 execute tasks 0, 1, 2, 3
process2 execute tasks 4, 5, 6, 7
process3 execute tasks 8, 9, 10, 11
process4 execute tasks 12, 13, 14, 15
process5 execute tasks 16, 17, 18, 19

You can easily get what happened while observing the output.

imap(func, iterable[, chunksize])
It is lazier version of map. It will iterate over the iterable one element at a time, and send them each to a worker process. That means no memory overhead problems.

One major advantage of imap is, you can start receiving results from processes as soon as they're ready, rather than having to wait for all of them to be finished.
from multiprocessing import Pool, Process
from time import sleep
import os


def process(task):
    print("Started task ", task, " PID :", os.getpid())
    sleep(task)
    return str(task)+" Finished"

if __name__=="__main__":
    myPool = Pool(5)

    tasks=[]

    for i in range(20):
        tasks.append(i)

    print("Submitted tasks to pool")
    results = myPool.imap(process, tasks)
    print("Got the results")

    for result in results:
        print(result)


Output
Submitted tasks to pool
Got the results
Started task  0  PID : 91600
Started task  1  PID : 91601
Started task  2  PID : 91602
0 Finished
Started task  5  PID : 91600
Started task  3  PID : 91603
Started task  4  PID : 91604
Started task  6  PID : 91601
1 Finished
Started task  7  PID : 91602
2 Finished
Started task  8  PID : 91603
3 Finished
Started task  9  PID : 91604
4 Finished
Started task  10  PID : 91600
5 Finished
Started task  11  PID : 91601
6 Finished
Started task  12  PID : 91602
7 Finished
Started task  13  PID : 91603
8 Finished
Started task  14  PID : 91604
9 Finished
Started task  15  PID : 91600
10 Finished
Started task  16  PID : 91601
11 Finished
Started task  17  PID : 91602
12 Finished
Started task  18  PID : 91603
13 Finished
Started task  19  PID : 91604
14 Finished
15 Finished
16 Finished
17 Finished
18 Finished
19 Finished


As you observe the output, the results will be returned as soon as they're ready, while still preserving the ordering of the input iterable. With imap_unordered, results will be yielded as soon as they're ready, regardless of the order of the input iterable.

For example
from multiprocessing import Pool, Process
from time import sleep
import os


def process(task):
    print("Started task ", task, " PID :", os.getpid())
    sleep(4)
    return str(task)+" Finished"

if __name__=="__main__":
    myPool = Pool(5)

    tasks=[]

    for i in range(20):
        tasks.append(i)

    print("Submitted tasks to pool")
    results = myPool.imap(process, tasks)
    print("Got the results")

    for result in results:
        print(result)


Output
Submitted tasks to pool
Got the results
Started task  0  PID : 91693
Started task  1  PID : 91694
Started task  2  PID : 91695
Started task  3  PID : 91696
Started task  4  PID : 91697
Started task  5  PID : 91693
Started task  6  PID : 91695
0 Finished
Started task  7  PID : 91697
Started task  8  PID : 91696
Started task  9  PID : 91694
1 Finished
2 Finished
3 Finished
4 Finished
Started task  10  PID : 91694
Started task  11  PID : 91695
Started task  12  PID : 91693
5 Finished
Started task  13  PID : 91697
6 Finished
7 Finished
Started task  14  PID : 91696
8 Finished
9 Finished
Started task  15  PID : 91696
Started task  16  PID : 91694
Started task  17  PID : 91695
Started task  18  PID : 91693
10 Finished
11 Finished
Started task  19  PID : 91697
12 Finished
13 Finished
14 Finished
15 Finished
16 Finished
17 Finished
18 Finished
19 Finished


Observe the output, results appeared in the order they submitted.

Update the program with imap_unordered.
from multiprocessing import Pool, Process
from time import sleep
import os


def process(task):
    print("Started task ", task, " PID :", os.getpid())
    sleep(4)
    return str(task)+" Finished"

if __name__=="__main__":
    myPool = Pool(5)

    tasks=[]

    for i in range(20):
        tasks.append(i)

    print("Submitted tasks to pool")
    results = myPool.imap_unordered(process, tasks)
    print("Got the results")

    for result in results:
        print(result)


Output
Submitted tasks to pool
Got the results
Started task  0  PID : 91749
Started task  1  PID : 91750
Started task  2  PID : 91751
Started task  3  PID : 91752
Started task  4  PID : 91753
Started task  5  PID : 91749
Started task  6  PID : 91750
0 Finished
1 Finished
Started task  7  PID : 91751
2 Finished
Started task  8  PID : 91753
4 Finished
3 Finished
Started task  9  PID : 91752
Started task  10  PID : 91753
Started task  11  PID : 91750
Started task  12  PID : 91749
Started task  13  PID : 91751
Started task  14  PID : 91752
8 Finished
6 Finished
5 Finished
7 Finished
9 Finished
Started task  15  PID : 91749
Started task  16  PID : 91750
12 Finished
11 Finished
Started task  17  PID : 91753
Started task  18  PID : 91751
Started task  19  PID : 91752
10 Finished
13 Finished
14 Finished
18 Finished
19 Finished
17 Finished
15 Finished
16 Finished


Observe the output, results are not appeared in order.




Previous                                                 Next                                                 Home

No comments:

Post a Comment