Common Concurrency Patterns¶
Practical patterns for concurrent and parallel programming in Python.
Mental Model
Concurrency patterns are battle-tested templates for organizing parallel work. Parallel map distributes independent items across workers, producer-consumer decouples generation from processing via a queue, and fan-out/fan-in splits a job into subtasks then merges results. Knowing which pattern fits your problem is half the solution.
Pattern 1: Parallel Map¶
Apply a function to each item in a collection:
```python from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def process(item): return item ** 2
items = list(range(100))
I/O-bound: use threads¶
with ThreadPoolExecutor() as executor: results = list(executor.map(process, items))
CPU-bound: use processes¶
with ProcessPoolExecutor() as executor: results = list(executor.map(process, items)) ```
Pattern 2: Producer-Consumer¶
Separate data production from consumption:
```python from concurrent.futures import ThreadPoolExecutor import queue import threading
def producer(q, items): """Produce items into queue.""" for item in items: q.put(item) q.put(None) # Sentinel
def consumer(q, process_func): """Consume items from queue.""" results = [] while True: item = q.get() if item is None: break results.append(process_func(item)) return results
Using queue for decoupling¶
q = queue.Queue(maxsize=10) # Bounded queue
Start producer and consumer¶
with ThreadPoolExecutor(max_workers=2) as executor: producer_future = executor.submit(producer, q, range(100)) consumer_future = executor.submit(consumer, q, lambda x: x ** 2)
producer_future.result()
results = consumer_future.result()
```
Pattern 3: Pipeline¶
Chain processing stages:
```python from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import queue
def stage1_fetch(urls): """I/O-bound: fetch data.""" with ThreadPoolExecutor(max_workers=10) as executor: return list(executor.map(fetch_url, urls))
def stage2_process(data_list): """CPU-bound: process data.""" with ProcessPoolExecutor() as executor: return list(executor.map(process_data, data_list))
def stage3_save(results): """I/O-bound: save results.""" with ThreadPoolExecutor(max_workers=5) as executor: return list(executor.map(save_result, results))
Run pipeline¶
urls = [...] raw_data = stage1_fetch(urls) processed = stage2_process(raw_data) saved = stage3_save(processed) ```
Pattern 4: Fan-Out / Fan-In¶
Distribute work, then aggregate results:
```python from concurrent.futures import ThreadPoolExecutor, as_completed
def fan_out_fan_in(items, process_func, aggregate_func, max_workers=10): """ Fan-out: Process items concurrently Fan-in: Aggregate all results """ with ThreadPoolExecutor(max_workers=max_workers) as executor: # Fan-out: submit all tasks futures = [executor.submit(process_func, item) for item in items]
# Fan-in: collect results
results = []
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
print(f"Task failed: {e}")
# Aggregate
return aggregate_func(results)
Usage¶
urls = [...] total_bytes = fan_out_fan_in( urls, process_func=lambda url: len(requests.get(url).content), aggregate_func=sum ) ```
Pattern 5: Worker Pool with Results¶
Manage workers and collect results:
```python from concurrent.futures import ThreadPoolExecutor, as_completed
def worker_pool_with_results(tasks, worker_func, max_workers=10): """Execute tasks and return results with task info.""" results = {} errors = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks with tracking
future_to_task = {
executor.submit(worker_func, task): task
for task in tasks
}
# Collect results
for future in as_completed(future_to_task):
task = future_to_task[future]
try:
results[task] = future.result()
except Exception as e:
errors[task] = str(e)
return results, errors
Usage¶
def fetch(url): return requests.get(url).status_code
results, errors = worker_pool_with_results(urls, fetch) print(f"Successes: {len(results)}, Failures: {len(errors)}") ```
Pattern 6: Timeout with Fallback¶
Handle slow tasks gracefully:
```python from concurrent.futures import ThreadPoolExecutor, TimeoutError
def with_timeout(func, args=(), timeout=5, fallback=None): """Execute function with timeout and fallback.""" with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args) try: return future.result(timeout=timeout) except TimeoutError: future.cancel() return fallback
Usage¶
result = with_timeout( slow_api_call, args=(url,), timeout=5, fallback={"error": "timeout"} ) ```
Pattern 7: Rate Limiting¶
Control request rate:
```python from concurrent.futures import ThreadPoolExecutor import time import threading
class RateLimiter: def init(self, calls_per_second): self.delay = 1.0 / calls_per_second self.lock = threading.Lock() self.last_call = 0
def wait(self):
with self.lock:
now = time.time()
wait_time = self.last_call + self.delay - now
if wait_time > 0:
time.sleep(wait_time)
self.last_call = time.time()
def rate_limited_fetch(rate_limiter, url): rate_limiter.wait() return requests.get(url)
Usage: 10 requests per second max¶
limiter = RateLimiter(calls_per_second=10)
with ThreadPoolExecutor(max_workers=20) as executor: futures = [ executor.submit(rate_limited_fetch, limiter, url) for url in urls ] results = [f.result() for f in futures] ```
Pattern 8: Retry with Backoff¶
Retry failed tasks with exponential backoff:
```python from concurrent.futures import ThreadPoolExecutor import time import random
def retry_with_backoff(func, max_retries=3, base_delay=1): """Retry function with exponential backoff.""" for attempt in range(max_retries): try: return func() except Exception as e: if attempt == max_retries - 1: raise delay = base_delay * (2 ** attempt) + random.uniform(0, 1) print(f"Attempt {attempt + 1} failed, retrying in {delay:.1f}s") time.sleep(delay)
def robust_fetch(url): return retry_with_backoff(lambda: requests.get(url, timeout=5))
With executor¶
with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(robust_fetch, url) for url in urls] results = [f.result() for f in futures] ```
Pattern 9: Progress Tracking¶
Track completion progress:
```python from concurrent.futures import ThreadPoolExecutor, as_completed import threading
class ProgressTracker: def init(self, total): self.total = total self.completed = 0 self.lock = threading.Lock()
def update(self):
with self.lock:
self.completed += 1
percent = 100 * self.completed / self.total
print(f"\rProgress: {self.completed}/{self.total} ({percent:.0f}%)", end="")
def finish(self):
print() # New line
def process_with_progress(items, func, max_workers=10): tracker = ProgressTracker(len(items))
def wrapped(item):
result = func(item)
tracker.update()
return result
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(wrapped, items))
tracker.finish()
return results
Usage¶
results = process_with_progress(urls, fetch_url) ```
Pattern 10: Batch Processing¶
Process items in batches:
```python from concurrent.futures import ProcessPoolExecutor
def batch_process(items, func, batch_size=100, max_workers=None): """Process items in batches for better efficiency.""" def process_batch(batch): return [func(item) for item in batch]
# Create batches
batches = [
items[i:i + batch_size]
for i in range(0, len(items), batch_size)
]
# Process batches in parallel
with ProcessPoolExecutor(max_workers=max_workers) as executor:
batch_results = list(executor.map(process_batch, batches))
# Flatten results
return [item for batch in batch_results for item in batch]
Usage¶
results = batch_process(large_list, expensive_func, batch_size=1000) ```
Pattern 11: Graceful Shutdown¶
Handle shutdown cleanly:
```python from concurrent.futures import ThreadPoolExecutor import signal import threading
class GracefulExecutor: def init(self, max_workers=10): self.executor = ThreadPoolExecutor(max_workers=max_workers) self.shutdown_event = threading.Event()
# Handle Ctrl+C
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
print("\nShutdown requested...")
self.shutdown_event.set()
self.executor.shutdown(wait=False, cancel_futures=True)
def submit(self, func, *args):
if self.shutdown_event.is_set():
return None
return self.executor.submit(func, *args)
def map(self, func, items):
futures = []
for item in items:
if self.shutdown_event.is_set():
break
futures.append(self.executor.submit(func, item))
return [f.result() for f in futures if f and not f.cancelled()]
def shutdown(self):
self.executor.shutdown(wait=True)
Usage¶
executor = GracefulExecutor() try: results = executor.map(slow_task, items) finally: executor.shutdown() ```
Pattern 12: Resource Pool¶
Manage limited resources:
```python from concurrent.futures import ThreadPoolExecutor import threading
class ResourcePool: def init(self, resources): self.resources = list(resources) self.semaphore = threading.Semaphore(len(resources)) self.lock = threading.Lock()
def acquire(self):
self.semaphore.acquire()
with self.lock:
return self.resources.pop()
def release(self, resource):
with self.lock:
self.resources.append(resource)
self.semaphore.release()
def use_pooled_resource(pool, task_func, args): """Use a resource from pool for task.""" resource = pool.acquire() try: return task_func(resource, args) finally: pool.release(resource)
Usage: Database connection pool¶
connections = [create_connection() for _ in range(5)] pool = ResourcePool(connections)
def query(conn, sql): return conn.execute(sql)
with ThreadPoolExecutor(max_workers=20) as executor: futures = [ executor.submit(use_pooled_resource, pool, query, sql) for sql in queries ] results = [f.result() for f in futures] ```
Key Takeaways¶
- Parallel Map: Simplest pattern for independent tasks
- Producer-Consumer: Decouple production from processing
- Pipeline: Chain stages with appropriate executors
- Fan-Out/Fan-In: Distribute work, aggregate results
- Timeout/Retry: Handle failures gracefully
- Rate Limiting: Control resource usage
- Progress Tracking: Monitor long-running jobs
- Batch Processing: Reduce overhead for many small tasks
- Graceful Shutdown: Clean termination handling
- Resource Pool: Manage limited resources safely
Exercises¶
Exercise 1.
Implement a fan-out/fan-in pattern using ThreadPoolExecutor. Submit 10 tasks that each compute the sum of squares up to a given number. Use as_completed() to collect results, then aggregate them with sum(). Print each partial result as it arrives and the final total.
Solution to Exercise 1
```python
from concurrent.futures import ThreadPoolExecutor, as_completed
def sum_of_squares(n):
return sum(i * i for i in range(n))
items = list(range(1000, 11000, 1000))
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(sum_of_squares, n): n for n in items}
partial_results = []
for future in as_completed(futures):
n = futures[future]
result = future.result()
partial_results.append(result)
print(f"n={n}: {result}")
total = sum(partial_results)
print(f"\nAggregate total: {total}")
```
Exercise 2.
Write a ProgressTracker class with thread-safe update() and implement process_with_progress(items, func) that maps a function over a list using ThreadPoolExecutor while printing progress as a percentage. Test with 20 items where each item sleeps for 0.1s.
Solution to Exercise 2
```python
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class ProgressTracker:
def __init__(self, total):
self.total = total
self.completed = 0
self.lock = threading.Lock()
def update(self):
with self.lock:
self.completed += 1
pct = 100 * self.completed / self.total
print(f"\rProgress: {self.completed}/{self.total} ({pct:.0f}%)", end="")
def process_with_progress(items, func, max_workers=5):
tracker = ProgressTracker(len(items))
def wrapped(item):
result = func(item)
tracker.update()
return result
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(wrapped, items))
print()
return results
def slow_square(x):
time.sleep(0.1)
return x ** 2
results = process_with_progress(list(range(20)), slow_square)
print(f"Results: {results}")
```
Exercise 3.
Implement a producer-consumer pattern using threading, queue.Queue(maxsize=5), and two consumer threads. The producer generates 20 items. Each consumer processes items (simulated with 0.1s sleep) and appends results to a shared list. Use a None sentinel to signal consumers to stop. Print the final sorted results.
Solution to Exercise 3
```python
import threading
import queue
import time
def producer(q, n):
for i in range(n):
q.put(i)
q.put(None)
q.put(None)
def consumer(q, results, lock, name):
while True:
item = q.get()
if item is None:
break
time.sleep(0.1)
result = item ** 2
with lock:
results.append(result)
print(f"{name} processed {item} -> {result}")
q = queue.Queue(maxsize=5)
results = []
lock = threading.Lock()
threads = [
threading.Thread(target=producer, args=(q, 20)),
threading.Thread(target=consumer, args=(q, results, lock, "Consumer-A")),
threading.Thread(target=consumer, args=(q, results, lock, "Consumer-B")),
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"\nSorted results: {sorted(results)}")
```