Skip to content

Iterator Chaining

Iterator chaining combines multiple iterators or transformation functions into a pipeline. The itertools module provides powerful tools for creating complex iteration patterns with minimal memory overhead.


Basic Chaining

Composing Generators

def add_one(iterable):
    for value in iterable:
        yield value + 1

def double(iterable):
    for value in iterable:
        yield value * 2

numbers = range(1, 4)
result = double(add_one(numbers))
print(list(result))

Output:

[4, 6, 8]

Iterator Chaining Order

numbers = [1, 2, 3]

# Different order, different result
result1 = double(add_one(numbers))
result2 = add_one(double(numbers))

print(f"Add then double: {list(result1)}")
print(f"Double then add: {list(result2)}")

Output:

Add then double: [4, 6, 8]
Double then add: [3, 5, 7]

itertools Chains

Chaining Iterables

from itertools import chain

list1 = [1, 2, 3]
list2 = [4, 5, 6]
list3 = [7, 8, 9]

combined = chain(list1, list2, list3)
print(list(combined))

Output:

[1, 2, 3, 4, 5, 6, 7, 8, 9]

Chain from Iterable

from itertools import chain

nested = [[1, 2], [3, 4], [5, 6]]
flattened = chain.from_iterable(nested)
print(list(flattened))

Output:

[1, 2, 3, 4, 5, 6]

Complex Pipelines

Multi-Step Processing

from itertools import filter, map

numbers = range(1, 11)
result = filter(lambda x: x % 2 == 0, map(lambda x: x ** 2, numbers))
print(list(result))

Output:

[4, 16, 36, 64, 100]

Real-World Example

import itertools

def read_lines():
    data = ["hello world", "foo bar", "test data"]
    for line in data:
        yield line

def split_words(lines):
    for line in lines:
        yield from line.split()

def uppercase(words):
    for word in words:
        yield word.upper()

pipeline = uppercase(split_words(read_lines()))
print(list(pipeline))

Output:

['HELLO', 'WORLD', 'FOO', 'BAR', 'TEST', 'DATA']


Runnable Example: practical_applications.py

"""
PYTHON GENERATORS & ITERATORS - PRACTICAL APPLICATIONS
======================================================

Topic: Real-World Generator Applications
----------------------------------------

This module covers:
1. File processing and streaming data
2. Working with large datasets
3. Memory-efficient data pipelines
4. Custom iteration protocols
5. Integration with standard library
6. Performance optimization techniques

Learning Objectives:
- Apply generators to real-world problems
- Build efficient data processing pipelines
- Handle large files and datasets
- Integrate generators with Python stdlib
- Optimize memory usage and performance
- Implement custom iteration patterns

Prerequisites:
- Completion of beginner, intermediate, and advanced levels
- Strong understanding of all generator concepts
- Familiarity with Python standard library
- Understanding of file I/O and data processing
"""

import os
import csv
import json
import itertools
import functools
from collections import deque
import time
import sys

# ============================================================================
# SECTION 1: FILE PROCESSING
# ============================================================================

if __name__ == "__main__":

    print("=" * 70)
    print("SECTION 1: FILE PROCESSING")
    print("=" * 70)

    """
    Generators are ideal for file processing:
    - Process files larger than available memory
    - Stream data without loading entire file
    - Memory efficient line-by-line processing
    - Easy to chain processing operations
    """

    # Example 1.1: Reading large files
    print("\n--- Example 1.1: Memory-Efficient File Reading ---")


    def read_large_file(filepath, chunk_size=8192):
        """
        Read a large file in chunks.

        Args:
            filepath: Path to file
            chunk_size: Size of each chunk in bytes

        Yields:
            Chunks of file content
        """
        with open(filepath, 'r') as file:
            while True:
                chunk = file.read(chunk_size)
                if not chunk:
                    break
                yield chunk


    def read_lines(filepath):
        """
        Read file line by line.

        More memory efficient than readlines() for large files.
        """
        with open(filepath, 'r') as file:
            for line in file:
                yield line.strip()


    def read_lines_filtered(filepath, filter_func):
        """
        Read file with filtering.

        Args:
            filepath: Path to file
            filter_func: Function to filter lines (returns bool)

        Yields:
            Filtered lines
        """
        for line in read_lines(filepath):
            if filter_func(line):
                yield line


    print("File reading generators defined (see code)")


    # Example 1.2: Processing CSV files
    print("\n--- Example 1.2: CSV Processing ---")


    def csv_reader(filepath, skip_header=True):
        """
        Generator for reading CSV files row by row.

        Memory efficient for large CSV files.
        """
        with open(filepath, 'r') as file:
            reader = csv.reader(file)

            if skip_header:
                next(reader, None)

            for row in reader:
                yield row


    def csv_dict_reader(filepath):
        """
        Read CSV as dictionaries with column names as keys.
        """
        with open(filepath, 'r') as file:
            reader = csv.DictReader(file)
            for row in reader:
                yield dict(row)


    def filtered_csv(filepath, column_index, filter_value):
        """
        Filter CSV rows based on column value.

        Demonstrates combining file reading with filtering.
        """
        for row in csv_reader(filepath):
            if len(row) > column_index and row[column_index] == filter_value:
                yield row


    print("CSV processing generators defined")


    # Example 1.3: JSON streaming
    print("\n--- Example 1.3: JSON Line Processing ---")


    def json_lines_reader(filepath):
        """
        Read JSONL (JSON Lines) file.

        Each line is a separate JSON object.
        Memory efficient for large JSON datasets.
        """
        with open(filepath, 'r') as file:
            for line in file:
                try:
                    yield json.loads(line.strip())
                except json.JSONDecodeError:
                    continue


    def json_array_items(filepath):
        """
        Stream items from a JSON array file.

        Note: This simplified version loads the whole file.
        For true streaming of large JSON arrays, use ijson library.
        """
        with open(filepath, 'r') as file:
            data = json.load(file)
            if isinstance(data, list):
                for item in data:
                    yield item


    print("JSON streaming generators defined")


    # ============================================================================
    # SECTION 2: DATA PIPELINES
    # ============================================================================

    print("\n" + "=" * 70)
    print("SECTION 2: DATA PROCESSING PIPELINES")
    print("=" * 70)

    """
    Build complex data processing pipelines using generator composition.
    Each stage processes data and passes to the next stage.
    """

    # Example 2.1: ETL Pipeline (Extract, Transform, Load)
    print("\n--- Example 2.1: ETL Pipeline ---")


    def extract_data(source):
        """
        Extract stage: Read data from source.
        """
        for item in source:
            yield item


    def transform_data(data_stream, transform_func):
        """
        Transform stage: Apply transformation to each item.
        """
        for item in data_stream:
            try:
                yield transform_func(item)
            except Exception as e:
                # Skip items that fail transformation
                continue


    def filter_data(data_stream, filter_func):
        """
        Filter stage: Keep only items matching condition.
        """
        for item in data_stream:
            if filter_func(item):
                yield item


    def load_data(data_stream, batch_size=100):
        """
        Load stage: Batch data for efficient loading.
        """
        batch = []
        for item in data_stream:
            batch.append(item)
            if len(batch) >= batch_size:
                yield batch
                batch = []

        if batch:
            yield batch


    # Example usage
    print("ETL Pipeline example:")

    # Simulate data source
    source_data = range(1, 21)

    # Build pipeline
    extracted = extract_data(source_data)
    transformed = transform_data(extracted, lambda x: x * 2)
    filtered = filter_data(transformed, lambda x: x > 10)
    loaded = load_data(filtered, batch_size=5)

    # Process data
    for batch_num, batch in enumerate(loaded, 1):
        print(f"Batch {batch_num}: {batch}")


    # Example 2.2: Chained transformations
    print("\n--- Example 2.2: Chained Transformations ---")


    def map_values(data_stream, func):
        """Apply function to each item."""
        for item in data_stream:
            yield func(item)


    def filter_values(data_stream, predicate):
        """Keep items matching predicate."""
        for item in data_stream:
            if predicate(item):
                yield item


    def take_n(data_stream, n):
        """Take first n items."""
        for i, item in enumerate(data_stream):
            if i >= n:
                break
            yield item


    def skip_n(data_stream, n):
        """Skip first n items."""
        for i, item in enumerate(data_stream):
            if i >= n:
                yield item


    # Chain multiple operations
    print("Chained operations:")
    data = range(1, 51)
    result = take_n(
        map_values(
            filter_values(data, lambda x: x % 2 == 0),
            lambda x: x ** 2
        ),
        5
    )
    print(f"Result: {list(result)}")


    # Example 2.3: Parallel pipelines
    print("\n--- Example 2.3: Parallel Processing Streams ---")


    def split_stream(data_stream, condition):
        """
        Split data stream into two based on condition.

        Returns:
            Two generators: (true_stream, false_stream)
        """
        true_items = []
        false_items = []

        for item in data_stream:
            if condition(item):
                true_items.append(item)
            else:
                false_items.append(item)

        return iter(true_items), iter(false_items)


    def merge_streams(*streams):
        """
        Merge multiple streams into one.
        """
        for stream in streams:
            for item in stream:
                yield item


    print("Stream splitting and merging example:")
    data = range(1, 11)
    evens, odds = split_stream(data, lambda x: x % 2 == 0)

    print(f"Evens: {list(evens)}")
    print(f"Odds: {list(odds)}")


    # ============================================================================
    # SECTION 3: WORKING WITH LARGE DATASETS
    # ============================================================================

    print("\n" + "=" * 70)
    print("SECTION 3: LARGE DATASET HANDLING")
    print("=" * 70)

    """
    Techniques for efficiently processing datasets that don't fit in memory.
    """

    # Example 3.1: Sliding window analysis
    print("\n--- Example 3.1: Sliding Window ---")


    def sliding_window(iterable, window_size):
        """
        Create sliding window view of data.

        Useful for time-series analysis, moving averages.
        """
        iterator = iter(iterable)
        window = deque(maxlen=window_size)

        # Fill initial window
        for _ in range(window_size):
            try:
                window.append(next(iterator))
            except StopIteration:
                return

        yield tuple(window)

        # Slide the window
        for item in iterator:
            window.append(item)
            yield tuple(window)


    def moving_average(data_stream, window_size):
        """
        Calculate moving average.

        Memory efficient - only keeps window_size items in memory.
        """
        for window in sliding_window(data_stream, window_size):
            yield sum(window) / len(window)


    # Example usage
    print("Moving average example:")
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    print(f"Data: {data}")
    print(f"3-period moving average: {list(moving_average(data, 3))}")


    # Example 3.2: Chunking large datasets
    print("\n--- Example 3.2: Processing in Chunks ---")


    def chunk_data(iterable, chunk_size):
        """
        Split data into chunks of specified size.

        Useful for batch processing, API calls with rate limits.
        """
        chunk = []
        for item in iterable:
            chunk.append(item)
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []

        if chunk:
            yield chunk


    def process_chunks(data_source, chunk_size, process_func):
        """
        Process data in chunks.

        Args:
            data_source: Iterable data source
            chunk_size: Size of each chunk
            process_func: Function to apply to each chunk
        """
        for chunk in chunk_data(data_source, chunk_size):
            yield process_func(chunk)


    # Example usage
    print("Chunk processing example:")
    data = range(1, 26)
    chunks = chunk_data(data, 5)
    print("Chunks of 5:")
    for chunk_num, chunk in enumerate(chunks, 1):
        print(f"  Chunk {chunk_num}: {chunk}")


    # Example 3.3: Sampling large datasets
    print("\n--- Example 3.3: Data Sampling ---")


    def reservoir_sample(data_stream, k):
        """
        Reservoir sampling: Get k random samples from stream.

        Useful when:
        - Don't know total size in advance
        - Stream is too large to fit in memory
        - Need uniform random sampling

        Algorithm guarantees each item has equal probability of selection.
        """
        import random

        reservoir = []

        for i, item in enumerate(data_stream):
            if i < k:
                reservoir.append(item)
            else:
                # Random replacement
                j = random.randint(0, i)
                if j < k:
                    reservoir[j] = item

        return reservoir


    def every_nth(data_stream, n):
        """
        Sample every nth item from stream.

        Simple deterministic sampling.
        """
        for i, item in enumerate(data_stream):
            if i % n == 0:
                yield item


    print("Sampling examples:")
    data = range(1, 101)

    print(f"Every 10th item: {list(every_nth(data, 10))}")

    # Reservoir sampling
    import random
    random.seed(42)
    sample = reservoir_sample(range(1, 101), 5)
    print(f"5 random samples: {sample}")


    # ============================================================================
    # SECTION 4: INTEGRATION WITH STANDARD LIBRARY
    # ============================================================================

    print("\n" + "=" * 70)
    print("SECTION 4: ITERTOOLS INTEGRATION")
    print("=" * 70)

    """
    Combine generators with itertools for powerful data processing.
    """

    # Example 4.1: Common itertools patterns
    print("\n--- Example 4.1: Itertools Patterns ---")


    def demonstrate_itertools():
        """
        Show common itertools usage with generators.
        """
        data = range(1, 11)

        # islice - take a slice without creating list
        print("First 5 items:")
        print(list(itertools.islice(data, 5)))

        # chain - concatenate iterables
        gen1 = (x for x in range(1, 4))
        gen2 = (x for x in range(4, 7))
        print("\nChained generators:")
        print(list(itertools.chain(gen1, gen2)))

        # groupby - group consecutive items
        data = [1, 1, 2, 2, 2, 3, 3, 1, 1]
        print("\nGrouped by value:")
        for key, group in itertools.groupby(data):
            print(f"  {key}: {list(group)}")

        # cycle - repeat indefinitely
        print("\nFirst 10 from cycled [1, 2, 3]:")
        cycled = itertools.cycle([1, 2, 3])
        print(list(itertools.islice(cycled, 10)))

        # accumulate - running totals
        print("\nAccumulated sum of [1, 2, 3, 4, 5]:")
        print(list(itertools.accumulate([1, 2, 3, 4, 5])))


    demonstrate_itertools()


    # Example 4.2: Combining multiple iterables
    print("\n--- Example 4.2: Combining Iterables ---")


    def demonstrate_combinations():
        """Show various ways to combine iterables."""

        # zip_longest - zip with padding
        a = [1, 2, 3]
        b = ['a', 'b']
        print("zip_longest with padding:")
        result = itertools.zip_longest(a, b, fillvalue='X')
        print(list(result))

        # product - cartesian product
        print("\nCartesian product of [1,2] and ['a','b']:")
        result = itertools.product([1, 2], ['a', 'b'])
        print(list(result))

        # combinations and permutations
        print("\nCombinations of [1,2,3] taken 2 at a time:")
        print(list(itertools.combinations([1, 2, 3], 2)))

        print("\nPermutations of [1,2,3] taken 2 at a time:")
        print(list(itertools.permutations([1, 2, 3], 2)))


    demonstrate_combinations()


    # Example 4.3: Custom generator with itertools
    print("\n--- Example 4.3: Advanced Pipeline ---")


    def unique_items(data_stream):
        """
        Yield unique items from stream (maintains order).

        Memory efficient for streams with many duplicates.
        """
        seen = set()
        for item in data_stream:
            if item not in seen:
                seen.add(item)
                yield item


    def pairwise(iterable):
        """
        Generate pairs of consecutive items.

        s -> (s0,s1), (s1,s2), (s2, s3), ...
        """
        a, b = itertools.tee(iterable)
        next(b, None)
        return zip(a, b)


    # Example usage
    print("Unique items from [1, 2, 2, 3, 1, 4, 3, 5]:")
    data = [1, 2, 2, 3, 1, 4, 3, 5]
    print(list(unique_items(data)))

    print("\nPairs from [1, 2, 3, 4, 5]:")
    print(list(pairwise([1, 2, 3, 4, 5])))


    # ============================================================================
    # SECTION 5: PERFORMANCE OPTIMIZATION
    # ============================================================================

    print("\n" + "=" * 70)
    print("SECTION 5: PERFORMANCE OPTIMIZATION")
    print("=" * 70)

    """
    Techniques for optimizing generator performance.
    """

    # Example 5.1: Generator vs list performance
    print("\n--- Example 5.1: Performance Comparison ---")


    def measure_performance():
        """
        Compare performance of different approaches.
        """
        n = 100000

        # Method 1: List comprehension
        start = time.time()
        result = sum([x ** 2 for x in range(n)])
        time_list = time.time() - start

        # Method 2: Generator expression
        start = time.time()
        result = sum(x ** 2 for x in range(n))
        time_gen = time.time() - start

        # Method 3: Generator function
        def squares(n):
            for i in range(n):
                yield i ** 2

        start = time.time()
        result = sum(squares(n))
        time_gen_func = time.time() - start

        print(f"List comprehension: {time_list:.4f}s")
        print(f"Generator expression: {time_gen:.4f}s")
        print(f"Generator function: {time_gen_func:.4f}s")


    measure_performance()


    # Example 5.2: Memory efficiency
    print("\n--- Example 5.2: Memory Efficiency ---")


    def memory_comparison():
        """
        Compare memory usage.
        """
        n = 1000000

        # List - stores all values
        list_obj = [x for x in range(n)]
        list_size = sys.getsizeof(list_obj)

        # Generator - stores only state
        gen_obj = (x for x in range(n))
        gen_size = sys.getsizeof(gen_obj)

        print(f"List memory: {list_size:,} bytes")
        print(f"Generator memory: {gen_size:,} bytes")
        print(f"Savings: {list_size / gen_size:.0f}x")


    memory_comparison()


    # Example 5.3: Lazy evaluation benefits
    print("\n--- Example 5.3: Lazy Evaluation Benefits ---")


    def expensive_computation(x):
        """Simulate expensive operation."""
        return x ** 2


    def eager_approach(data):
        """Process all data immediately."""
        return [expensive_computation(x) for x in data]


    def lazy_approach(data):
        """Process data on-demand."""
        return (expensive_computation(x) for x in data)


    # Compare when we only need first few items
    print("When processing only first 3 items:")

    data = range(10000)

    start = time.time()
    eager = eager_approach(data)
    result = eager[:3]
    time_eager = time.time() - start

    start = time.time()
    lazy = lazy_approach(data)
    result = list(itertools.islice(lazy, 3))
    time_lazy = time.time() - start

    print(f"Eager: {time_eager:.4f}s (processed all items)")
    print(f"Lazy: {time_lazy:.4f}s (processed only 3 items)")


    # ============================================================================
    # SECTION 6: REAL-WORLD EXAMPLES
    # ============================================================================

    print("\n" + "=" * 70)
    print("SECTION 6: REAL-WORLD EXAMPLES")
    print("=" * 70)

    # Example 6.1: Log file analyzer
    print("\n--- Example 6.1: Log File Analyzer ---")


    def parse_log_line(line):
        """Parse a log line into structured data."""
        # Simplified parser
        parts = line.split(' - ')
        if len(parts) >= 3:
            return {
                'timestamp': parts[0],
                'level': parts[1],
                'message': parts[2]
            }
        return None


    def filter_log_level(log_stream, level):
        """Filter logs by level."""
        for entry in log_stream:
            if entry and entry.get('level') == level:
                yield entry


    def log_analyzer(filepath):
        """
        Analyze log file efficiently.

        Demonstrates real-world file processing pipeline.
        """
        # Read file line by line
        with open(filepath, 'r') as file:
            # Parse each line
            parsed = (parse_log_line(line.strip()) for line in file)

            # Filter out None values
            valid = (entry for entry in parsed if entry is not None)

            # Can now process stream
            for entry in valid:
                yield entry


    print("Log analyzer pattern defined (see code)")


    # Example 6.2: Data aggregation
    print("\n--- Example 6.2: Data Aggregation ---")


    def aggregate_by_key(data_stream, key_func):
        """
        Aggregate data by key.

        Args:
            data_stream: Stream of dictionaries
            key_func: Function to extract grouping key

        Yields:
            (key, group) tuples
        """
        # Group consecutive items by key
        for key, group in itertools.groupby(data_stream, key_func):
            yield key, list(group)


    def sum_by_key(data_stream, key_func, value_func):
        """
        Sum values grouped by key.
        """
        for key, group in aggregate_by_key(data_stream, key_func):
            total = sum(value_func(item) for item in group)
            yield key, total


    # Example usage
    print("Aggregation example:")
    sales_data = [
        {'date': '2024-01-01', 'amount': 100},
        {'date': '2024-01-01', 'amount': 150},
        {'date': '2024-01-02', 'amount': 200},
        {'date': '2024-01-02', 'amount': 250},
    ]

    totals = sum_by_key(
        sales_data,
        key_func=lambda x: x['date'],
        value_func=lambda x: x['amount']
    )

    for date, total in totals:
        print(f"  {date}: ${total}")


    # Example 6.3: Event stream processing
    print("\n--- Example 6.3: Event Stream Processor ---")


    def event_stream_processor(events):
        """
        Process stream of events.

        Demonstrates real-time event processing pattern.
        """
        window = deque(maxlen=100)  # Keep last 100 events

        for event in events:
            window.append(event)

            # Check for patterns
            if len(window) >= 3:
                last_three = list(window)[-3:]
                # Check for pattern (simplified)
                if all(e.get('type') == 'error' for e in last_three):
                    yield {
                        'alert': 'Multiple consecutive errors',
                        'events': last_three
                    }


    print("Event stream processor defined (see code)")


    # ============================================================================
    # SUMMARY AND BEST PRACTICES
    # ============================================================================

    print("\n" + "=" * 70)
    print("SUMMARY: PRACTICAL APPLICATIONS")
    print("=" * 70)

    print("""
    KEY APPLICATIONS:

    1. FILE PROCESSING:
       - Line-by-line reading for large files
       - CSV/JSON streaming
       - Log file analysis
       - Memory-efficient parsing

    2. DATA PIPELINES:
       - ETL workflows
       - Chained transformations
       - Filtering and mapping
       - Batch processing

    3. LARGE DATASETS:
       - Sliding windows
       - Chunking
       - Sampling
       - Streaming aggregation

    4. ITERTOOLS INTEGRATION:
       - islice, chain, groupby
       - zip_longest, product
       - combinations, permutations
       - accumulate

    5. PERFORMANCE:
       - Lazy evaluation
       - Memory efficiency
       - Process on-demand
       - Avoid unnecessary computation

    BEST PRACTICES:

    1. Use generators for:
       - Large files
       - Streaming data
       - Memory constraints
       - One-pass processing

    2. Combine with itertools:
       - Built-in optimizations
       - Tested implementations
       - Composable operations

    3. Build pipelines:
       - Small, focused generators
       - Easy to test
       - Reusable components
       - Clear data flow

    4. Handle resources:
       - Use context managers
       - Close generators properly
       - Clean up in finally blocks

    5. Optimize performance:
       - Profile first
       - Use generator expressions
       - Avoid premature conversion to lists
       - Process in chunks

    COMMON PATTERNS:

    1. File Processing:
       with open(file) as f:
           for line in f:
               yield process(line)

    2. Pipeline:
       stage3(stage2(stage1(source)))

    3. Filter-Map:
       (transform(x) for x in data if condition(x))

    4. Batching:
       for chunk in chunks(data, size):
           yield process_batch(chunk)

    5. Aggregation:
       for key, group in groupby(data, key_func):
           yield key, summarize(group)

    REMEMBER:
    - Generators excel at streaming data
    - Build pipelines for complex processing
    - Use lazy evaluation for efficiency
    - Integrate with itertools
    - Profile before optimizing
    - Keep generators focused and simple
    """)

    print("\n" + "=" * 70)
    print("END OF PRACTICAL APPLICATIONS")
    print("Now apply these patterns to the exercises!")
    print("=" * 70)