Tasks and Coroutines¶
Coroutines¶
A coroutine is a function defined with async def. When called, it returns a coroutine object that must be awaited or scheduled.
async def my_coroutine():
await asyncio.sleep(1)
return "result"
# Calling creates a coroutine object (doesn't run it)
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# Must run it
result = asyncio.run(my_coroutine())
Coroutine States¶
Created → Running → Suspended → Running → ... → Completed
↑ │
└───────────┘
(await)
Tasks¶
A Task wraps a coroutine and schedules it for execution. Tasks enable concurrent execution.
async def main():
# Create a task - starts running immediately
task = asyncio.create_task(my_coroutine())
# Do other work while task runs...
# Get the result when needed
result = await task
Coroutine vs Task¶
| Aspect | Coroutine | Task |
|---|---|---|
| Created by | async def call |
asyncio.create_task() |
| Starts running | When awaited | Immediately when created |
| Concurrent | No (sequential) | Yes |
| Can be cancelled | No | Yes |
Creating Tasks¶
asyncio.create_task() (Preferred)¶
async def fetch(url):
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
# Create tasks - both start immediately
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))
# Tasks are running concurrently now
print("Tasks created and running...")
# Await results
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())
Named Tasks (Python 3.8+)¶
task = asyncio.create_task(fetch("url"), name="fetch_task")
print(task.get_name()) # "fetch_task"
asyncio.ensure_future() (Legacy)¶
# Works but create_task() is preferred
task = asyncio.ensure_future(my_coroutine())
Running Multiple Tasks¶
asyncio.gather()¶
Run multiple coroutines concurrently and collect results:
async def fetch(url):
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
# Run all concurrently, get results in order
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3")
)
print(results) # ["Data from url1", "Data from url2", "Data from url3"]
asyncio.run(main()) # Takes ~1 second, not 3
Handling Exceptions in gather()¶
async def might_fail(n):
if n == 2:
raise ValueError("Error!")
return n
async def main():
# Default: first exception cancels others
try:
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3)
)
except ValueError as e:
print(f"Caught: {e}")
# With return_exceptions=True: exceptions returned as results
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
print(results) # [1, ValueError("Error!"), 3]
asyncio.wait()¶
More control over completion:
async def main():
tasks = [
asyncio.create_task(fetch("url1")),
asyncio.create_task(fetch("url2")),
asyncio.create_task(fetch("url3"))
]
# Wait for all
done, pending = await asyncio.wait(tasks)
for task in done:
print(task.result())
Wait Options¶
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Wait for first exception
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
# Wait with timeout
done, pending = await asyncio.wait(tasks, timeout=5.0)
asyncio.as_completed()¶
Process results as they complete:
async def main():
tasks = [
asyncio.create_task(fetch("slow", 3)),
asyncio.create_task(fetch("fast", 1)),
asyncio.create_task(fetch("medium", 2))
]
# Yields tasks in completion order
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got result: {result}")
# Output (order by completion):
# Got result: Data from fast
# Got result: Data from medium
# Got result: Data from slow
Task Cancellation¶
Cancelling a Task¶
async def long_running():
try:
await asyncio.sleep(10)
return "completed"
except asyncio.CancelledError:
print("Task was cancelled!")
raise # Re-raise to properly cancel
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(1)
task.cancel() # Request cancellation
try:
await task
except asyncio.CancelledError:
print("Caught cancellation")
asyncio.run(main())
Handling Cancellation¶
async def cleanup_on_cancel():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Perform cleanup
print("Cleaning up...")
await save_state()
raise # Always re-raise CancelledError
Shielding from Cancellation¶
async def critical_operation():
await asyncio.sleep(1)
return "important result"
async def main():
task = asyncio.create_task(critical_operation())
# Shield prevents cancellation from propagating
try:
result = await asyncio.shield(task)
except asyncio.CancelledError:
# Original task continues running
result = await task
Task Introspection¶
Task Properties¶
async def main():
task = asyncio.create_task(fetch("url"), name="my_fetch")
print(task.get_name()) # "my_fetch"
print(task.done()) # False
print(task.cancelled()) # False
await task
print(task.done()) # True
print(task.result()) # "Data from url"
Getting All Tasks¶
async def main():
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))
# Get all running tasks
all_tasks = asyncio.all_tasks()
print(f"Running tasks: {len(all_tasks)}") # 3 (including main)
# Get current task
current = asyncio.current_task()
print(f"Current: {current.get_name()}")
Timeouts¶
asyncio.wait_for()¶
async def slow_operation():
await asyncio.sleep(10)
return "result"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
asyncio.timeout() (Python 3.11+)¶
async def main():
async with asyncio.timeout(2.0):
await slow_operation() # Raises TimeoutError if > 2 seconds
Timeout with Cleanup¶
async def main():
task = asyncio.create_task(slow_operation())
try:
result = await asyncio.wait_for(task, timeout=2.0)
except asyncio.TimeoutError:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
print("Cleaned up after timeout")
TaskGroup (Python 3.11+)¶
Modern way to manage multiple tasks with proper cleanup:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("url1"))
task2 = tg.create_task(fetch("url2"))
task3 = tg.create_task(fetch("url3"))
# All tasks are done here
# If any task raises, all others are cancelled
print(task1.result(), task2.result(), task3.result())
TaskGroup vs gather()¶
| Feature | TaskGroup | gather() |
|---|---|---|
| Python version | 3.11+ | 3.4+ |
| Exception handling | Cancels all on first error | Configurable |
| Context manager | Yes | No |
| Structured concurrency | Yes | No |
Common Patterns¶
Fire and Forget (Background Task)¶
async def background_job():
await asyncio.sleep(5)
print("Background job done")
async def main():
# Start task but don't await
task = asyncio.create_task(background_job())
# Continue with other work
print("Main work...")
# Optionally wait at the end
await task
Task with Callback¶
def task_done_callback(task):
if task.exception():
print(f"Task failed: {task.exception()}")
else:
print(f"Task result: {task.result()}")
async def main():
task = asyncio.create_task(fetch("url"))
task.add_done_callback(task_done_callback)
await task
Limiting Concurrency¶
async def fetch_with_semaphore(sem, url):
async with sem: # Limit concurrent requests
return await fetch(url)
async def main():
sem = asyncio.Semaphore(10) # Max 10 concurrent
urls = [f"url{i}" for i in range(100)]
tasks = [fetch_with_semaphore(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
Summary¶
| Function | Purpose |
|---|---|
asyncio.create_task() |
Create task from coroutine |
asyncio.gather() |
Run multiple, collect results |
asyncio.wait() |
Run multiple, control completion |
asyncio.as_completed() |
Iterate in completion order |
asyncio.wait_for() |
Run with timeout |
asyncio.shield() |
Protect from cancellation |
task.cancel() |
Cancel a task |
task.result() |
Get task result |
asyncio.TaskGroup |
Structured concurrency (3.11+) |
Key Takeaways:
- Tasks enable concurrent execution of coroutines
create_task()starts execution immediatelygather()is the simplest way to run multiple tasks- Always handle
CancelledErrorin long-running tasks - Use timeouts to prevent hanging
- TaskGroup (3.11+) provides structured concurrency
Runnable Example: coroutine_prime_filter_example.py¶
"""
Asyncio: Coroutines with asyncio.gather()
Demonstrates concurrent coroutines running cooperative tasks.
Two async tasks (prime filtering and square mapping) run
concurrently using asyncio.gather().
Topics covered:
- async/await syntax
- asyncio.gather() for concurrent coroutines
- yield from for delegation to sub-generators
- Cooperative multitasking with await asyncio.sleep()
Based on concepts from Python-100-Days example23 and ch14/asyncio materials.
"""
import asyncio
from math import sqrt
# =============================================================================
# Example 1: Helper Functions
# =============================================================================
def is_prime(num: int) -> bool:
"""Check if a number is prime."""
if num < 2:
return False
for factor in range(2, int(sqrt(num)) + 1):
if num % factor == 0:
return False
return True
def number_range(start: int, end: int):
"""Generator that yields numbers in range.
Using 'yield from' to delegate to range().
"""
yield from range(start, end + 1)
# =============================================================================
# Example 2: Concurrent Coroutines
# =============================================================================
async def prime_filter(start: int, end: int) -> tuple[int, ...]:
"""Async coroutine that filters prime numbers from a range.
The await asyncio.sleep(0) call yields control to the event loop,
allowing other coroutines to run. This is cooperative multitasking.
"""
primes = []
for n in number_range(start, end):
if is_prime(n):
primes.append(n)
# Yield control to event loop periodically
if n % 10 == 0:
await asyncio.sleep(0)
return tuple(primes)
async def square_mapper(start: int, end: int) -> list[int]:
"""Async coroutine that computes squares of numbers in a range."""
squares = []
for n in number_range(start, end):
squares.append(n * n)
if n % 10 == 0:
await asyncio.sleep(0)
return squares
# =============================================================================
# Example 3: Running Coroutines Concurrently
# =============================================================================
async def demo_gather():
"""Run multiple coroutines concurrently with asyncio.gather().
gather() starts all coroutines at once and returns results
in the same order as the coroutines were passed.
"""
print("=== asyncio.gather() Demo ===")
print("Running prime_filter and square_mapper concurrently...\n")
# Both coroutines run concurrently (interleaved at await points)
primes, squares = await asyncio.gather(
prime_filter(2, 50),
square_mapper(1, 10),
)
print(f"Primes (2-50): {primes}")
print(f"Squares (1-10): {squares}")
print()
# =============================================================================
# Example 4: Progress Reporting with Async
# =============================================================================
async def prime_filter_with_progress(start: int, end: int) -> list[int]:
"""Prime filter that reports progress."""
primes = []
total = end - start + 1
for i, n in enumerate(number_range(start, end)):
if is_prime(n):
primes.append(n)
# Report progress every 25%
if (i + 1) % (total // 4) == 0:
pct = (i + 1) / total * 100
print(f" Prime filter: {pct:.0f}% complete ({len(primes)} found)")
await asyncio.sleep(0)
return primes
async def countdown(name: str, n: int) -> str:
"""Simple countdown coroutine (runs alongside prime filter)."""
for i in range(n, 0, -1):
print(f" {name}: {i}")
await asyncio.sleep(0.01)
return f"{name} done"
async def demo_concurrent_progress():
"""Show interleaved execution of concurrent coroutines."""
print("=== Concurrent Coroutines with Progress ===")
results = await asyncio.gather(
prime_filter_with_progress(2, 1000),
countdown("Timer-A", 5),
countdown("Timer-B", 3),
)
primes, msg_a, msg_b = results
print(f"\nFound {len(primes)} primes between 2 and 1000")
print(f"Messages: {msg_a}, {msg_b}")
print()
# =============================================================================
# Example 5: gather() vs wait() Comparison
# =============================================================================
async def demo_comparison():
"""Compare gather() and wait() approaches."""
print("=== gather() vs wait() ===")
print("""
asyncio.gather(*coroutines):
- Returns results in ORDER (same as input)
- Simple API: results = await gather(a, b, c)
- Best for: getting all results together
asyncio.wait(tasks):
- Returns (done, pending) sets
- Results in COMPLETION order (not input order)
- Supports timeout and FIRST_COMPLETED
- Best for: processing as tasks complete, timeouts
""")
# gather: ordered results
results = await asyncio.gather(
prime_filter(2, 30),
square_mapper(1, 5),
)
print(f"gather results (ordered): {results}")
print()
# wait: done/pending sets
tasks = [
asyncio.create_task(prime_filter(2, 30)),
asyncio.create_task(square_mapper(1, 5)),
]
done, pending = await asyncio.wait(tasks)
print(f"wait: {len(done)} done, {len(pending)} pending")
for task in done:
print(f" Result: {task.result()}")
# =============================================================================
# Main
# =============================================================================
if __name__ == '__main__':
asyncio.run(demo_gather())
asyncio.run(demo_concurrent_progress())
asyncio.run(demo_comparison())