Process Pool¶
A Pool manages a collection of worker processes, providing a simple way to parallelize function execution across multiple inputs.
Creating a Pool¶
Basic Pool Usage¶
from multiprocessing import Pool
import os
def square(x):
print(f"Process {os.getpid()}: computing {x}²")
return x ** 2
if __name__ == "__main__":
# Create pool with 4 worker processes
with Pool(4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]
Default Worker Count¶
from multiprocessing import Pool, cpu_count
# Default: uses cpu_count() workers
with Pool() as pool:
print(f"Workers: {pool._processes}") # Number of CPUs
# Explicit count
with Pool(processes=8) as pool:
pass
Pool Methods¶
map() — Parallel Map¶
Apply function to each item in iterable, return results in order:
from multiprocessing import Pool
import time
def slow_square(x):
time.sleep(0.5)
return x ** 2
if __name__ == "__main__":
numbers = list(range(10))
# Sequential
start = time.perf_counter()
results = list(map(slow_square, numbers))
print(f"Sequential: {time.perf_counter() - start:.2f}s")
# Parallel
start = time.perf_counter()
with Pool(4) as pool:
results = pool.map(slow_square, numbers)
print(f"Parallel: {time.perf_counter() - start:.2f}s")
print(results)
map() with Chunksize¶
from multiprocessing import Pool
def process(x):
return x * 2
if __name__ == "__main__":
data = list(range(1000))
with Pool(4) as pool:
# Default: pool divides work automatically
results = pool.map(process, data)
# Chunksize: send N items to each worker at a time
# Larger chunks = less IPC overhead, but less load balancing
results = pool.map(process, data, chunksize=100)
starmap() — Multiple Arguments¶
from multiprocessing import Pool
def power(base, exp):
return base ** exp
if __name__ == "__main__":
# starmap unpacks argument tuples
args = [(2, 3), (3, 4), (4, 5), (5, 6)]
with Pool(4) as pool:
results = pool.starmap(power, args)
print(results) # [8, 81, 1024, 15625]
imap() — Lazy Iterator¶
Returns results as they complete (memory efficient for large datasets):
from multiprocessing import Pool
import time
def slow_process(x):
time.sleep(0.5)
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# imap returns iterator, not list
results = pool.imap(slow_process, range(10))
# Process results as they become available
for result in results:
print(f"Got result: {result}")
imap_unordered() — Fastest Results First¶
Returns results in completion order (not input order):
from multiprocessing import Pool
import time
import random
def variable_task(x):
delay = random.uniform(0.1, 1.0)
time.sleep(delay)
return (x, delay)
if __name__ == "__main__":
with Pool(4) as pool:
# Results come in completion order
for result in pool.imap_unordered(variable_task, range(10)):
print(f"Completed: {result}")
apply() — Single Function Call¶
from multiprocessing import Pool
def compute(x, y, z):
return x + y + z
if __name__ == "__main__":
with Pool(4) as pool:
# apply() blocks until result ready
result = pool.apply(compute, args=(1, 2, 3))
print(result) # 6
apply_async() — Asynchronous Single Call¶
from multiprocessing import Pool
import time
def slow_compute(x):
time.sleep(1)
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# Submit task, get AsyncResult immediately
async_result = pool.apply_async(slow_compute, args=(10,))
print("Doing other work...")
time.sleep(0.5)
# Get result (blocks if not ready)
result = async_result.get()
print(f"Result: {result}")
map_async() — Asynchronous Map¶
from multiprocessing import Pool
import time
def process(x):
time.sleep(0.5)
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
# Submit all tasks, get AsyncResult
async_result = pool.map_async(process, range(10))
print("Tasks submitted, doing other work...")
# Check if ready
while not async_result.ready():
print("Still working...")
time.sleep(0.3)
# Get all results
results = async_result.get()
print(results)
AsyncResult Object¶
Methods available on AsyncResult returned by async methods:
from multiprocessing import Pool
import time
def slow_task(x):
time.sleep(2)
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
async_result = pool.apply_async(slow_task, args=(10,))
# Check if complete
print(f"Ready: {async_result.ready()}") # False
# Wait with timeout
async_result.wait(timeout=1)
print(f"Ready after wait: {async_result.ready()}") # Still False
# Check if successful (blocks until complete)
# print(f"Successful: {async_result.successful()}")
# Get result (blocks until complete)
result = async_result.get(timeout=5)
print(f"Result: {result}")
Error Handling in Pools¶
Handling Exceptions¶
from multiprocessing import Pool
def risky_task(x):
if x == 3:
raise ValueError(f"Cannot process {x}")
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
try:
# Exception raised when iterating results
results = pool.map(risky_task, [1, 2, 3, 4, 5])
except ValueError as e:
print(f"Error: {e}")
Handling Exceptions with imap¶
from multiprocessing import Pool
def risky_task(x):
if x == 3:
raise ValueError(f"Cannot process {x}")
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.imap(risky_task, range(5))
for i in range(5):
try:
result = next(results)
print(f"Result: {result}")
except ValueError as e:
print(f"Error: {e}")
except StopIteration:
break
Error Callback with apply_async¶
from multiprocessing import Pool
def task(x):
if x < 0:
raise ValueError("Negative!")
return x ** 2
def on_success(result):
print(f"Success: {result}")
def on_error(error):
print(f"Error: {error}")
if __name__ == "__main__":
with Pool(4) as pool:
# Success case
pool.apply_async(task, args=(5,), callback=on_success, error_callback=on_error)
# Error case
pool.apply_async(task, args=(-1,), callback=on_success, error_callback=on_error)
pool.close()
pool.join()
Pool Lifecycle¶
Manual Management¶
from multiprocessing import Pool
def task(x):
return x ** 2
if __name__ == "__main__":
pool = Pool(4)
try:
results = pool.map(task, range(10))
print(results)
finally:
pool.close() # No more tasks accepted
pool.join() # Wait for workers to finish
Context Manager (Recommended)¶
from multiprocessing import Pool
def task(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(task, range(10))
print(results)
# Pool automatically closed and joined
terminate() vs close()¶
from multiprocessing import Pool
def task(x):
return x ** 2
if __name__ == "__main__":
pool = Pool(4)
# close(): No new tasks, wait for existing to finish
pool.close()
pool.join()
# terminate(): Kill workers immediately
pool2 = Pool(4)
pool2.terminate() # Don't wait for tasks
pool2.join()
Initializer Functions¶
Run setup code in each worker process:
from multiprocessing import Pool
import os
# Global variable in worker processes
worker_data = None
def init_worker(shared_data):
"""Called once when each worker starts."""
global worker_data
worker_data = shared_data
print(f"Worker {os.getpid()} initialized with {shared_data}")
def process_item(x):
"""Use initialized data."""
return x * worker_data
if __name__ == "__main__":
# Pass initializer and its arguments
with Pool(4, initializer=init_worker, initargs=(10,)) as pool:
results = pool.map(process_item, [1, 2, 3, 4, 5])
print(results) # [10, 20, 30, 40, 50]
Database Connection per Worker¶
from multiprocessing import Pool
import os
# Worker-local database connection
db_connection = None
def init_db():
"""Initialize database connection for this worker."""
global db_connection
db_connection = create_database_connection()
print(f"Worker {os.getpid()}: DB connected")
def query(sql):
"""Use worker's database connection."""
return db_connection.execute(sql)
if __name__ == "__main__":
with Pool(4, initializer=init_db) as pool:
queries = ["SELECT * FROM users", "SELECT * FROM orders"]
results = pool.map(query, queries)
Practical Examples¶
Parallel File Processing¶
from multiprocessing import Pool
from pathlib import Path
def process_file(filepath):
"""Process a single file."""
content = Path(filepath).read_text()
word_count = len(content.split())
return (filepath, word_count)
if __name__ == "__main__":
files = list(Path(".").glob("*.txt"))
with Pool() as pool:
results = pool.map(process_file, files)
for filepath, count in results:
print(f"{filepath}: {count} words")
Parallel Image Processing¶
from multiprocessing import Pool
from pathlib import Path
# from PIL import Image # Uncomment if using PIL
def resize_image(args):
"""Resize a single image."""
input_path, output_path, size = args
# img = Image.open(input_path)
# img = img.resize(size)
# img.save(output_path)
return f"Resized {input_path}"
if __name__ == "__main__":
images = [
("img1.jpg", "out1.jpg", (100, 100)),
("img2.jpg", "out2.jpg", (100, 100)),
("img3.jpg", "out3.jpg", (100, 100)),
]
with Pool() as pool:
results = pool.map(resize_image, images)
print(results)
Parallel Web Scraping (I/O + CPU)¶
from multiprocessing import Pool
import time
def fetch_and_parse(url):
"""Fetch URL and parse content."""
# response = requests.get(url)
# soup = BeautifulSoup(response.text, 'html.parser')
# return len(soup.find_all('a'))
time.sleep(0.5) # Simulate
return f"Parsed {url}"
if __name__ == "__main__":
urls = [f"https://example.com/page{i}" for i in range(20)]
with Pool(8) as pool:
results = pool.map(fetch_and_parse, urls)
print(results)
Pool Method Summary¶
| Method | Blocking | Ordered | Use Case |
|---|---|---|---|
map() |
Yes | Yes | Simple parallel map |
starmap() |
Yes | Yes | Multiple arguments |
imap() |
Iterator | Yes | Memory efficient |
imap_unordered() |
Iterator | No | Fastest results first |
apply() |
Yes | N/A | Single function call |
apply_async() |
No | N/A | Async single call |
map_async() |
No | Yes | Async parallel map |
Key Takeaways¶
- Use
Poolfor simple parallel function application map()is the most common method — parallel version of built-inmap()starmap()for functions with multiple argumentsimap_unordered()for best performance when order doesn't matter- Use context manager (
with Pool() as pool:) for automatic cleanup initializersets up per-worker resources (database connections, etc.)- Match pool size to CPU count for CPU-bound tasks
- Use
chunksizeto tune performance for large datasets
Runnable Example: pool_tutorial.py¶
"""
Topic 45.4 - Process Pools with multiprocessing.Pool
Complete guide to using process pools for efficient parallel task execution.
Pools manage a set of worker processes and distribute work automatically.
Learning Objectives:
- Create and use process pools
- Use map, imap, starmap for parallel execution
- Handle async operations with apply_async and map_async
- Manage pool lifecycle
- Error handling in pools
- Performance optimization
Author: Python Educator
Date: 2024
"""
import multiprocessing
from multiprocessing import Pool, cpu_count
import time
import random
import math
# ============================================================================
# PART 1: BEGINNER - Pool Basics and map()
# ============================================================================
def basic_pool_usage():
"""
The simplest way to use a pool: map() a function over data.
This is like built-in map() but runs in parallel across processes.
"""
print("=" * 70)
print("BEGINNER: Basic Pool with map()")
print("=" * 70)
def square(x):
"""
Calculate square of a number.
Args:
x: Number to square
Returns:
Square of x
"""
# Add delay to simulate real work
time.sleep(0.1)
return x ** 2
# Input data
numbers = list(range(10))
print(f"\n📝 Input: {numbers}")
print(f" We want to square each number in parallel\n")
# Method 1: Sequential (for comparison)
print("⏱️ Sequential execution:")
start = time.time()
results_sequential = [square(x) for x in numbers]
seq_time = time.time() - start
print(f" Time: {seq_time:.2f}s")
print(f" Results: {results_sequential}")
# Method 2: Parallel with Pool
print("\n⏱️ Parallel execution with Pool:")
start = time.time()
# Create a pool with 4 worker processes
with Pool(processes=4) as pool:
# Map the function over the data
results_parallel = pool.map(square, numbers)
parallel_time = time.time() - start
print(f" Time: {parallel_time:.2f}s")
print(f" Results: {results_parallel}")
# Analysis
print(f"\n📊 Speedup: {seq_time/parallel_time:.2f}x faster!")
print(f" 4 workers can process multiple items simultaneously")
print("\n💡 Pool.map() advantages:")
print(" ✓ Automatic work distribution")
print(" ✓ Process reuse (no startup overhead)")
print(" ✓ Simple API (like built-in map)")
print(" ✓ Handles all the complexity for you")
print("\n" + "=" * 70 + "\n")
def pool_with_different_sizes():
"""
Experiment with different pool sizes to find optimal performance.
"""
print("=" * 70)
print("BEGINNER: Choosing Pool Size")
print("=" * 70)
def cpu_task(x):
"""CPU-intensive task"""
# Calculate factorial (CPU work)
result = math.factorial(x % 15 + 5)
time.sleep(0.05) # Small delay
return result % 1000
numbers = list(range(40))
print(f"\n🖥️ Your system has {cpu_count()} CPU cores")
print(f" Testing different pool sizes on {len(numbers)} tasks:\n")
# Test different pool sizes
for pool_size in [1, 2, 4, cpu_count(), cpu_count() * 2]:
start = time.time()
with Pool(processes=pool_size) as pool:
results = pool.map(cpu_task, numbers)
elapsed = time.time() - start
print(f" {pool_size:2d} processes: {elapsed:.3f}s")
print("\n💡 Guidelines for pool size:")
print(" • CPU-bound: pool_size = cpu_count()")
print(" • I/O-bound: pool_size = cpu_count() * 2 or more")
print(" • Mixed: Start with cpu_count() and experiment")
print(" • Too many processes = overhead from context switching")
print("\n" + "=" * 70 + "\n")
def pool_context_manager():
"""
Demonstrate proper pool lifecycle management with context manager.
"""
print("=" * 70)
print("BEGINNER: Pool Lifecycle Management")
print("=" * 70)
def worker(x):
"""Simple worker function"""
return x * 2
print("\n📝 Recommended: Use context manager (with statement)")
print(" with Pool(4) as pool:")
print(" results = pool.map(worker, data)")
print(" # Pool automatically closed and terminated")
# Good practice: context manager
with Pool(4) as pool:
results = pool.map(worker, range(10))
print(f"\n✓ Results: {results}")
print("✓ Pool automatically cleaned up\n")
print("📝 Alternative: Manual management")
print(" pool = Pool(4)")
print(" results = pool.map(worker, data)")
print(" pool.close() # No more tasks accepted")
print(" pool.join() # Wait for workers to finish")
# Manual management (less preferred)
pool = Pool(4)
results = pool.map(worker, range(10, 20))
print(f"\n✓ Results: {results}")
pool.close() # Stop accepting new tasks
pool.join() # Wait for completion
print("✓ Pool manually cleaned up")
print("\n💡 Best Practice:")
print(" Always use 'with Pool() as pool:' for automatic cleanup")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# PART 2: INTERMEDIATE - Advanced Mapping Functions
# ============================================================================
def pool_starmap_multiple_arguments():
"""
Use starmap() when your function takes multiple arguments.
starmap unpacks argument tuples for you.
"""
print("=" * 70)
print("INTERMEDIATE: starmap() for Multiple Arguments")
print("=" * 70)
def calculate_power(base, exponent):
"""
Calculate base raised to exponent.
Args:
base: Base number
exponent: Exponent
Returns:
base ** exponent
"""
time.sleep(0.1)
return base ** exponent
# Data: list of (base, exponent) tuples
tasks = [
(2, 3), # 2^3 = 8
(3, 4), # 3^4 = 81
(5, 2), # 5^2 = 25
(10, 3), # 10^3 = 1000
(7, 2), # 7^2 = 49
]
print(f"\n📝 Tasks: {tasks}")
print(" Each tuple is (base, exponent)\n")
# Use starmap to unpack tuples
with Pool(3) as pool:
results = pool.starmap(calculate_power, tasks)
print("📊 Results:")
for (base, exp), result in zip(tasks, results):
print(f" {base}^{exp} = {result}")
print("\n💡 starmap vs map:")
print(" map(f, [x1, x2]) → f(x1), f(x2)")
print(" starmap(f, [(a,b)]) → f(a, b) # unpacks tuple")
print("\n" + "=" * 70 + "\n")
def pool_imap_lazy_iteration():
"""
Use imap() for lazy iteration over results.
Unlike map(), imap() returns results as they complete.
"""
print("=" * 70)
print("INTERMEDIATE: imap() for Lazy Results")
print("=" * 70)
def slow_square(x):
"""Square with variable delay"""
delay = random.uniform(0.5, 1.5)
time.sleep(delay)
return x ** 2, delay
numbers = list(range(8))
print(f"\n📝 Processing {len(numbers)} items with variable delays\n")
# Method 1: map() - waits for ALL results
print("⏱️ Using map() (blocks until all complete):")
start = time.time()
with Pool(4) as pool:
results = pool.map(slow_square, numbers)
elapsed = time.time() - start
print(f" Got all results after {elapsed:.2f}s")
print(f" Results: {[r[0] for r in results]}")
# Method 2: imap() - yields results as they arrive
print("\n⏱️ Using imap() (yields results incrementally):")
start = time.time()
with Pool(4) as pool:
# imap returns an iterator
for i, (result, delay) in enumerate(pool.imap(slow_square, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Got result #{i}: {result} (took {delay:.2f}s)")
print("\n💡 When to use imap():")
print(" ✓ Process results as they complete")
print(" ✓ Show progress updates")
print(" ✓ Lower memory usage (streaming)")
print(" ✓ Start processing early results while others compute")
print("\n" + "=" * 70 + "\n")
def pool_imap_unordered():
"""
Use imap_unordered() when result order doesn't matter.
This can be faster as it returns results immediately.
"""
print("=" * 70)
print("INTERMEDIATE: imap_unordered() for Faster Results")
print("=" * 70)
def process_item(x):
"""Process with random delay"""
delay = random.uniform(0.1, 1.0)
time.sleep(delay)
return x, delay
numbers = list(range(12))
print(f"\n📝 Processing {len(numbers)} items\n")
# Ordered iteration
print("⏱️ imap() - Maintains order:")
with Pool(4) as pool:
start = time.time()
for i, (num, delay) in enumerate(pool.imap(process_item, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Position {i}: item {num}")
# Unordered iteration
print("\n⏱️ imap_unordered() - Returns as completed:")
with Pool(4) as pool:
start = time.time()
for i, (num, delay) in enumerate(pool.imap_unordered(process_item, numbers)):
elapsed = time.time() - start
print(f" [{elapsed:.2f}s] Completed #{i}: item {num}")
print("\n💡 imap_unordered() advantages:")
print(" ✓ Faster - returns results immediately")
print(" ✓ No waiting for slow tasks to maintain order")
print(" ✓ Good for independent tasks")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# PART 3: ADVANCED - Async Operations and Error Handling
# ============================================================================
def pool_apply_async():
"""
Use apply_async() for single tasks with callbacks.
More flexible than map, but requires manual result handling.
"""
print("=" * 70)
print("ADVANCED: apply_async() with Callbacks")
print("=" * 70)
def compute_factorial(n):
"""Compute factorial of n"""
time.sleep(0.5)
result = math.factorial(n)
return n, result
def success_callback(result):
"""Called when task completes successfully"""
n, factorial = result
print(f" ✓ Success: {n}! = {factorial}")
def error_callback(error):
"""Called when task raises exception"""
print(f" ✗ Error: {error}")
print("\n⚙️ Submitting async tasks with callbacks:\n")
with Pool(3) as pool:
# Submit multiple async tasks
async_results = []
for n in [5, 10, 15, 20]:
result = pool.apply_async(
compute_factorial,
args=(n,),
callback=success_callback,
error_callback=error_callback
)
async_results.append(result)
# Wait for all tasks
pool.close()
pool.join()
print("\n💡 apply_async() features:")
print(" ✓ Submit individual tasks")
print(" ✓ Callbacks for success/error")
print(" ✓ Non-blocking submission")
print(" ✓ Flexible task management")
print("\n" + "=" * 70 + "\n")
def pool_map_async_with_progress():
"""
Use map_async() for non-blocking batch operations with progress tracking.
"""
print("=" * 70)
print("ADVANCED: map_async() with Progress Tracking")
print("=" * 70)
def heavy_computation(x):
"""CPU-intensive task"""
time.sleep(0.3)
return x ** 3
numbers = list(range(20))
print(f"\n⚙️ Processing {len(numbers)} items asynchronously...\n")
with Pool(4) as pool:
# Submit all tasks at once (non-blocking)
result = pool.map_async(heavy_computation, numbers)
# Do other work while tasks execute
while not result.ready():
remaining = result._number_left
print(f" Tasks remaining: {remaining}")
time.sleep(0.5)
# Get final results (will wait if not ready)
final_results = result.get()
print(f"\n✓ All tasks completed!")
print(f" First 5 results: {final_results[:5]}")
print("\n💡 map_async() advantages:")
print(" ✓ Non-blocking submission")
print(" ✓ Can check progress with ready()")
print(" ✓ Can track remaining tasks")
print(" ✓ Main thread free for other work")
print("\n" + "=" * 70 + "\n")
def pool_error_handling():
"""
Handle errors in pool tasks gracefully.
"""
print("=" * 70)
print("ADVANCED: Error Handling in Pools")
print("=" * 70)
def risky_division(args):
"""
Division that might fail.
Args:
args: (numerator, denominator) tuple
Returns:
Result of division
Raises:
ZeroDivisionError: If denominator is 0
"""
numerator, denominator = args
time.sleep(0.1)
# This will raise exception for denominator=0
return numerator / denominator
# Some operations will fail
tasks = [
(10, 2), # OK: 5.0
(20, 4), # OK: 5.0
(15, 0), # ERROR: division by zero
(30, 6), # OK: 5.0
(25, 0), # ERROR: division by zero
]
print("\n📝 Method 1: Let exceptions propagate (default)")
try:
with Pool(2) as pool:
# This will raise exception when it encounters error
results = pool.starmap(risky_division, tasks)
except Exception as e:
print(f" ✗ Caught exception: {type(e).__name__}: {e}")
print("\n📝 Method 2: Handle errors individually")
def safe_division(args):
"""Wrap risky function with error handling"""
try:
return risky_division(args), None
except Exception as e:
return None, str(e)
with Pool(2) as pool:
results = pool.starmap(safe_division, tasks)
print("\n📊 Results:")
for (num, denom), (result, error) in zip(tasks, results):
if error:
print(f" {num}/{denom}: ✗ Error: {error}")
else:
print(f" {num}/{denom}: ✓ {result}")
print("\n💡 Error handling strategies:")
print(" 1. Try-except around pool.map() - stops on first error")
print(" 2. Wrap worker in try-except - continue on errors")
print(" 3. Use error_callback in apply_async()")
print("\n" + "=" * 70 + "\n")
def pool_chunksize_optimization():
"""
Optimize performance with chunksize parameter.
"""
print("=" * 70)
print("ADVANCED: Chunksize Optimization")
print("=" * 70)
def quick_task(x):
"""Very quick task"""
return x * 2
# Many small tasks
numbers = list(range(1000))
print(f"\n⏱️ Processing {len(numbers)} quick tasks:")
print(" Testing different chunksizes...\n")
# Test different chunksizes
for chunksize in [1, 10, 50, 100]:
start = time.time()
with Pool(4) as pool:
results = pool.map(quick_task, numbers, chunksize=chunksize)
elapsed = time.time() - start
print(f" Chunksize {chunksize:3d}: {elapsed:.4f}s")
print("\n💡 Chunksize guidelines:")
print(" • Default: chunksize = len(data) / (processes * 4)")
print(" • Many quick tasks: larger chunksize (less overhead)")
print(" • Few slow tasks: smaller chunksize (better distribution)")
print(" • Experiment to find optimal value")
print("\n" + "=" * 70 + "\n")
def real_world_example_image_processing():
"""
Realistic example: Parallel image processing simulation.
"""
print("=" * 70)
print("ADVANCED: Real-World Example - Batch Processing")
print("=" * 70)
def process_image(image_id):
"""
Simulate image processing.
Args:
image_id: Image identifier
Returns:
Processing result
"""
# Simulate different processing times
time.sleep(random.uniform(0.2, 0.8))
# Simulate processing operations
operations = ["resize", "filter", "compress"]
return {
'id': image_id,
'operations': operations,
'size_mb': random.uniform(1.0, 5.0),
'status': 'success'
}
# Simulate 50 images to process
image_ids = [f"IMG_{i:04d}" for i in range(50)]
print(f"\n📷 Processing {len(image_ids)} images...\n")
start = time.time()
# Process with pool
with Pool(cpu_count()) as pool:
# Use imap_unordered for best performance
results = list(pool.imap_unordered(
process_image,
image_ids,
chunksize=5 # Process 5 images per worker at a time
))
elapsed = time.time() - start
# Statistics
total_size = sum(r['size_mb'] for r in results)
avg_size = total_size / len(results)
print(f"✓ Processed {len(results)} images in {elapsed:.2f}s")
print(f" Total size: {total_size:.1f} MB")
print(f" Average size: {avg_size:.2f} MB")
print(f" Throughput: {len(results)/elapsed:.1f} images/sec")
print("\n💡 This pattern works for:")
print(" • Image/video processing")
print(" • Data transformation")
print(" • File conversion")
print(" • API requests")
print(" • Report generation")
print("\n" + "=" * 70 + "\n")
# ============================================================================
# MAIN EXECUTION
# ============================================================================
def main():
"""Run all pool demonstrations."""
print("\n" + "=" * 70)
print(" " * 20 + "PROCESS POOLS")
print(" " * 15 + "multiprocessing.Pool Tutorial")
print("=" * 70 + "\n")
# Beginner level
basic_pool_usage()
pool_with_different_sizes()
pool_context_manager()
# Intermediate level
pool_starmap_multiple_arguments()
pool_imap_lazy_iteration()
pool_imap_unordered()
# Advanced level
pool_apply_async()
pool_map_async_with_progress()
pool_error_handling()
pool_chunksize_optimization()
real_world_example_image_processing()
print("\n" + "=" * 70)
print("Process Pools Tutorial Complete!")
print("=" * 70)
print("\n💡 Key Takeaways:")
print("1. Pool manages worker processes automatically")
print("2. map() is simplest - like built-in map")
print("3. starmap() for functions with multiple arguments")
print("4. imap() yields results incrementally")
print("5. imap_unordered() returns results faster")
print("6. apply_async() for fine-grained control")
print("7. Use chunksize to optimize performance")
print("8. Always use context manager for cleanup")
print("=" * 70 + "\n")
if __name__ == "__main__":
# IMPORTANT: This guard is required on Windows
main()
Runnable Example: prime_validation_pool.py¶
"""
Prime Number Validation: Serial vs Multiprocessing Pool
This tutorial demonstrates how to use multiprocessing for CPU-bound tasks.
WHAT IS CPU-BOUND WORK?
Tasks that spend most time doing computation (not waiting for I/O).
Examples: prime checking, calculations, data processing, compression.
WHY MULTIPROCESSING HELPS:
- Python's GIL (Global Interpreter Lock) prevents true parallelism with threads
- Multiprocessing creates separate processes (each has own GIL)
- On multi-core CPUs, processes can run truly in parallel
- Can achieve near-linear speedup with N cores
THE TASK:
Check if large numbers are prime. This is computationally expensive:
- Even with optimization, checking a large number takes millions of divisions
- No I/O involved, so it's pure CPU work
- Perfect for multiprocessing!
TECHNIQUES:
1. Serial: Check primes one at a time in main process
2. Pool: Use multiprocessing.Pool to distribute work across cores
Learning Goals:
- Understand when multiprocessing is appropriate
- Learn to use multiprocessing.Pool
- See real speedup from parallelization
- Understand the overhead involved
"""
import math
import time
from multiprocessing import Pool, cpu_count
if __name__ == "__main__":
print("=" * 70)
print("PRIME VALIDATION: SERIAL vs MULTIPROCESSING")
print("=" * 70)
# ============ EXAMPLE 1: Understanding Prime Checking ============
print("\n" + "=" * 70)
print("EXAMPLE 1: Understanding Prime Number Checking")
print("=" * 70)
print("""
A prime number is only divisible by 1 and itself.
NAIVE APPROACH: Check all numbers up to n
Too slow! For n = 10^18, this would take forever.
OPTIMIZED APPROACH (used here):
1. If n is even, it's not prime (except 2)
2. Only check odd divisors from 3 to sqrt(n)
3. If no divisor found, n is prime
WHY ONLY UP TO sqrt(n)?
If n = a * b where a <= b, then a <= sqrt(n).
So if n has a factor, we'll find one <= sqrt(n).
Example: Is 97 prime?
- sqrt(97) ≈ 9.8
- Check: 97 % 3, 97 % 5, 97 % 7, 97 % 9
- None divide evenly, so 97 is prime!
Example: Is 99 prime?
- sqrt(99) ≈ 9.9
- Check: 99 % 3 = 0, so 99 = 3 * 33, not prime!
Even with this optimization, checking large primes (15+ digits) takes
significant CPU time. This is why it's good for demonstrating multiprocessing.
""")
# ============ EXAMPLE 2: Prime Checking Function ============
print("\n" + "=" * 70)
print("EXAMPLE 2: Implement Prime Checking Function")
print("=" * 70)
def check_prime(n):
"""
Check if n is a prime number.
Algorithm:
1. Even numbers (except 2) are not prime
2. Check odd divisors from 3 to sqrt(n)
3. Use step of 2 to skip even numbers
This is CPU-bound work: pure computation, no I/O.
Time complexity: O(sqrt(n))
For n ~ 10^18, sqrt(n) ~ 10^9, so millions of checks.
"""
if n % 2 == 0:
return False
# Only need to check up to sqrt(n)
from_i = 3
to_i = math.sqrt(n) + 1
# Check odd numbers only (step by 2)
for i in range(from_i, int(to_i), 2):
if n % i == 0:
return False
return True
# Test with some examples
test_numbers = [
(2, True),
(97, True),
(100, False),
(10007, True),
]
print("\nTesting check_prime():")
for num, expected in test_numbers:
result = check_prime(num)
status = "PASS" if result == expected else "FAIL"
print(f" check_prime({num:6}) = {result:5} (expected {expected:5}) [{status}]")
# ============ EXAMPLE 3: Expensive Test Numbers ============
print("\n" + "=" * 70)
print("EXAMPLE 3: Test with Large Numbers")
print("=" * 70)
print("""
We'll test with large numbers that take significant time to check.
These are actual large primes and non-primes from cryptography:
- Large primes are expensive to verify
- Large composites are also expensive (must check many divisors before confirming)
Expected checking times (on modern CPU):
- 12-15 digit numbers: milliseconds
- 18 digit numbers: seconds
- 20+ digit numbers: tens of seconds
""")
# Test numbers - these are real cryptographic numbers
test_cases = [
("trivial non-prime", 112272535095295),
("15-digit composite", 100109100129100369),
("15-digit composite 2", 100109100129101027),
("18-digit prime", 100109100129100151),
("18-digit prime 2", 100109100129162907),
]
print(f"\nQuick validation on one number:")
num, label = test_cases[0]
print(f" Testing {label}: {num}")
start = time.time()
is_prime = check_prime(num)
elapsed = time.time() - start
print(f" Result: {is_prime} (checked in {elapsed:.4f}s)")
# ============ EXAMPLE 4: Serial Processing ============
print("\n" + "=" * 70)
print("EXAMPLE 4: Serial Processing (One at a time)")
print("=" * 70)
print("""
Process each number one at a time in the main process.
WHY IT'S SLOW:
- CPU can only do one thing at a time
- No parallelism at all
- Wastes cores on multi-core machines
Complexity:
- 4 numbers, each taking 1-2 seconds
- Total: ~4-8 seconds
- Only using 1 of your N cores
""")
print(f"\nProcessing {len(test_cases)} numbers serially:")
print(f"Current CPU count: {cpu_count()} cores\n")
results_serial = []
start_total = time.time()
for label, number in test_cases:
start = time.time()
is_prime = check_prime(number)
elapsed = time.time() - start
results_serial.append((label, number, is_prime, elapsed))
print(f" {label:20} ({number}): {is_prime:5} ({elapsed:.4f}s)")
elapsed_total_serial = time.time() - start_total
print(f"\nTotal serial time: {elapsed_total_serial:.4f}s")
print(f"Average per number: {elapsed_total_serial/len(test_cases):.4f}s")
# ============ EXAMPLE 5: Multiprocessing Pool ============
print("\n" + "=" * 70)
print("EXAMPLE 5: Multiprocessing Pool (Parallel)")
print("=" * 70)
print("""
Use multiprocessing.Pool to distribute work across multiple processes.
HOW IT WORKS:
1. Create a Pool with N worker processes (default: CPU count)
2. Submit jobs using pool.apply_async() or pool.map()
3. Each worker process runs jobs in parallel
4. Collect results
WHY IT'S FASTER:
- Each process runs on a separate CPU core
- True parallelism (not limited by GIL)
- Can check multiple numbers simultaneously
- Speedup approximately = min(num_jobs, num_cores)
OVERHEAD:
- Creating processes takes time (~0.1s each)
- Serializing data to pass to processes (pickling)
- For small jobs, overhead might exceed speedup!
- For large jobs, speedup dominates
In this case:
- Each job takes 1-2 seconds
- Process creation overhead (~0.5s total) is small
- Clear speedup expected
""")
print(f"\nProcessing {len(test_cases)} numbers with multiprocessing Pool:")
start_total = time.time()
# Create a Pool with default number of workers (CPU count)
with Pool() as pool:
# Use map() to apply check_prime to each number
# Returns results in same order as input
numbers = [num for label, num in test_cases]
results_list = pool.map(check_prime, numbers)
elapsed_total_parallel = time.time() - start_total
# Print results
for (label, number), is_prime in zip(test_cases, results_list):
print(f" {label:20} ({number}): {is_prime:5}")
print(f"\nTotal parallel time: {elapsed_total_parallel:.4f}s")
print(f"Average per number: {elapsed_total_parallel/len(test_cases):.4f}s")
# ============ EXAMPLE 6: Performance Comparison ============
print("\n" + "=" * 70)
print("EXAMPLE 6: Performance Comparison")
print("=" * 70)
speedup = elapsed_total_serial / elapsed_total_parallel
num_cores = cpu_count()
print(f"\nSpeedup Analysis:")
print(f" Serial time: {elapsed_total_serial:.4f}s")
print(f" Parallel time: {elapsed_total_parallel:.4f}s")
print(f" Speedup: {speedup:.2f}x")
print(f" CPU cores: {num_cores}")
print(f" Efficiency: {speedup/num_cores*100:.1f}% (speedup / cores)")
print(f"\nInterpretation:")
if speedup > 1:
print(f" Multiprocessing is {speedup:.1f}x faster!")
if speedup > num_cores * 0.8:
print(f" Good efficiency: Using cores well (>80%)")
else:
print(f" Okay efficiency: Using cores okay (overhead present)")
else:
print(f" Serial is faster (overhead > benefit)")
print(f" This can happen with small jobs or I/O-bound work")
print(f"\n{'*' * 70}")
print("WHEN MULTIPROCESSING HELPS")
print("{'*' * 70}")
print(f"""
GOOD FOR MULTIPROCESSING:
- CPU-bound tasks (pure computation)
- Long-running jobs (seconds or more)
- Many independent items to process
- Available CPU cores to distribute to
BAD FOR MULTIPROCESSING:
- I/O-bound tasks (use threading or async instead)
- Quick jobs (overhead > benefit)
- Need to share mutable state
- On single-core machines
FOR THIS TASK (prime checking):
- Job time: 1-2 seconds each (large numbers)
- CPU-bound: Pure math, no I/O
- Independent: Each number is independent
- Result: Multiprocessing is clearly beneficial
""")
# ============ EXAMPLE 7: Advanced Pool Usage ============
print("\n" + "=" * 70)
print("EXAMPLE 7: Advanced Pool Usage - apply_async()")
print("=" * 70)
print("""
Different ways to use Pool:
1. pool.map(func, iterable)
- Apply func to each item
- Blocks until all results ready
- Returns list of results
2. pool.apply_async(func, args)
- Submit single job
- Returns immediately with AsyncResult object
- Get result later with .get()
3. pool.imap(func, iterable)
- Like map, but returns iterator
- Results available as they complete
For this example, map() is fine. For more complex scenarios,
async methods let you submit jobs without waiting.
""")
print(f"\nDemonstration with apply_async():")
start_total = time.time()
with Pool() as pool:
# Submit all jobs without waiting
async_results = []
for label, number in test_cases:
async_result = pool.apply_async(check_prime, (number,))
async_results.append((label, number, async_result))
# Collect results as they complete
print("Jobs submitted, waiting for results...\n")
for label, number, async_result in async_results:
is_prime = async_result.get() # Blocks until this specific job completes
print(f" {label:20} ({number}): {is_prime:5}")
elapsed_async = time.time() - start_total
print(f"\nTotal time with async: {elapsed_async:.4f}s")
# ============ EXAMPLE 8: Overhead Analysis ============
print("\n" + "=" * 70)
print("EXAMPLE 8: Understanding Multiprocessing Overhead")
print("=" * 70)
print("""
Multiprocessing has overhead:
1. PROCESS CREATION: ~0.1-0.2s per process
- Creating processes is expensive
- Pool reuses processes to amortize this
2. PICKLING/SERIALIZATION: Time to send data to processes
- Data must be converted to bytes and sent via IPC
- Getting results back has same cost
- Small data: negligible
- Large data: can be significant
3. CONTEXT SWITCHING: OS switching between processes
- Not a huge factor on modern systems
WHEN OVERHEAD DOMINATES:
- Small jobs (microseconds)
- Frequent small submissions
- Large data to transfer
WHEN SPEEDUP DOMINATES:
- Long jobs (seconds+)
- Batch submissions
- Small data transfer
FOR THIS TASK:
- Process creation: ~0.5s total (5 processes, once at start)
- Per-job overhead: Negligible (just int, bool transfer)
- Job time: 1-2 seconds each
- Result: Overhead is <20% of time, speedup dominates!
""")
print("\n" + "=" * 70)
print("KEY TAKEAWAY")
print("=" * 70)
print(f"""
Multiprocessing provides {speedup:.1f}x speedup for this CPU-bound task.
USE MULTIPROCESSING WHEN:
1. Task is CPU-bound (computation, not I/O)
2. Jobs take significant time (seconds+)
3. You have multiple cores available
4. Data transfer overhead is small
USE THREADING FOR:
- I/O-bound tasks (network, disk, databases)
- Quick tasks with frequent context switches
USE ASYNCIO FOR:
- I/O-bound tasks with many concurrent operations
- Modern, efficient I/O multiplexing
FOR PEAK PERFORMANCE:
1. Profile to confirm it's worth optimizing
2. Start with simple Pool.map() approach
3. Use apply_async() for more control if needed
4. Consider process count (default: CPU count)
5. Measure actual speedup vs single-threaded baseline
Remember: Don't optimize prematurely! Use multiprocessing
only when profiling shows it's needed for CPU-bound work.
""")