Tasks and Coroutines¶
Mental Model
A coroutine is a recipe -- calling it creates an object that describes what to do, but doesn't do it yet. A Task is what actually runs that recipe by scheduling it on the event loop. You need create_task() to get true concurrency: without it, await-ing coroutines one by one runs them sequentially.
Coroutines¶
A coroutine is a function defined with async def. When called, it returns a coroutine object that must be awaited or scheduled.
```python async def my_coroutine(): await asyncio.sleep(1) return "result"
Calling creates a coroutine object (doesn't run it)¶
coro = my_coroutine()
print(type(coro)) #
Must run it¶
result = asyncio.run(my_coroutine()) ```
Coroutine States¶
Created → Running → Suspended → Running → ... → Completed
↑ │
└───────────┘
(await)
Tasks¶
A Task wraps a coroutine and schedules it for execution. Tasks enable concurrent execution.
```python async def main(): # Create a task - starts running immediately task = asyncio.create_task(my_coroutine())
# Do other work while task runs...
# Get the result when needed
result = await task
```
Coroutine vs Task¶
| Aspect | Coroutine | Task |
|---|---|---|
| Created by | async def call |
asyncio.create_task() |
| Starts running | When awaited | Immediately when created |
| Concurrent | No (sequential) | Yes |
| Can be cancelled | No | Yes |
Creating Tasks¶
asyncio.create_task() (Preferred)¶
```python async def fetch(url): await asyncio.sleep(1) return f"Data from {url}"
async def main(): # Create tasks - both start immediately task1 = asyncio.create_task(fetch("url1")) task2 = asyncio.create_task(fetch("url2"))
# Tasks are running concurrently now
print("Tasks created and running...")
# Await results
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main()) ```
Named Tasks (Python 3.8+)¶
python
task = asyncio.create_task(fetch("url"), name="fetch_task")
print(task.get_name()) # "fetch_task"
asyncio.ensure_future() (Legacy)¶
```python
Works but create_task() is preferred¶
task = asyncio.ensure_future(my_coroutine()) ```
Running Multiple Tasks¶
asyncio.gather()¶
Run multiple coroutines concurrently and collect results:
```python async def fetch(url): await asyncio.sleep(1) return f"Data from {url}"
async def main(): # Run all concurrently, get results in order results = await asyncio.gather( fetch("url1"), fetch("url2"), fetch("url3") ) print(results) # ["Data from url1", "Data from url2", "Data from url3"]
asyncio.run(main()) # Takes ~1 second, not 3 ```
Handling Exceptions in gather()¶
```python async def might_fail(n): if n == 2: raise ValueError("Error!") return n
async def main(): # Default: first exception cancels others try: results = await asyncio.gather( might_fail(1), might_fail(2), might_fail(3) ) except ValueError as e: print(f"Caught: {e}")
# With return_exceptions=True: exceptions returned as results
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
print(results) # [1, ValueError("Error!"), 3]
```
asyncio.wait()¶
More control over completion:
```python async def main(): tasks = [ asyncio.create_task(fetch("url1")), asyncio.create_task(fetch("url2")), asyncio.create_task(fetch("url3")) ]
# Wait for all
done, pending = await asyncio.wait(tasks)
for task in done:
print(task.result())
```
Wait Options¶
```python
Wait for first to complete¶
done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED )
Wait for first exception¶
done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_EXCEPTION )
Wait with timeout¶
done, pending = await asyncio.wait(tasks, timeout=5.0) ```
asyncio.as_completed()¶
Process results as they complete:
```python async def main(): tasks = [ asyncio.create_task(fetch("slow", 3)), asyncio.create_task(fetch("fast", 1)), asyncio.create_task(fetch("medium", 2)) ]
# Yields tasks in completion order
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got result: {result}")
Output (order by completion):¶
Got result: Data from fast¶
Got result: Data from medium¶
Got result: Data from slow¶
```
Task Cancellation¶
Cancelling a Task¶
```python async def long_running(): try: await asyncio.sleep(10) return "completed" except asyncio.CancelledError: print("Task was cancelled!") raise # Re-raise to properly cancel
async def main(): task = asyncio.create_task(long_running())
await asyncio.sleep(1)
task.cancel() # Request cancellation
try:
await task
except asyncio.CancelledError:
print("Caught cancellation")
asyncio.run(main()) ```
Handling Cancellation¶
python
async def cleanup_on_cancel():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Perform cleanup
print("Cleaning up...")
await save_state()
raise # Always re-raise CancelledError
Shielding from Cancellation¶
```python async def critical_operation(): await asyncio.sleep(1) return "important result"
async def main(): task = asyncio.create_task(critical_operation())
# Shield prevents cancellation from propagating
try:
result = await asyncio.shield(task)
except asyncio.CancelledError:
# Original task continues running
result = await task
```
Task Introspection¶
Task Properties¶
```python async def main(): task = asyncio.create_task(fetch("url"), name="my_fetch")
print(task.get_name()) # "my_fetch"
print(task.done()) # False
print(task.cancelled()) # False
await task
print(task.done()) # True
print(task.result()) # "Data from url"
```
Getting All Tasks¶
```python async def main(): task1 = asyncio.create_task(fetch("url1")) task2 = asyncio.create_task(fetch("url2"))
# Get all running tasks
all_tasks = asyncio.all_tasks()
print(f"Running tasks: {len(all_tasks)}") # 3 (including main)
# Get current task
current = asyncio.current_task()
print(f"Current: {current.get_name()}")
```
Timeouts¶
asyncio.wait_for()¶
```python async def slow_operation(): await asyncio.sleep(10) return "result"
async def main(): try: result = await asyncio.wait_for(slow_operation(), timeout=2.0) except asyncio.TimeoutError: print("Operation timed out!")
asyncio.run(main()) ```
asyncio.timeout() (Python 3.11+)¶
python
async def main():
async with asyncio.timeout(2.0):
await slow_operation() # Raises TimeoutError if > 2 seconds
Timeout with Cleanup¶
```python async def main(): task = asyncio.create_task(slow_operation())
try:
result = await asyncio.wait_for(task, timeout=2.0)
except asyncio.TimeoutError:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
print("Cleaned up after timeout")
```
TaskGroup (Python 3.11+)¶
Modern way to manage multiple tasks with proper cleanup:
```python async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task(fetch("url1")) task2 = tg.create_task(fetch("url2")) task3 = tg.create_task(fetch("url3"))
# All tasks are done here
# If any task raises, all others are cancelled
print(task1.result(), task2.result(), task3.result())
```
TaskGroup vs gather()¶
| Feature | TaskGroup | gather() |
|---|---|---|
| Python version | 3.11+ | 3.4+ |
| Exception handling | Cancels all on first error | Configurable |
| Context manager | Yes | No |
| Structured concurrency | Yes | No |
Common Patterns¶
Fire and Forget (Background Task)¶
```python async def background_job(): await asyncio.sleep(5) print("Background job done")
async def main(): # Start task but don't await task = asyncio.create_task(background_job())
# Continue with other work
print("Main work...")
# Optionally wait at the end
await task
```
Task with Callback¶
```python def task_done_callback(task): if task.exception(): print(f"Task failed: {task.exception()}") else: print(f"Task result: {task.result()}")
async def main(): task = asyncio.create_task(fetch("url")) task.add_done_callback(task_done_callback) await task ```
Limiting Concurrency¶
```python async def fetch_with_semaphore(sem, url): async with sem: # Limit concurrent requests return await fetch(url)
async def main(): sem = asyncio.Semaphore(10) # Max 10 concurrent
urls = [f"url{i}" for i in range(100)]
tasks = [fetch_with_semaphore(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
```
Summary¶
| Function | Purpose |
|---|---|
asyncio.create_task() |
Create task from coroutine |
asyncio.gather() |
Run multiple, collect results |
asyncio.wait() |
Run multiple, control completion |
asyncio.as_completed() |
Iterate in completion order |
asyncio.wait_for() |
Run with timeout |
asyncio.shield() |
Protect from cancellation |
task.cancel() |
Cancel a task |
task.result() |
Get task result |
asyncio.TaskGroup |
Structured concurrency (3.11+) |
Key Takeaways:
- Tasks enable concurrent execution of coroutines
create_task()starts execution immediatelygather()is the simplest way to run multiple tasks- Always handle
CancelledErrorin long-running tasks - Use timeouts to prevent hanging
- TaskGroup (3.11+) provides structured concurrency
Runnable Example: coroutine_prime_filter_example.py¶
```python """ Asyncio: Coroutines with asyncio.gather()
Demonstrates concurrent coroutines running cooperative tasks. Two async tasks (prime filtering and square mapping) run concurrently using asyncio.gather().
Topics covered: - async/await syntax - asyncio.gather() for concurrent coroutines - yield from for delegation to sub-generators - Cooperative multitasking with await asyncio.sleep()
Based on concepts from Python-100-Days example23 and ch14/asyncio materials. """
import asyncio from math import sqrt
=============================================================================¶
Example 1: Helper Functions¶
=============================================================================¶
def is_prime(num: int) -> bool: """Check if a number is prime.""" if num < 2: return False for factor in range(2, int(sqrt(num)) + 1): if num % factor == 0: return False return True
def number_range(start: int, end: int): """Generator that yields numbers in range.
Using 'yield from' to delegate to range().
"""
yield from range(start, end + 1)
=============================================================================¶
Example 2: Concurrent Coroutines¶
=============================================================================¶
async def prime_filter(start: int, end: int) -> tuple[int, ...]: """Async coroutine that filters prime numbers from a range.
The await asyncio.sleep(0) call yields control to the event loop,
allowing other coroutines to run. This is cooperative multitasking.
"""
primes = []
for n in number_range(start, end):
if is_prime(n):
primes.append(n)
# Yield control to event loop periodically
if n % 10 == 0:
await asyncio.sleep(0)
return tuple(primes)
async def square_mapper(start: int, end: int) -> list[int]: """Async coroutine that computes squares of numbers in a range.""" squares = [] for n in number_range(start, end): squares.append(n * n) if n % 10 == 0: await asyncio.sleep(0) return squares
=============================================================================¶
Example 3: Running Coroutines Concurrently¶
=============================================================================¶
async def demo_gather(): """Run multiple coroutines concurrently with asyncio.gather().
gather() starts all coroutines at once and returns results
in the same order as the coroutines were passed.
"""
print("=== asyncio.gather() Demo ===")
print("Running prime_filter and square_mapper concurrently...\n")
# Both coroutines run concurrently (interleaved at await points)
primes, squares = await asyncio.gather(
prime_filter(2, 50),
square_mapper(1, 10),
)
print(f"Primes (2-50): {primes}")
print(f"Squares (1-10): {squares}")
print()
=============================================================================¶
Example 4: Progress Reporting with Async¶
=============================================================================¶
async def prime_filter_with_progress(start: int, end: int) -> list[int]: """Prime filter that reports progress.""" primes = [] total = end - start + 1 for i, n in enumerate(number_range(start, end)): if is_prime(n): primes.append(n) # Report progress every 25% if (i + 1) % (total // 4) == 0: pct = (i + 1) / total * 100 print(f" Prime filter: {pct:.0f}% complete ({len(primes)} found)") await asyncio.sleep(0) return primes
async def countdown(name: str, n: int) -> str: """Simple countdown coroutine (runs alongside prime filter).""" for i in range(n, 0, -1): print(f" {name}: {i}") await asyncio.sleep(0.01) return f"{name} done"
async def demo_concurrent_progress(): """Show interleaved execution of concurrent coroutines.""" print("=== Concurrent Coroutines with Progress ===")
results = await asyncio.gather(
prime_filter_with_progress(2, 1000),
countdown("Timer-A", 5),
countdown("Timer-B", 3),
)
primes, msg_a, msg_b = results
print(f"\nFound {len(primes)} primes between 2 and 1000")
print(f"Messages: {msg_a}, {msg_b}")
print()
=============================================================================¶
Example 5: gather() vs wait() Comparison¶
=============================================================================¶
async def demo_comparison(): """Compare gather() and wait() approaches.""" print("=== gather() vs wait() ===") print(""" asyncio.gather(*coroutines): - Returns results in ORDER (same as input) - Simple API: results = await gather(a, b, c) - Best for: getting all results together
asyncio.wait(tasks):
- Returns (done, pending) sets
- Results in COMPLETION order (not input order)
- Supports timeout and FIRST_COMPLETED
- Best for: processing as tasks complete, timeouts
""")
# gather: ordered results
results = await asyncio.gather(
prime_filter(2, 30),
square_mapper(1, 5),
)
print(f"gather results (ordered): {results}")
print()
# wait: done/pending sets
tasks = [
asyncio.create_task(prime_filter(2, 30)),
asyncio.create_task(square_mapper(1, 5)),
]
done, pending = await asyncio.wait(tasks)
print(f"wait: {len(done)} done, {len(pending)} pending")
for task in done:
print(f" Result: {task.result()}")
=============================================================================¶
Main¶
=============================================================================¶
if name == 'main': asyncio.run(demo_gather()) asyncio.run(demo_concurrent_progress()) asyncio.run(demo_comparison()) ```
Exercises¶
Exercise 1.
Create 5 tasks using asyncio.create_task(), each sleeping for a different duration (0.1s to 0.5s) and returning its task name. Use asyncio.as_completed() to print results in completion order. Verify that shorter tasks complete first.
Solution to Exercise 1
```python
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return name
async def main():
tasks = [
asyncio.create_task(task(f"task-{i}", 0.1 * (i + 1)))
for i in range(5)
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Completed: {result}")
asyncio.run(main())
```
Exercise 2.
Write a program that creates a long-running task (await asyncio.sleep(10)) and cancels it after 0.5 seconds. Catch asyncio.CancelledError in both the task and the caller. Print messages showing when the task starts, when it is cancelled, and when the caller handles the cancellation.
Solution to Exercise 2
```python
import asyncio
async def long_running():
try:
print("Task: started")
await asyncio.sleep(10)
return "done"
except asyncio.CancelledError:
print("Task: cancelled, cleaning up")
raise
async def main():
t = asyncio.create_task(long_running())
await asyncio.sleep(0.5)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("Caller: handled cancellation")
asyncio.run(main())
```
Exercise 3.
Use asyncio.TaskGroup (Python 3.11+) to run 4 tasks concurrently. Three tasks should succeed (returning their ID squared), and one should raise a ValueError. Catch the resulting ExceptionGroup and print both the successful results and the error message.
Solution to Exercise 3
```python
import asyncio
async def compute(task_id):
await asyncio.sleep(0.1)
if task_id == 3:
raise ValueError(f"Task {task_id} failed")
return task_id ** 2
async def main():
try:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(compute(i)) for i in range(1, 5)]
except* ValueError as eg:
print("Errors:")
for exc in eg.exceptions:
print(f" {exc}")
print("Results from successful tasks:")
for t in tasks:
if not t.cancelled() and t.exception() is None:
print(f" {t.result()}")
asyncio.run(main())
```