Asyncio Patterns¶
Producer-Consumer Pattern¶
Basic Queue¶
import asyncio
async def producer(queue, n):
"""Produce n items."""
for i in range(n):
await asyncio.sleep(0.1) # Simulate work
await queue.put(f"item-{i}")
print(f"Produced: item-{i}")
# Signal completion
await queue.put(None)
async def consumer(queue):
"""Consume items until None received."""
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(0.2) # Simulate processing
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue)
)
asyncio.run(main())
Multiple Consumers¶
async def main():
queue = asyncio.Queue()
# Start multiple consumers
consumers = [
asyncio.create_task(consumer(queue, f"consumer-{i}"))
for i in range(3)
]
# Produce items
await producer(queue, 20)
# Wait for queue to be processed
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
Semaphore (Limiting Concurrency)¶
Rate Limiting Requests¶
import asyncio
import aiohttp
async def fetch(session, url, sem):
async with sem: # Acquire semaphore
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f"https://example.com/page{i}" for i in range(100)]
# Limit to 10 concurrent requests
sem = asyncio.Semaphore(10)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
return results
Bounded Semaphore¶
# BoundedSemaphore raises error if released more than acquired
sem = asyncio.BoundedSemaphore(10)
async def safe_operation(sem):
async with sem:
await do_work()
# Automatically released
Event (Signaling)¶
Simple Signal¶
async def waiter(event):
print("Waiting for event...")
await event.wait()
print("Event received!")
async def setter(event):
await asyncio.sleep(2)
print("Setting event")
event.set()
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event),
waiter(event),
setter(event)
)
asyncio.run(main())
Start Signal Pattern¶
async def worker(event, worker_id):
await event.wait() # Wait for start signal
print(f"Worker {worker_id} starting")
await do_work()
async def main():
start_event = asyncio.Event()
# Create workers (they wait for event)
workers = [
asyncio.create_task(worker(start_event, i))
for i in range(5)
]
# Setup phase
await setup()
# Signal all workers to start
start_event.set()
await asyncio.gather(*workers)
Lock (Mutual Exclusion)¶
Protecting Shared State¶
class Counter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate delay
self.value = current + 1
async def main():
counter = Counter()
# Without lock, final value would be wrong
await asyncio.gather(*[counter.increment() for _ in range(100)])
print(f"Final value: {counter.value}") # 100
asyncio.run(main())
Lock with Timeout¶
async def acquire_with_timeout(lock, timeout):
try:
await asyncio.wait_for(lock.acquire(), timeout)
try:
await do_critical_work()
finally:
lock.release()
except asyncio.TimeoutError:
print("Could not acquire lock in time")
Condition (Wait/Notify)¶
Producer-Consumer with Condition¶
async def producer(condition, items):
async with condition:
items.append("new item")
print("Produced item, notifying")
condition.notify()
async def consumer(condition, items):
async with condition:
while not items:
print("Waiting for items...")
await condition.wait()
item = items.pop()
print(f"Consumed: {item}")
async def main():
condition = asyncio.Condition()
items = []
# Start consumer first (waits)
consumer_task = asyncio.create_task(consumer(condition, items))
await asyncio.sleep(1)
# Producer adds item and notifies
await producer(condition, items)
await consumer_task
Retry Pattern¶
Exponential Backoff¶
async def fetch_with_retry(url, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return await fetch(url)
except aiohttp.ClientError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay}s")
await asyncio.sleep(delay)
Retry Decorator¶
def async_retry(max_retries=3, delay=1, exceptions=(Exception,)):
def decorator(func):
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
await asyncio.sleep(delay * (2 ** attempt))
raise last_exception
return wrapper
return decorator
@async_retry(max_retries=3, exceptions=(aiohttp.ClientError,))
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
Timeout Patterns¶
Context Manager Timeout¶
async def with_timeout(coro, timeout, default=None):
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
return default
async def main():
result = await with_timeout(slow_operation(), timeout=5, default="timed out")
Per-Task Timeouts in Batch¶
async def fetch_with_timeout(url, timeout=5):
try:
return await asyncio.wait_for(fetch(url), timeout)
except asyncio.TimeoutError:
return {"url": url, "error": "timeout"}
async def main():
urls = ["url1", "url2", "url3"]
results = await asyncio.gather(*[
fetch_with_timeout(url) for url in urls
])
Graceful Shutdown¶
Signal Handling¶
import signal
class Server:
def __init__(self):
self.running = True
async def run(self):
loop = asyncio.get_running_loop()
# Setup signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown)
while self.running:
await self.handle_request()
def shutdown(self):
print("Shutdown requested")
self.running = False
async def main():
server = Server()
await server.run()
Cleanup All Tasks¶
async def shutdown(loop, signal=None):
if signal:
print(f"Received signal {signal.name}")
print("Cancelling outstanding tasks...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
Connection Pool Pattern¶
class ConnectionPool:
def __init__(self, max_connections=10):
self.semaphore = asyncio.Semaphore(max_connections)
self.connections = asyncio.Queue()
async def get_connection(self):
await self.semaphore.acquire()
try:
return self.connections.get_nowait()
except asyncio.QueueEmpty:
return await create_connection()
async def release_connection(self, conn):
await self.connections.put(conn)
self.semaphore.release()
async def __aenter__(self):
self.conn = await self.get_connection()
return self.conn
async def __aexit__(self, *args):
await self.release_connection(self.conn)
# Usage
pool = ConnectionPool(max_connections=10)
async def query(sql):
async with pool as conn:
return await conn.execute(sql)
Batch Processing¶
Chunked Processing¶
async def process_batch(items, batch_size=10):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(*[
process_item(item) for item in batch
])
results.extend(batch_results)
# Optional: delay between batches
await asyncio.sleep(0.1)
return results
Async Iterator Processing¶
async def process_stream(async_iterator, concurrency=10):
sem = asyncio.Semaphore(concurrency)
async def bounded_process(item):
async with sem:
return await process_item(item)
tasks = []
async for item in async_iterator:
tasks.append(asyncio.create_task(bounded_process(item)))
return await asyncio.gather(*tasks)
Error Handling Patterns¶
Collect All Errors¶
async def process_all(items):
results = []
errors = []
for coro in asyncio.as_completed([process(item) for item in items]):
try:
result = await coro
results.append(result)
except Exception as e:
errors.append(e)
if errors:
print(f"Completed with {len(errors)} errors")
return results, errors
Circuit Breaker¶
class CircuitBreaker:
def __init__(self, max_failures=5, reset_timeout=60):
self.max_failures = max_failures
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
async def call(self, coro):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = await coro
self.failures = 0
self.state = "closed"
return result
except Exception as e:
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.max_failures:
self.state = "open"
raise
Summary¶
| Pattern | Use Case |
|---|---|
| Queue | Producer-consumer workflows |
| Semaphore | Limit concurrent operations |
| Event | Signal between coroutines |
| Lock | Protect shared state |
| Condition | Wait/notify coordination |
| Retry | Handle transient failures |
| Timeout | Prevent hanging operations |
| Connection Pool | Reuse expensive resources |
| Batch Processing | Handle large datasets |
| Circuit Breaker | Fail fast on repeated errors |
Key Takeaways:
- Use
asyncio.Queuefor producer-consumer patterns - Use
Semaphoreto limit concurrency (rate limiting) - Always implement timeouts for external calls
- Handle graceful shutdown with signal handlers
- Use retry with exponential backoff for reliability
- Batch operations to manage resource usage