ProcessPoolExecutor¶
ProcessPoolExecutor manages a pool of worker processes for parallel execution. Best suited for CPU-bound tasks that need to bypass the GIL.
Mental Model
A ProcessPoolExecutor is a team of independent workers, each with their own Python interpreter and GIL. Because they run in separate processes, they achieve true CPU parallelism -- but communication between them requires serialization (pickling). Use it when the computation per task is heavy enough to justify the overhead of copying data across process boundaries.
Basic Usage¶
```python from concurrent.futures import ProcessPoolExecutor import time
def compute_heavy(n): """CPU-intensive computation.""" return sum(i * i for i in range(n))
if name == "main": numbers = [10_000_000, 20_000_000, 30_000_000, 40_000_000]
# Sequential
start = time.perf_counter()
results = [compute_heavy(n) for n in numbers]
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel with processes
start = time.perf_counter()
with ProcessPoolExecutor() as executor:
results = list(executor.map(compute_heavy, numbers))
print(f"Parallel: {time.perf_counter() - start:.2f}s")
```
Important: Always use if __name__ == "__main__": guard with ProcessPoolExecutor.
Creating ProcessPoolExecutor¶
```python from concurrent.futures import ProcessPoolExecutor import multiprocessing as mp
Default workers: os.cpu_count()¶
executor = ProcessPoolExecutor()
Explicit worker count¶
executor = ProcessPoolExecutor(max_workers=4)
With specific start method¶
executor = ProcessPoolExecutor( max_workers=4, mp_context=mp.get_context("spawn") )
With initializer¶
executor = ProcessPoolExecutor( max_workers=4, initializer=setup_function, initargs=(arg1, arg2) )
Restart workers periodically (Python 3.11+)¶
executor = ProcessPoolExecutor( max_workers=4, max_tasks_per_child=100 # Restart after 100 tasks ) ```
Using map()¶
```python from concurrent.futures import ProcessPoolExecutor
def square(x): return x ** 2
if name == "main": with ProcessPoolExecutor() as executor: results = list(executor.map(square, range(10))) print(results) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] ```
map() with Chunksize¶
For large datasets, chunksize reduces IPC overhead:
```python from concurrent.futures import ProcessPoolExecutor
def process(x): return x ** 2
if name == "main": data = list(range(100_000))
with ProcessPoolExecutor() as executor:
# Without chunksize: many small transfers
results1 = list(executor.map(process, data))
# With chunksize: fewer, larger transfers
results2 = list(executor.map(process, data, chunksize=1000))
```
map() with Multiple Arguments¶
```python from concurrent.futures import ProcessPoolExecutor
def power(base, exp): return base ** exp
if name == "main": bases = [2, 3, 4, 5] exps = [10, 10, 10, 10]
with ProcessPoolExecutor() as executor:
results = list(executor.map(power, bases, exps))
print(results) # [1024, 59049, 1048576, 9765625]
```
Using submit()¶
```python from concurrent.futures import ProcessPoolExecutor
def compute(x): return x ** 2
if name == "main": with ProcessPoolExecutor() as executor: # Submit individual tasks future1 = executor.submit(compute, 10) future2 = executor.submit(compute, 20) future3 = executor.submit(compute, 30)
print(future1.result()) # 100
print(future2.result()) # 400
print(future3.result()) # 900
```
Processing Results as They Complete¶
```python from concurrent.futures import ProcessPoolExecutor, as_completed import random
def variable_computation(task_id): """Computation with variable duration.""" iterations = random.randint(1_000_000, 10_000_000) result = sum(i for i in range(iterations)) return (task_id, iterations, result)
if name == "main": with ProcessPoolExecutor() as executor: futures = {executor.submit(variable_computation, i): i for i in range(10)}
for future in as_completed(futures):
task_id = futures[future]
try:
tid, iters, result = future.result()
print(f"Task {tid}: {iters:,} iterations")
except Exception as e:
print(f"Task {task_id} failed: {e}")
```
Initializer for Worker Setup¶
```python from concurrent.futures import ProcessPoolExecutor import os
Global variable in each worker¶
model = None
def init_worker(model_path): """Load model once per worker process.""" global model model = load_model(model_path) print(f"Worker {os.getpid()}: Model loaded")
def predict(data): """Use pre-loaded model.""" return model.predict(data)
if name == "main": with ProcessPoolExecutor( max_workers=4, initializer=init_worker, initargs=("model.pkl",) ) as executor: predictions = list(executor.map(predict, test_data)) ```
Practical Examples¶
Parallel Number Crunching¶
```python from concurrent.futures import ProcessPoolExecutor import math
def is_prime(n): """Check if n is prime.""" if n < 2: return False if n == 2: return True if n % 2 == 0: return False for i in range(3, int(math.sqrt(n)) + 1, 2): if n % i == 0: return False return True
def count_primes(start, end): """Count primes in range.""" return sum(1 for n in range(start, end) if is_prime(n))
if name == "main": # Split range into chunks ranges = [ (0, 250_000), (250_000, 500_000), (500_000, 750_000), (750_000, 1_000_000), ]
with ProcessPoolExecutor() as executor:
counts = list(executor.starmap(count_primes, ranges))
print(f"Total primes: {sum(counts)}")
```
Parallel Image Processing¶
```python from concurrent.futures import ProcessPoolExecutor from pathlib import Path
from PIL import Image # Uncomment for real usage¶
def process_image(image_path): """Resize and convert image.""" # img = Image.open(image_path) # img = img.resize((256, 256)) # output_path = image_path.with_stem(image_path.stem + "_thumb") # img.save(output_path) return f"Processed {image_path}"
if name == "main": images = list(Path("images").glob("*.jpg"))
with ProcessPoolExecutor() as executor:
results = list(executor.map(process_image, images))
print(f"Processed {len(results)} images")
```
Parallel Data Processing¶
```python from concurrent.futures import ProcessPoolExecutor import pandas as pd
def process_chunk(chunk_data): """Process a chunk of data.""" # Heavy computation on chunk result = chunk_data.apply(expensive_transformation) return result.sum()
if name == "main": # Load and split data df = pd.read_csv("large_data.csv") chunks = np.array_split(df, 8) # Split into 8 chunks
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_chunk, chunks))
total = sum(results)
```
Monte Carlo Simulation¶
```python from concurrent.futures import ProcessPoolExecutor import random
def monte_carlo_pi(num_samples): """Estimate pi using Monte Carlo method.""" inside = 0 for _ in range(num_samples): x, y = random.random(), random.random() if xx + yy <= 1: inside += 1 return inside
if name == "main": num_workers = 8 samples_per_worker = 10_000_000
with ProcessPoolExecutor(max_workers=num_workers) as executor:
results = list(executor.map(
monte_carlo_pi,
[samples_per_worker] * num_workers
))
total_inside = sum(results)
total_samples = samples_per_worker * num_workers
pi_estimate = 4 * total_inside / total_samples
print(f"Pi estimate: {pi_estimate}")
```
Serialization Requirements¶
Objects passed to/from processes must be picklable:
```python from concurrent.futures import ProcessPoolExecutor
Works: regular functions, basic types¶
def square(x): return x ** 2
Fails: lambda functions¶
executor.map(lambda x: x**2, data) # PicklingError!¶
Fails: local classes¶
class Local: pass
executor.submit(func, Local()) # PicklingError!¶
Works: module-level classes¶
class ModuleLevel: pass
if name == "main": with ProcessPoolExecutor() as executor: results = list(executor.map(square, [1, 2, 3])) ```
Common Pickling Issues¶
```python
Problem: Lambda functions can't be pickled¶
bad = lambda x: x ** 2
Solution: Use regular function¶
def good(x): return x ** 2
Problem: Instance methods need care¶
class Processor: def process(self, x): return x ** 2
Solution: Use module-level function or staticmethod¶
def process_item(x): return x ** 2 ```
Error Handling¶
```python from concurrent.futures import ProcessPoolExecutor, as_completed
def risky_task(x): if x == 5: raise ValueError(f"Cannot process {x}") return x ** 2
if name == "main": with ProcessPoolExecutor() as executor: futures = {executor.submit(risky_task, i): i for i in range(10)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result()
print(f"Task {task_id}: {result}")
except Exception as e:
print(f"Task {task_id} failed: {e}")
```
Performance Considerations¶
Worker Count¶
```python import os
CPU-bound: match CPU cores¶
executor = ProcessPoolExecutor(max_workers=os.cpu_count())
Leave some CPUs free¶
executor = ProcessPoolExecutor(max_workers=max(1, os.cpu_count() - 2)) ```
Chunksize for map()¶
```python
Small data: default is fine¶
executor.map(func, small_list)
Large data: set chunksize¶
Rule of thumb: len(data) // (workers * 4)¶
executor.map(func, large_list, chunksize=1000) ```
Startup Overhead¶
Process creation is slow. Avoid for:
- Very short tasks
- Small datasets
- Tasks that run once
```python
Bad: Overhead dominates¶
with ProcessPoolExecutor() as executor: result = executor.submit(lambda: 1 + 1).result()
Good: Amortize overhead with batch processing¶
with ProcessPoolExecutor() as executor: results = list(executor.map(compute, large_dataset)) ```
Comparison with ThreadPoolExecutor¶
| Aspect | ProcessPoolExecutor | ThreadPoolExecutor |
|---|---|---|
| Best for | CPU-bound | I/O-bound |
| GIL | Bypassed | Affected |
| Memory | Isolated (copies) | Shared |
| Overhead | Higher | Lower |
| Serialization | Required (pickle) | Not required |
| Startup | Slower | Faster |
| Max workers | ~CPU count | 10-50+ |
Key Takeaways¶
- Use
ProcessPoolExecutorfor CPU-bound tasks - Always use
if __name__ == "__main__":guard - Match worker count to CPU cores
- Use
chunksizefor large datasets to reduce overhead - Objects must be picklable (no lambdas, local classes)
- Use
initializerfor expensive per-worker setup - Process overhead is higher than threads — batch work
- Use context manager for automatic cleanup
- For I/O-bound tasks, use
ThreadPoolExecutorinstead
Exercises¶
Exercise 1.
Use ProcessPoolExecutor.map() with chunksize=100 to compute the sum of cubes for each number in range(10_000). Compare the elapsed time with chunksize=1 and chunksize=100. Print both times and the speedup.
Solution to Exercise 1
```python
import time
from concurrent.futures import ProcessPoolExecutor
def sum_cubes(n):
return sum(i ** 3 for i in range(n))
if __name__ == "__main__":
data = list(range(10_000))
for cs in [1, 100]:
start = time.perf_counter()
with ProcessPoolExecutor() as ex:
list(ex.map(sum_cubes, data, chunksize=cs))
elapsed = time.perf_counter() - start
print(f"chunksize={cs:3d}: {elapsed:.2f}s")
```
Exercise 2.
Write a Monte Carlo estimation of pi using ProcessPoolExecutor. Each of 8 workers generates 1,000,000 random (x, y) points and counts how many fall inside the unit circle. Combine results to estimate pi and print the estimate with the elapsed time.
Solution to Exercise 2
```python
import time
import random
from concurrent.futures import ProcessPoolExecutor
def monte_carlo(n_samples):
inside = 0
for _ in range(n_samples):
x, y = random.random(), random.random()
if x * x + y * y <= 1:
inside += 1
return inside
if __name__ == "__main__":
workers = 8
samples = 1_000_000
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=workers) as ex:
counts = list(ex.map(monte_carlo, [samples] * workers))
elapsed = time.perf_counter() - start
pi = 4 * sum(counts) / (workers * samples)
print(f"Pi estimate: {pi:.6f} (elapsed {elapsed:.2f}s)")
```
Exercise 3.
Use ProcessPoolExecutor with submit() and as_completed() to factorize 10 large numbers. Map each future back to its input using a dictionary. Print results in completion order, showing the input number and its smallest factor (or "prime" if none found below its square root).
Solution to Exercise 3
```python
import math
from concurrent.futures import ProcessPoolExecutor, as_completed
def smallest_factor(n):
if n < 2:
return (n, None)
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return (n, i)
return (n, None)
if __name__ == "__main__":
numbers = [
104729, 1000003, 999983, 7919 * 7919,
1299827, 15485863, 49979687, 67867979,
104395303, 982451653,
]
with ProcessPoolExecutor() as ex:
futs = {ex.submit(smallest_factor, n): n for n in numbers}
for f in as_completed(futs):
n, factor = f.result()
if factor:
print(f"{n}: smallest factor = {factor}")
else:
print(f"{n}: prime")
```