Async Iteration and Context Managers¶
Python supports asynchronous versions of for loops and with statements for working with async resources.
async for - Asynchronous Iteration¶
Async Iterators¶
An async iterator implements __aiter__ and __anext__:
import asyncio
class AsyncCounter:
def __init__(self, stop):
self.current = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async operation
self.current += 1
return self.current
async def main():
async for num in AsyncCounter(5):
print(num)
# 1, 2, 3, 4, 5 (with 0.1s delay between each)
asyncio.run(main())
Async Generators (Simpler)¶
async def async_range(stop):
for i in range(stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for num in async_range(5):
print(num)
asyncio.run(main())
Practical Examples¶
1. Streaming API Results¶
async def fetch_pages(url, max_pages=10):
"""Yield pages as they're fetched."""
page = 1
while page <= max_pages:
async with aiohttp.ClientSession() as session:
async with session.get(f"{url}?page={page}") as resp:
data = await resp.json()
if not data['results']:
return
yield data['results']
page += 1
async def main():
async for results in fetch_pages("https://api.example.com/items"):
for item in results:
process(item)
2. Database Cursor¶
async def fetch_rows(query, batch_size=100):
"""Yield rows from database in batches."""
async with get_connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(query)
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
yield row
async def main():
async for row in fetch_rows("SELECT * FROM users"):
print(row)
3. WebSocket Messages¶
async def read_messages(websocket):
"""Yield messages from WebSocket."""
async for message in websocket:
if message.type == aiohttp.WSMsgType.TEXT:
yield message.data
elif message.type == aiohttp.WSMsgType.ERROR:
break
async def main():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('ws://example.com') as ws:
async for msg in read_messages(ws):
print(f"Received: {msg}")
4. File Lines (with aiofiles)¶
import aiofiles
async def read_lines(filename):
async with aiofiles.open(filename) as f:
async for line in f:
yield line.strip()
async def main():
async for line in read_lines("data.txt"):
print(line)
Async Comprehensions¶
# Async list comprehension
async def main():
results = [x async for x in async_range(5)]
print(results) # [0, 1, 2, 3, 4]
# With condition
evens = [x async for x in async_range(10) if x % 2 == 0]
print(evens) # [0, 2, 4, 6, 8]
# Async generator expression
gen = (x * 2 async for x in async_range(5))
async for val in gen:
print(val)
Combining with await¶
# await in comprehension
async def fetch(url):
await asyncio.sleep(0.1)
return f"data from {url}"
async def main():
urls = ["url1", "url2", "url3"]
# Sequential fetching
results = [await fetch(url) for url in urls]
# Both async for and await
async def url_generator():
for url in urls:
await asyncio.sleep(0.1)
yield url
results = [await fetch(url) async for url in url_generator()]
async with - Async Context Managers¶
Async Context Manager Protocol¶
Implements __aenter__ and __aexit__:
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(0.1) # Simulate async setup
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(0.1) # Simulate async cleanup
return False # Don't suppress exceptions
async def do_work(self):
print("Working...")
async def main():
async with AsyncResource() as resource:
await resource.do_work()
asyncio.run(main())
Using @asynccontextmanager¶
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_resource(name):
print(f"Setting up {name}")
await asyncio.sleep(0.1)
try:
yield name
finally:
print(f"Cleaning up {name}")
await asyncio.sleep(0.1)
async def main():
async with managed_resource("database") as resource:
print(f"Using {resource}")
Practical Examples¶
1. Database Connection¶
class AsyncDBConnection:
def __init__(self, dsn):
self.dsn = dsn
self.conn = None
async def __aenter__(self):
self.conn = await asyncpg.connect(self.dsn)
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async def main():
async with AsyncDBConnection("postgresql://...") as conn:
result = await conn.fetch("SELECT * FROM users")
2. HTTP Session¶
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
data = await response.json()
print(data)
3. Lock¶
lock = asyncio.Lock()
async def critical_section():
async with lock:
# Only one coroutine at a time
await do_protected_work()
4. Semaphore for Rate Limiting¶
semaphore = asyncio.Semaphore(10)
async def rate_limited_fetch(url):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
5. Transaction¶
@asynccontextmanager
async def transaction(conn):
await conn.execute("BEGIN")
try:
yield conn
await conn.execute("COMMIT")
except Exception:
await conn.execute("ROLLBACK")
raise
async def main():
async with get_connection() as conn:
async with transaction(conn):
await conn.execute("INSERT INTO ...")
await conn.execute("UPDATE ...")
6. Timeout Context¶
# Python 3.11+
async def main():
async with asyncio.timeout(5.0):
await long_running_operation()
# Pre-3.11
@asynccontextmanager
async def timeout(seconds):
task = asyncio.current_task()
loop = asyncio.get_running_loop()
def cancel():
task.cancel()
handle = loop.call_later(seconds, cancel)
try:
yield
finally:
handle.cancel()
Nested Async Context Managers¶
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
async with aiofiles.open('output.json', 'w') as f:
data = await response.text()
await f.write(data)
Multiple Context Managers¶
# Using multiple managers
async def main():
async with (
get_database() as db,
get_cache() as cache,
get_queue() as queue
):
await process(db, cache, queue)
# Or using asyncio.gather for parallel setup
async def main():
db, cache, queue = await asyncio.gather(
get_database().__aenter__(),
get_cache().__aenter__(),
get_queue().__aenter__()
)
# Note: cleanup is more complex this way
Combining async for and async with¶
@asynccontextmanager
async def open_stream(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
yield response
async def stream_lines(url):
async with open_stream(url) as response:
async for line in response.content:
yield line.decode()
async def main():
async for line in stream_lines("https://example.com/stream"):
print(line)
Common Patterns¶
Async Resource Pool¶
class AsyncPool:
def __init__(self, factory, size):
self.factory = factory
self.pool = asyncio.Queue(maxsize=size)
self.size = size
async def initialize(self):
for _ in range(self.size):
resource = await self.factory()
await self.pool.put(resource)
@asynccontextmanager
async def acquire(self):
resource = await self.pool.get()
try:
yield resource
finally:
await self.pool.put(resource)
# Usage
pool = AsyncPool(create_connection, size=10)
await pool.initialize()
async with pool.acquire() as conn:
await conn.execute(...)
Key Takeaways¶
async foriterates over async iterators/generatorsasync withmanages async context managers- Use async generators for streaming data
- Use
@asynccontextmanagerfor simpler context manager creation - Common uses: HTTP sessions, database connections, locks, files
- Async comprehensions work with
async forandawait - Always clean up async resources properly in
__aexit__