Async Iteration and Context Managers¶
Python supports asynchronous versions of for loops and with statements for working with async resources.
Mental Model
async for and async with are the async equivalents of their synchronous counterparts, designed for sources that produce items or manage resources over the network. Picture a streaming API where each item requires an await to arrive -- async for lets you loop over that stream naturally, while async with ensures connections and sessions are properly opened and closed even when pauses happen in between.
async for - Asynchronous Iteration¶
Async Iterators¶
An async iterator implements __aiter__ and __anext__:
```python import asyncio
class AsyncCounter: def init(self, stop): self.current = 0 self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.stop:
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simulate async operation
self.current += 1
return self.current
async def main(): async for num in AsyncCounter(5): print(num) # 1, 2, 3, 4, 5 (with 0.1s delay between each)
asyncio.run(main()) ```
Async Generators (Simpler)¶
```python async def async_range(stop): for i in range(stop): await asyncio.sleep(0.1) yield i
async def main(): async for num in async_range(5): print(num)
asyncio.run(main()) ```
Practical Examples¶
1. Streaming API Results¶
```python async def fetch_pages(url, max_pages=10): """Yield pages as they're fetched.""" page = 1 while page <= max_pages: async with aiohttp.ClientSession() as session: async with session.get(f"{url}?page={page}") as resp: data = await resp.json()
if not data['results']:
return
yield data['results']
page += 1
async def main(): async for results in fetch_pages("https://api.example.com/items"): for item in results: process(item) ```
2. Database Cursor¶
```python async def fetch_rows(query, batch_size=100): """Yield rows from database in batches.""" async with get_connection() as conn: async with conn.cursor() as cursor: await cursor.execute(query)
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
yield row
async def main(): async for row in fetch_rows("SELECT * FROM users"): print(row) ```
3. WebSocket Messages¶
```python async def read_messages(websocket): """Yield messages from WebSocket.""" async for message in websocket: if message.type == aiohttp.WSMsgType.TEXT: yield message.data elif message.type == aiohttp.WSMsgType.ERROR: break
async def main(): async with aiohttp.ClientSession() as session: async with session.ws_connect('ws://example.com') as ws: async for msg in read_messages(ws): print(f"Received: {msg}") ```
4. File Lines (with aiofiles)¶
```python import aiofiles
async def read_lines(filename): async with aiofiles.open(filename) as f: async for line in f: yield line.strip()
async def main(): async for line in read_lines("data.txt"): print(line) ```
Async Comprehensions¶
```python
Async list comprehension¶
async def main(): results = [x async for x in async_range(5)] print(results) # [0, 1, 2, 3, 4]
With condition¶
evens = [x async for x in async_range(10) if x % 2 == 0]
print(evens) # [0, 2, 4, 6, 8]
Async generator expression¶
gen = (x * 2 async for x in async_range(5))
async for val in gen:
print(val)
```
Combining with await¶
```python
await in comprehension¶
async def fetch(url): await asyncio.sleep(0.1) return f"data from {url}"
async def main(): urls = ["url1", "url2", "url3"]
# Sequential fetching
results = [await fetch(url) for url in urls]
# Both async for and await
async def url_generator():
for url in urls:
await asyncio.sleep(0.1)
yield url
results = [await fetch(url) async for url in url_generator()]
```
async with - Async Context Managers¶
Async Context Manager Protocol¶
Implements __aenter__ and __aexit__:
```python class AsyncResource: async def aenter(self): print("Acquiring resource...") await asyncio.sleep(0.1) # Simulate async setup return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(0.1) # Simulate async cleanup
return False # Don't suppress exceptions
async def do_work(self):
print("Working...")
async def main(): async with AsyncResource() as resource: await resource.do_work()
asyncio.run(main()) ```
Using @asynccontextmanager¶
```python from contextlib import asynccontextmanager
@asynccontextmanager async def managed_resource(name): print(f"Setting up {name}") await asyncio.sleep(0.1)
try:
yield name
finally:
print(f"Cleaning up {name}")
await asyncio.sleep(0.1)
async def main(): async with managed_resource("database") as resource: print(f"Using {resource}") ```
Practical Examples¶
1. Database Connection¶
```python class AsyncDBConnection: def init(self, dsn): self.dsn = dsn self.conn = None
async def __aenter__(self):
self.conn = await asyncpg.connect(self.dsn)
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async def main(): async with AsyncDBConnection("postgresql://...") as conn: result = await conn.fetch("SELECT * FROM users") ```
2. HTTP Session¶
```python import aiohttp
async def main(): async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com') as response: data = await response.json() print(data) ```
3. Lock¶
```python lock = asyncio.Lock()
async def critical_section(): async with lock: # Only one coroutine at a time await do_protected_work() ```
4. Semaphore for Rate Limiting¶
```python semaphore = asyncio.Semaphore(10)
async def rate_limited_fetch(url): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() ```
5. Transaction¶
```python @asynccontextmanager async def transaction(conn): await conn.execute("BEGIN") try: yield conn await conn.execute("COMMIT") except Exception: await conn.execute("ROLLBACK") raise
async def main(): async with get_connection() as conn: async with transaction(conn): await conn.execute("INSERT INTO ...") await conn.execute("UPDATE ...") ```
6. Timeout Context¶
```python
Python 3.11+¶
async def main(): async with asyncio.timeout(5.0): await long_running_operation()
Pre-3.11¶
@asynccontextmanager async def timeout(seconds): task = asyncio.current_task() loop = asyncio.get_running_loop()
def cancel():
task.cancel()
handle = loop.call_later(seconds, cancel)
try:
yield
finally:
handle.cancel()
```
Nested Async Context Managers¶
python
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
async with aiofiles.open('output.json', 'w') as f:
data = await response.text()
await f.write(data)
Multiple Context Managers¶
```python
Using multiple managers¶
async def main(): async with ( get_database() as db, get_cache() as cache, get_queue() as queue ): await process(db, cache, queue)
Or using asyncio.gather for parallel setup¶
async def main(): db, cache, queue = await asyncio.gather( get_database().aenter(), get_cache().aenter(), get_queue().aenter() ) # Note: cleanup is more complex this way ```
Combining async for and async with¶
```python @asynccontextmanager async def open_stream(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: yield response
async def stream_lines(url): async with open_stream(url) as response: async for line in response.content: yield line.decode()
async def main(): async for line in stream_lines("https://example.com/stream"): print(line) ```
Common Patterns¶
Async Resource Pool¶
```python class AsyncPool: def init(self, factory, size): self.factory = factory self.pool = asyncio.Queue(maxsize=size) self.size = size
async def initialize(self):
for _ in range(self.size):
resource = await self.factory()
await self.pool.put(resource)
@asynccontextmanager
async def acquire(self):
resource = await self.pool.get()
try:
yield resource
finally:
await self.pool.put(resource)
Usage¶
pool = AsyncPool(create_connection, size=10) await pool.initialize()
async with pool.acquire() as conn: await conn.execute(...) ```
Key Takeaways¶
async foriterates over async iterators/generatorsasync withmanages async context managers- Use async generators for streaming data
- Use
@asynccontextmanagerfor simpler context manager creation - Common uses: HTTP sessions, database connections, locks, files
- Async comprehensions work with
async forandawait - Always clean up async resources properly in
__aexit__
Exercises¶
Exercise 1.
Write an async generator async_range_squared(n) that yields the squares of numbers 0 through n-1, with a 0.05s delay between each. Collect the results using an async list comprehension and print them.
Solution to Exercise 1
```python
import asyncio
async def async_range_squared(n):
for i in range(n):
await asyncio.sleep(0.05)
yield i * i
async def main():
squares = [x async for x in async_range_squared(10)]
print(f"Squares: {squares}")
asyncio.run(main())
```
Exercise 2.
Create an async context manager class ManagedResource that prints "Acquired" on enter (after a 0.1s simulated setup) and "Released" on exit (after a 0.1s simulated teardown). The resource should provide a do_work() async method that prints "Working...". Demonstrate it with async with.
Solution to Exercise 2
```python
import asyncio
class ManagedResource:
async def __aenter__(self):
await asyncio.sleep(0.1)
print("Acquired")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(0.1)
print("Released")
return False
async def do_work(self):
print("Working...")
async def main():
async with ManagedResource() as resource:
await resource.do_work()
asyncio.run(main())
```
Exercise 3.
Write an async generator batched_counter(total, batch_size) that yields lists of consecutive integers in batches. For example, batched_counter(10, 3) yields [0,1,2], [3,4,5], [6,7,8], [9]. Add a 0.1s delay between batches. Collect all items into a flat list using async for and verify it equals list(range(total)).
Solution to Exercise 3
```python
import asyncio
async def batched_counter(total, batch_size):
for start in range(0, total, batch_size):
await asyncio.sleep(0.1)
yield list(range(start, min(start + batch_size, total)))
async def main():
flat = []
async for batch in batched_counter(10, 3):
print(f"Batch: {batch}")
flat.extend(batch)
assert flat == list(range(10))
print(f"All items: {flat}")
asyncio.run(main())
```