Callback Patterns¶
A callback is a function passed as an argument to another function, to be called later. Callbacks are fundamental to event-driven programming, asynchronous operations, and flexible APIs.
Mental Model
A callback is a "call me when you're done" contract: you hand a function to someone else's code, and that code invokes it at the right moment. The caller decides what to do; the callee decides when. This inversion of control is the foundation of event-driven and asynchronous programming.
What is a Callback?¶
```python def process_data(data, on_complete): """Process data and call on_complete when done.""" result = [x * 2 for x in data] on_complete(result) # Callback invocation
def print_result(result): print(f"Processing complete: {result}")
Pass function as callback¶
process_data([1, 2, 3], print_result)
Output: Processing complete: [2, 4, 6]¶
```
The key insight: functions are first-class objects in Python, so they can be passed around like any other value.
Basic Callback Patterns¶
Simple Callback¶
```python def fetch_data(url, callback): """Simulate fetching data and calling back with result.""" # Simulate network request data = {"status": "ok", "url": url} callback(data)
def handle_response(data): print(f"Received: {data}")
fetch_data("https://api.example.com", handle_response) ```
Callback with Lambda¶
```python def calculate(x, y, operation): """Apply operation callback to x and y.""" return operation(x, y)
Using lambdas as callbacks¶
calculate(10, 5, lambda a, b: a + b) # 15 calculate(10, 5, lambda a, b: a * b) # 50 calculate(10, 5, lambda a, b: a ** b) # 100000 ```
Multiple Callbacks¶
```python def process(data, on_success, on_error): """Process with separate success and error callbacks.""" try: result = [x * 2 for x in data] on_success(result) except Exception as e: on_error(e)
def success_handler(result): print(f"Success: {result}")
def error_handler(error): print(f"Error: {error}")
process([1, 2, 3], success_handler, error_handler)
Success: [2, 4, 6]¶
process("not a list", success_handler, error_handler)
Error: can't multiply sequence by non-int¶
```
Callback with Context¶
Passing Extra Arguments¶
```python from functools import partial
def notify(message, callback, context): """Notify with additional context.""" callback(message, context)
def log_message(message, level="INFO", timestamp=None): print(f"[{level}] {timestamp}: {message}")
notify("Server started", log_message, level="INFO", timestamp="10:30")
[INFO] 10:30: Server started¶
```
Using partial for Context¶
```python from functools import partial
def send_email(to, subject, body): print(f"To: {to}\nSubject: {subject}\n{body}")
def process_order(order_id, on_complete): # Process order... on_complete(order_id)
Create callback with pre-filled arguments¶
email_callback = partial( send_email, to="customer@example.com", subject="Order Confirmation" )
def notify_order(order_id): email_callback(body=f"Your order #{order_id} is confirmed!")
process_order(12345, notify_order) ```
Closure for Context¶
```python def create_logger(prefix): """Create a callback with embedded context.""" def log(message): print(f"[{prefix}] {message}") return log
debug_log = create_logger("DEBUG") error_log = create_logger("ERROR")
def process(data, logger): logger(f"Processing {len(data)} items") # ... process data logger("Complete")
process([1, 2, 3], debug_log)
[DEBUG] Processing 3 items¶
[DEBUG] Complete¶
```
Event Handler Pattern¶
Simple Event System¶
```python class EventEmitter: def init(self): self._callbacks = {}
def on(self, event, callback):
"""Register a callback for an event."""
if event not in self._callbacks:
self._callbacks[event] = []
self._callbacks[event].append(callback)
def emit(self, event, *args, **kwargs):
"""Trigger all callbacks for an event."""
for callback in self._callbacks.get(event, []):
callback(*args, **kwargs)
Usage¶
emitter = EventEmitter()
def on_user_login(user): print(f"User logged in: {user}")
def on_user_login_log(user): print(f"Logging: {user} login at {datetime.now()}")
emitter.on("login", on_user_login) emitter.on("login", on_user_login_log)
emitter.emit("login", "alice")
User logged in: alice¶
Logging: alice login at 2024-01-15 10:30:00¶
```
With Decorator Registration¶
```python class EventSystem: _handlers = {}
@classmethod
def on(cls, event):
"""Decorator to register event handler."""
def decorator(func):
if event not in cls._handlers:
cls._handlers[event] = []
cls._handlers[event].append(func)
return func
return decorator
@classmethod
def emit(cls, event, *args, **kwargs):
for handler in cls._handlers.get(event, []):
handler(*args, **kwargs)
Register handlers with decorator¶
@EventSystem.on("order_placed") def send_confirmation(order): print(f"Sending confirmation for order {order['id']}")
@EventSystem.on("order_placed") def update_inventory(order): print(f"Updating inventory for {order['items']}")
Trigger event¶
EventSystem.emit("order_placed", {"id": 123, "items": ["book", "pen"]}) ```
Observer Pattern¶
```python class Subject: """Observable that notifies observers of state changes."""
def __init__(self):
self._observers = []
self._state = None
def attach(self, observer):
"""Add an observer callback."""
self._observers.append(observer)
def detach(self, observer):
"""Remove an observer callback."""
self._observers.remove(observer)
def notify(self):
"""Notify all observers of state change."""
for observer in self._observers:
observer(self._state)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.notify()
Usage¶
def observer_a(state): print(f"Observer A: state changed to {state}")
def observer_b(state): print(f"Observer B: state changed to {state}")
subject = Subject() subject.attach(observer_a) subject.attach(observer_b)
subject.state = "active"
Observer A: state changed to active¶
Observer B: state changed to active¶
subject.detach(observer_a) subject.state = "inactive"
Observer B: state changed to inactive¶
```
Progress Callback Pattern¶
```python def process_items(items, on_progress=None): """Process items with optional progress callback.""" results = [] total = len(items)
for i, item in enumerate(items):
# Process item
result = item * 2
results.append(result)
# Report progress
if on_progress:
progress = (i + 1) / total * 100
on_progress(progress, item, result)
return results
def print_progress(percent, item, result): print(f"{percent:.0f}% - Processed {item} -> {result}")
process_items([1, 2, 3, 4, 5], on_progress=print_progress)
20% - Processed 1 -> 2¶
40% - Processed 2 -> 4¶
60% - Processed 3 -> 6¶
80% - Processed 4 -> 8¶
100% - Processed 5 -> 10¶
```
Progress with Cancellation¶
```python def process_items(items, on_progress=None): """Process items; callback returns False to cancel.""" results = [] total = len(items)
for i, item in enumerate(items):
result = item * 2
results.append(result)
if on_progress:
progress = (i + 1) / total * 100
should_continue = on_progress(progress, item, result)
if should_continue is False:
print("Processing cancelled")
break
return results
def limited_progress(percent, item, result): print(f"{percent:.0f}%") return percent < 50 # Cancel after 50%
process_items([1, 2, 3, 4, 5], on_progress=limited_progress)
20%¶
40%¶
60%¶
Processing cancelled¶
```
Retry with Callback¶
```python import time import random
def retry_operation(operation, max_attempts=3, on_retry=None): """Retry operation with callback on each retry.""" last_error = None
for attempt in range(1, max_attempts + 1):
try:
return operation()
except Exception as e:
last_error = e
if on_retry and attempt < max_attempts:
on_retry(attempt, e)
time.sleep(0.1 * attempt) # Exponential backoff
raise last_error
def unreliable_operation(): if random.random() < 0.7: raise ConnectionError("Network timeout") return "Success!"
def on_retry(attempt, error): print(f"Attempt {attempt} failed: {error}. Retrying...")
result = retry_operation(unreliable_operation, max_attempts=5, on_retry=on_retry) ```
Validation Callback Pattern¶
```python def create_user(username, email, validators=None): """Create user with validation callbacks.""" validators = validators or []
data = {"username": username, "email": email}
# Run all validators
errors = []
for validator in validators:
error = validator(data)
if error:
errors.append(error)
if errors:
raise ValueError(f"Validation failed: {errors}")
return data
Validator callbacks¶
def validate_username(data): if len(data["username"]) < 3: return "Username must be at least 3 characters"
def validate_email(data): if "@" not in data["email"]: return "Invalid email format"
def validate_no_spaces(data): if " " in data["username"]: return "Username cannot contain spaces"
Usage¶
validators = [validate_username, validate_email, validate_no_spaces]
try: user = create_user("ab", "invalid", validators) except ValueError as e: print(e)
Validation failed: ['Username must be at least 3 characters', 'Invalid email format']¶
```
Middleware Pattern¶
```python def apply_middleware(data, middlewares): """Apply chain of middleware callbacks.""" result = data for middleware in middlewares: result = middleware(result) if result is None: break # Middleware can halt chain return result
Middleware callbacks¶
def add_timestamp(data): data["timestamp"] = "2024-01-15" return data
def add_user_id(data): data["user_id"] = 123 return data
def validate_data(data): if "name" not in data: print("Validation failed: missing name") return None # Halt chain return data
Apply middleware chain¶
middlewares = [add_timestamp, add_user_id, validate_data]
result = apply_middleware({"name": "test"}, middlewares) print(result)
{'name': 'test', 'timestamp': '2024-01-15', 'user_id': 123}¶
result = apply_middleware({}, middlewares)
Validation failed: missing name¶
None¶
```
Async Callback Simulation¶
```python import threading import time
def async_fetch(url, on_complete, on_error=None): """Simulate async operation with callbacks.""" def do_fetch(): try: time.sleep(1) # Simulate network delay data = {"url": url, "status": "ok"} on_complete(data) except Exception as e: if on_error: on_error(e)
thread = threading.Thread(target=do_fetch)
thread.start()
return thread
def handle_result(data): print(f"Received: {data}")
def handle_error(error): print(f"Error: {error}")
print("Starting async fetch...") thread = async_fetch("https://api.example.com", handle_result, handle_error) print("Continuing with other work...") thread.join() # Wait for completion
Starting async fetch...¶
Continuing with other work...¶
(1 second later)¶
Received:¶
```
Best Practices¶
Use Type Hints¶
```python from typing import Callable, Optional
def process( data: list, on_complete: Callable[[list], None], on_error: Optional[Callable[[Exception], None]] = None ) -> None: """Process with typed callbacks.""" try: result = [x * 2 for x in data] on_complete(result) except Exception as e: if on_error: on_error(e) ```
Default No-Op Callback¶
python
def process(data, callback=None):
"""Use no-op default instead of None check."""
callback = callback or (lambda x: None)
result = [x * 2 for x in data]
callback(result)
return result
Document Callback Signatures¶
```python def fetch_data(url, callback): """ Fetch data from URL.
Args:
url: The URL to fetch
callback: Function called with (data, error) where:
- data: The response dict if successful, None on error
- error: Exception if failed, None on success
"""
pass
```
Summary¶
| Pattern | Use Case |
|---|---|
| Simple callback | Basic async completion |
| Success/Error callbacks | Error handling |
| Event emitter | Multiple listeners |
| Observer | State change notifications |
| Progress callback | Long-running operations |
| Validation callbacks | Pluggable validation |
| Middleware | Request/response processing |
Key Takeaways:
- Callbacks enable flexible, extensible APIs
- Use multiple callbacks for success/error handling
functools.partialand closures provide context- Event systems allow multiple subscribers
- Progress callbacks should support cancellation
- Type hints document expected callback signatures
- Consider
async/awaitfor complex async patterns
Exercises¶
Exercise 1.
Write a download_file(url, on_progress, on_complete) function that simulates downloading by iterating 10 times. On each iteration, call on_progress(percent) with the current percentage (10, 20, ..., 100). After the loop, call on_complete(url). Demonstrate with callbacks that print progress and a completion message.
Solution to Exercise 1
def download_file(url, on_progress, on_complete):
for i in range(1, 11):
percent = i * 10
on_progress(percent)
on_complete(url)
download_file(
"https://example.com/file.zip",
on_progress=lambda p: print(f" Progress: {p}%"),
on_complete=lambda url: print(f" Download complete: {url}"),
)
Exercise 2.
Create a simple EventEmitter class with on(event, callback) to register callbacks and emit(event, *args) to trigger all callbacks for that event. Register multiple callbacks for a "data" event and demonstrate that emitting the event calls all of them in order.
Solution to Exercise 2
class EventEmitter:
def __init__(self):
self.listeners = {}
def on(self, event, callback):
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event, *args):
for callback in self.listeners.get(event, []):
callback(*args)
emitter = EventEmitter()
emitter.on("data", lambda d: print(f"Logger: {d}"))
emitter.on("data", lambda d: print(f"Processor: {d}"))
emitter.on("data", lambda d: print(f"Archiver: {d}"))
emitter.emit("data", {"temperature": 22.5})
Exercise 3.
Write a retry_with_callback(func, max_attempts, on_failure, on_success) function that calls func() up to max_attempts times. On each failure, call on_failure(attempt, exception). On success, call on_success(result) and return the result. If all attempts fail, raise the last exception.
Solution to Exercise 3
def retry_with_callback(func, max_attempts, on_failure, on_success):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
result = func()
on_success(result)
return result
except Exception as e:
last_exc = e
on_failure(attempt, e)
raise last_exc
call_count = 0
def flaky_operation():
global call_count
call_count += 1
if call_count < 3:
raise ConnectionError("Timeout")
return "OK"
retry_with_callback(
flaky_operation,
max_attempts=5,
on_failure=lambda a, e: print(f" Attempt {a} failed: {e}"),
on_success=lambda r: print(f" Success: {r}"),
)