Executor Interface¶
concurrent.futures provides a high-level interface for asynchronously executing callables. It's the recommended approach for most concurrent programming in Python.
Why concurrent.futures?¶
Before: Low-Level Threading/Multiprocessing¶
import threading
import queue
# Manual thread management
result_queue = queue.Queue()
def worker(x, q):
q.put(x ** 2)
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i, result_queue))
t.start()
threads.append(t)
for t in threads:
t.join()
results = [result_queue.get() for _ in range(10)]
After: concurrent.futures¶
from concurrent.futures import ThreadPoolExecutor
def square(x):
return x ** 2
with ThreadPoolExecutor() as executor:
results = list(executor.map(square, range(10)))
The Executor Interface¶
Both ThreadPoolExecutor and ProcessPoolExecutor share the same interface:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Same interface for both
executor = ThreadPoolExecutor(max_workers=4)
# or
executor = ProcessPoolExecutor(max_workers=4)
# Submit individual tasks
future = executor.submit(function, arg1, arg2)
# Map function over iterable
results = executor.map(function, iterable)
# Shutdown
executor.shutdown(wait=True)
Core Methods¶
submit() — Submit Single Task¶
from concurrent.futures import ThreadPoolExecutor
def compute(x, y):
return x + y
with ThreadPoolExecutor() as executor:
# Returns Future immediately
future = executor.submit(compute, 10, 20)
# Get result (blocks until complete)
result = future.result()
print(result) # 30
map() — Apply Function to Iterable¶
from concurrent.futures import ThreadPoolExecutor
def square(x):
return x ** 2
with ThreadPoolExecutor() as executor:
# Returns iterator of results (in order)
results = executor.map(square, [1, 2, 3, 4, 5])
print(list(results)) # [1, 4, 9, 16, 25]
shutdown() — Stop Executor¶
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
# Submit tasks...
future = executor.submit(lambda: 42)
# Shutdown options
executor.shutdown(wait=True) # Wait for pending tasks (default)
executor.shutdown(wait=False) # Return immediately
executor.shutdown(wait=True, cancel_futures=True) # Cancel pending (Python 3.9+)
Context Manager (Recommended)¶
from concurrent.futures import ThreadPoolExecutor
def process(x):
return x * 2
# Automatic shutdown when exiting context
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process, range(10)))
# executor is automatically shut down here
print(results)
Switching Between Threads and Processes¶
The unified interface makes it easy to switch:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def compute(x):
return x ** 2
data = list(range(100))
# For I/O-bound tasks: use threads
with ThreadPoolExecutor(max_workers=10) as executor:
thread_results = list(executor.map(compute, data))
# For CPU-bound tasks: use processes
with ProcessPoolExecutor(max_workers=4) as executor:
process_results = list(executor.map(compute, data))
Factory Pattern¶
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def get_executor(task_type, max_workers=None):
"""Get appropriate executor for task type."""
if task_type == "io":
return ThreadPoolExecutor(max_workers=max_workers or 20)
elif task_type == "cpu":
return ProcessPoolExecutor(max_workers=max_workers)
else:
raise ValueError(f"Unknown task type: {task_type}")
# Usage
with get_executor("io") as executor:
results = executor.map(fetch_url, urls)
with get_executor("cpu") as executor:
results = executor.map(heavy_computation, data)
Executor Parameters¶
ThreadPoolExecutor¶
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(
max_workers=10, # Max threads (default: min(32, cpu_count + 4))
thread_name_prefix="Worker", # Prefix for thread names
initializer=init_func, # Called in each thread at start
initargs=(arg1, arg2), # Arguments for initializer
)
ProcessPoolExecutor¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
executor = ProcessPoolExecutor(
max_workers=4, # Max processes (default: cpu_count)
mp_context=mp.get_context("spawn"), # Start method
initializer=init_func, # Called in each process at start
initargs=(arg1, arg2), # Arguments for initializer
max_tasks_per_child=100, # Restart worker after N tasks (Python 3.11+)
)
Initializer Functions¶
Run setup code in each worker:
from concurrent.futures import ThreadPoolExecutor
import threading
# Thread-local storage
local = threading.local()
def init_worker(connection_string):
"""Initialize each worker thread."""
local.db = create_connection(connection_string)
print(f"Worker {threading.current_thread().name}: DB connected")
def query(sql):
"""Use worker's database connection."""
return local.db.execute(sql)
with ThreadPoolExecutor(
max_workers=4,
initializer=init_worker,
initargs=("postgresql://localhost/mydb",)
) as executor:
queries = ["SELECT * FROM users", "SELECT * FROM orders"]
results = list(executor.map(query, queries))
Error Handling¶
Exceptions in submit()¶
from concurrent.futures import ThreadPoolExecutor
def risky_task(x):
if x < 0:
raise ValueError("Negative not allowed")
return x ** 2
with ThreadPoolExecutor() as executor:
future = executor.submit(risky_task, -5)
try:
result = future.result() # Raises exception here
except ValueError as e:
print(f"Task failed: {e}")
Exceptions in map()¶
from concurrent.futures import ThreadPoolExecutor
def risky_task(x):
if x == 3:
raise ValueError(f"Cannot process {x}")
return x ** 2
with ThreadPoolExecutor() as executor:
try:
# Exception raised when iterating
results = list(executor.map(risky_task, range(5)))
except ValueError as e:
print(f"Error: {e}")
Handling Errors Per Task¶
from concurrent.futures import ThreadPoolExecutor
def safe_task(x):
try:
if x == 3:
raise ValueError("Bad value")
return ("success", x ** 2)
except Exception as e:
return ("error", str(e))
with ThreadPoolExecutor() as executor:
results = list(executor.map(safe_task, range(5)))
for status, value in results:
if status == "success":
print(f"Result: {value}")
else:
print(f"Error: {value}")
Timeout Support¶
result() with Timeout¶
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task():
time.sleep(10)
return "Done"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
result = future.result(timeout=2) # Wait max 2 seconds
except TimeoutError:
print("Task timed out!")
future.cancel() # Try to cancel (may not work if already running)
map() with Timeout¶
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def slow_task(x):
import time
time.sleep(x)
return x
with ThreadPoolExecutor() as executor:
try:
# timeout applies to entire iteration
results = list(executor.map(slow_task, [1, 2, 3], timeout=2))
except TimeoutError:
print("Map operation timed out!")
Comparison: map() vs submit()¶
| Feature | map() | submit() |
|---|---|---|
| Input | Iterable | Individual arguments |
| Returns | Iterator of results | Single Future |
| Order | Preserves input order | N/A |
| Error handling | First exception stops iteration | Per-task exception handling |
| Flexibility | Less (same function) | More (different functions) |
| Memory | Lazy (iterator) | Eager (all Futures at once) |
When to Use map()¶
# Same function applied to many inputs
results = executor.map(process, items)
When to Use submit()¶
# Different functions or complex handling
futures = []
futures.append(executor.submit(download, url))
futures.append(executor.submit(process, data))
futures.append(executor.submit(upload, result))
Practical Example¶
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def fetch_data(url):
"""I/O-bound: fetch from URL."""
time.sleep(0.5) # Simulate network
return f"Data from {url}"
def process_data(data):
"""CPU-bound: process data."""
time.sleep(0.1) # Simulate computation
return f"Processed: {data}"
urls = [f"https://api.example.com/data/{i}" for i in range(10)]
# Stage 1: Fetch (I/O-bound) — use threads
with ThreadPoolExecutor(max_workers=10) as executor:
raw_data = list(executor.map(fetch_data, urls))
print(f"Fetched {len(raw_data)} items")
# Stage 2: Process (CPU-bound) — use processes
with ProcessPoolExecutor() as executor:
processed = list(executor.map(process_data, raw_data))
print(f"Processed {len(processed)} items")
Key Takeaways¶
concurrent.futuresprovides unified interface for threads and processes- Use context manager (
with) for automatic cleanup submit()for individual tasks, returnsFuturemap()for applying function to iterable, returns iterator- Same code works with both
ThreadPoolExecutorandProcessPoolExecutor - Switch between threads (I/O-bound) and processes (CPU-bound) easily
- Use
initializerfor per-worker setup (database connections, etc.) - Handle exceptions via
future.result()or wrap in try/except