Sunday, 20 December 2015

Python: Bounded semaphore


threading.BoundedSemaphore(value=1)
You can use bounded semaphore to make sure you never call release() too many times. A bounded semaphore checks to make sure its current value doesn’t exceed its initial value. If it does, ValueError is raised.

For example, following program implemented using simple semaphore, allows maximum three threads at a time into critical section.

SemaphoreEx.py
import threading
import time

global threads
threads=[]

def process(sem):
 global threads
 name = threading.current_thread().getName()
 print("Waiting to start execution : ", name)
 
 #Critical section
 sem.acquire()
 print("Started execution : ", name)
 threads.append(name)
 time.sleep(1)
 print("Finished Execution : ", name)
 print(threads)
 sem.release()
 
 threads.remove(name)

semaphore=threading.Semaphore(value=3)
for i in range(10):
 t = threading.Thread(target=process, name='thread_'+str(i), args=(semaphore,))
 t.start()

$ python3 SemaphoreEx.py 
Waiting to start execution :  thread_0
Started execution :  thread_0
Waiting to start execution :  thread_1
Started execution :  thread_1
Waiting to start execution :  thread_2
Started execution :  thread_2
Waiting to start execution :  thread_3
Waiting to start execution :  thread_4
Waiting to start execution :  thread_5
Waiting to start execution :  thread_6
Waiting to start execution :  thread_7
Waiting to start execution :  thread_8
Waiting to start execution :  thread_9
Finished Execution :  thread_0
['thread_0', 'thread_1', 'thread_2']
Started execution :  thread_3
Finished Execution :  thread_1
['thread_1', 'thread_2', 'thread_3']
Finished Execution :  thread_2
['thread_2', 'thread_3']
Started execution :  thread_4
Started execution :  thread_5
Finished Execution :  thread_3
['thread_3', 'thread_4', 'thread_5']
Finished Execution :  thread_4
['thread_4', 'thread_5']
Started execution :  thread_6
Finished Execution :  thread_5
['thread_5', 'thread_6']
Started execution :  thread_8
Started execution :  thread_7
Finished Execution :  thread_6
['thread_6', 'thread_8', 'thread_7']
Finished Execution :  thread_8
Finished Execution :  thread_7
['thread_8', 'thread_7']
Started execution :  thread_9
['thread_8', 'thread_7']
Finished Execution :  thread_9
['thread_9']
Observe the output, at any time maximum 3 thread can able to run critical section of process method.

Suppose, if you call release method more than acquire methods, then your output collapse.


SemaphoreEx.py
import threading
import time

global threads
threads=[]

def process(sem):
 global threads
 name = threading.current_thread().getName()
 print("Waiting to start execution : ", name)
 
 #Critical section
 sem.acquire()
 print("Started execution : ", name)
 threads.append(name)
 time.sleep(1)
 print("Finished Execution : ", name)
 print(threads)
 sem.release()
 
 threads.remove(name)

semaphore=threading.Semaphore(value=3)
for i in range(10):
 t = threading.Thread(target=process, name='thread_'+str(i), args=(semaphore,))
 t.start()
 semaphore.release()


I just added ‘semaphore.release()’  at the end of file.
$ python3 SemaphoreEx.py 
Waiting to start execution :  thread_0
Started execution :  thread_0
Waiting to start execution :  thread_1
Started execution :  thread_1
Waiting to start execution :  thread_2
Started execution :  thread_2
Waiting to start execution :  thread_3
Started execution :  thread_3
Waiting to start execution :  thread_4
Started execution :  thread_4
Waiting to start execution :  thread_5
Started execution :  thread_5
Waiting to start execution :  thread_6
Started execution :  thread_6
Waiting to start execution :  thread_7
Started execution :  thread_7
Waiting to start execution :  thread_8
Started execution :  thread_8
Waiting to start execution :  thread_9
Started execution :  thread_9
Finished Execution :  thread_0
['thread_0', 'thread_1', 'thread_2', 'thread_3', 'thread_4', 'thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
Finished Execution :  thread_3
Finished Execution :  thread_2
['thread_1', 'thread_2', 'thread_3', 'thread_4', 'thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
Finished Execution :  thread_4
Finished Execution :  thread_1
['thread_1', 'thread_2', 'thread_3', 'thread_4', 'thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
['thread_1', 'thread_2', 'thread_4', 'thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
['thread_1', 'thread_2', 'thread_4', 'thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
Finished Execution :  thread_6
Finished Execution :  thread_5
['thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
Finished Execution :  thread_7
['thread_5', 'thread_6', 'thread_7', 'thread_8', 'thread_9']
['thread_5', 'thread_7', 'thread_8', 'thread_9']
Finished Execution :  thread_8
['thread_8', 'thread_9']
Finished Execution :  thread_9
['thread_9']

Calling release function more times causes more threads entered into critical section simultaneously.

By using BoundedSemaphore, we can solve these problems. A bounded semaphore checks to make sure its current value doesn’t exceed its initial value. If it does, ValueError is raised.


SemaphoreEx.py
import threading
import time

global threads
threads=[]

def process(sem):
 global threads
 name = threading.current_thread().getName()
 print("Waiting to start execution : ", name)
 
 #Critical section
 sem.acquire()
 print("Started execution : ", name)
 threads.append(name)
 time.sleep(1)
 print("Finished Execution : ", name)
 print(threads)
 sem.release()
 
 threads.remove(name)

semaphore=threading.BoundedSemaphore(value=3)
for i in range(10):
 t = threading.Thread(target=process, name='thread_'+str(i), args=(semaphore,))
 t.start()
 semaphore.release()

$ python3 SemaphoreEx.py 
Waiting to start execution :  thread_0
Started execution :  thread_0
Waiting to start execution :  thread_1
Started execution :  thread_1
Traceback (most recent call last):
  File "SemaphoreEx.py", line 27, in <module>
    semaphore.release()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 480, in release
    raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times
Finished Execution :  thread_0
['thread_0', 'thread_1']
Finished Execution :  thread_1
['thread_1']
Exception in thread thread_1:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 923, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 871, in run
    self._target(*self._args, **self._kwargs)
  File "SemaphoreEx.py", line 19, in process
    sem.release()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 480, in release
    raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times



Previous                                                 Next                                                 Home

No comments:

Post a Comment