ProcessPoolExecutor¶
ProcessPoolExecutor manages a pool of worker processes for parallel execution. Best suited for CPU-bound tasks that need to bypass the GIL.
Basic Usage¶
from concurrent.futures import ProcessPoolExecutor
import time
def compute_heavy(n):
"""CPU-intensive computation."""
return sum(i * i for i in range(n))
if __name__ == "__main__":
numbers = [10_000_000, 20_000_000, 30_000_000, 40_000_000]
# Sequential
start = time.perf_counter()
results = [compute_heavy(n) for n in numbers]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel with processes
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
results = list(executor.map(compute_heavy, numbers))
print(f"Parallel: {time.perf_counter() - start:.2f}s")
Important: Always use if __name__ == "__main__": guard with ProcessPoolExecutor.
Creating ProcessPoolExecutor¶
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
# Default workers: os.cpu_count()
executor = ProcessPoolExecutor()
# Explicit worker count
executor = ProcessPoolExecutor(max_workers=4)
# With specific start method
executor = ProcessPoolExecutor(
max_workers=4,
mp_context=mp.get_context("spawn")
)
# With initializer
executor = ProcessPoolExecutor(
max_workers=4,
initializer=setup_function,
initargs=(arg1, arg2)
)
# Restart workers periodically (Python 3.11+)
executor = ProcessPoolExecutor(
max_workers=4,
max_tasks_per_child=100 # Restart after 100 tasks
)
Using map()¶
from concurrent.futures import ProcessPoolExecutor
def square(x):
return x ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = list(executor.map(square, range(10)))
print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
map() with Chunksize¶
For large datasets, chunksize reduces IPC overhead:
from concurrent.futures import ProcessPoolExecutor
def process(x):
return x ** 2
if __name__ == "__main__":
data = list(range(100_000))
with ProcessPoolExecutor() as executor:
# Without chunksize: many small transfers
results1 = list(executor.map(process, data))
# With chunksize: fewer, larger transfers
results2 = list(executor.map(process, data, chunksize=1000))
map() with Multiple Arguments¶
from concurrent.futures import ProcessPoolExecutor
def power(base, exp):
return base ** exp
if __name__ == "__main__":
bases = [2, 3, 4, 5]
exps = [10, 10, 10, 10]
with ProcessPoolExecutor() as executor:
results = list(executor.map(power, bases, exps))
print(results) # [1024, 59049, 1048576, 9765625]
Using submit()¶
from concurrent.futures import ProcessPoolExecutor
def compute(x):
return x ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
# Submit individual tasks
future1 = executor.submit(compute, 10)
future2 = executor.submit(compute, 20)
future3 = executor.submit(compute, 30)
print(future1.result()) # 100
print(future2.result()) # 400
print(future3.result()) # 900
Processing Results as They Complete¶
from concurrent.futures import ProcessPoolExecutor, as_completed
import random
def variable_computation(task_id):
"""Computation with variable duration."""
iterations = random.randint(1_000_000, 10_000_000)
result = sum(i for i in range(iterations))
return (task_id, iterations, result)
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = {executor.submit(variable_computation, i): i for i in range(10)}
for future in as_completed(futures):
task_id = futures[future]
try:
tid, iters, result = future.result()
print(f"Task {tid}: {iters:,} iterations")
except Exception as e:
print(f"Task {task_id} failed: {e}")
Initializer for Worker Setup¶
from concurrent.futures import ProcessPoolExecutor
import os
# Global variable in each worker
model = None
def init_worker(model_path):
"""Load model once per worker process."""
global model
model = load_model(model_path)
print(f"Worker {os.getpid()}: Model loaded")
def predict(data):
"""Use pre-loaded model."""
return model.predict(data)
if __name__ == "__main__":
with ProcessPoolExecutor(
max_workers=4,
initializer=init_worker,
initargs=("model.pkl",)
) as executor:
predictions = list(executor.map(predict, test_data))
Practical Examples¶
Parallel Number Crunching¶
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
"""Check if n is prime."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def count_primes(start, end):
"""Count primes in range."""
return sum(1 for n in range(start, end) if is_prime(n))
if __name__ == "__main__":
# Split range into chunks
ranges = [
(0, 250_000),
(250_000, 500_000),
(500_000, 750_000),
(750_000, 1_000_000),
]
with ProcessPoolExecutor() as executor:
counts = list(executor.starmap(count_primes, ranges))
print(f"Total primes: {sum(counts)}")
Parallel Image Processing¶
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
# from PIL import Image # Uncomment for real usage
def process_image(image_path):
"""Resize and convert image."""
# img = Image.open(image_path)
# img = img.resize((256, 256))
# output_path = image_path.with_stem(image_path.stem + "_thumb")
# img.save(output_path)
return f"Processed {image_path}"
if __name__ == "__main__":
images = list(Path("images").glob("*.jpg"))
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_image, images))
print(f"Processed {len(results)} images")
Parallel Data Processing¶
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
def process_chunk(chunk_data):
"""Process a chunk of data."""
# Heavy computation on chunk
result = chunk_data.apply(expensive_transformation)
return result.sum()
if __name__ == "__main__":
# Load and split data
df = pd.read_csv("large_data.csv")
chunks = np.array_split(df, 8) # Split into 8 chunks
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_chunk, chunks))
total = sum(results)
Monte Carlo Simulation¶
from concurrent.futures import ProcessPoolExecutor
import random
def monte_carlo_pi(num_samples):
"""Estimate pi using Monte Carlo method."""
inside = 0
for _ in range(num_samples):
x, y = random.random(), random.random()
if x*x + y*y <= 1:
inside += 1
return inside
if __name__ == "__main__":
num_workers = 8
samples_per_worker = 10_000_000
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(
monte_carlo_pi,
[samples_per_worker] * num_workers
))
total_inside = sum(results)
total_samples = samples_per_worker * num_workers
pi_estimate = 4 * total_inside / total_samples
print(f"Pi estimate: {pi_estimate}")
Serialization Requirements¶
Objects passed to/from processes must be picklable:
from concurrent.futures import ProcessPoolExecutor
# Works: regular functions, basic types
def square(x):
return x ** 2
# Fails: lambda functions
# executor.map(lambda x: x**2, data) # PicklingError!
# Fails: local classes
class Local:
pass
# executor.submit(func, Local()) # PicklingError!
# Works: module-level classes
class ModuleLevel:
pass
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
results = list(executor.map(square, [1, 2, 3]))
Common Pickling Issues¶
# Problem: Lambda functions can't be pickled
bad = lambda x: x ** 2
# Solution: Use regular function
def good(x):
return x ** 2
# Problem: Instance methods need care
class Processor:
def process(self, x):
return x ** 2
# Solution: Use module-level function or staticmethod
def process_item(x):
return x ** 2
Error Handling¶
from concurrent.futures import ProcessPoolExecutor, as_completed
def risky_task(x):
if x == 5:
raise ValueError(f"Cannot process {x}")
return x ** 2
if __name__ == "__main__":
with ProcessPoolExecutor() as executor:
futures = {executor.submit(risky_task, i): i for i in range(10)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result()
print(f"Task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} failed: {e}")
Performance Considerations¶
Worker Count¶
import os
# CPU-bound: match CPU cores
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
# Leave some CPUs free
executor = ProcessPoolExecutor(max_workers=max(1, os.cpu_count() - 2))
Chunksize for map()¶
# Small data: default is fine
executor.map(func, small_list)
# Large data: set chunksize
# Rule of thumb: len(data) // (workers * 4)
executor.map(func, large_list, chunksize=1000)
Startup Overhead¶
Process creation is slow. Avoid for: - Very short tasks - Small datasets - Tasks that run once
# Bad: Overhead dominates
with ProcessPoolExecutor() as executor:
result = executor.submit(lambda: 1 + 1).result()
# Good: Amortize overhead with batch processing
with ProcessPoolExecutor() as executor:
results = list(executor.map(compute, large_dataset))
Comparison with ThreadPoolExecutor¶
| Aspect | ProcessPoolExecutor | ThreadPoolExecutor |
|---|---|---|
| Best for | CPU-bound | I/O-bound |
| GIL | Bypassed | Affected |
| Memory | Isolated (copies) | Shared |
| Overhead | Higher | Lower |
| Serialization | Required (pickle) | Not required |
| Startup | Slower | Faster |
| Max workers | ~CPU count | 10-50+ |
Key Takeaways¶
- Use
ProcessPoolExecutorfor CPU-bound tasks - Always use
if __name__ == "__main__":guard - Match worker count to CPU cores
- Use
chunksizefor large datasets to reduce overhead - Objects must be picklable (no lambdas, local classes)
- Use
initializerfor expensive per-worker setup - Process overhead is higher than threads — batch work
- Use context manager for automatic cleanup
- For I/O-bound tasks, use
ThreadPoolExecutorinstead