Thread Synchronization¶
When multiple threads access shared data, synchronization is essential to prevent race conditions and ensure data integrity.
The Problem: Race Conditions¶
Without Synchronization¶
import threading
counter = 0
def increment():
global counter
for _ in range(100_000):
counter += 1 # Not atomic!
# Create two threads
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Expected: 200000")
print(f"Actual: {counter}") # Often less than 200000!
Why It Happens¶
counter += 1 is not atomic — it's actually three operations:
# What looks like one operation:
counter += 1
# Is actually:
temp = counter # 1. Read current value
temp = temp + 1 # 2. Increment
counter = temp # 3. Write back
# Thread 1: Read counter (0)
# Thread 2: Read counter (0) ← Same value!
# Thread 1: Write counter (1)
# Thread 2: Write counter (1) ← Overwrites Thread 1's work!
Lock (Mutex)¶
A Lock ensures only one thread can execute a section of code at a time.
Basic Lock Usage¶
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100_000):
lock.acquire() # Get the lock
try:
counter += 1
finally:
lock.release() # Always release!
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Counter: {counter}") # Always 200000
Lock as Context Manager (Recommended)¶
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100_000):
with lock: # Automatically acquires and releases
counter += 1
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Counter: {counter}") # Always 200000
Non-Blocking Lock Acquisition¶
import threading
import time
lock = threading.Lock()
def try_acquire():
if lock.acquire(blocking=False):
try:
print(f"{threading.current_thread().name}: Got lock")
time.sleep(1)
finally:
lock.release()
else:
print(f"{threading.current_thread().name}: Lock busy, skipping")
# With timeout
def try_acquire_timeout():
if lock.acquire(timeout=0.5):
try:
print(f"{threading.current_thread().name}: Got lock")
time.sleep(1)
finally:
lock.release()
else:
print(f"{threading.current_thread().name}: Timeout")
RLock (Reentrant Lock)¶
A regular Lock cannot be acquired twice by the same thread. RLock allows the same thread to acquire it multiple times.
The Problem with Regular Lock¶
import threading
lock = threading.Lock()
def outer():
with lock:
print("Outer acquired lock")
inner() # Deadlock! Lock already held
def inner():
with lock: # Blocks forever waiting for lock
print("Inner acquired lock")
# This will deadlock with regular Lock
RLock Solution¶
import threading
rlock = threading.RLock()
def outer():
with rlock:
print("Outer acquired lock")
inner()
def inner():
with rlock: # Same thread can acquire again
print("Inner acquired lock")
outer() # Works!
# Output:
# Outer acquired lock
# Inner acquired lock
When to Use RLock¶
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.RLock() # Use RLock for recursive access
def deposit(self, amount):
with self.lock:
self.balance += amount
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
return True
return False
def transfer(self, other, amount):
with self.lock: # First acquisition
if self.withdraw(amount): # Second acquisition (same lock)
other.deposit(amount)
return True
return False
Semaphore¶
A Semaphore allows a limited number of threads to access a resource simultaneously.
Basic Semaphore¶
import threading
import time
# Allow max 3 concurrent accesses
semaphore = threading.Semaphore(3)
def worker(worker_id):
print(f"Worker {worker_id}: Waiting for semaphore")
with semaphore:
print(f"Worker {worker_id}: Acquired semaphore")
time.sleep(2) # Simulate work
print(f"Worker {worker_id}: Releasing semaphore")
# Start 10 workers, but only 3 run at a time
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
Practical Example: Connection Pool¶
import threading
import time
import random
class ConnectionPool:
def __init__(self, max_connections):
self.semaphore = threading.Semaphore(max_connections)
self.connections = [f"conn_{i}" for i in range(max_connections)]
self.lock = threading.Lock()
def get_connection(self):
self.semaphore.acquire()
with self.lock:
conn = self.connections.pop()
return conn
def release_connection(self, conn):
with self.lock:
self.connections.append(conn)
self.semaphore.release()
pool = ConnectionPool(3)
def database_query(query_id):
conn = pool.get_connection()
try:
print(f"Query {query_id}: Using {conn}")
time.sleep(random.uniform(0.5, 1.5))
finally:
pool.release_connection(conn)
print(f"Query {query_id}: Released {conn}")
threads = [threading.Thread(target=database_query, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
BoundedSemaphore¶
Raises error if released more times than acquired:
import threading
# Regular semaphore: no error on extra release
sem = threading.Semaphore(2)
sem.release()
sem.release()
sem.release() # No error, but counter is now 5!
# BoundedSemaphore: catches bugs
bsem = threading.BoundedSemaphore(2)
bsem.release() # ValueError: Semaphore released too many times
Event¶
An Event is a simple flag that threads can wait on.
Basic Event Usage¶
import threading
import time
event = threading.Event()
def waiter(name):
print(f"{name}: Waiting for event...")
event.wait() # Block until event is set
print(f"{name}: Event received!")
def setter():
print("Setter: Preparing...")
time.sleep(2)
print("Setter: Setting event")
event.set()
# Start waiters
for i in range(3):
threading.Thread(target=waiter, args=(f"Waiter-{i}",)).start()
# Start setter
threading.Thread(target=setter).start()
Event Methods¶
import threading
event = threading.Event()
# Check if set
print(event.is_set()) # False
# Wait with timeout
result = event.wait(timeout=1.0) # Returns True if set, False on timeout
# Set the event (wake all waiting threads)
event.set()
print(event.is_set()) # True
# Clear the event
event.clear()
print(event.is_set()) # False
Practical Example: Startup Coordination¶
import threading
import time
db_ready = threading.Event()
cache_ready = threading.Event()
def init_database():
print("Database: Initializing...")
time.sleep(2)
print("Database: Ready")
db_ready.set()
def init_cache():
print("Cache: Waiting for database...")
db_ready.wait() # Cache depends on database
print("Cache: Initializing...")
time.sleep(1)
print("Cache: Ready")
cache_ready.set()
def main_app():
print("App: Waiting for all services...")
db_ready.wait()
cache_ready.wait()
print("App: All services ready, starting...")
threading.Thread(target=init_database).start()
threading.Thread(target=init_cache).start()
threading.Thread(target=main_app).start()
Condition¶
A Condition combines a lock with the ability to wait for a condition to be true.
Basic Condition Usage¶
import threading
import time
condition = threading.Condition()
items = []
def producer():
for i in range(5):
time.sleep(0.5)
with condition:
items.append(f"item-{i}")
print(f"Produced: item-{i}")
condition.notify() # Wake up one waiting consumer
def consumer():
while True:
with condition:
while not items: # Wait until items available
print("Consumer: Waiting...")
condition.wait()
item = items.pop(0)
print(f"Consumed: {item}")
if item == "item-4":
break
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Producer-Consumer Pattern¶
import threading
import time
import random
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.condition = threading.Condition()
def put(self, item):
with self.condition:
while len(self.buffer) >= self.capacity:
print(f"Buffer full, producer waiting...")
self.condition.wait()
self.buffer.append(item)
print(f"Produced: {item}, buffer size: {len(self.buffer)}")
self.condition.notify_all()
def get(self):
with self.condition:
while len(self.buffer) == 0:
print(f"Buffer empty, consumer waiting...")
self.condition.wait()
item = self.buffer.pop(0)
print(f"Consumed: {item}, buffer size: {len(self.buffer)}")
self.condition.notify_all()
return item
buffer = BoundedBuffer(capacity=3)
def producer(producer_id):
for i in range(5):
time.sleep(random.uniform(0.1, 0.5))
buffer.put(f"P{producer_id}-{i}")
def consumer(consumer_id):
for _ in range(5):
time.sleep(random.uniform(0.1, 0.5))
buffer.get()
# 2 producers, 2 consumers
threads = []
for i in range(2):
threads.append(threading.Thread(target=producer, args=(i,)))
threads.append(threading.Thread(target=consumer, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()
Barrier¶
A Barrier synchronizes a fixed number of threads at a certain point.
import threading
import time
import random
barrier = threading.Barrier(3)
def worker(worker_id):
# Phase 1: Each thread does independent work
work_time = random.uniform(0.5, 2.0)
print(f"Worker {worker_id}: Working for {work_time:.1f}s")
time.sleep(work_time)
# Synchronization point
print(f"Worker {worker_id}: Reached barrier, waiting...")
barrier.wait() # All threads must reach here before any continue
# Phase 2: Continue after all threads synchronized
print(f"Worker {worker_id}: Continuing after barrier")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
Summary: Synchronization Primitives¶
| Primitive | Purpose | Use Case |
|---|---|---|
| Lock | Mutual exclusion | Protect shared data |
| RLock | Reentrant mutual exclusion | Recursive functions with locks |
| Semaphore | Limit concurrent access | Connection pools, rate limiting |
| Event | Simple signaling | Start/stop signals, coordination |
| Condition | Wait for complex conditions | Producer-consumer, state changes |
| Barrier | Synchronize multiple threads | Phased computation |
Key Takeaways¶
- Always use synchronization when multiple threads access shared mutable data
- Use
with lock:syntax for automatic acquisition and release Lockfor simple mutual exclusion,RLockfor recursive accessSemaphoreto limit concurrent resource accessEventfor simple signaling between threadsConditionfor complex waiting conditions (producer-consumer)Barrierto synchronize threads at specific points- Avoid holding locks longer than necessary to prevent contention
Runnable Example: thread_safety_tutorial.py¶
"""
Topic 45.5 - Thread Safety and Synchronization
Complete guide to making your code thread-safe using locks, semaphores,
events, and other synchronization primitives.
Learning Objectives:
- Understand race conditions
- Use Locks and RLocks for mutual exclusion
- Apply Semaphores for resource limiting
- Use Events for signaling
- Implement Conditions for complex coordination
- Understand Barriers for synchronization points
- Thread-safe data structures
Author: Python Educator
Date: 2024
"""
import threading
import time
import random
from queue import Queue
from threading import Lock, RLock, Semaphore, Event, Condition, Barrier
# ============================================================================
# PART 1: BEGINNER - Race Conditions and Locks
# ============================================================================
def demonstrate_race_condition():
"""
Show what happens WITHOUT thread safety - race conditions occur!
"""
print("=" * 70)
print("BEGINNER: Understanding Race Conditions")
print("=" * 70)
# Shared counter (NOT thread-safe!)
counter = 0
def increment_counter():
"""Increment shared counter 100,000 times"""
nonlocal counter
for _ in range(100000):
# This is NOT atomic! It's actually:
# 1. Read counter
# 2. Add 1
# 3. Write back
# Threads can interleave between these steps!
counter += 1
print("\n❌ WITHOUT synchronization:")
print(" Running 5 threads, each incrementing counter 100,000 times")
print(" Expected final value: 500,000\n")
# Reset counter
counter = 0
# Create threads
threads = []
for _ in range(5):
thread = threading.Thread(target=increment_counter)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f" Actual final value: {counter}")
if counter != 500000:
print(f" ⚠️ Lost updates! {500000 - counter} increments were lost!")
print(" This is a RACE CONDITION - multiple threads racing to update")
else:
print(" ✓ Got lucky this time, but this is still unsafe!")
print("\n💡 What went wrong:")
print(" Thread A: reads 100, adds 1, writes 101")
print(" Thread B: reads 100 (before A wrote!), adds 1, writes 101")
print(" Result: Only increased by 1 instead of 2!")
print("\n" + "=" * 70 + "\n")
def fixing_with_lock():
"""
Fix race conditions using a Lock for mutual exclusion.
"""
print("=" * 70)
print("BEGINNER: Fixing Race Conditions with Lock")
print("=" * 70)
# Shared counter with lock
counter = 0
counter_lock = Lock() # Create a lock
def increment_counter_safe():
"""Increment counter safely using lock"""
nonlocal counter
for _ in range(100000):
# Acquire lock before accessing shared resource
counter_lock.acquire()
try:
counter += 1 # Only one thread can be here at a time
finally:
# ALWAYS release the lock
counter_lock.release()
print("\n✓ WITH Lock synchronization:")
print(" Running 5 threads, each incrementing counter 100,000 times")
print(" Expected final value: 500,000\n")
# Create threads
threads = []
for _ in range(5):
thread = threading.Thread(target=increment_counter_safe)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f" Final value: {counter}")
if counter == 500000:
print(" ✓ Perfect! Lock prevented race conditions")
print("\n💡 How Lock works:")
print(" • Only ONE thread can hold the lock at a time")
print(" • Other threads WAIT until lock is released")
print(" • Ensures mutual exclusion (mutex)")
print("\n" + "=" * 70 + "\n")
def lock_with_context_manager():
"""
Use 'with' statement for automatic lock management.
This is the recommended way to use locks!
"""
print("=" * 70)
print("BEGINNER: Lock with Context Manager (Best Practice)")
print("=" * 70)
# Shared resource
balance = 1000
balance_lock = Lock()
def withdraw_money(amount):
"""Safely withdraw money from balance"""
nonlocal balance
# Context manager automatically acquires and releases lock
with balance_lock:
# Critical section
if balance >= amount:
print(f" [{threading.current_thread().name}] "
f"Withdrawing ${amount}")
time.sleep(0.1) # Simulate processing
balance -= amount
print(f" [{threading.current_thread().name}] "
f"Remaining: ${balance}")
return True
else:
print(f" [{threading.current_thread().name}] "
f"Insufficient funds!")
return False
# Lock automatically released here!
print(f"\n💰 Initial balance: ${balance}")
print(" Multiple threads trying to withdraw...\n")
# Multiple threads try to withdraw
threads = []
for i in range(5):
thread = threading.Thread(
target=withdraw_money,
args=(300,),
name=f"Customer-{i+1}"
)
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print(f"\n💰 Final balance: ${balance}")
print("\n💡 Context Manager Benefits:")
print(" ✓ Lock automatically released (even if exception occurs)")
print(" ✓ Cleaner, more readable code")
print(" ✓ Can't forget to release lock")
print(" ✓ ALWAYS use 'with lock:' instead of acquire/release")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# PART 2: INTERMEDIATE - Advanced Synchronization Primitives
# ============================================================================
def reentrant_lock_example():
"""
RLock (Reentrant Lock) can be acquired multiple times by same thread.
Useful for recursive functions or nested lock acquisitions.
"""
print("=" * 70)
print("INTERMEDIATE: RLock (Reentrant Lock)")
print("=" * 70)
# Regular Lock vs RLock
regular_lock = Lock()
rlock = RLock()
print("\n📝 Regular Lock - DEADLOCK if acquired twice:")
def bad_recursive(n):
"""This will deadlock with regular Lock!"""
with regular_lock:
if n > 0:
# Can't acquire lock again - deadlock!
# Uncomment to see deadlock:
# with regular_lock:
# bad_recursive(n - 1)
pass
print(" (Commented out to avoid deadlock)")
print("\n📝 RLock - OK to acquire multiple times:")
def good_recursive(n):
"""Works fine with RLock"""
with rlock:
if n > 0:
print(f" Recursion level {n} (lock acquired {4-n+1} times)")
good_recursive(n - 1)
good_recursive(3)
print("\n💡 When to use RLock:")
print(" ✓ Recursive functions that need locking")
print(" ✓ Methods that call other locked methods")
print(" ✓ Same thread needs to acquire lock multiple times")
print(" ✗ Slightly slower than regular Lock")
print("\n" + "=" * 70 + "\n")
def semaphore_resource_limiting():
"""
Semaphore limits concurrent access to a resource.
Like a lock, but allows N threads instead of just 1.
"""
print("=" * 70)
print("INTERMEDIATE: Semaphore for Resource Limiting")
print("=" * 70)
# Allow max 3 concurrent database connections
db_connections = Semaphore(3)
def access_database(user_id):
"""
Simulate database access with limited connections.
Args:
user_id: User identifier
"""
print(f" User {user_id}: Requesting database access...")
with db_connections: # Acquire one permit
print(f" User {user_id}: ✓ Connected to database")
time.sleep(random.uniform(1.0, 2.0)) # Simulate query
print(f" User {user_id}: Disconnected")
# Permit automatically released
print("\n⚙️ 10 users trying to access database")
print(" (Only 3 concurrent connections allowed)\n")
threads = []
for i in range(10):
thread = threading.Thread(target=access_database, args=(i+1,))
threads.append(thread)
thread.start()
time.sleep(0.2) # Stagger requests
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Semaphore use cases:")
print(" ✓ Limit concurrent API calls")
print(" ✓ Database connection pool")
print(" ✓ Rate limiting")
print(" ✓ Resource pool management")
print("\n" + "=" * 70 + "\n")
def event_signaling():
"""
Event allows threads to signal each other.
One thread waits for an event, another thread sets it.
"""
print("=" * 70)
print("INTERMEDIATE: Event for Thread Signaling")
print("=" * 70)
# Event starts as "not set"
ready_event = Event()
data_ready = Event()
def producer():
"""Produce data and signal when ready"""
print(" [Producer] Preparing data...")
time.sleep(2) # Simulate preparation
print(" [Producer] Data ready! Signaling consumers...")
data_ready.set() # Signal that data is ready
# Wait for consumers to be ready for next batch
ready_event.wait()
print(" [Producer] Consumers ready for next batch")
def consumer(consumer_id):
"""Wait for data, then process it"""
print(f" [Consumer {consumer_id}] Waiting for data...")
# Wait for data_ready event
data_ready.wait()
print(f" [Consumer {consumer_id}] Got data! Processing...")
time.sleep(1)
print(f" [Consumer {consumer_id}] Done processing")
# Signal ready for next batch
if consumer_id == 3: # Last consumer signals
ready_event.set()
print("\n⚙️ Starting producer-consumer with Event:\n")
# Start producer
prod_thread = threading.Thread(target=producer)
prod_thread.start()
# Start consumers
cons_threads = []
for i in range(3):
thread = threading.Thread(target=consumer, args=(i+1,))
cons_threads.append(thread)
thread.start()
# Wait for all
prod_thread.join()
for thread in cons_threads:
thread.join()
print("\n💡 Event methods:")
print(" • set() - Signal the event (wake waiters)")
print(" • clear() - Reset the event")
print(" • wait() - Block until event is set")
print(" • is_set() - Check if event is set")
print("\n" + "=" * 70 + "\n")
def condition_variable_example():
"""
Condition allows complex wait conditions with notify.
More powerful than Event for producer-consumer patterns.
"""
print("=" * 70)
print("INTERMEDIATE: Condition for Complex Coordination")
print("=" * 70)
# Shared data with condition
buffer = []
max_size = 5
condition = Condition()
def producer(items):
"""Produce items when buffer has space"""
for item in items:
with condition:
# Wait while buffer is full
while len(buffer) >= max_size:
print(f" [Producer] Buffer full, waiting...")
condition.wait() # Release lock and wait
# Add item
buffer.append(item)
print(f" [Producer] Added {item}. Buffer: {len(buffer)}")
# Notify consumers
condition.notify() # Wake one waiting thread
print(" [Producer] Finished")
def consumer(num_items):
"""Consume items when buffer has data"""
consumed = 0
while consumed < num_items:
with condition:
# Wait while buffer is empty
while len(buffer) == 0:
print(f" [Consumer] Buffer empty, waiting...")
condition.wait()
# Remove item
item = buffer.pop(0)
consumed += 1
print(f" [Consumer] Got {item}. Buffer: {len(buffer)}")
# Notify producers
condition.notify() # Wake one waiting thread
print(" [Consumer] Finished")
print("\n⚙️ Producer-Consumer with Condition:\n")
# Start threads
items_to_produce = [f"Item-{i}" for i in range(15)]
prod = threading.Thread(target=producer, args=(items_to_produce,))
cons = threading.Thread(target=consumer, args=(len(items_to_produce),))
cons.start()
time.sleep(0.5) # Let consumer start waiting
prod.start()
prod.join()
cons.join()
print("\n💡 Condition advantages:")
print(" ✓ Efficient waiting (no busy polling)")
print(" ✓ notify() to wake specific waiters")
print(" ✓ notify_all() to wake all waiters")
print(" ✓ Perfect for producer-consumer with bounded buffer")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# PART 3: ADVANCED - Complex Synchronization Patterns
# ============================================================================
def barrier_synchronization():
"""
Barrier ensures all threads reach a certain point before continuing.
Useful for phases of parallel algorithms.
"""
print("=" * 70)
print("ADVANCED: Barrier for Synchronization Points")
print("=" * 70)
num_workers = 4
barrier = Barrier(num_workers)
def parallel_phase_worker(worker_id):
"""
Worker that goes through multiple synchronized phases.
Args:
worker_id: Worker identifier
"""
# Phase 1: Data loading
print(f" [Worker {worker_id}] Phase 1: Loading data...")
time.sleep(random.uniform(0.5, 1.5))
print(f" [Worker {worker_id}] Data loaded")
# Wait for all workers to finish Phase 1
barrier.wait()
print(f" [Worker {worker_id}] ✓ Phase 1 complete for all")
# Phase 2: Processing
print(f" [Worker {worker_id}] Phase 2: Processing...")
time.sleep(random.uniform(0.5, 1.5))
print(f" [Worker {worker_id}] Processing done")
# Wait for all workers to finish Phase 2
barrier.wait()
print(f" [Worker {worker_id}] ✓ Phase 2 complete for all")
# Phase 3: Saving results
print(f" [Worker {worker_id}] Phase 3: Saving results...")
time.sleep(random.uniform(0.3, 0.8))
print(f" [Worker {worker_id}] Results saved")
# Final barrier
barrier.wait()
print(f" [Worker {worker_id}] ✓ All phases complete!")
print(f"\n⚙️ Running {num_workers} workers through 3 phases:\n")
threads = []
for i in range(num_workers):
thread = threading.Thread(target=parallel_phase_worker, args=(i+1,))
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Barrier use cases:")
print(" ✓ Parallel algorithms with phases")
print(" ✓ Map-reduce operations")
print(" ✓ Distributed computing checkpoints")
print(" ✓ Game loop synchronization")
print("\n" + "=" * 70 + "\n")
def thread_safe_singleton_pattern():
"""
Implement thread-safe Singleton pattern with double-checked locking.
"""
print("=" * 70)
print("ADVANCED: Thread-Safe Singleton Pattern")
print("=" * 70)
class DatabaseConnection:
"""Thread-safe singleton database connection"""
_instance = None
_lock = Lock()
def __new__(cls):
# First check (without lock for performance)
if cls._instance is None:
# Acquire lock
with cls._lock:
# Second check (with lock for safety)
if cls._instance is None:
print(" Creating NEW database connection...")
time.sleep(0.5) # Simulate slow initialization
cls._instance = super().__new__(cls)
cls._instance.connection_id = random.randint(1000, 9999)
return cls._instance
def query(self, sql):
"""Execute a query"""
return f"Query result from connection {self.connection_id}"
def worker(worker_id):
"""Worker that gets database connection"""
print(f" [Worker {worker_id}] Getting database connection...")
db = DatabaseConnection()
print(f" [Worker {worker_id}] Got connection: {db.connection_id}")
# Use connection
result = db.query("SELECT * FROM users")
print(f" [Worker {worker_id}] {result}")
print("\n⚙️ Multiple threads requesting singleton:\n")
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(i+1,))
threads.append(thread)
thread.start()
# Wait for all
for thread in threads:
thread.join()
print("\n💡 Double-checked locking:")
print(" ✓ First check avoids lock overhead")
print(" ✓ Second check ensures thread safety")
print(" ✓ Only one instance created despite concurrent access")
print("\n" + "=" * 70 + "\n")
def deadlock_example_and_prevention():
"""
Demonstrate deadlock and how to prevent it.
"""
print("=" * 70)
print("ADVANCED: Deadlock and Prevention")
print("=" * 70)
lock_a = Lock()
lock_b = Lock()
def thread1_bad():
"""Can cause deadlock"""
with lock_a:
print(" Thread1: Acquired Lock A")
time.sleep(0.1)
# Trying to acquire lock B while holding A
with lock_b:
print(" Thread1: Acquired Lock B")
def thread2_bad():
"""Can cause deadlock"""
with lock_b:
print(" Thread2: Acquired Lock B")
time.sleep(0.1)
# Trying to acquire lock A while holding B
with lock_a:
print(" Thread2: Acquired Lock A")
print("\n❌ Deadlock scenario (NOT executed to avoid hanging):")
print(" Thread 1: locks A, waits for B")
print(" Thread 2: locks B, waits for A")
print(" → Both threads wait forever!")
# Good: Always acquire locks in same order
def thread1_good():
"""Deadlock-free version"""
with lock_a: # Always lock A first
print(" Thread1: Acquired Lock A")
time.sleep(0.1)
with lock_b: # Then lock B
print(" Thread1: Acquired Lock B")
print(" Thread1: ✓ Completed safely")
def thread2_good():
"""Deadlock-free version"""
with lock_a: # Always lock A first (same order!)
print(" Thread2: Acquired Lock A")
time.sleep(0.1)
with lock_b: # Then lock B
print(" Thread2: Acquired Lock B")
print(" Thread2: ✓ Completed safely")
print("\n✓ Deadlock-free execution (same lock order):\n")
t1 = threading.Thread(target=thread1_good)
t2 = threading.Thread(target=thread2_good)
t1.start()
t2.start()
t1.join()
t2.join()
print("\n💡 Deadlock prevention strategies:")
print(" 1. Always acquire locks in the same order")
print(" 2. Use timeout on lock acquisition")
print(" 3. Use a single lock instead of multiple")
print(" 4. Avoid nested locking when possible")
print(" 5. Use higher-level abstractions (Queue, etc.)")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# MAIN EXECUTION
# ============================================================================
def main():
"""Run all thread safety demonstrations."""
print("\n" + "=" * 70)
print(" " * 18 + "THREAD SAFETY")
print(" " * 12 + "Synchronization and Race Conditions")
print("=" * 70 + "\n")
# Beginner level
demonstrate_race_condition()
fixing_with_lock()
lock_with_context_manager()
# Intermediate level
reentrant_lock_example()
semaphore_resource_limiting()
event_signaling()
condition_variable_example()
# Advanced level
barrier_synchronization()
thread_safe_singleton_pattern()
deadlock_example_and_prevention()
print("\n" + "=" * 70)
print("Thread Safety Tutorial Complete!")
print("=" * 70)
print("\n💡 Key Takeaways:")
print("1. Always use locks for shared mutable state")
print("2. Use 'with lock:' for automatic release")
print("3. RLock for recursive/nested locking")
print("4. Semaphore for limiting concurrent access")
print("5. Event for simple signaling")
print("6. Condition for complex coordination")
print("7. Barrier for phase synchronization")
print("8. Prevent deadlocks with consistent lock ordering")
print("=" * 70 + "\n")
if __name__ == "__main__":
main()