Process Pool¶
A Pool manages a collection of worker processes, providing a simple way to parallelize function execution across multiple inputs.
Mental Model
A Pool is a batch-processing factory: you feed it a function and a list of inputs, and it distributes the items across worker processes automatically. Think map() but parallel -- each worker grabs the next item, computes the result, and returns it. The pool handles process creation, work distribution, and result collection so you can focus on the computation itself.
Creating a Pool¶
Basic Pool Usage¶
```python from multiprocessing import Pool import os
def square(x): print(f"Process {os.getpid()}: computing {x}²") return x ** 2
if name == "main": # Create pool with 4 worker processes with Pool(4) as pool: results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]
```
Default Worker Count¶
```python from multiprocessing import Pool, cpu_count
Default: uses cpu_count() workers¶
with Pool() as pool: print(f"Workers: {pool._processes}") # Number of CPUs
Explicit count¶
with Pool(processes=8) as pool: pass ```
Pool Methods¶
map() — Parallel Map¶
Apply function to each item in iterable, return results in order:
```python from multiprocessing import Pool import time
def slow_square(x): time.sleep(0.5) return x ** 2
if name == "main": numbers = list(range(10))
# Sequential
start = time.perf_counter()
results = list(map(slow_square, numbers))
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel
start = time.perf_counter()
with Pool(4) as pool:
results = pool.map(slow_square, numbers)
print(f"Parallel: {time.perf_counter() - start:.2f}s")
print(results)
```
map() with Chunksize¶
```python from multiprocessing import Pool
def process(x): return x * 2
if name == "main": data = list(range(1000))
with Pool(4) as pool:
# Default: pool divides work automatically
results = pool.map(process, data)
# Chunksize: send N items to each worker at a time
# Larger chunks = less IPC overhead, but less load balancing
results = pool.map(process, data, chunksize=100)
```
starmap() — Multiple Arguments¶
```python from multiprocessing import Pool
def power(base, exp): return base ** exp
if name == "main": # starmap unpacks argument tuples args = [(2, 3), (3, 4), (4, 5), (5, 6)]
with Pool(4) as pool:
results = pool.starmap(power, args)
print(results) # [8, 81, 1024, 15625]
```
imap() — Lazy Iterator¶
Returns results as they complete (memory efficient for large datasets):
```python from multiprocessing import Pool import time
def slow_process(x): time.sleep(0.5) return x ** 2
if name == "main": with Pool(4) as pool: # imap returns iterator, not list results = pool.imap(slow_process, range(10))
# Process results as they become available
for result in results:
print(f"Got result: {result}")
```
imap_unordered() — Fastest Results First¶
Returns results in completion order (not input order):
```python from multiprocessing import Pool import time import random
def variable_task(x): delay = random.uniform(0.1, 1.0) time.sleep(delay) return (x, delay)
if name == "main": with Pool(4) as pool: # Results come in completion order for result in pool.imap_unordered(variable_task, range(10)): print(f"Completed: {result}") ```
apply() — Single Function Call¶
```python from multiprocessing import Pool
def compute(x, y, z): return x + y + z
if name == "main": with Pool(4) as pool: # apply() blocks until result ready result = pool.apply(compute, args=(1, 2, 3)) print(result) # 6 ```
apply_async() — Asynchronous Single Call¶
```python from multiprocessing import Pool import time
def slow_compute(x): time.sleep(1) return x ** 2
if name == "main": with Pool(4) as pool: # Submit task, get AsyncResult immediately async_result = pool.apply_async(slow_compute, args=(10,))
print("Doing other work...")
time.sleep(0.5)
# Get result (blocks if not ready)
result = async_result.get()
print(f"Result: {result}")
```
map_async() — Asynchronous Map¶
```python from multiprocessing import Pool import time
def process(x): time.sleep(0.5) return x ** 2
if name == "main": with Pool(4) as pool: # Submit all tasks, get AsyncResult async_result = pool.map_async(process, range(10))
print("Tasks submitted, doing other work...")
# Check if ready
while not async_result.ready():
print("Still working...")
time.sleep(0.3)
# Get all results
results = async_result.get()
print(results)
```
AsyncResult Object¶
Methods available on AsyncResult returned by async methods:
```python from multiprocessing import Pool import time
def slow_task(x): time.sleep(2) return x ** 2
if name == "main": with Pool(4) as pool: async_result = pool.apply_async(slow_task, args=(10,))
# Check if complete
print(f"Ready: {async_result.ready()}") # False
# Wait with timeout
async_result.wait(timeout=1)
print(f"Ready after wait: {async_result.ready()}") # Still False
# Check if successful (blocks until complete)
# print(f"Successful: {async_result.successful()}")
# Get result (blocks until complete)
result = async_result.get(timeout=5)
print(f"Result: {result}")
```
Error Handling in Pools¶
Handling Exceptions¶
```python from multiprocessing import Pool
def risky_task(x): if x == 3: raise ValueError(f"Cannot process {x}") return x ** 2
if name == "main": with Pool(4) as pool: try: # Exception raised when iterating results results = pool.map(risky_task, [1, 2, 3, 4, 5]) except ValueError as e: print(f"Error: {e}") ```
Handling Exceptions with imap¶
```python from multiprocessing import Pool
def risky_task(x): if x == 3: raise ValueError(f"Cannot process {x}") return x ** 2
if name == "main": with Pool(4) as pool: results = pool.imap(risky_task, range(5))
for i in range(5):
try:
result = next(results)
print(f"Result: {result}")
except ValueError as e:
print(f"Error: {e}")
except StopIteration:
break
```
Error Callback with apply_async¶
```python from multiprocessing import Pool
def task(x): if x < 0: raise ValueError("Negative!") return x ** 2
def on_success(result): print(f"Success: {result}")
def on_error(error): print(f"Error: {error}")
if name == "main": with Pool(4) as pool: # Success case pool.apply_async(task, args=(5,), callback=on_success, error_callback=on_error)
# Error case
pool.apply_async(task, args=(-1,), callback=on_success, error_callback=on_error)
pool.close()
pool.join()
```
Pool Lifecycle¶
Manual Management¶
```python from multiprocessing import Pool
def task(x): return x ** 2
if name == "main": pool = Pool(4)
try:
results = pool.map(task, range(10))
print(results)
finally:
pool.close() # No more tasks accepted
pool.join() # Wait for workers to finish
```
Context Manager (Recommended)¶
```python from multiprocessing import Pool
def task(x): return x ** 2
if name == "main": with Pool(4) as pool: results = pool.map(task, range(10)) print(results) # Pool automatically closed and joined ```
terminate() vs close()¶
```python from multiprocessing import Pool
def task(x): return x ** 2
if name == "main": pool = Pool(4)
# close(): No new tasks, wait for existing to finish
pool.close()
pool.join()
# terminate(): Kill workers immediately
pool2 = Pool(4)
pool2.terminate() # Don't wait for tasks
pool2.join()
```
Initializer Functions¶
Run setup code in each worker process:
```python from multiprocessing import Pool import os
Global variable in worker processes¶
worker_data = None
def init_worker(shared_data): """Called once when each worker starts.""" global worker_data worker_data = shared_data print(f"Worker {os.getpid()} initialized with {shared_data}")
def process_item(x): """Use initialized data.""" return x * worker_data
if name == "main": # Pass initializer and its arguments with Pool(4, initializer=init_worker, initargs=(10,)) as pool: results = pool.map(process_item, [1, 2, 3, 4, 5])
print(results) # [10, 20, 30, 40, 50]
```
Database Connection per Worker¶
```python from multiprocessing import Pool import os
Worker-local database connection¶
db_connection = None
def init_db(): """Initialize database connection for this worker.""" global db_connection db_connection = create_database_connection() print(f"Worker {os.getpid()}: DB connected")
def query(sql): """Use worker's database connection.""" return db_connection.execute(sql)
if name == "main": with Pool(4, initializer=init_db) as pool: queries = ["SELECT * FROM users", "SELECT * FROM orders"] results = pool.map(query, queries) ```
Practical Examples¶
Parallel File Processing¶
```python from multiprocessing import Pool from pathlib import Path
def process_file(filepath): """Process a single file.""" content = Path(filepath).read_text() word_count = len(content.split()) return (filepath, word_count)
if name == "main": files = list(Path(".").glob("*.txt"))
with Pool() as pool:
results = pool.map(process_file, files)
for filepath, count in results:
print(f"{filepath}: {count} words")
```
Parallel Image Processing¶
```python from multiprocessing import Pool from pathlib import Path
from PIL import Image # Uncomment if using PIL¶
def resize_image(args): """Resize a single image.""" input_path, output_path, size = args # img = Image.open(input_path) # img = img.resize(size) # img.save(output_path) return f"Resized {input_path}"
if name == "main": images = [ ("img1.jpg", "out1.jpg", (100, 100)), ("img2.jpg", "out2.jpg", (100, 100)), ("img3.jpg", "out3.jpg", (100, 100)), ]
with Pool() as pool:
results = pool.map(resize_image, images)
print(results)
```
Parallel Web Scraping (I/O + CPU)¶
```python from multiprocessing import Pool import time
def fetch_and_parse(url): """Fetch URL and parse content.""" # response = requests.get(url) # soup = BeautifulSoup(response.text, 'html.parser') # return len(soup.find_all('a')) time.sleep(0.5) # Simulate return f"Parsed {url}"
if name == "main": urls = [f"https://example.com/page{i}" for i in range(20)]
with Pool(8) as pool:
results = pool.map(fetch_and_parse, urls)
print(results)
```
Pool Method Summary¶
| Method | Blocking | Ordered | Use Case |
|---|---|---|---|
map() |
Yes | Yes | Simple parallel map |
starmap() |
Yes | Yes | Multiple arguments |
imap() |
Iterator | Yes | Memory efficient |
imap_unordered() |
Iterator | No | Fastest results first |
apply() |
Yes | N/A | Single function call |
apply_async() |
No | N/A | Async single call |
map_async() |
No | Yes | Async parallel map |
Key Takeaways¶
- Use
Poolfor simple parallel function application map()is the most common method — parallel version of built-inmap()starmap()for functions with multiple argumentsimap_unordered()for best performance when order doesn't matter- Use context manager (
with Pool() as pool:) for automatic cleanup initializersets up per-worker resources (database connections, etc.)- Match pool size to CPU count for CPU-bound tasks
- Use
chunksizeto tune performance for large datasets
Runnable Example: pool_tutorial.py¶
```python """ Topic 45.4 - Process Pools with multiprocessing.Pool
Complete guide to using process pools for efficient parallel task execution. Pools manage a set of worker processes and distribute work automatically.
Learning Objectives: - Create and use process pools - Use map, imap, starmap for parallel execution - Handle async operations with apply_async and map_async - Manage pool lifecycle - Error handling in pools - Performance optimization
Author: Python Educator Date: 2024 """
import multiprocessing from multiprocessing import Pool, cpu_count import time import random import math
============================================================================¶
PART 1: BEGINNER - Pool Basics and map()¶
============================================================================¶
def basic_pool_usage(): """ The simplest way to use a pool: map() a function over data. This is like built-in map() but runs in parallel across processes. """ print("=" * 70) print("BEGINNER: Basic Pool with map()") print("=" * 70)
def square(x):
"""
Calculate square of a number.
Args:
x: Number to square
Returns:
Square of x
"""
# Add delay to simulate real work
time.sleep(0.1)
return x ** 2
# Input data
numbers = list(range(10))
print(f"\n📝 Input: {numbers}")
print(f" We want to square each number in parallel\n")
# Method 1: Sequential (for comparison)
print("⏱️ Sequential execution:")
start = time.time()
results_sequential = [square(x) for x in numbers]
seq_time = time.time() - start
print(f" Time: {seq_time:.2f}s")
print(f" Results: {results_sequential}")
# Method 2: Parallel with Pool
print("\n⏱️ Parallel execution with Pool:")
start = time.time()
# Create a pool with 4 worker processes
with Pool(processes=4) as pool:
# Map the function over the data
results_parallel = pool.map(square, numbers)
parallel_time = time.time() - start
print(f" Time: {parallel_time:.2f}s")
print(f" Results: {results_parallel}")
# Analysis
print(f"\n📊 Speedup: {seq_time/parallel_time:.2f}x faster!")
print(f" 4 workers can process multiple items simultaneously")
print("\n💡 Pool.map() advantages:")
print(" ✓ Automatic work distribution")
print(" ✓ Process reuse (no startup overhead)")
print(" ✓ Simple API (like built-in map)")
print(" ✓ Handles all the complexity for you")
print("\n" + "=" * 70 + "\n")
def pool_with_different_sizes(): """ Experiment with different pool sizes to find optimal performance. """ print("=" * 70) print("BEGINNER: Choosing Pool Size") print("=" * 70)
def cpu_task(x):
"""CPU-intensive task"""
# Calculate factorial (CPU work)
result = math.factorial(x % 15 + 5)
time.sleep(0.05) # Small delay
return result % 1000
numbers = list(range(40))
print(f"\n🖥️ Your system has {cpu_count()} CPU cores")
print(f" Testing different pool sizes on {len(numbers)} tasks:\n")
# Test different pool sizes
for pool_size in [1, 2, 4, cpu_count(), cpu_count() * 2]:
start = time.time()
with Pool(processes=pool_size) as pool:
results = pool.map(cpu_task, numbers)
elapsed = time.time() - start
print(f" {pool_size:2d} processes: {elapsed:.3f}s")
print("\n💡 Guidelines for pool size:")
print(" • CPU-bound: pool_size = cpu_count()")
print(" • I/O-bound: pool_size = cpu_count() * 2 or more")
print(" • Mixed: Start with cpu_count() and experiment")
print(" • Too many processes = overhead from context switching")
print("\n" + "=" * 70 + "\n")
def pool_context_manager(): """ Demonstrate proper pool lifecycle management with context manager. """ print("=" * 70) print("BEGINNER: Pool Lifecycle Management") print("=" * 70)
def worker(x):
"""Simple worker function"""
return x * 2
print("\n📝 Recommended: Use context manager (with statement)")
print(" with Pool(4) as pool:")
print(" results = pool.map(worker, data)")
print(" # Pool automatically closed and terminated")
# Good practice: context manager
with Pool(4) as pool:
results = pool.map(worker, range(10))
print(f"\n✓ Results: {results}")
print("✓ Pool automatically cleaned up\n")
print("📝 Alternative: Manual management")
print(" pool = Pool(4)")
print(" results = pool.map(worker, data)")
print(" pool.close() # No more tasks accepted")
print(" pool.join() # Wait for workers to finish")
# Manual management (less preferred)
pool = Pool(4)
results = pool.map(worker, range(10, 20))
print(f"\n✓ Results: {results}")
pool.close() # Stop accepting new tasks
pool.join() # Wait for completion
print("✓ Pool manually cleaned up")
print("\n💡 Best Practice:")
print(" Always use 'with Pool() as pool:' for automatic cleanup")
print("\n" + "=" * 70 + "\n")
============================================================================¶
PART 2: INTERMEDIATE - Advanced Mapping Functions¶
============================================================================¶
def pool_starmap_multiple_arguments(): """ Use starmap() when your function takes multiple arguments. starmap unpacks argument tuples for you. """ print("=" * 70) print("INTERMEDIATE: starmap() for Multiple Arguments") print("=" * 70)
def calculate_power(base, exponent):
"""
Calculate base raised to exponent.
Args:
base: Base number
exponent: Exponent
Returns:
base ** exponent
"""
time.sleep(0.1)
return base ** exponent
# Data: list of (base, exponent) tuples
tasks = [
(2, 3), # 2^3 = 8
(3, 4), # 3^4 = 81
(5, 2), # 5^2 = 25
(10, 3), # 10^3 = 1000
(7, 2), # 7^2 = 49
]
print(f"\n📝 Tasks: {tasks}")
print(" Each tuple is (base, exponent)\n")
# Use starmap to unpack tuples
with Pool(3) as pool:
results = pool.starmap(calculate_power, tasks)
print("📊 Results:")
for (base, exp), result in zip(tasks, results):
print(f" {base}^{exp} = {result}")
print("\n💡 starmap vs map:")
print(" map(f, [x1, x2]) → f(x1), f(x2)")
print(" starmap(f, [(a,b)]) → f(a, b) # unpacks tuple")
print("\n" + "=" * 70 + "\n")
def pool_imap_lazy_iteration(): """ Use imap() for lazy iteration over results. Unlike map(), imap() returns results as they complete. """ print("=" * 70) print("INTERMEDIATE: imap() for Lazy Results") print("=" * 70)
def slow_square(x):
"""Square with variable delay"""
delay = random.uniform(0.5, 1.5)
time.sleep(delay)
return x ** 2, delay
numbers = list(range(8))
print(f"\n📝 Processing {len(numbers)} items with variable delays\n")
# Method 1: map() - waits for ALL results
print("⏱️ Using map() (blocks until all complete):")
start = time.time()
with Pool(4) as pool:
results = pool.map(slow_square, numbers)
elapsed = time.time() - start
print(f" Got all results after {elapsed:.2f}s")
print(f" Results: {[r[0] for r in results]}")
# Method 2: imap() - yields results as they arrive
print("\n⏱️ Using imap() (yields results incrementally):")
start = time.time()
with Pool(4) as pool:
# imap returns an iterator
for i, (result, delay) in enumerate(pool.imap(slow_square, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Got result #{i}: {result} (took {delay:.2f}s)")
print("\n💡 When to use imap():")
print(" ✓ Process results as they complete")
print(" ✓ Show progress updates")
print(" ✓ Lower memory usage (streaming)")
print(" ✓ Start processing early results while others compute")
print("\n" + "=" * 70 + "\n")
def pool_imap_unordered(): """ Use imap_unordered() when result order doesn't matter. This can be faster as it returns results immediately. """ print("=" * 70) print("INTERMEDIATE: imap_unordered() for Faster Results") print("=" * 70)
def process_item(x):
"""Process with random delay"""
delay = random.uniform(0.1, 1.0)
time.sleep(delay)
return x, delay
numbers = list(range(12))
print(f"\n📝 Processing {len(numbers)} items\n")
# Ordered iteration
print("⏱️ imap() - Maintains order:")
with Pool(4) as pool:
start = time.time()
for i, (num, delay) in enumerate(pool.imap(process_item, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Position {i}: item {num}")
# Unordered iteration
print("\n⏱️ imap_unordered() - Returns as completed:")
with Pool(4) as pool:
start = time.time()
for i, (num, delay) in enumerate(pool.imap_unordered(process_item, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Completed #{i}: item {num}")
print("\n💡 imap_unordered() advantages:")
print(" ✓ Faster - returns results immediately")
print(" ✓ No waiting for slow tasks to maintain order")
print(" ✓ Good for independent tasks")
print("\n" + "=" * 70 + "\n")
============================================================================¶
PART 3: ADVANCED - Async Operations and Error Handling¶
============================================================================¶
def pool_apply_async(): """ Use apply_async() for single tasks with callbacks. More flexible than map, but requires manual result handling. """ print("=" * 70) print("ADVANCED: apply_async() with Callbacks") print("=" * 70)
def compute_factorial(n):
"""Compute factorial of n"""
time.sleep(0.5)
result = math.factorial(n)
return n, result
def success_callback(result):
"""Called when task completes successfully"""
n, factorial = result
print(f" ✓ Success: {n}! = {factorial}")
def error_callback(error):
"""Called when task raises exception"""
print(f" ✗ Error: {error}")
print("\n⚙️ Submitting async tasks with callbacks:\n")
with Pool(3) as pool:
# Submit multiple async tasks
async_results = []
for n in [5, 10, 15, 20]:
result = pool.apply_async(
compute_factorial,
args=(n,),
callback=success_callback,
error_callback=error_callback
)
async_results.append(result)
# Wait for all tasks
pool.close()
pool.join()
print("\n💡 apply_async() features:")
print(" ✓ Submit individual tasks")
print(" ✓ Callbacks for success/error")
print(" ✓ Non-blocking submission")
print(" ✓ Flexible task management")
print("\n" + "=" * 70 + "\n")
def pool_map_async_with_progress(): """ Use map_async() for non-blocking batch operations with progress tracking. """ print("=" * 70) print("ADVANCED: map_async() with Progress Tracking") print("=" * 70)
def heavy_computation(x):
"""CPU-intensive task"""
time.sleep(0.3)
return x ** 3
numbers = list(range(20))
print(f"\n⚙️ Processing {len(numbers)} items asynchronously...\n")
with Pool(4) as pool:
# Submit all tasks at once (non-blocking)
result = pool.map_async(heavy_computation, numbers)
# Do other work while tasks execute
while not result.ready():
remaining = result._number_left
print(f" Tasks remaining: {remaining}")
time.sleep(0.5)
# Get final results (will wait if not ready)
final_results = result.get()
print(f"\n✓ All tasks completed!")
print(f" First 5 results: {final_results[:5]}")
print("\n💡 map_async() advantages:")
print(" ✓ Non-blocking submission")
print(" ✓ Can check progress with ready()")
print(" ✓ Can track remaining tasks")
print(" ✓ Main thread free for other work")
print("\n" + "=" * 70 + "\n")
def pool_error_handling(): """ Handle errors in pool tasks gracefully. """ print("=" * 70) print("ADVANCED: Error Handling in Pools") print("=" * 70)
def risky_division(args):
"""
Division that might fail.
Args:
args: (numerator, denominator) tuple
Returns:
Result of division
Raises:
ZeroDivisionError: If denominator is 0
"""
numerator, denominator = args
time.sleep(0.1)
# This will raise exception for denominator=0
return numerator / denominator
# Some operations will fail
tasks = [
(10, 2), # OK: 5.0
(20, 4), # OK: 5.0
(15, 0), # ERROR: division by zero
(30, 6), # OK: 5.0
(25, 0), # ERROR: division by zero
]
print("\n📝 Method 1: Let exceptions propagate (default)")
try:
with Pool(2) as pool:
# This will raise exception when it encounters error
results = pool.starmap(risky_division, tasks)
except Exception as e:
print(f" ✗ Caught exception: {type(e).__name__}: {e}")
print("\n📝 Method 2: Handle errors individually")
def safe_division(args):
"""Wrap risky function with error handling"""
try:
return risky_division(args), None
except Exception as e:
return None, str(e)
with Pool(2) as pool:
results = pool.starmap(safe_division, tasks)
print("\n📊 Results:")
for (num, denom), (result, error) in zip(tasks, results):
if error:
print(f" {num}/{denom}: ✗ Error: {error}")
else:
print(f" {num}/{denom}: ✓ {result}")
print("\n💡 Error handling strategies:")
print(" 1. Try-except around pool.map() - stops on first error")
print(" 2. Wrap worker in try-except - continue on errors")
print(" 3. Use error_callback in apply_async()")
print("\n" + "=" * 70 + "\n")
def pool_chunksize_optimization(): """ Optimize performance with chunksize parameter. """ print("=" * 70) print("ADVANCED: Chunksize Optimization") print("=" * 70)
def quick_task(x):
"""Very quick task"""
return x * 2
# Many small tasks
numbers = list(range(1000))
print(f"\n⏱️ Processing {len(numbers)} quick tasks:")
print(" Testing different chunksizes...\n")
# Test different chunksizes
for chunksize in [1, 10, 50, 100]:
start = time.time()
with Pool(4) as pool:
results = pool.map(quick_task, numbers, chunksize=chunksize)
elapsed = time.time() - start
print(f" Chunksize {chunksize:3d}: {elapsed:.4f}s")
print("\n💡 Chunksize guidelines:")
print(" • Default: chunksize = len(data) / (processes * 4)")
print(" • Many quick tasks: larger chunksize (less overhead)")
print(" • Few slow tasks: smaller chunksize (better distribution)")
print(" • Experiment to find optimal value")
print("\n" + "=" * 70 + "\n")
def real_world_example_image_processing(): """ Realistic example: Parallel image processing simulation. """ print("=" * 70) print("ADVANCED: Real-World Example - Batch Processing") print("=" * 70)
def process_image(image_id):
"""
Simulate image processing.
Args:
image_id: Image identifier
Returns:
Processing result
"""
# Simulate different processing times
time.sleep(random.uniform(0.2, 0.8))
# Simulate processing operations
operations = ["resize", "filter", "compress"]
return {
'id': image_id,
'operations': operations,
'size_mb': random.uniform(1.0, 5.0),
'status': 'success'
}
# Simulate 50 images to process
image_ids = [f"IMG_{i:04d}" for i in range(50)]
print(f"\n📷 Processing {len(image_ids)} images...\n")
start = time.time()
# Process with pool
with Pool(cpu_count()) as pool:
# Use imap_unordered for best performance
results = list(pool.imap_unordered(
process_image,
image_ids,
chunksize=5 # Process 5 images per worker at a time
))
elapsed = time.time() - start
# Statistics
total_size = sum(r['size_mb'] for r in results)
avg_size = total_size / len(results)
print(f"✓ Processed {len(results)} images in {elapsed:.2f}s")
print(f" Total size: {total_size:.1f} MB")
print(f" Average size: {avg_size:.2f} MB")
print(f" Throughput: {len(results)/elapsed:.1f} images/sec")
print("\n💡 This pattern works for:")
print(" • Image/video processing")
print(" • Data transformation")
print(" • File conversion")
print(" • API requests")
print(" • Report generation")
print("\n" + "=" * 70 + "\n")
============================================================================¶
MAIN EXECUTION¶
============================================================================¶
def main(): """Run all pool demonstrations.""" print("\n" + "=" * 70) print(" " * 20 + "PROCESS POOLS") print(" " * 15 + "multiprocessing.Pool Tutorial") print("=" * 70 + "\n")
# Beginner level
basic_pool_usage()
pool_with_different_sizes()
pool_context_manager()
# Intermediate level
pool_starmap_multiple_arguments()
pool_imap_lazy_iteration()
pool_imap_unordered()
# Advanced level
pool_apply_async()
pool_map_async_with_progress()
pool_error_handling()
pool_chunksize_optimization()
real_world_example_image_processing()
print("\n" + "=" * 70)
print("Process Pools Tutorial Complete!")
print("=" * 70)
print("\n💡 Key Takeaways:")
print("1. Pool manages worker processes automatically")
print("2. map() is simplest - like built-in map")
print("3. starmap() for functions with multiple arguments")
print("4. imap() yields results incrementally")
print("5. imap_unordered() returns results faster")
print("6. apply_async() for fine-grained control")
print("7. Use chunksize to optimize performance")
print("8. Always use context manager for cleanup")
print("=" * 70 + "\n")
if name == "main": # IMPORTANT: This guard is required on Windows main() ```
Runnable Example: prime_validation_pool.py¶
```python """ Prime Number Validation: Serial vs Multiprocessing Pool
This tutorial demonstrates how to use multiprocessing for CPU-bound tasks.
WHAT IS CPU-BOUND WORK? Tasks that spend most time doing computation (not waiting for I/O). Examples: prime checking, calculations, data processing, compression.
WHY MULTIPROCESSING HELPS: - Python's GIL (Global Interpreter Lock) prevents true parallelism with threads - Multiprocessing creates separate processes (each has own GIL) - On multi-core CPUs, processes can run truly in parallel - Can achieve near-linear speedup with N cores
THE TASK: Check if large numbers are prime. This is computationally expensive: - Even with optimization, checking a large number takes millions of divisions - No I/O involved, so it's pure CPU work - Perfect for multiprocessing!
TECHNIQUES: 1. Serial: Check primes one at a time in main process 2. Pool: Use multiprocessing.Pool to distribute work across cores
Learning Goals: - Understand when multiprocessing is appropriate - Learn to use multiprocessing.Pool - See real speedup from parallelization - Understand the overhead involved """
import math import time from multiprocessing import Pool, cpu_count
if name == "main":
print("=" * 70)
print("PRIME VALIDATION: SERIAL vs MULTIPROCESSING")
print("=" * 70)
# ============ EXAMPLE 1: Understanding Prime Checking ============
print("\n" + "=" * 70)
print("EXAMPLE 1: Understanding Prime Number Checking")
print("=" * 70)
print("""
A prime number is only divisible by 1 and itself.
NAIVE APPROACH: Check all numbers up to n
Too slow! For n = 10^18, this would take forever.
OPTIMIZED APPROACH (used here):
1. If n is even, it's not prime (except 2)
2. Only check odd divisors from 3 to sqrt(n)
3. If no divisor found, n is prime
WHY ONLY UP TO sqrt(n)?
If n = a * b where a <= b, then a <= sqrt(n).
So if n has a factor, we'll find one <= sqrt(n).
Example: Is 97 prime?
- sqrt(97) ≈ 9.8
- Check: 97 % 3, 97 % 5, 97 % 7, 97 % 9
- None divide evenly, so 97 is prime!
Example: Is 99 prime?
- sqrt(99) ≈ 9.9
- Check: 99 % 3 = 0, so 99 = 3 * 33, not prime!
Even with this optimization, checking large primes (15+ digits) takes
significant CPU time. This is why it's good for demonstrating multiprocessing.
""")
# ============ EXAMPLE 2: Prime Checking Function ============
print("\n" + "=" * 70)
print("EXAMPLE 2: Implement Prime Checking Function")
print("=" * 70)
def check_prime(n):
"""
Check if n is a prime number.
Algorithm:
1. Even numbers (except 2) are not prime
2. Check odd divisors from 3 to sqrt(n)
3. Use step of 2 to skip even numbers
This is CPU-bound work: pure computation, no I/O.
Time complexity: O(sqrt(n))
For n ~ 10^18, sqrt(n) ~ 10^9, so millions of checks.
"""
if n % 2 == 0:
return False
# Only need to check up to sqrt(n)
from_i = 3
to_i = math.sqrt(n) + 1
# Check odd numbers only (step by 2)
for i in range(from_i, int(to_i), 2):
if n % i == 0:
return False
return True
# Test with some examples
test_numbers = [
(2, True),
(97, True),
(100, False),
(10007, True),
]
print("\nTesting check_prime():")
for num, expected in test_numbers:
result = check_prime(num)
status = "PASS" if result == expected else "FAIL"
print(f" check_prime({num:6}) = {result:5} (expected {expected:5}) [{status}]")
# ============ EXAMPLE 3: Expensive Test Numbers ============
print("\n" + "=" * 70)
print("EXAMPLE 3: Test with Large Numbers")
print("=" * 70)
print("""
We'll test with large numbers that take significant time to check.
These are actual large primes and non-primes from cryptography:
- Large primes are expensive to verify
- Large composites are also expensive (must check many divisors before confirming)
Expected checking times (on modern CPU):
- 12-15 digit numbers: milliseconds
- 18 digit numbers: seconds
- 20+ digit numbers: tens of seconds
""")
# Test numbers - these are real cryptographic numbers
test_cases = [
("trivial non-prime", 112272535095295),
("15-digit composite", 100109100129100369),
("15-digit composite 2", 100109100129101027),
("18-digit prime", 100109100129100151),
("18-digit prime 2", 100109100129162907),
]
print(f"\nQuick validation on one number:")
num, label = test_cases[0]
print(f" Testing {label}: {num}")
start = time.time()
is_prime = check_prime(num)
elapsed = time.time() - start
print(f" Result: {is_prime} (checked in {elapsed:.4f}s)")
# ============ EXAMPLE 4: Serial Processing ============
print("\n" + "=" * 70)
print("EXAMPLE 4: Serial Processing (One at a time)")
print("=" * 70)
print("""
Process each number one at a time in the main process.
WHY IT'S SLOW:
- CPU can only do one thing at a time
- No parallelism at all
- Wastes cores on multi-core machines
Complexity:
- 4 numbers, each taking 1-2 seconds
- Total: ~4-8 seconds
- Only using 1 of your N cores
""")
print(f"\nProcessing {len(test_cases)} numbers serially:")
print(f"Current CPU count: {cpu_count()} cores\n")
results_serial = []
start_total = time.time()
for label, number in test_cases:
start = time.time()
is_prime = check_prime(number)
elapsed = time.time() - start
results_serial.append((label, number, is_prime, elapsed))
print(f" {label:20} ({number}): {is_prime:5} ({elapsed:.4f}s)")
elapsed_total_serial = time.time() - start_total
print(f"\nTotal serial time: {elapsed_total_serial:.4f}s")
print(f"Average per number: {elapsed_total_serial/len(test_cases):.4f}s")
# ============ EXAMPLE 5: Multiprocessing Pool ============
print("\n" + "=" * 70)
print("EXAMPLE 5: Multiprocessing Pool (Parallel)")
print("=" * 70)
print("""
Use multiprocessing.Pool to distribute work across multiple processes.
HOW IT WORKS:
1. Create a Pool with N worker processes (default: CPU count)
2. Submit jobs using pool.apply_async() or pool.map()
3. Each worker process runs jobs in parallel
4. Collect results
WHY IT'S FASTER:
- Each process runs on a separate CPU core
- True parallelism (not limited by GIL)
- Can check multiple numbers simultaneously
- Speedup approximately = min(num_jobs, num_cores)
OVERHEAD:
- Creating processes takes time (~0.1s each)
- Serializing data to pass to processes (pickling)
- For small jobs, overhead might exceed speedup!
- For large jobs, speedup dominates
In this case:
- Each job takes 1-2 seconds
- Process creation overhead (~0.5s total) is small
- Clear speedup expected
""")
print(f"\nProcessing {len(test_cases)} numbers with multiprocessing Pool:")
start_total = time.time()
# Create a Pool with default number of workers (CPU count)
with Pool() as pool:
# Use map() to apply check_prime to each number
# Returns results in same order as input
numbers = [num for label, num in test_cases]
results_list = pool.map(check_prime, numbers)
elapsed_total_parallel = time.time() - start_total
# Print results
for (label, number), is_prime in zip(test_cases, results_list):
print(f" {label:20} ({number}): {is_prime:5}")
print(f"\nTotal parallel time: {elapsed_total_parallel:.4f}s")
print(f"Average per number: {elapsed_total_parallel/len(test_cases):.4f}s")
# ============ EXAMPLE 6: Performance Comparison ============
print("\n" + "=" * 70)
print("EXAMPLE 6: Performance Comparison")
print("=" * 70)
speedup = elapsed_total_serial / elapsed_total_parallel
num_cores = cpu_count()
print(f"\nSpeedup Analysis:")
print(f" Serial time: {elapsed_total_serial:.4f}s")
print(f" Parallel time: {elapsed_total_parallel:.4f}s")
print(f" Speedup: {speedup:.2f}x")
print(f" CPU cores: {num_cores}")
print(f" Efficiency: {speedup/num_cores*100:.1f}% (speedup / cores)")
print(f"\nInterpretation:")
if speedup > 1:
print(f" Multiprocessing is {speedup:.1f}x faster!")
if speedup > num_cores * 0.8:
print(f" Good efficiency: Using cores well (>80%)")
else:
print(f" Okay efficiency: Using cores okay (overhead present)")
else:
print(f" Serial is faster (overhead > benefit)")
print(f" This can happen with small jobs or I/O-bound work")
print(f"\n{'*' * 70}")
print("WHEN MULTIPROCESSING HELPS")
print("{'*' * 70}")
print(f"""
GOOD FOR MULTIPROCESSING:
- CPU-bound tasks (pure computation)
- Long-running jobs (seconds or more)
- Many independent items to process
- Available CPU cores to distribute to
BAD FOR MULTIPROCESSING:
- I/O-bound tasks (use threading or async instead)
- Quick jobs (overhead > benefit)
- Need to share mutable state
- On single-core machines
FOR THIS TASK (prime checking):
- Job time: 1-2 seconds each (large numbers)
- CPU-bound: Pure math, no I/O
- Independent: Each number is independent
- Result: Multiprocessing is clearly beneficial
""")
# ============ EXAMPLE 7: Advanced Pool Usage ============
print("\n" + "=" * 70)
print("EXAMPLE 7: Advanced Pool Usage - apply_async()")
print("=" * 70)
print("""
Different ways to use Pool:
1. pool.map(func, iterable)
- Apply func to each item
- Blocks until all results ready
- Returns list of results
2. pool.apply_async(func, args)
- Submit single job
- Returns immediately with AsyncResult object
- Get result later with .get()
3. pool.imap(func, iterable)
- Like map, but returns iterator
- Results available as they complete
For this example, map() is fine. For more complex scenarios,
async methods let you submit jobs without waiting.
""")
print(f"\nDemonstration with apply_async():")
start_total = time.time()
with Pool() as pool:
# Submit all jobs without waiting
async_results = []
for label, number in test_cases:
async_result = pool.apply_async(check_prime, (number,))
async_results.append((label, number, async_result))
# Collect results as they complete
print("Jobs submitted, waiting for results...\n")
for label, number, async_result in async_results:
is_prime = async_result.get() # Blocks until this specific job completes
print(f" {label:20} ({number}): {is_prime:5}")
elapsed_async = time.time() - start_total
print(f"\nTotal time with async: {elapsed_async:.4f}s")
# ============ EXAMPLE 8: Overhead Analysis ============
print("\n" + "=" * 70)
print("EXAMPLE 8: Understanding Multiprocessing Overhead")
print("=" * 70)
print("""
Multiprocessing has overhead:
1. PROCESS CREATION: ~0.1-0.2s per process
- Creating processes is expensive
- Pool reuses processes to amortize this
2. PICKLING/SERIALIZATION: Time to send data to processes
- Data must be converted to bytes and sent via IPC
- Getting results back has same cost
- Small data: negligible
- Large data: can be significant
3. CONTEXT SWITCHING: OS switching between processes
- Not a huge factor on modern systems
WHEN OVERHEAD DOMINATES:
- Small jobs (microseconds)
- Frequent small submissions
- Large data to transfer
WHEN SPEEDUP DOMINATES:
- Long jobs (seconds+)
- Batch submissions
- Small data transfer
FOR THIS TASK:
- Process creation: ~0.5s total (5 processes, once at start)
- Per-job overhead: Negligible (just int, bool transfer)
- Job time: 1-2 seconds each
- Result: Overhead is <20% of time, speedup dominates!
""")
print("\n" + "=" * 70)
print("KEY TAKEAWAY")
print("=" * 70)
print(f"""
Multiprocessing provides {speedup:.1f}x speedup for this CPU-bound task.
USE MULTIPROCESSING WHEN:
1. Task is CPU-bound (computation, not I/O)
2. Jobs take significant time (seconds+)
3. You have multiple cores available
4. Data transfer overhead is small
USE THREADING FOR:
- I/O-bound tasks (network, disk, databases)
- Quick tasks with frequent context switches
USE ASYNCIO FOR:
- I/O-bound tasks with many concurrent operations
- Modern, efficient I/O multiplexing
FOR PEAK PERFORMANCE:
1. Profile to confirm it's worth optimizing
2. Start with simple Pool.map() approach
3. Use apply_async() for more control if needed
4. Consider process count (default: CPU count)
5. Measure actual speedup vs single-threaded baseline
Remember: Don't optimize prematurely! Use multiprocessing
only when profiling shows it's needed for CPU-bound work.
""")
```
Exercises¶
Exercise 1.
Use Pool.map() with 4 workers to compute the factorial of numbers 1 through 20. Print each result. Compare the wall-clock time of the pool version against the sequential map(math.factorial, range(1, 21)).
Solution to Exercise 1
```python
import math
import time
from multiprocessing import Pool
if __name__ == "__main__":
nums = list(range(1, 21))
# Sequential
start = time.perf_counter()
seq = list(map(math.factorial, nums))
seq_t = time.perf_counter() - start
# Parallel
start = time.perf_counter()
with Pool(4) as pool:
par = pool.map(math.factorial, nums)
par_t = time.perf_counter() - start
for n, f in zip(nums, par):
print(f"{n}! = {f}")
print(f"\nSequential: {seq_t:.4f}s")
print(f"Pool: {par_t:.4f}s")
```
Exercise 2.
Use Pool.starmap() to compute base ** exponent for 10 (base, exponent) pairs. Include a 0.2-second sleep in the worker to simulate heavy computation. Print each result and the total elapsed time.
Solution to Exercise 2
```python
import time
from multiprocessing import Pool
def power(base, exp):
time.sleep(0.2)
return base ** exp
if __name__ == "__main__":
pairs = [(2, 10), (3, 5), (5, 3), (7, 4), (10, 6),
(2, 20), (3, 10), (4, 8), (6, 5), (9, 3)]
start = time.perf_counter()
with Pool(4) as pool:
results = pool.starmap(power, pairs)
elapsed = time.perf_counter() - start
for (b, e), r in zip(pairs, results):
print(f"{b}^{e} = {r}")
print(f"\nElapsed: {elapsed:.2f}s")
```
Exercise 3.
Write an error-tolerant pool pipeline. Define a safe_divide(a, b) function that returns (result, None) on success and (None, error_message) on failure. Use Pool.starmap() on a list of 10 (a, b) pairs where some have b=0. Print successes and failures separately, and report the count of each.
Solution to Exercise 3
```python
from multiprocessing import Pool
def safe_divide(a, b):
try:
return (a / b, None)
except ZeroDivisionError as e:
return (None, str(e))
if __name__ == "__main__":
pairs = [(10, 2), (20, 0), (30, 5), (40, 0), (50, 10),
(60, 3), (70, 0), (80, 4), (90, 9), (100, 0)]
with Pool(4) as pool:
results = pool.starmap(safe_divide, pairs)
successes = 0
failures = 0
for (a, b), (result, error) in zip(pairs, results):
if error:
print(f" {a}/{b}: FAIL — {error}")
failures += 1
else:
print(f" {a}/{b}: OK — {result}")
successes += 1
print(f"\nSuccesses: {successes}, Failures: {failures}")
```