Python BoundedBarrier tutorial shows how to synchronize Python threads using a custom BoundedBarrier for thread coordination.
last modified February 15, 2025
In this article we show how to synchronize Python threads using a custom BoundedBarrier.
A BoundedBarrier is a synchronization primitive that allows a fixed number of threads to wait for each other to reach a common barrier point. Unlike the built-in threading.Barrier, a BoundedBarrier can be implemented with additional constraints, such as limiting the maximum number of threads that can wait at the barrier.
This tutorial demonstrates how to create a custom BoundedBarrier using Python’s threading.Condition and threading.Lock.
The following example demonstrates how to implement a custom BoundedBarrier.
main.py
import threading
class BoundedBarrier: def init(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name): print(f"{thread_name} is starting") barrier.wait() # Wait at the barrier print(f"{thread_name} has passed the barrier")
def main(): max_threads = 3 barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All threads have passed the barrier")
if name == “main”: main()
In this program, a custom BoundedBarrier is implemented using threading.Condition. The barrier allows a fixed number of threads to wait for each other before proceeding.
self.condition = threading.Condition()
The BoundedBarrier uses a Condition object to manage thread synchronization.
self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse else: self.condition.wait() # Wait for other threads
Each thread increments the counter when it reaches the barrier. If the counter reaches the maximum number of threads, all waiting threads are notified, and the counter is reset. Otherwise, the thread waits for other threads to arrive.
barrier = BoundedBarrier(max_threads)
The BoundedBarrier is initialized with the maximum number of threads that can wait at the barrier.
$ python main.py Thread-1 is starting Thread-2 is starting Thread-3 is starting Thread-1 has passed the barrier Thread-2 has passed the barrier Thread-3 has passed the barrier All threads have passed the barrier
The following example demonstrates how to reuse the BoundedBarrier for multiple synchronization points.
main.py
import threading import time
class BoundedBarrier: def init(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name, num_phases): for phase in range(num_phases): print(f"{thread_name} is working on phase {phase + 1}") time.sleep(1) # Simulate work for the phase print(f"{thread_name} has completed phase {phase + 1}") barrier.wait() # Wait at the barrier print(f"{thread_name} is moving to the next phase")
def main(): max_threads = 3 num_phases = 2 # Number of phases in the task barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if name == “main”: main()
In this program, the BoundedBarrier is reused for multiple synchronization points. Each thread works on two phases, and the barrier ensures that all threads complete one phase before moving to the next.
barrier.wait() # Wait at the barrier
Each thread calls the wait method on the barrier after completing a phase. This ensures that all threads finish the current phase before moving to the next one.
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
The following example demonstrates how to use a custom BoundedBarrier with a timeout to synchronize threads across multiple phases of execution. If a thread does not reach the barrier within the specified timeout, it will proceed without waiting for the other threads.
main.py
import threading import time
class BoundedBarrier: def init(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition()
def wait(self, timeout=None):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
return True # Barrier tripped
else:
if timeout is None:
self.condition.wait() # Wait indefinitely
else:
if not self.condition.wait(timeout): # Wait with timeout
self.count -= 1 # Decrement count if timeout occurs
return False # Barrier not tripped
return True # Barrier tripped
def worker(barrier, thread_name, num_phases): for phase in range(num_phases): print(f"{thread_name} is working on phase {phase + 1}") time.sleep(1) # Simulate work for the phase print(f"{thread_name} has completed phase {phase + 1}") if not barrier.wait(timeout=2): # Wait at the barrier with a timeout print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}") continue print(f"{thread_name} is moving to the next phase")
def main(): max_threads = 3 num_phases = 2 # Number of phases in the task barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if name == “main”: main()
In this program, the BoundedBarrier is used with a timeout to synchronize threads across multiple phases. If a thread does not reach the barrier within 2 seconds, it will proceed without waiting for the other threads.
def wait(self, timeout=None): with self.condition: self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse return True # Barrier tripped else: if timeout is None: self.condition.wait() # Wait indefinitely else: if not self.condition.wait(timeout): # Wait with timeout self.count -= 1 # Decrement count if timeout occurs return False # Barrier not tripped return True # Barrier tripped
The wait method is updated to support a timeout. If the timeout occurs, the thread decrements the counter and proceeds without waiting for the other threads.
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}") continue
Each thread calls the wait method with a timeout of 2 seconds. If the barrier is not tripped within this time, the thread proceeds without waiting.
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
Python threading - documentation
In this article we have shown how to synchronize Python threads using a custom BoundedBarrier.
My name is Jan Bodnar, and I am a passionate programmer with extensive programming experience. I have been writing programming articles since 2007. To date, I have authored over 1,400 articles and 8 e-books. I possess more than ten years of experience in teaching programming.
List all Python tutorials.