Thread Synchronization¶
When multiple threads access shared data, synchronization is essential to prevent race conditions and ensure data integrity.
Mental Model
Synchronization primitives are traffic signals for threads. A Lock is a single-occupancy bathroom door -- only one thread enters at a time. An RLock lets the same thread re-enter. A Semaphore is a parking garage with limited spots. An Event is a starting gun that all waiting threads hear at once. Pick the lightest primitive that solves your coordination problem.
The Problem: Race Conditions¶
Without Synchronization¶
```python import threading
counter = 0
def increment(): global counter for _ in range(100_000): counter += 1 # Not atomic!
Create two threads¶
t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment)
t1.start() t2.start() t1.join() t2.join()
print(f"Expected: 200000") print(f"Actual: {counter}") # Often less than 200000! ```
Why It Happens¶
counter += 1 is not atomic — it's actually three operations:
```python
What looks like one operation:¶
counter += 1
Is actually:¶
temp = counter # 1. Read current value temp = temp + 1 # 2. Increment counter = temp # 3. Write back
Thread 1: Read counter (0)¶
Thread 2: Read counter (0) ← Same value!¶
Thread 1: Write counter (1)¶
Thread 2: Write counter (1) ← Overwrites Thread 1's work!¶
```
Lock (Mutex)¶
A Lock ensures only one thread can execute a section of code at a time.
Basic Lock Usage¶
```python import threading
counter = 0 lock = threading.Lock()
def increment(): global counter for _ in range(100_000): lock.acquire() # Get the lock try: counter += 1 finally: lock.release() # Always release!
t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment)
t1.start() t2.start() t1.join() t2.join()
print(f"Counter: {counter}") # Always 200000 ```
Lock as Context Manager (Recommended)¶
```python import threading
counter = 0 lock = threading.Lock()
def increment(): global counter for _ in range(100_000): with lock: # Automatically acquires and releases counter += 1
t1 = threading.Thread(target=increment) t2 = threading.Thread(target=increment)
t1.start() t2.start() t1.join() t2.join()
print(f"Counter: {counter}") # Always 200000 ```
Non-Blocking Lock Acquisition¶
```python import threading import time
lock = threading.Lock()
def try_acquire(): if lock.acquire(blocking=False): try: print(f"{threading.current_thread().name}: Got lock") time.sleep(1) finally: lock.release() else: print(f"{threading.current_thread().name}: Lock busy, skipping")
With timeout¶
def try_acquire_timeout(): if lock.acquire(timeout=0.5): try: print(f"{threading.current_thread().name}: Got lock") time.sleep(1) finally: lock.release() else: print(f"{threading.current_thread().name}: Timeout") ```
RLock (Reentrant Lock)¶
A regular Lock cannot be acquired twice by the same thread. RLock allows the same thread to acquire it multiple times.
The Problem with Regular Lock¶
```python import threading
lock = threading.Lock()
def outer(): with lock: print("Outer acquired lock") inner() # Deadlock! Lock already held
def inner(): with lock: # Blocks forever waiting for lock print("Inner acquired lock")
This will deadlock with regular Lock¶
```
RLock Solution¶
```python import threading
rlock = threading.RLock()
def outer(): with rlock: print("Outer acquired lock") inner()
def inner(): with rlock: # Same thread can acquire again print("Inner acquired lock")
outer() # Works!
Output:¶
Outer acquired lock¶
Inner acquired lock¶
```
When to Use RLock¶
```python import threading
class BankAccount: def init(self, balance): self.balance = balance self.lock = threading.RLock() # Use RLock for recursive access
def deposit(self, amount):
with self.lock:
self.balance += amount
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
return True
return False
def transfer(self, other, amount):
with self.lock: # First acquisition
if self.withdraw(amount): # Second acquisition (same lock)
other.deposit(amount)
return True
return False
```
Semaphore¶
A Semaphore allows a limited number of threads to access a resource simultaneously.
Basic Semaphore¶
```python import threading import time
Allow max 3 concurrent accesses¶
semaphore = threading.Semaphore(3)
def worker(worker_id): print(f"Worker {worker_id}: Waiting for semaphore") with semaphore: print(f"Worker {worker_id}: Acquired semaphore") time.sleep(2) # Simulate work print(f"Worker {worker_id}: Releasing semaphore")
Start 10 workers, but only 3 run at a time¶
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() ```
Practical Example: Connection Pool¶
```python import threading import time import random
class ConnectionPool: def init(self, max_connections): self.semaphore = threading.Semaphore(max_connections) self.connections = [f"conn_{i}" for i in range(max_connections)] self.lock = threading.Lock()
def get_connection(self):
self.semaphore.acquire()
with self.lock:
conn = self.connections.pop()
return conn
def release_connection(self, conn):
with self.lock:
self.connections.append(conn)
self.semaphore.release()
pool = ConnectionPool(3)
def database_query(query_id): conn = pool.get_connection() try: print(f"Query {query_id}: Using {conn}") time.sleep(random.uniform(0.5, 1.5)) finally: pool.release_connection(conn) print(f"Query {query_id}: Released {conn}")
threads = [threading.Thread(target=database_query, args=(i,)) for i in range(10)] for t in threads: t.start() for t in threads: t.join() ```
BoundedSemaphore¶
Raises error if released more times than acquired:
```python import threading
Regular semaphore: no error on extra release¶
sem = threading.Semaphore(2) sem.release() sem.release() sem.release() # No error, but counter is now 5!
BoundedSemaphore: catches bugs¶
bsem = threading.BoundedSemaphore(2) bsem.release() # ValueError: Semaphore released too many times ```
Event¶
An Event is a simple flag that threads can wait on.
Basic Event Usage¶
```python import threading import time
event = threading.Event()
def waiter(name): print(f"{name}: Waiting for event...") event.wait() # Block until event is set print(f"{name}: Event received!")
def setter(): print("Setter: Preparing...") time.sleep(2) print("Setter: Setting event") event.set()
Start waiters¶
for i in range(3): threading.Thread(target=waiter, args=(f"Waiter-{i}",)).start()
Start setter¶
threading.Thread(target=setter).start() ```
Event Methods¶
```python import threading
event = threading.Event()
Check if set¶
print(event.is_set()) # False
Wait with timeout¶
result = event.wait(timeout=1.0) # Returns True if set, False on timeout
Set the event (wake all waiting threads)¶
event.set() print(event.is_set()) # True
Clear the event¶
event.clear() print(event.is_set()) # False ```
Practical Example: Startup Coordination¶
```python import threading import time
db_ready = threading.Event() cache_ready = threading.Event()
def init_database(): print("Database: Initializing...") time.sleep(2) print("Database: Ready") db_ready.set()
def init_cache(): print("Cache: Waiting for database...") db_ready.wait() # Cache depends on database print("Cache: Initializing...") time.sleep(1) print("Cache: Ready") cache_ready.set()
def main_app(): print("App: Waiting for all services...") db_ready.wait() cache_ready.wait() print("App: All services ready, starting...")
threading.Thread(target=init_database).start() threading.Thread(target=init_cache).start() threading.Thread(target=main_app).start() ```
Condition¶
A Condition combines a lock with the ability to wait for a condition to be true.
Basic Condition Usage¶
```python import threading import time
condition = threading.Condition() items = []
def producer(): for i in range(5): time.sleep(0.5) with condition: items.append(f"item-{i}") print(f"Produced: item-{i}") condition.notify() # Wake up one waiting consumer
def consumer(): while True: with condition: while not items: # Wait until items available print("Consumer: Waiting...") condition.wait() item = items.pop(0) print(f"Consumed: {item}")
if item == "item-4":
break
threading.Thread(target=producer).start() threading.Thread(target=consumer).start() ```
Producer-Consumer Pattern¶
```python import threading import time import random
class BoundedBuffer: def init(self, capacity): self.capacity = capacity self.buffer = [] self.condition = threading.Condition()
def put(self, item):
with self.condition:
while len(self.buffer) >= self.capacity:
print(f"Buffer full, producer waiting...")
self.condition.wait()
self.buffer.append(item)
print(f"Produced: {item}, buffer size: {len(self.buffer)}")
self.condition.notify_all()
def get(self):
with self.condition:
while len(self.buffer) == 0:
print(f"Buffer empty, consumer waiting...")
self.condition.wait()
item = self.buffer.pop(0)
print(f"Consumed: {item}, buffer size: {len(self.buffer)}")
self.condition.notify_all()
return item
buffer = BoundedBuffer(capacity=3)
def producer(producer_id): for i in range(5): time.sleep(random.uniform(0.1, 0.5)) buffer.put(f"P{producer_id}-{i}")
def consumer(consumer_id): for _ in range(5): time.sleep(random.uniform(0.1, 0.5)) buffer.get()
2 producers, 2 consumers¶
threads = [] for i in range(2): threads.append(threading.Thread(target=producer, args=(i,))) threads.append(threading.Thread(target=consumer, args=(i,)))
for t in threads: t.start() for t in threads: t.join() ```
Barrier¶
A Barrier synchronizes a fixed number of threads at a certain point.
```python import threading import time import random
barrier = threading.Barrier(3)
def worker(worker_id): # Phase 1: Each thread does independent work work_time = random.uniform(0.5, 2.0) print(f"Worker {worker_id}: Working for {work_time:.1f}s") time.sleep(work_time)
# Synchronization point
print(f"Worker {worker_id}: Reached barrier, waiting...")
barrier.wait() # All threads must reach here before any continue
# Phase 2: Continue after all threads synchronized
print(f"Worker {worker_id}: Continuing after barrier")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)] for t in threads: t.start() for t in threads: t.join() ```
Summary: Synchronization Primitives¶
| Primitive | Purpose | Use Case |
|---|---|---|
| Lock | Mutual exclusion | Protect shared data |
| RLock | Reentrant mutual exclusion | Recursive functions with locks |
| Semaphore | Limit concurrent access | Connection pools, rate limiting |
| Event | Simple signaling | Start/stop signals, coordination |
| Condition | Wait for complex conditions | Producer-consumer, state changes |
| Barrier | Synchronize multiple threads | Phased computation |
Key Takeaways¶
- Always use synchronization when multiple threads access shared mutable data
- Use
with lock:syntax for automatic acquisition and release Lockfor simple mutual exclusion,RLockfor recursive accessSemaphoreto limit concurrent resource accessEventfor simple signaling between threadsConditionfor complex waiting conditions (producer-consumer)Barrierto synchronize threads at specific points- Avoid holding locks longer than necessary to prevent contention
Runnable Example: thread_safety_tutorial.py¶
```python """ Topic 45.5 - Thread Safety and Synchronization
Complete guide to making your code thread-safe using locks, semaphores, events, and other synchronization primitives.
Learning Objectives: - Understand race conditions - Use Locks and RLocks for mutual exclusion - Apply Semaphores for resource limiting - Use Events for signaling - Implement Conditions for complex coordination - Understand Barriers for synchronization points - Thread-safe data structures
Author: Python Educator Date: 2024 """
import threading import time import random from queue import Queue from threading import Lock, RLock, Semaphore, Event, Condition, Barrier
============================================================================¶
PART 1: BEGINNER - Race Conditions and Locks¶
============================================================================¶
def demonstrate_race_condition(): """ Show what happens WITHOUT thread safety - race conditions occur! """ print("=" * 70) print("BEGINNER: Understanding Race Conditions") print("=" * 70)
# Shared counter (NOT thread-safe!)
counter = 0
def increment_counter():
"""Increment shared counter 100,000 times"""
nonlocal counter
for _ in range(100000):
# This is NOT atomic! It's actually:
# 1. Read counter
# 2. Add 1
# 3. Write back
# Threads can interleave between these steps!
counter += 1
print("\n❌ WITHOUT synchronization:")
print(" Running 5 threads, each incrementing counter 100,000 times")
print(" Expected final value: 500,000\n")
# Reset counter
counter = 0
# Create threads
threads = []
for _ in range(5):
thread = threading.Thread(target=increment_counter)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f" Actual final value: {counter}")
if counter != 500000:
print(f" ⚠️ Lost updates! {500000 - counter} increments were lost!")
print(" This is a RACE CONDITION - multiple threads racing to update")
else:
print(" ✓ Got lucky this time, but this is still unsafe!")
print("\n💡 What went wrong:")
print(" Thread A: reads 100, adds 1, writes 101")
print(" Thread B: reads 100 (before A wrote!), adds 1, writes 101")
print(" Result: Only increased by 1 instead of 2!")
print("\n" + "=" * 70 + "\n")
def fixing_with_lock(): """ Fix race conditions using a Lock for mutual exclusion. """ print("=" * 70) print("BEGINNER: Fixing Race Conditions with Lock") print("=" * 70)
# Shared counter with lock
counter = 0
counter_lock = Lock() # Create a lock
def increment_counter_safe():
"""Increment counter safely using lock"""
nonlocal counter
for _ in range(100000):
# Acquire lock before accessing shared resource
counter_lock.acquire()
try:
counter += 1 # Only one thread can be here at a time
finally:
# ALWAYS release the lock
counter_lock.release()
print("\n✓ WITH Lock synchronization:")
print(" Running 5 threads, each incrementing counter 100,000 times")
print(" Expected final value: 500,000\n")
# Create threads
threads = []
for _ in range(5):
thread = threading.Thread(target=increment_counter_safe)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f" Final value: {counter}")
if counter == 500000:
print(" ✓ Perfect! Lock prevented race conditions")
print("\n💡 How Lock works:")
print(" • Only ONE thread can hold the lock at a time")
print(" • Other threads WAIT until lock is released")
print(" • Ensures mutual exclusion (mutex)")
print("\n" + "=" * 70 + "\n")
def lock_with_context_manager(): """ Use 'with' statement for automatic lock management. This is the recommended way to use locks! """ print("=" * 70) print("BEGINNER: Lock with Context Manager (Best Practice)") print("=" * 70)
# Shared resource
balance = 1000
balance_lock = Lock()
def withdraw_money(amount):
"""Safely withdraw money from balance"""
nonlocal balance
# Context manager automatically acquires and releases lock
with balance_lock:
# Critical section
if balance >= amount:
print(f" [{threading.current_thread().name}] "
f"Withdrawing ${amount}")
time.sleep(0.1) # Simulate processing
balance -= amount
print(f" [{threading.current_thread().name}] "
f"Remaining: ${balance}")
return True
else:
print(f" [{threading.current_thread().name}] "
f"Insufficient funds!")
return False
# Lock automatically released here!
print(f"\n💰 Initial balance: ${balance}")
print(" Multiple threads trying to withdraw...\n")
# Multiple threads try to withdraw
threads = []
for i in range(5):
thread = threading.Thread(
target=withdraw_money,
args=(300,),
name=f"Customer-{i+1}"
)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f"\n💰 Final balance: ${balance}")
print("\n💡 Context Manager Benefits:")
print(" ✓ Lock automatically released (even if exception occurs)")
print(" ✓ Cleaner, more readable code")
print(" ✓ Can't forget to release lock")
print(" ✓ ALWAYS use 'with lock:' instead of acquire/release")
print("\n" + "=" * 70 + "\n")
============================================================================¶
PART 2: INTERMEDIATE - Advanced Synchronization Primitives¶
============================================================================¶
def reentrant_lock_example(): """ RLock (Reentrant Lock) can be acquired multiple times by same thread. Useful for recursive functions or nested lock acquisitions. """ print("=" * 70) print("INTERMEDIATE: RLock (Reentrant Lock)") print("=" * 70)
# Regular Lock vs RLock
regular_lock = Lock()
rlock = RLock()
print("\n📝 Regular Lock - DEADLOCK if acquired twice:")
def bad_recursive(n):
"""This will deadlock with regular Lock!"""
with regular_lock:
if n > 0:
# Can't acquire lock again - deadlock!
# Uncomment to see deadlock:
# with regular_lock:
# bad_recursive(n - 1)
pass
print(" (Commented out to avoid deadlock)")
print("\n📝 RLock - OK to acquire multiple times:")
def good_recursive(n):
"""Works fine with RLock"""
with rlock:
if n > 0:
print(f" Recursion level {n} (lock acquired {4-n+1} times)")
good_recursive(n - 1)
good_recursive(3)
print("\n💡 When to use RLock:")
print(" ✓ Recursive functions that need locking")
print(" ✓ Methods that call other locked methods")
print(" ✓ Same thread needs to acquire lock multiple times")
print(" ✗ Slightly slower than regular Lock")
print("\n" + "=" * 70 + "\n")
def semaphore_resource_limiting(): """ Semaphore limits concurrent access to a resource. Like a lock, but allows N threads instead of just 1. """ print("=" * 70) print("INTERMEDIATE: Semaphore for Resource Limiting") print("=" * 70)
# Allow max 3 concurrent database connections
db_connections = Semaphore(3)
def access_database(user_id):
"""
Simulate database access with limited connections.
Args:
user_id: User identifier
"""
print(f" User {user_id}: Requesting database access...")
with db_connections: # Acquire one permit
print(f" User {user_id}: ✓ Connected to database")
time.sleep(random.uniform(1.0, 2.0)) # Simulate query
print(f" User {user_id}: Disconnected")
# Permit automatically released
print("\n⚙️ 10 users trying to access database")
print(" (Only 3 concurrent connections allowed)\n")
threads = []
for i in range(10):
thread = threading.Thread(target=access_database, args=(i+1,))
threads.append(thread)
thread.start()
time.sleep(0.2) # Stagger requests
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Semaphore use cases:")
print(" ✓ Limit concurrent API calls")
print(" ✓ Database connection pool")
print(" ✓ Rate limiting")
print(" ✓ Resource pool management")
print("\n" + "=" * 70 + "\n")
def event_signaling(): """ Event allows threads to signal each other. One thread waits for an event, another thread sets it. """ print("=" * 70) print("INTERMEDIATE: Event for Thread Signaling") print("=" * 70)
# Event starts as "not set"
ready_event = Event()
data_ready = Event()
def producer():
"""Produce data and signal when ready"""
print(" [Producer] Preparing data...")
time.sleep(2) # Simulate preparation
print(" [Producer] Data ready! Signaling consumers...")
data_ready.set() # Signal that data is ready
# Wait for consumers to be ready for next batch
ready_event.wait()
print(" [Producer] Consumers ready for next batch")
def consumer(consumer_id):
"""Wait for data, then process it"""
print(f" [Consumer {consumer_id}] Waiting for data...")
# Wait for data_ready event
data_ready.wait()
print(f" [Consumer {consumer_id}] Got data! Processing...")
time.sleep(1)
print(f" [Consumer {consumer_id}] Done processing")
# Signal ready for next batch
if consumer_id == 3: # Last consumer signals
ready_event.set()
print("\n⚙️ Starting producer-consumer with Event:\n")
# Start producer
prod_thread = threading.Thread(target=producer)
prod_thread.start()
# Start consumers
cons_threads = []
for i in range(3):
thread = threading.Thread(target=consumer, args=(i+1,))
cons_threads.append(thread)
thread.start()
# Wait for all
prod_thread.join()
for thread in cons_threads:
thread.join()
print("\n💡 Event methods:")
print(" • set() - Signal the event (wake waiters)")
print(" • clear() - Reset the event")
print(" • wait() - Block until event is set")
print(" • is_set() - Check if event is set")
print("\n" + "=" * 70 + "\n")
def condition_variable_example(): """ Condition allows complex wait conditions with notify. More powerful than Event for producer-consumer patterns. """ print("=" * 70) print("INTERMEDIATE: Condition for Complex Coordination") print("=" * 70)
# Shared data with condition
buffer = []
max_size = 5
condition = Condition()
def producer(items):
"""Produce items when buffer has space"""
for item in items:
with condition:
# Wait while buffer is full
while len(buffer) >= max_size:
print(f" [Producer] Buffer full, waiting...")
condition.wait() # Release lock and wait
# Add item
buffer.append(item)
print(f" [Producer] Added {item}. Buffer: {len(buffer)}")
# Notify consumers
condition.notify() # Wake one waiting thread
print(" [Producer] Finished")
def consumer(num_items):
"""Consume items when buffer has data"""
consumed = 0
while consumed < num_items:
with condition:
# Wait while buffer is empty
while len(buffer) == 0:
print(f" [Consumer] Buffer empty, waiting...")
condition.wait()
# Remove item
item = buffer.pop(0)
consumed += 1
print(f" [Consumer] Got {item}. Buffer: {len(buffer)}")
# Notify producers
condition.notify() # Wake one waiting thread
print(" [Consumer] Finished")
print("\n⚙️ Producer-Consumer with Condition:\n")
# Start threads
items_to_produce = [f"Item-{i}" for i in range(15)]
prod = threading.Thread(target=producer, args=(items_to_produce,))
cons = threading.Thread(target=consumer, args=(len(items_to_produce),))
cons.start()
time.sleep(0.5) # Let consumer start waiting
prod.start()
prod.join()
cons.join()
print("\n💡 Condition advantages:")
print(" ✓ Efficient waiting (no busy polling)")
print(" ✓ notify() to wake specific waiters")
print(" ✓ notify_all() to wake all waiters")
print(" ✓ Perfect for producer-consumer with bounded buffer")
print("\n" + "=" * 70 + "\n")
============================================================================¶
PART 3: ADVANCED - Complex Synchronization Patterns¶
============================================================================¶
def barrier_synchronization(): """ Barrier ensures all threads reach a certain point before continuing. Useful for phases of parallel algorithms. """ print("=" * 70) print("ADVANCED: Barrier for Synchronization Points") print("=" * 70)
num_workers = 4
barrier = Barrier(num_workers)
def parallel_phase_worker(worker_id):
"""
Worker that goes through multiple synchronized phases.
Args:
worker_id: Worker identifier
"""
# Phase 1: Data loading
print(f" [Worker {worker_id}] Phase 1: Loading data...")
time.sleep(random.uniform(0.5, 1.5))
print(f" [Worker {worker_id}] Data loaded")
# Wait for all workers to finish Phase 1
barrier.wait()
print(f" [Worker {worker_id}] ✓ Phase 1 complete for all")
# Phase 2: Processing
print(f" [Worker {worker_id}] Phase 2: Processing...")
time.sleep(random.uniform(0.5, 1.5))
print(f" [Worker {worker_id}] Processing done")
# Wait for all workers to finish Phase 2
barrier.wait()
print(f" [Worker {worker_id}] ✓ Phase 2 complete for all")
# Phase 3: Saving results
print(f" [Worker {worker_id}] Phase 3: Saving results...")
time.sleep(random.uniform(0.3, 0.8))
print(f" [Worker {worker_id}] Results saved")
# Final barrier
barrier.wait()
print(f" [Worker {worker_id}] ✓ All phases complete!")
print(f"\n⚙️ Running {num_workers} workers through 3 phases:\n")
threads = []
for i in range(num_workers):
thread = threading.Thread(target=parallel_phase_worker, args=(i+1,))
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Barrier use cases:")
print(" ✓ Parallel algorithms with phases")
print(" ✓ Map-reduce operations")
print(" ✓ Distributed computing checkpoints")
print(" ✓ Game loop synchronization")
print("\n" + "=" * 70 + "\n")
def thread_safe_singleton_pattern(): """ Implement thread-safe Singleton pattern with double-checked locking. """ print("=" * 70) print("ADVANCED: Thread-Safe Singleton Pattern") print("=" * 70)
class DatabaseConnection:
"""Thread-safe singleton database connection"""
_instance = None
_lock = Lock()
def __new__(cls):
# First check (without lock for performance)
if cls._instance is None:
# Acquire lock
with cls._lock:
# Second check (with lock for safety)
if cls._instance is None:
print(" Creating NEW database connection...")
time.sleep(0.5) # Simulate slow initialization
cls._instance = super().__new__(cls)
cls._instance.connection_id = random.randint(1000, 9999)
return cls._instance
def query(self, sql):
"""Execute a query"""
return f"Query result from connection {self.connection_id}"
def worker(worker_id):
"""Worker that gets database connection"""
print(f" [Worker {worker_id}] Getting database connection...")
db = DatabaseConnection()
print(f" [Worker {worker_id}] Got connection: {db.connection_id}")
# Use connection
result = db.query("SELECT * FROM users")
print(f" [Worker {worker_id}] {result}")
print("\n⚙️ Multiple threads requesting singleton:\n")
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i+1,))
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Double-checked locking:")
print(" ✓ First check avoids lock overhead")
print(" ✓ Second check ensures thread safety")
print(" ✓ Only one instance created despite concurrent access")
print("\n" + "=" * 70 + "\n")
def deadlock_example_and_prevention(): """ Demonstrate deadlock and how to prevent it. """ print("=" * 70) print("ADVANCED: Deadlock and Prevention") print("=" * 70)
lock_a = Lock()
lock_b = Lock()
def thread1_bad():
"""Can cause deadlock"""
with lock_a:
print(" Thread1: Acquired Lock A")
time.sleep(0.1)
# Trying to acquire lock B while holding A
with lock_b:
print(" Thread1: Acquired Lock B")
def thread2_bad():
"""Can cause deadlock"""
with lock_b:
print(" Thread2: Acquired Lock B")
time.sleep(0.1)
# Trying to acquire lock A while holding B
with lock_a:
print(" Thread2: Acquired Lock A")
print("\n❌ Deadlock scenario (NOT executed to avoid hanging):")
print(" Thread 1: locks A, waits for B")
print(" Thread 2: locks B, waits for A")
print(" → Both threads wait forever!")
# Good: Always acquire locks in same order
def thread1_good():
"""Deadlock-free version"""
with lock_a: # Always lock A first
print(" Thread1: Acquired Lock A")
time.sleep(0.1)
with lock_b: # Then lock B
print(" Thread1: Acquired Lock B")
print(" Thread1: ✓ Completed safely")
def thread2_good():
"""Deadlock-free version"""
with lock_a: # Always lock A first (same order!)
print(" Thread2: Acquired Lock A")
time.sleep(0.1)
with lock_b: # Then lock B
print(" Thread2: Acquired Lock B")
print(" Thread2: ✓ Completed safely")
print("\n✓ Deadlock-free execution (same lock order):\n")
t1 = threading.Thread(target=thread1_good)
t2 = threading.Thread(target=thread2_good)
t1.start()
t2.start()
t1.join()
t2.join()
print("\n💡 Deadlock prevention strategies:")
print(" 1. Always acquire locks in the same order")
print(" 2. Use timeout on lock acquisition")
print(" 3. Use a single lock instead of multiple")
print(" 4. Avoid nested locking when possible")
print(" 5. Use higher-level abstractions (Queue, etc.)")
print("\n" + "=" * 70 + "\n")
============================================================================¶
MAIN EXECUTION¶
============================================================================¶
def main(): """Run all thread safety demonstrations.""" print("\n" + "=" * 70) print(" " * 18 + "THREAD SAFETY") print(" " * 12 + "Synchronization and Race Conditions") print("=" * 70 + "\n")
# Beginner level
demonstrate_race_condition()
fixing_with_lock()
lock_with_context_manager()
# Intermediate level
reentrant_lock_example()
semaphore_resource_limiting()
event_signaling()
condition_variable_example()
# Advanced level
barrier_synchronization()
thread_safe_singleton_pattern()
deadlock_example_and_prevention()
print("\n" + "=" * 70)
print("Thread Safety Tutorial Complete!")
print("=" * 70)
print("\n💡 Key Takeaways:")
print("1. Always use locks for shared mutable state")
print("2. Use 'with lock:' for automatic release")
print("3. RLock for recursive/nested locking")
print("4. Semaphore for limiting concurrent access")
print("5. Event for simple signaling")
print("6. Condition for complex coordination")
print("7. Barrier for phase synchronization")
print("8. Prevent deadlocks with consistent lock ordering")
print("=" * 70 + "\n")
if name == "main": main() ```
Exercises¶
Exercise 1.
Create a BoundedSemaphore(3) that limits access to a simulated resource. Spawn 10 threads that each acquire the semaphore, print their worker ID and a timestamp, sleep for 0.5 seconds, then release. Verify (by inspecting timestamps) that no more than 3 workers are active simultaneously.
Solution to Exercise 1
```python
import threading
import time
sem = threading.BoundedSemaphore(3)
def worker(wid):
sem.acquire()
try:
print(f"Worker {wid} entered at {time.perf_counter():.2f}")
time.sleep(0.5)
print(f"Worker {wid} leaving at {time.perf_counter():.2f}")
finally:
sem.release()
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print("All workers done.")
```
Exercise 2.
Use a threading.Barrier(4) to coordinate 4 worker threads that each go through two phases: (1) "data loading" (random sleep 0.5-1.5s), barrier wait, (2) "processing" (random sleep 0.3-1.0s), barrier wait. Print start and finish messages for each phase so you can confirm all workers synchronize between phases.
Solution to Exercise 2
```python
import threading
import time
import random
barrier = threading.Barrier(4)
def worker(wid):
# Phase 1
print(f"Worker {wid}: loading data...")
time.sleep(random.uniform(0.5, 1.5))
print(f"Worker {wid}: data loaded, waiting at barrier")
barrier.wait()
print(f"Worker {wid}: Phase 1 complete for all")
# Phase 2
print(f"Worker {wid}: processing...")
time.sleep(random.uniform(0.3, 1.0))
print(f"Worker {wid}: processing done, waiting at barrier")
barrier.wait()
print(f"Worker {wid}: Phase 2 complete for all")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print("All phases done.")
```
Exercise 3.
Implement a thread-safe bounded buffer using threading.Condition. The buffer has a maximum size of 5. Write a producer that inserts 20 items and a consumer that removes 20 items. The producer waits when the buffer is full; the consumer waits when the buffer is empty. Print the buffer size after each operation and verify no items are lost.
Solution to Exercise 3
```python
import threading
import time
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.condition = threading.Condition()
def put(self, item):
with self.condition:
while len(self.buffer) >= self.capacity:
self.condition.wait()
self.buffer.append(item)
print(f" Produced {item}, size={len(self.buffer)}")
self.condition.notify()
def get(self):
with self.condition:
while len(self.buffer) == 0:
self.condition.wait()
item = self.buffer.pop(0)
print(f" Consumed {item}, size={len(self.buffer)}")
self.condition.notify()
return item
buf = BoundedBuffer(5)
def producer():
for i in range(20):
buf.put(i)
time.sleep(0.02)
def consumer():
for _ in range(20):
buf.get()
time.sleep(0.03)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Buffer empty: {len(buf.buffer) == 0}")
```