Thread Communication¶
Threads need to communicate and share data safely. This page covers patterns for passing data between threads.
Thread-Safe Queue¶
The queue module provides thread-safe queues — the recommended way for threads to communicate.
Basic Queue Usage¶
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
item = f"item-{i}"
q.put(item)
print(f"Produced: {item}")
time.sleep(0.5)
q.put(None) # Sentinel to signal completion
def consumer():
while True:
item = q.get() # Blocks until item available
if item is None:
break
print(f"Consumed: {item}")
q.task_done()
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Queue Methods¶
import queue
q = queue.Queue()
# Put items
q.put("item1") # Block until space available
q.put("item2", block=False) # Raise queue.Full if full
q.put("item3", timeout=1) # Raise queue.Full after timeout
# Get items
item = q.get() # Block until item available
item = q.get(block=False) # Raise queue.Empty if empty
item = q.get(timeout=1) # Raise queue.Empty after timeout
# Check state
q.empty() # True if empty (approximate)
q.full() # True if full (approximate)
q.qsize() # Approximate size
# Task tracking
q.task_done() # Mark task as complete
q.join() # Block until all tasks done
Queue with Size Limit¶
import threading
import queue
import time
# Limited capacity queue
q = queue.Queue(maxsize=3)
def producer():
for i in range(10):
print(f"Producing item-{i}...")
q.put(f"item-{i}") # Blocks when queue is full
print(f"Produced item-{i}")
def consumer():
time.sleep(2) # Slow consumer
while True:
try:
item = q.get(timeout=3)
print(f"Consumed: {item}")
time.sleep(0.5)
q.task_done()
except queue.Empty:
break
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()
Queue Types¶
FIFO Queue (Default)¶
import queue
q = queue.Queue() # First-In-First-Out
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
LIFO Queue (Stack)¶
import queue
q = queue.LifoQueue() # Last-In-First-Out
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
print(q.get()) # 2
print(q.get()) # 1
Priority Queue¶
import queue
q = queue.PriorityQueue()
q.put((3, "low priority"))
q.put((1, "high priority"))
q.put((2, "medium priority"))
print(q.get()) # (1, 'high priority')
print(q.get()) # (2, 'medium priority')
print(q.get()) # (3, 'low priority')
Producer-Consumer Pattern¶
Single Producer, Single Consumer¶
import threading
import queue
import time
def producer(q, num_items):
for i in range(num_items):
item = f"item-{i}"
q.put(item)
print(f"[Producer] Created {item}")
time.sleep(0.1)
q.put(None) # Sentinel
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"[Consumer] Processing {item}")
time.sleep(0.2)
q.task_done()
q = queue.Queue()
producer_t = threading.Thread(target=producer, args=(q, 10))
consumer_t = threading.Thread(target=consumer, args=(q,))
producer_t.start()
consumer_t.start()
producer_t.join()
consumer_t.join()
Multiple Producers, Multiple Consumers¶
import threading
import queue
import time
import random
def producer(q, producer_id, num_items):
for i in range(num_items):
item = f"P{producer_id}-item-{i}"
q.put(item)
print(f"[Producer {producer_id}] Created {item}")
time.sleep(random.uniform(0.05, 0.15))
def consumer(q, consumer_id, stop_event):
while not stop_event.is_set() or not q.empty():
try:
item = q.get(timeout=0.5)
print(f"[Consumer {consumer_id}] Processing {item}")
time.sleep(random.uniform(0.1, 0.3))
q.task_done()
except queue.Empty:
continue
q = queue.Queue()
stop_event = threading.Event()
# Start 3 producers
producers = []
for i in range(3):
t = threading.Thread(target=producer, args=(q, i, 5))
t.start()
producers.append(t)
# Start 2 consumers
consumers = []
for i in range(2):
t = threading.Thread(target=consumer, args=(q, i, stop_event))
t.start()
consumers.append(t)
# Wait for producers to finish
for p in producers:
p.join()
# Wait for queue to empty
q.join()
# Signal consumers to stop
stop_event.set()
# Wait for consumers
for c in consumers:
c.join()
print("All done!")
Worker Pool Pattern¶
Thread Pool with Queue¶
import threading
import queue
import time
class ThreadPool:
def __init__(self, num_workers):
self.tasks = queue.Queue()
self.results = queue.Queue()
self.workers = []
for _ in range(num_workers):
worker = threading.Thread(target=self._worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker(self):
while True:
func, args, kwargs = self.tasks.get()
if func is None:
break
try:
result = func(*args, **kwargs)
self.results.put(("success", result))
except Exception as e:
self.results.put(("error", e))
finally:
self.tasks.task_done()
def submit(self, func, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def wait(self):
self.tasks.join()
def get_results(self):
results = []
while not self.results.empty():
results.append(self.results.get())
return results
def shutdown(self):
for _ in self.workers:
self.tasks.put((None, None, None))
for w in self.workers:
w.join()
# Usage
def process(x):
time.sleep(0.1)
return x * 2
pool = ThreadPool(4)
for i in range(20):
pool.submit(process, i)
pool.wait()
results = pool.get_results()
print(results)
pool.shutdown()
Sharing Data with Locks¶
Thread-Safe Counter¶
import threading
class ThreadSafeCounter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
@property
def value(self):
with self._lock:
return self._value
counter = ThreadSafeCounter()
def worker():
for _ in range(10000):
counter.increment()
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter.value) # Always 100000
Thread-Safe Dictionary¶
import threading
class ThreadSafeDict:
def __init__(self):
self._dict = {}
self._lock = threading.RLock()
def __setitem__(self, key, value):
with self._lock:
self._dict[key] = value
def __getitem__(self, key):
with self._lock:
return self._dict[key]
def __contains__(self, key):
with self._lock:
return key in self._dict
def get(self, key, default=None):
with self._lock:
return self._dict.get(key, default)
def items(self):
with self._lock:
return list(self._dict.items())
# Usage
d = ThreadSafeDict()
def writer(writer_id):
for i in range(100):
d[f"key-{writer_id}-{i}"] = i
threads = [threading.Thread(target=writer, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Total items: {len(d.items())}") # 500
Thread-Local Data¶
threading.local() provides data that is specific to each thread.
import threading
import time
# Thread-local storage
local_data = threading.local()
def worker(name):
# Each thread gets its own 'name' attribute
local_data.name = name
# Simulate work
time.sleep(0.1)
# Access thread-local data
print(f"Thread {local_data.name}: processing")
time.sleep(0.1)
print(f"Thread {local_data.name}: done")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Worker-{i}",))
t.start()
threads.append(t)
for t in threads:
t.join()
Practical Example: Database Connection per Thread¶
import threading
class DatabaseConnection:
"""Simulated database connection."""
def __init__(self, thread_name):
self.thread_name = thread_name
print(f"Created connection for {thread_name}")
def query(self, sql):
return f"Result from {self.thread_name}"
# Thread-local connection
_connections = threading.local()
def get_connection():
"""Get or create connection for current thread."""
if not hasattr(_connections, 'conn'):
_connections.conn = DatabaseConnection(
threading.current_thread().name
)
return _connections.conn
def worker():
conn = get_connection() # Each thread gets its own connection
print(conn.query("SELECT * FROM users"))
conn2 = get_connection() # Same connection returned
print(f"Same connection: {conn is conn2}")
threads = []
for i in range(3):
t = threading.Thread(target=worker, name=f"Thread-{i}")
t.start()
threads.append(t)
for t in threads:
t.join()
Pipe Pattern with Queues¶
Chain multiple processing stages:
import threading
import queue
import time
def stage1(input_q, output_q):
"""Read raw data, output preprocessed."""
while True:
item = input_q.get()
if item is None:
output_q.put(None)
break
result = f"preprocessed({item})"
output_q.put(result)
input_q.task_done()
def stage2(input_q, output_q):
"""Process preprocessed data."""
while True:
item = input_q.get()
if item is None:
output_q.put(None)
break
result = f"processed({item})"
output_q.put(result)
input_q.task_done()
def stage3(input_q, results):
"""Final stage, collect results."""
while True:
item = input_q.get()
if item is None:
break
results.append(f"final({item})")
input_q.task_done()
# Create queues for pipeline
q1 = queue.Queue()
q2 = queue.Queue()
q3 = queue.Queue()
results = []
# Start pipeline stages
threading.Thread(target=stage1, args=(q1, q2)).start()
threading.Thread(target=stage2, args=(q2, q3)).start()
threading.Thread(target=stage3, args=(q3, results)).start()
# Feed input
for i in range(5):
q1.put(f"item-{i}")
q1.put(None) # Sentinel
# Wait for completion
time.sleep(1) # Allow pipeline to complete
print("Results:", results)
# ['final(processed(preprocessed(item-0)))', ...]
Summary: Communication Patterns¶
| Pattern | Use Case | Module |
|---|---|---|
| Queue | Producer-consumer, task distribution | queue.Queue |
| Priority Queue | Tasks with priorities | queue.PriorityQueue |
| Shared variable + Lock | Simple shared state | threading.Lock |
| Thread-local | Per-thread data (connections, context) | threading.local() |
| Event | Simple signaling | threading.Event |
| Condition | Complex waiting conditions | threading.Condition |
Key Takeaways¶
- Queue is the safest way for threads to communicate
- Use
q.put()/q.get()— they handle locking automatically - Use sentinel values (
None) to signal termination threading.local()for per-thread data (database connections, etc.)- Protect shared mutable data with locks
- The producer-consumer pattern scales well with multiple workers
q.join()waits for allq.task_done()calls to matchq.put()calls- Choose queue type based on ordering needs (FIFO, LIFO, Priority)
Runnable Example: bank_account_condition_example.py¶
"""
Thread Synchronization: Bank Account with Condition Variables
A practical example of thread coordination using threading.Condition.
Multiple depositor and withdrawer threads operate on a shared bank
account, with withdrawals waiting until sufficient funds are available.
Topics covered:
- threading.Condition for wait/notify coordination
- Producer-consumer pattern (depositors produce, withdrawers consume)
- threading.Lock for mutual exclusion
- ThreadPoolExecutor for managing thread pools
- Thread-safe balance updates
Based on concepts from Python-100-Days example21 and ch14/threading materials.
"""
import threading
from concurrent.futures import ThreadPoolExecutor
from random import randint
from time import sleep
# =============================================================================
# Example 1: Thread-Safe Bank Account
# =============================================================================
class BankAccount:
"""A bank account with thread-safe deposit and withdrawal.
Uses threading.Condition (which wraps a Lock) to coordinate:
- Deposits: add money and notify waiting withdrawals
- Withdrawals: wait until sufficient balance is available
"""
def __init__(self, initial_balance: float = 0.0):
self._balance = initial_balance
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
@property
def balance(self) -> float:
return self._balance
def deposit(self, amount: float) -> None:
"""Deposit money and notify any waiting withdrawals.
After depositing, notify_all() wakes up threads that
are waiting in withdraw() so they can re-check balance.
"""
with self._condition:
self._balance += amount
self._condition.notify_all() # Wake up waiting withdrawers
def withdraw(self, amount: float) -> None:
"""Withdraw money, waiting if balance is insufficient.
The while loop re-checks the condition after being notified,
because another thread might have consumed the funds first
(spurious wakeup protection).
"""
with self._condition:
while amount > self._balance:
self._condition.wait() # Release lock and wait
self._balance -= amount
# =============================================================================
# Example 2: Depositor and Withdrawer Workers
# =============================================================================
def depositor(account: BankAccount, name: str, rounds: int = 5) -> None:
"""Worker that makes random deposits."""
for i in range(rounds):
amount = randint(50, 200)
account.deposit(amount)
print(f" {name} deposited ${amount:>3} -> balance: ${account.balance:.0f}")
sleep(0.1)
def withdrawer(account: BankAccount, name: str, rounds: int = 3) -> None:
"""Worker that makes random withdrawals (waits if insufficient funds)."""
for i in range(rounds):
amount = randint(100, 300)
print(f" {name} wants to withdraw ${amount}...")
account.withdraw(amount)
print(f" {name} withdrew ${amount:>3} -> balance: ${account.balance:.0f}")
sleep(0.2)
# =============================================================================
# Example 3: Running the Simulation
# =============================================================================
def run_simulation():
"""Run the bank account simulation with multiple threads."""
print("=== Bank Account Thread Simulation ===")
print("3 depositors (5 rounds each) + 3 withdrawers (3 rounds each)")
print()
account = BankAccount(initial_balance=100)
print(f"Initial balance: ${account.balance:.0f}")
print()
with ThreadPoolExecutor(max_workers=6) as pool:
# Submit depositor tasks
for i in range(3):
pool.submit(depositor, account, f"Depositor-{i+1}")
# Submit withdrawer tasks
for i in range(3):
pool.submit(withdrawer, account, f"Withdrawer-{i+1}")
print(f"\nFinal balance: ${account.balance:.0f}")
print()
# =============================================================================
# Example 4: Understanding Condition vs Lock
# =============================================================================
def demo_condition_vs_lock():
"""Explain when to use Condition vs plain Lock."""
print("=== Condition vs Lock ===")
print("""
threading.Lock:
- Mutual exclusion only
- One thread at a time in critical section
- Use when: protecting shared data from concurrent access
threading.Condition:
- Mutual exclusion + wait/notify
- Threads can WAIT for a condition to become true
- Other threads NOTIFY when condition might have changed
- Use when: threads need to coordinate (producer-consumer)
Pattern:
# Waiting thread:
with condition:
while not ready: # Always use while (not if)
condition.wait() # Releases lock, blocks, reacquires lock
# ... proceed ...
# Notifying thread:
with condition:
# ... make changes ...
condition.notify_all() # Wake up waiting threads
""")
# =============================================================================
# Main
# =============================================================================
if __name__ == '__main__':
run_simulation()
demo_condition_vs_lock()