Iterator Chaining¶
Iterator chaining combines multiple iterators or transformation functions into a pipeline. The itertools module provides powerful tools for creating complex iteration patterns with minimal memory overhead.
Basic Chaining¶
Composing Generators¶
def add_one(iterable):
for value in iterable:
yield value + 1
def double(iterable):
for value in iterable:
yield value * 2
numbers = range(1, 4)
result = double(add_one(numbers))
print(list(result))
Output:
[4, 6, 8]
Iterator Chaining Order¶
numbers = [1, 2, 3]
# Different order, different result
result1 = double(add_one(numbers))
result2 = add_one(double(numbers))
print(f"Add then double: {list(result1)}")
print(f"Double then add: {list(result2)}")
Output:
Add then double: [4, 6, 8]
Double then add: [3, 5, 7]
itertools Chains¶
Chaining Iterables¶
from itertools import chain
list1 = [1, 2, 3]
list2 = [4, 5, 6]
list3 = [7, 8, 9]
combined = chain(list1, list2, list3)
print(list(combined))
Output:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
Chain from Iterable¶
from itertools import chain
nested = [[1, 2], [3, 4], [5, 6]]
flattened = chain.from_iterable(nested)
print(list(flattened))
Output:
[1, 2, 3, 4, 5, 6]
Complex Pipelines¶
Multi-Step Processing¶
from itertools import filter, map
numbers = range(1, 11)
result = filter(lambda x: x % 2 == 0, map(lambda x: x ** 2, numbers))
print(list(result))
Output:
[4, 16, 36, 64, 100]
Real-World Example¶
import itertools
def read_lines():
data = ["hello world", "foo bar", "test data"]
for line in data:
yield line
def split_words(lines):
for line in lines:
yield from line.split()
def uppercase(words):
for word in words:
yield word.upper()
pipeline = uppercase(split_words(read_lines()))
print(list(pipeline))
Output:
['HELLO', 'WORLD', 'FOO', 'BAR', 'TEST', 'DATA']
Runnable Example: practical_applications.py¶
"""
PYTHON GENERATORS & ITERATORS - PRACTICAL APPLICATIONS
======================================================
Topic: Real-World Generator Applications
----------------------------------------
This module covers:
1. File processing and streaming data
2. Working with large datasets
3. Memory-efficient data pipelines
4. Custom iteration protocols
5. Integration with standard library
6. Performance optimization techniques
Learning Objectives:
- Apply generators to real-world problems
- Build efficient data processing pipelines
- Handle large files and datasets
- Integrate generators with Python stdlib
- Optimize memory usage and performance
- Implement custom iteration patterns
Prerequisites:
- Completion of beginner, intermediate, and advanced levels
- Strong understanding of all generator concepts
- Familiarity with Python standard library
- Understanding of file I/O and data processing
"""
import os
import csv
import json
import itertools
import functools
from collections import deque
import time
import sys
# ============================================================================
# SECTION 1: FILE PROCESSING
# ============================================================================
if __name__ == "__main__":
print("=" * 70)
print("SECTION 1: FILE PROCESSING")
print("=" * 70)
"""
Generators are ideal for file processing:
- Process files larger than available memory
- Stream data without loading entire file
- Memory efficient line-by-line processing
- Easy to chain processing operations
"""
# Example 1.1: Reading large files
print("\n--- Example 1.1: Memory-Efficient File Reading ---")
def read_large_file(filepath, chunk_size=8192):
"""
Read a large file in chunks.
Args:
filepath: Path to file
chunk_size: Size of each chunk in bytes
Yields:
Chunks of file content
"""
with open(filepath, 'r') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
def read_lines(filepath):
"""
Read file line by line.
More memory efficient than readlines() for large files.
"""
with open(filepath, 'r') as file:
for line in file:
yield line.strip()
def read_lines_filtered(filepath, filter_func):
"""
Read file with filtering.
Args:
filepath: Path to file
filter_func: Function to filter lines (returns bool)
Yields:
Filtered lines
"""
for line in read_lines(filepath):
if filter_func(line):
yield line
print("File reading generators defined (see code)")
# Example 1.2: Processing CSV files
print("\n--- Example 1.2: CSV Processing ---")
def csv_reader(filepath, skip_header=True):
"""
Generator for reading CSV files row by row.
Memory efficient for large CSV files.
"""
with open(filepath, 'r') as file:
reader = csv.reader(file)
if skip_header:
next(reader, None)
for row in reader:
yield row
def csv_dict_reader(filepath):
"""
Read CSV as dictionaries with column names as keys.
"""
with open(filepath, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
yield dict(row)
def filtered_csv(filepath, column_index, filter_value):
"""
Filter CSV rows based on column value.
Demonstrates combining file reading with filtering.
"""
for row in csv_reader(filepath):
if len(row) > column_index and row[column_index] == filter_value:
yield row
print("CSV processing generators defined")
# Example 1.3: JSON streaming
print("\n--- Example 1.3: JSON Line Processing ---")
def json_lines_reader(filepath):
"""
Read JSONL (JSON Lines) file.
Each line is a separate JSON object.
Memory efficient for large JSON datasets.
"""
with open(filepath, 'r') as file:
for line in file:
try:
yield json.loads(line.strip())
except json.JSONDecodeError:
continue
def json_array_items(filepath):
"""
Stream items from a JSON array file.
Note: This simplified version loads the whole file.
For true streaming of large JSON arrays, use ijson library.
"""
with open(filepath, 'r') as file:
data = json.load(file)
if isinstance(data, list):
for item in data:
yield item
print("JSON streaming generators defined")
# ============================================================================
# SECTION 2: DATA PIPELINES
# ============================================================================
print("\n" + "=" * 70)
print("SECTION 2: DATA PROCESSING PIPELINES")
print("=" * 70)
"""
Build complex data processing pipelines using generator composition.
Each stage processes data and passes to the next stage.
"""
# Example 2.1: ETL Pipeline (Extract, Transform, Load)
print("\n--- Example 2.1: ETL Pipeline ---")
def extract_data(source):
"""
Extract stage: Read data from source.
"""
for item in source:
yield item
def transform_data(data_stream, transform_func):
"""
Transform stage: Apply transformation to each item.
"""
for item in data_stream:
try:
yield transform_func(item)
except Exception as e:
# Skip items that fail transformation
continue
def filter_data(data_stream, filter_func):
"""
Filter stage: Keep only items matching condition.
"""
for item in data_stream:
if filter_func(item):
yield item
def load_data(data_stream, batch_size=100):
"""
Load stage: Batch data for efficient loading.
"""
batch = []
for item in data_stream:
batch.append(item)
if len(batch) >= batch_size:
yield batch
batch = []
if batch:
yield batch
# Example usage
print("ETL Pipeline example:")
# Simulate data source
source_data = range(1, 21)
# Build pipeline
extracted = extract_data(source_data)
transformed = transform_data(extracted, lambda x: x * 2)
filtered = filter_data(transformed, lambda x: x > 10)
loaded = load_data(filtered, batch_size=5)
# Process data
for batch_num, batch in enumerate(loaded, 1):
print(f"Batch {batch_num}: {batch}")
# Example 2.2: Chained transformations
print("\n--- Example 2.2: Chained Transformations ---")
def map_values(data_stream, func):
"""Apply function to each item."""
for item in data_stream:
yield func(item)
def filter_values(data_stream, predicate):
"""Keep items matching predicate."""
for item in data_stream:
if predicate(item):
yield item
def take_n(data_stream, n):
"""Take first n items."""
for i, item in enumerate(data_stream):
if i >= n:
break
yield item
def skip_n(data_stream, n):
"""Skip first n items."""
for i, item in enumerate(data_stream):
if i >= n:
yield item
# Chain multiple operations
print("Chained operations:")
data = range(1, 51)
result = take_n(
map_values(
filter_values(data, lambda x: x % 2 == 0),
lambda x: x ** 2
),
5
)
print(f"Result: {list(result)}")
# Example 2.3: Parallel pipelines
print("\n--- Example 2.3: Parallel Processing Streams ---")
def split_stream(data_stream, condition):
"""
Split data stream into two based on condition.
Returns:
Two generators: (true_stream, false_stream)
"""
true_items = []
false_items = []
for item in data_stream:
if condition(item):
true_items.append(item)
else:
false_items.append(item)
return iter(true_items), iter(false_items)
def merge_streams(*streams):
"""
Merge multiple streams into one.
"""
for stream in streams:
for item in stream:
yield item
print("Stream splitting and merging example:")
data = range(1, 11)
evens, odds = split_stream(data, lambda x: x % 2 == 0)
print(f"Evens: {list(evens)}")
print(f"Odds: {list(odds)}")
# ============================================================================
# SECTION 3: WORKING WITH LARGE DATASETS
# ============================================================================
print("\n" + "=" * 70)
print("SECTION 3: LARGE DATASET HANDLING")
print("=" * 70)
"""
Techniques for efficiently processing datasets that don't fit in memory.
"""
# Example 3.1: Sliding window analysis
print("\n--- Example 3.1: Sliding Window ---")
def sliding_window(iterable, window_size):
"""
Create sliding window view of data.
Useful for time-series analysis, moving averages.
"""
iterator = iter(iterable)
window = deque(maxlen=window_size)
# Fill initial window
for _ in range(window_size):
try:
window.append(next(iterator))
except StopIteration:
return
yield tuple(window)
# Slide the window
for item in iterator:
window.append(item)
yield tuple(window)
def moving_average(data_stream, window_size):
"""
Calculate moving average.
Memory efficient - only keeps window_size items in memory.
"""
for window in sliding_window(data_stream, window_size):
yield sum(window) / len(window)
# Example usage
print("Moving average example:")
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
print(f"Data: {data}")
print(f"3-period moving average: {list(moving_average(data, 3))}")
# Example 3.2: Chunking large datasets
print("\n--- Example 3.2: Processing in Chunks ---")
def chunk_data(iterable, chunk_size):
"""
Split data into chunks of specified size.
Useful for batch processing, API calls with rate limits.
"""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
def process_chunks(data_source, chunk_size, process_func):
"""
Process data in chunks.
Args:
data_source: Iterable data source
chunk_size: Size of each chunk
process_func: Function to apply to each chunk
"""
for chunk in chunk_data(data_source, chunk_size):
yield process_func(chunk)
# Example usage
print("Chunk processing example:")
data = range(1, 26)
chunks = chunk_data(data, 5)
print("Chunks of 5:")
for chunk_num, chunk in enumerate(chunks, 1):
print(f" Chunk {chunk_num}: {chunk}")
# Example 3.3: Sampling large datasets
print("\n--- Example 3.3: Data Sampling ---")
def reservoir_sample(data_stream, k):
"""
Reservoir sampling: Get k random samples from stream.
Useful when:
- Don't know total size in advance
- Stream is too large to fit in memory
- Need uniform random sampling
Algorithm guarantees each item has equal probability of selection.
"""
import random
reservoir = []
for i, item in enumerate(data_stream):
if i < k:
reservoir.append(item)
else:
# Random replacement
j = random.randint(0, i)
if j < k:
reservoir[j] = item
return reservoir
def every_nth(data_stream, n):
"""
Sample every nth item from stream.
Simple deterministic sampling.
"""
for i, item in enumerate(data_stream):
if i % n == 0:
yield item
print("Sampling examples:")
data = range(1, 101)
print(f"Every 10th item: {list(every_nth(data, 10))}")
# Reservoir sampling
import random
random.seed(42)
sample = reservoir_sample(range(1, 101), 5)
print(f"5 random samples: {sample}")
# ============================================================================
# SECTION 4: INTEGRATION WITH STANDARD LIBRARY
# ============================================================================
print("\n" + "=" * 70)
print("SECTION 4: ITERTOOLS INTEGRATION")
print("=" * 70)
"""
Combine generators with itertools for powerful data processing.
"""
# Example 4.1: Common itertools patterns
print("\n--- Example 4.1: Itertools Patterns ---")
def demonstrate_itertools():
"""
Show common itertools usage with generators.
"""
data = range(1, 11)
# islice - take a slice without creating list
print("First 5 items:")
print(list(itertools.islice(data, 5)))
# chain - concatenate iterables
gen1 = (x for x in range(1, 4))
gen2 = (x for x in range(4, 7))
print("\nChained generators:")
print(list(itertools.chain(gen1, gen2)))
# groupby - group consecutive items
data = [1, 1, 2, 2, 2, 3, 3, 1, 1]
print("\nGrouped by value:")
for key, group in itertools.groupby(data):
print(f" {key}: {list(group)}")
# cycle - repeat indefinitely
print("\nFirst 10 from cycled [1, 2, 3]:")
cycled = itertools.cycle([1, 2, 3])
print(list(itertools.islice(cycled, 10)))
# accumulate - running totals
print("\nAccumulated sum of [1, 2, 3, 4, 5]:")
print(list(itertools.accumulate([1, 2, 3, 4, 5])))
demonstrate_itertools()
# Example 4.2: Combining multiple iterables
print("\n--- Example 4.2: Combining Iterables ---")
def demonstrate_combinations():
"""Show various ways to combine iterables."""
# zip_longest - zip with padding
a = [1, 2, 3]
b = ['a', 'b']
print("zip_longest with padding:")
result = itertools.zip_longest(a, b, fillvalue='X')
print(list(result))
# product - cartesian product
print("\nCartesian product of [1,2] and ['a','b']:")
result = itertools.product([1, 2], ['a', 'b'])
print(list(result))
# combinations and permutations
print("\nCombinations of [1,2,3] taken 2 at a time:")
print(list(itertools.combinations([1, 2, 3], 2)))
print("\nPermutations of [1,2,3] taken 2 at a time:")
print(list(itertools.permutations([1, 2, 3], 2)))
demonstrate_combinations()
# Example 4.3: Custom generator with itertools
print("\n--- Example 4.3: Advanced Pipeline ---")
def unique_items(data_stream):
"""
Yield unique items from stream (maintains order).
Memory efficient for streams with many duplicates.
"""
seen = set()
for item in data_stream:
if item not in seen:
seen.add(item)
yield item
def pairwise(iterable):
"""
Generate pairs of consecutive items.
s -> (s0,s1), (s1,s2), (s2, s3), ...
"""
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
# Example usage
print("Unique items from [1, 2, 2, 3, 1, 4, 3, 5]:")
data = [1, 2, 2, 3, 1, 4, 3, 5]
print(list(unique_items(data)))
print("\nPairs from [1, 2, 3, 4, 5]:")
print(list(pairwise([1, 2, 3, 4, 5])))
# ============================================================================
# SECTION 5: PERFORMANCE OPTIMIZATION
# ============================================================================
print("\n" + "=" * 70)
print("SECTION 5: PERFORMANCE OPTIMIZATION")
print("=" * 70)
"""
Techniques for optimizing generator performance.
"""
# Example 5.1: Generator vs list performance
print("\n--- Example 5.1: Performance Comparison ---")
def measure_performance():
"""
Compare performance of different approaches.
"""
n = 100000
# Method 1: List comprehension
start = time.time()
result = sum([x ** 2 for x in range(n)])
time_list = time.time() - start
# Method 2: Generator expression
start = time.time()
result = sum(x ** 2 for x in range(n))
time_gen = time.time() - start
# Method 3: Generator function
def squares(n):
for i in range(n):
yield i ** 2
start = time.time()
result = sum(squares(n))
time_gen_func = time.time() - start
print(f"List comprehension: {time_list:.4f}s")
print(f"Generator expression: {time_gen:.4f}s")
print(f"Generator function: {time_gen_func:.4f}s")
measure_performance()
# Example 5.2: Memory efficiency
print("\n--- Example 5.2: Memory Efficiency ---")
def memory_comparison():
"""
Compare memory usage.
"""
n = 1000000
# List - stores all values
list_obj = [x for x in range(n)]
list_size = sys.getsizeof(list_obj)
# Generator - stores only state
gen_obj = (x for x in range(n))
gen_size = sys.getsizeof(gen_obj)
print(f"List memory: {list_size:,} bytes")
print(f"Generator memory: {gen_size:,} bytes")
print(f"Savings: {list_size / gen_size:.0f}x")
memory_comparison()
# Example 5.3: Lazy evaluation benefits
print("\n--- Example 5.3: Lazy Evaluation Benefits ---")
def expensive_computation(x):
"""Simulate expensive operation."""
return x ** 2
def eager_approach(data):
"""Process all data immediately."""
return [expensive_computation(x) for x in data]
def lazy_approach(data):
"""Process data on-demand."""
return (expensive_computation(x) for x in data)
# Compare when we only need first few items
print("When processing only first 3 items:")
data = range(10000)
start = time.time()
eager = eager_approach(data)
result = eager[:3]
time_eager = time.time() - start
start = time.time()
lazy = lazy_approach(data)
result = list(itertools.islice(lazy, 3))
time_lazy = time.time() - start
print(f"Eager: {time_eager:.4f}s (processed all items)")
print(f"Lazy: {time_lazy:.4f}s (processed only 3 items)")
# ============================================================================
# SECTION 6: REAL-WORLD EXAMPLES
# ============================================================================
print("\n" + "=" * 70)
print("SECTION 6: REAL-WORLD EXAMPLES")
print("=" * 70)
# Example 6.1: Log file analyzer
print("\n--- Example 6.1: Log File Analyzer ---")
def parse_log_line(line):
"""Parse a log line into structured data."""
# Simplified parser
parts = line.split(' - ')
if len(parts) >= 3:
return {
'timestamp': parts[0],
'level': parts[1],
'message': parts[2]
}
return None
def filter_log_level(log_stream, level):
"""Filter logs by level."""
for entry in log_stream:
if entry and entry.get('level') == level:
yield entry
def log_analyzer(filepath):
"""
Analyze log file efficiently.
Demonstrates real-world file processing pipeline.
"""
# Read file line by line
with open(filepath, 'r') as file:
# Parse each line
parsed = (parse_log_line(line.strip()) for line in file)
# Filter out None values
valid = (entry for entry in parsed if entry is not None)
# Can now process stream
for entry in valid:
yield entry
print("Log analyzer pattern defined (see code)")
# Example 6.2: Data aggregation
print("\n--- Example 6.2: Data Aggregation ---")
def aggregate_by_key(data_stream, key_func):
"""
Aggregate data by key.
Args:
data_stream: Stream of dictionaries
key_func: Function to extract grouping key
Yields:
(key, group) tuples
"""
# Group consecutive items by key
for key, group in itertools.groupby(data_stream, key_func):
yield key, list(group)
def sum_by_key(data_stream, key_func, value_func):
"""
Sum values grouped by key.
"""
for key, group in aggregate_by_key(data_stream, key_func):
total = sum(value_func(item) for item in group)
yield key, total
# Example usage
print("Aggregation example:")
sales_data = [
{'date': '2024-01-01', 'amount': 100},
{'date': '2024-01-01', 'amount': 150},
{'date': '2024-01-02', 'amount': 200},
{'date': '2024-01-02', 'amount': 250},
]
totals = sum_by_key(
sales_data,
key_func=lambda x: x['date'],
value_func=lambda x: x['amount']
)
for date, total in totals:
print(f" {date}: ${total}")
# Example 6.3: Event stream processing
print("\n--- Example 6.3: Event Stream Processor ---")
def event_stream_processor(events):
"""
Process stream of events.
Demonstrates real-time event processing pattern.
"""
window = deque(maxlen=100) # Keep last 100 events
for event in events:
window.append(event)
# Check for patterns
if len(window) >= 3:
last_three = list(window)[-3:]
# Check for pattern (simplified)
if all(e.get('type') == 'error' for e in last_three):
yield {
'alert': 'Multiple consecutive errors',
'events': last_three
}
print("Event stream processor defined (see code)")
# ============================================================================
# SUMMARY AND BEST PRACTICES
# ============================================================================
print("\n" + "=" * 70)
print("SUMMARY: PRACTICAL APPLICATIONS")
print("=" * 70)
print("""
KEY APPLICATIONS:
1. FILE PROCESSING:
- Line-by-line reading for large files
- CSV/JSON streaming
- Log file analysis
- Memory-efficient parsing
2. DATA PIPELINES:
- ETL workflows
- Chained transformations
- Filtering and mapping
- Batch processing
3. LARGE DATASETS:
- Sliding windows
- Chunking
- Sampling
- Streaming aggregation
4. ITERTOOLS INTEGRATION:
- islice, chain, groupby
- zip_longest, product
- combinations, permutations
- accumulate
5. PERFORMANCE:
- Lazy evaluation
- Memory efficiency
- Process on-demand
- Avoid unnecessary computation
BEST PRACTICES:
1. Use generators for:
- Large files
- Streaming data
- Memory constraints
- One-pass processing
2. Combine with itertools:
- Built-in optimizations
- Tested implementations
- Composable operations
3. Build pipelines:
- Small, focused generators
- Easy to test
- Reusable components
- Clear data flow
4. Handle resources:
- Use context managers
- Close generators properly
- Clean up in finally blocks
5. Optimize performance:
- Profile first
- Use generator expressions
- Avoid premature conversion to lists
- Process in chunks
COMMON PATTERNS:
1. File Processing:
with open(file) as f:
for line in f:
yield process(line)
2. Pipeline:
stage3(stage2(stage1(source)))
3. Filter-Map:
(transform(x) for x in data if condition(x))
4. Batching:
for chunk in chunks(data, size):
yield process_batch(chunk)
5. Aggregation:
for key, group in groupby(data, key_func):
yield key, summarize(group)
REMEMBER:
- Generators excel at streaming data
- Build pipelines for complex processing
- Use lazy evaluation for efficiency
- Integrate with itertools
- Profile before optimizing
- Keep generators focused and simple
""")
print("\n" + "=" * 70)
print("END OF PRACTICAL APPLICATIONS")
print("Now apply these patterns to the exercises!")
print("=" * 70)