Asyncio Patterns¶
Mental Model
Async patterns are recipes for coordinating coroutines that need to work together. Producer-consumer decouples data generation from processing, semaphores throttle concurrency to avoid overwhelming resources, and pipelines chain stages together. Each pattern solves a specific coordination problem that arises when many tasks share a single thread.
Producer-Consumer Pattern¶
Basic Queue¶
```python import asyncio
async def producer(queue, n): """Produce n items.""" for i in range(n): await asyncio.sleep(0.1) # Simulate work await queue.put(f"item-{i}") print(f"Produced: item-{i}")
# Signal completion
await queue.put(None)
async def consumer(queue): """Consume items until None received.""" while True: item = await queue.get() if item is None: break
await asyncio.sleep(0.2) # Simulate processing
print(f"Consumed: {item}")
queue.task_done()
async def main(): queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue)
)
asyncio.run(main()) ```
Multiple Consumers¶
```python async def main(): queue = asyncio.Queue()
# Start multiple consumers
consumers = [
asyncio.create_task(consumer(queue, f"consumer-{i}"))
for i in range(3)
]
# Produce items
await producer(queue, 20)
# Wait for queue to be processed
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
```
Semaphore (Limiting Concurrency)¶
Rate Limiting Requests¶
```python import asyncio import aiohttp
async def fetch(session, url, sem): async with sem: # Acquire semaphore async with session.get(url) as response: return await response.text()
async def main(): urls = [f"https://example.com/page{i}" for i in range(100)]
# Limit to 10 concurrent requests
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
return results
```
Bounded Semaphore¶
```python
BoundedSemaphore raises error if released more than acquired¶
sem = asyncio.BoundedSemaphore(10)
async def safe_operation(sem): async with sem: await do_work() # Automatically released ```
Event (Signaling)¶
Simple Signal¶
```python async def waiter(event): print("Waiting for event...") await event.wait() print("Event received!")
async def setter(event): await asyncio.sleep(2) print("Setting event") event.set()
async def main(): event = asyncio.Event()
await asyncio.gather(
waiter(event),
waiter(event),
setter(event)
)
asyncio.run(main()) ```
Start Signal Pattern¶
```python async def worker(event, worker_id): await event.wait() # Wait for start signal print(f"Worker {worker_id} starting") await do_work()
async def main(): start_event = asyncio.Event()
# Create workers (they wait for event)
workers = [
asyncio.create_task(worker(start_event, i))
for i in range(5)
]
# Setup phase
await setup()
# Signal all workers to start
start_event.set()
await asyncio.gather(*workers)
```
Lock (Mutual Exclusion)¶
Protecting Shared State¶
```python class Counter: def init(self): self.value = 0 self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate delay
self.value = current + 1
async def main(): counter = Counter()
# Without lock, final value would be wrong
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(f"Final value: {counter.value}") # 100
asyncio.run(main()) ```
Lock with Timeout¶
python
async def acquire_with_timeout(lock, timeout):
try:
await asyncio.wait_for(lock.acquire(), timeout)
try:
await do_critical_work()
finally:
lock.release()
except asyncio.TimeoutError:
print("Could not acquire lock in time")
Condition (Wait/Notify)¶
Producer-Consumer with Condition¶
```python async def producer(condition, items): async with condition: items.append("new item") print("Produced item, notifying") condition.notify()
async def consumer(condition, items): async with condition: while not items: print("Waiting for items...") await condition.wait()
item = items.pop()
print(f"Consumed: {item}")
async def main(): condition = asyncio.Condition() items = []
# Start consumer first (waits)
consumer_task = asyncio.create_task(consumer(condition, items))
await asyncio.sleep(1)
# Producer adds item and notifies
await producer(condition, items)
await consumer_task
```
Retry Pattern¶
Exponential Backoff¶
```python async def fetch_with_retry(url, max_retries=3, base_delay=1): for attempt in range(max_retries): try: return await fetch(url) except aiohttp.ClientError as e: if attempt == max_retries - 1: raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay}s")
await asyncio.sleep(delay)
```
Retry Decorator¶
```python def async_retry(max_retries=3, delay=1, exceptions=(Exception,)): def decorator(func): async def wrapper(args, kwargs): last_exception = None for attempt in range(max_retries): try: return await func(args, **kwargs) except exceptions as e: last_exception = e if attempt < max_retries - 1: await asyncio.sleep(delay * (2 ** attempt)) raise last_exception return wrapper return decorator
@async_retry(max_retries=3, exceptions=(aiohttp.ClientError,)) async def fetch_data(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.json() ```
Timeout Patterns¶
Context Manager Timeout¶
```python async def with_timeout(coro, timeout, default=None): try: return await asyncio.wait_for(coro, timeout) except asyncio.TimeoutError: return default
async def main(): result = await with_timeout(slow_operation(), timeout=5, default="timed out") ```
Per-Task Timeouts in Batch¶
```python async def fetch_with_timeout(url, timeout=5): try: return await asyncio.wait_for(fetch(url), timeout) except asyncio.TimeoutError: return {"url": url, "error": "timeout"}
async def main(): urls = ["url1", "url2", "url3"] results = await asyncio.gather(*[ fetch_with_timeout(url) for url in urls ]) ```
Graceful Shutdown¶
Signal Handling¶
```python import signal
class Server: def init(self): self.running = True
async def run(self):
loop = asyncio.get_running_loop()
# Setup signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown)
while self.running:
await self.handle_request()
def shutdown(self):
print("Shutdown requested")
self.running = False
async def main(): server = Server() await server.run() ```
Cleanup All Tasks¶
```python async def shutdown(loop, signal=None): if signal: print(f"Received signal {signal.name}")
print("Cancelling outstanding tasks...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
```
Connection Pool Pattern¶
```python class ConnectionPool: def init(self, max_connections=10): self.semaphore = asyncio.Semaphore(max_connections) self.connections = asyncio.Queue()
async def get_connection(self):
await self.semaphore.acquire()
try:
return self.connections.get_nowait()
except asyncio.QueueEmpty:
return await create_connection()
async def release_connection(self, conn):
await self.connections.put(conn)
self.semaphore.release()
async def __aenter__(self):
self.conn = await self.get_connection()
return self.conn
async def __aexit__(self, *args):
await self.release_connection(self.conn)
Usage¶
pool = ConnectionPool(max_connections=10)
async def query(sql): async with pool as conn: return await conn.execute(sql) ```
Batch Processing¶
Chunked Processing¶
```python async def process_batch(items, batch_size=10): results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(*[
process_item(item) for item in batch
])
results.extend(batch_results)
# Optional: delay between batches
await asyncio.sleep(0.1)
return results
```
Async Iterator Processing¶
```python async def process_stream(async_iterator, concurrency=10): sem = asyncio.Semaphore(concurrency)
async def bounded_process(item):
async with sem:
return await process_item(item)
tasks = []
async for item in async_iterator:
tasks.append(asyncio.create_task(bounded_process(item)))
return await asyncio.gather(*tasks)
```
Error Handling Patterns¶
Collect All Errors¶
```python async def process_all(items): results = [] errors = []
for coro in asyncio.as_completed([process(item) for item in items]):
try:
result = await coro
results.append(result)
except Exception as e:
errors.append(e)
if errors:
print(f"Completed with {len(errors)} errors")
return results, errors
```
Circuit Breaker¶
```python class CircuitBreaker: def init(self, max_failures=5, reset_timeout=60): self.max_failures = max_failures self.reset_timeout = reset_timeout self.failures = 0 self.last_failure_time = None self.state = "closed" # closed, open, half-open
async def call(self, coro):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = await coro
self.failures = 0
self.state = "closed"
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.max_failures:
self.state = "open"
raise
```
Summary¶
| Pattern | Use Case |
|---|---|
| Queue | Producer-consumer workflows |
| Semaphore | Limit concurrent operations |
| Event | Signal between coroutines |
| Lock | Protect shared state |
| Condition | Wait/notify coordination |
| Retry | Handle transient failures |
| Timeout | Prevent hanging operations |
| Connection Pool | Reuse expensive resources |
| Batch Processing | Handle large datasets |
| Circuit Breaker | Fail fast on repeated errors |
Key Takeaways:
- Use
asyncio.Queuefor producer-consumer patterns - Use
Semaphoreto limit concurrency (rate limiting) - Always implement timeouts for external calls
- Handle graceful shutdown with signal handlers
- Use retry with exponential backoff for reliability
- Batch operations to manage resource usage
Exercises¶
Exercise 1.
Implement an async producer-consumer pipeline with asyncio.Queue(maxsize=5). The producer generates 15 items (with 0.05s per item). Two consumers each process items (with 0.1s per item). Use a sentinel value None to signal each consumer to stop. Print which consumer handled each item.
Solution to Exercise 1
```python
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.05)
await queue.put(i)
print(f"Produced: {i}")
await queue.put(None)
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.1)
print(f"{name} consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 15),
consumer(queue, "Consumer-A"),
consumer(queue, "Consumer-B"),
)
asyncio.run(main())
```
Exercise 2.
Write an async_retry decorator that retries an async function up to 3 times with exponential backoff (delays of 0.1s, 0.2s, 0.4s). Test it with a function that fails twice then succeeds on the third attempt, printing each attempt number.
Solution to Exercise 2
```python
import asyncio
import functools
def async_retry(max_retries=3, base_delay=0.1):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
print(f" Attempt {attempt}")
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries:
raise
delay = base_delay * (2 ** (attempt - 1))
await asyncio.sleep(delay)
return wrapper
return decorator
call_count = 0
@async_retry(max_retries=3, base_delay=0.1)
async def flaky_operation():
global call_count
call_count += 1
if call_count < 3:
raise RuntimeError(f"Fail #{call_count}")
return "success"
async def main():
result = await flaky_operation()
print(f"Result: {result}")
asyncio.run(main())
```
Exercise 3.
Use asyncio.Semaphore(3) to limit concurrency when processing 10 tasks. Each task should print when it starts and finishes, sleeping for 0.2s. Verify from the output that at most 3 tasks run simultaneously by checking the timestamps.
Solution to Exercise 3
```python
import asyncio
import time
async def limited_task(sem, task_id):
async with sem:
start = time.perf_counter()
print(f"Task {task_id}: started at {start:.2f}")
await asyncio.sleep(0.2)
end = time.perf_counter()
print(f"Task {task_id}: finished at {end:.2f}")
async def main():
sem = asyncio.Semaphore(3)
await asyncio.gather(
*[limited_task(sem, i) for i in range(10)]
)
asyncio.run(main())
```