Skip to content

Executor Interface

concurrent.futures provides a high-level interface for asynchronously executing callables. It's the recommended approach for most concurrent programming in Python.


Why concurrent.futures?

Before: Low-Level Threading/Multiprocessing

import threading
import queue

# Manual thread management
result_queue = queue.Queue()

def worker(x, q):
    q.put(x ** 2)

threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i, result_queue))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

results = [result_queue.get() for _ in range(10)]

After: concurrent.futures

from concurrent.futures import ThreadPoolExecutor

def square(x):
    return x ** 2

with ThreadPoolExecutor() as executor:
    results = list(executor.map(square, range(10)))

The Executor Interface

Both ThreadPoolExecutor and ProcessPoolExecutor share the same interface:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Same interface for both
executor = ThreadPoolExecutor(max_workers=4)
# or
executor = ProcessPoolExecutor(max_workers=4)

# Submit individual tasks
future = executor.submit(function, arg1, arg2)

# Map function over iterable
results = executor.map(function, iterable)

# Shutdown
executor.shutdown(wait=True)

Core Methods

submit() — Submit Single Task

from concurrent.futures import ThreadPoolExecutor

def compute(x, y):
    return x + y

with ThreadPoolExecutor() as executor:
    # Returns Future immediately
    future = executor.submit(compute, 10, 20)

    # Get result (blocks until complete)
    result = future.result()
    print(result)  # 30

map() — Apply Function to Iterable

from concurrent.futures import ThreadPoolExecutor

def square(x):
    return x ** 2

with ThreadPoolExecutor() as executor:
    # Returns iterator of results (in order)
    results = executor.map(square, [1, 2, 3, 4, 5])
    print(list(results))  # [1, 4, 9, 16, 25]

shutdown() — Stop Executor

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)

# Submit tasks...
future = executor.submit(lambda: 42)

# Shutdown options
executor.shutdown(wait=True)   # Wait for pending tasks (default)
executor.shutdown(wait=False)  # Return immediately
executor.shutdown(wait=True, cancel_futures=True)  # Cancel pending (Python 3.9+)

from concurrent.futures import ThreadPoolExecutor

def process(x):
    return x * 2

# Automatic shutdown when exiting context
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process, range(10)))

# executor is automatically shut down here
print(results)

Switching Between Threads and Processes

The unified interface makes it easy to switch:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def compute(x):
    return x ** 2

data = list(range(100))

# For I/O-bound tasks: use threads
with ThreadPoolExecutor(max_workers=10) as executor:
    thread_results = list(executor.map(compute, data))

# For CPU-bound tasks: use processes
with ProcessPoolExecutor(max_workers=4) as executor:
    process_results = list(executor.map(compute, data))

Factory Pattern

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def get_executor(task_type, max_workers=None):
    """Get appropriate executor for task type."""
    if task_type == "io":
        return ThreadPoolExecutor(max_workers=max_workers or 20)
    elif task_type == "cpu":
        return ProcessPoolExecutor(max_workers=max_workers)
    else:
        raise ValueError(f"Unknown task type: {task_type}")

# Usage
with get_executor("io") as executor:
    results = executor.map(fetch_url, urls)

with get_executor("cpu") as executor:
    results = executor.map(heavy_computation, data)

Executor Parameters

ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(
    max_workers=10,           # Max threads (default: min(32, cpu_count + 4))
    thread_name_prefix="Worker",  # Prefix for thread names
    initializer=init_func,    # Called in each thread at start
    initargs=(arg1, arg2),    # Arguments for initializer
)

ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

executor = ProcessPoolExecutor(
    max_workers=4,            # Max processes (default: cpu_count)
    mp_context=mp.get_context("spawn"),  # Start method
    initializer=init_func,    # Called in each process at start
    initargs=(arg1, arg2),    # Arguments for initializer
    max_tasks_per_child=100,  # Restart worker after N tasks (Python 3.11+)
)

Initializer Functions

Run setup code in each worker:

from concurrent.futures import ThreadPoolExecutor
import threading

# Thread-local storage
local = threading.local()

def init_worker(connection_string):
    """Initialize each worker thread."""
    local.db = create_connection(connection_string)
    print(f"Worker {threading.current_thread().name}: DB connected")

def query(sql):
    """Use worker's database connection."""
    return local.db.execute(sql)

with ThreadPoolExecutor(
    max_workers=4,
    initializer=init_worker,
    initargs=("postgresql://localhost/mydb",)
) as executor:
    queries = ["SELECT * FROM users", "SELECT * FROM orders"]
    results = list(executor.map(query, queries))

Error Handling

Exceptions in submit()

from concurrent.futures import ThreadPoolExecutor

def risky_task(x):
    if x < 0:
        raise ValueError("Negative not allowed")
    return x ** 2

with ThreadPoolExecutor() as executor:
    future = executor.submit(risky_task, -5)

    try:
        result = future.result()  # Raises exception here
    except ValueError as e:
        print(f"Task failed: {e}")

Exceptions in map()

from concurrent.futures import ThreadPoolExecutor

def risky_task(x):
    if x == 3:
        raise ValueError(f"Cannot process {x}")
    return x ** 2

with ThreadPoolExecutor() as executor:
    try:
        # Exception raised when iterating
        results = list(executor.map(risky_task, range(5)))
    except ValueError as e:
        print(f"Error: {e}")

Handling Errors Per Task

from concurrent.futures import ThreadPoolExecutor

def safe_task(x):
    try:
        if x == 3:
            raise ValueError("Bad value")
        return ("success", x ** 2)
    except Exception as e:
        return ("error", str(e))

with ThreadPoolExecutor() as executor:
    results = list(executor.map(safe_task, range(5)))

    for status, value in results:
        if status == "success":
            print(f"Result: {value}")
        else:
            print(f"Error: {value}")

Timeout Support

result() with Timeout

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def slow_task():
    time.sleep(10)
    return "Done"

with ThreadPoolExecutor() as executor:
    future = executor.submit(slow_task)

    try:
        result = future.result(timeout=2)  # Wait max 2 seconds
    except TimeoutError:
        print("Task timed out!")
        future.cancel()  # Try to cancel (may not work if already running)

map() with Timeout

from concurrent.futures import ThreadPoolExecutor, TimeoutError

def slow_task(x):
    import time
    time.sleep(x)
    return x

with ThreadPoolExecutor() as executor:
    try:
        # timeout applies to entire iteration
        results = list(executor.map(slow_task, [1, 2, 3], timeout=2))
    except TimeoutError:
        print("Map operation timed out!")

Comparison: map() vs submit()

Feature map() submit()
Input Iterable Individual arguments
Returns Iterator of results Single Future
Order Preserves input order N/A
Error handling First exception stops iteration Per-task exception handling
Flexibility Less (same function) More (different functions)
Memory Lazy (iterator) Eager (all Futures at once)

When to Use map()

# Same function applied to many inputs
results = executor.map(process, items)

When to Use submit()

# Different functions or complex handling
futures = []
futures.append(executor.submit(download, url))
futures.append(executor.submit(process, data))
futures.append(executor.submit(upload, result))

Practical Example

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def fetch_data(url):
    """I/O-bound: fetch from URL."""
    time.sleep(0.5)  # Simulate network
    return f"Data from {url}"

def process_data(data):
    """CPU-bound: process data."""
    time.sleep(0.1)  # Simulate computation
    return f"Processed: {data}"

urls = [f"https://api.example.com/data/{i}" for i in range(10)]

# Stage 1: Fetch (I/O-bound) — use threads
with ThreadPoolExecutor(max_workers=10) as executor:
    raw_data = list(executor.map(fetch_data, urls))
print(f"Fetched {len(raw_data)} items")

# Stage 2: Process (CPU-bound) — use processes
with ProcessPoolExecutor() as executor:
    processed = list(executor.map(process_data, raw_data))
print(f"Processed {len(processed)} items")

Key Takeaways

  • concurrent.futures provides unified interface for threads and processes
  • Use context manager (with) for automatic cleanup
  • submit() for individual tasks, returns Future
  • map() for applying function to iterable, returns iterator
  • Same code works with both ThreadPoolExecutor and ProcessPoolExecutor
  • Switch between threads (I/O-bound) and processes (CPU-bound) easily
  • Use initializer for per-worker setup (database connections, etc.)
  • Handle exceptions via future.result() or wrap in try/except