Skip to content

Method Chaining

Fluent Interface

1. Concept

Method chaining returns self or new object:

import pandas as pd

df = (pd.read_csv('data.csv')
      .dropna()
      .query('age > 25')
      .sort_values('name')
      .reset_index(drop=True))

2. Benefits

  • Readable pipeline
  • No intermediate variables
  • Functional style

3. Design Pattern

Methods return DataFrame/Series:

class DataFrame:
    def dropna(self):
        # ... operation
        return new_dataframe

Common Chains

1. Cleaning Pipeline

df_clean = (df
    .drop_duplicates()
    .dropna(subset=['key_column'])
    .replace({'old': 'new'})
    .reset_index(drop=True))

2. Transformation

result = (df
    .assign(total=lambda x: x['a'] + x['b'])
    .pipe(lambda x: x[x['total'] > 10])
    .groupby('category')['total']
    .mean())

3. Aggregation

summary = (df
    .groupby(['year', 'month'])
    .agg({'sales': 'sum', 'profit': 'mean'})
    .round(2))

Pipe Method

The pipe() method enables clean functional programming with DataFrames by allowing any function to be called in a method chain.

1. Basic Syntax

# Without pipe
result = custom_function(df)

# With pipe (chainable)
result = df.pipe(custom_function)

2. Custom Functions with pipe

def remove_outliers(df, column, n_std=2):
    """Remove rows where column value is beyond n standard deviations."""
    mean = df[column].mean()
    std = df[column].std()
    return df[abs(df[column] - mean) < n_std * std]

def add_calculated_columns(df):
    """Add derived columns."""
    return df.assign(
        total=df['quantity'] * df['price'],
        tax=df['quantity'] * df['price'] * 0.1
    )

def format_currency(df, columns):
    """Format columns as currency strings."""
    df = df.copy()
    for col in columns:
        df[col] = df[col].apply(lambda x: f"${x:,.2f}")
    return df

# Use in a pipeline
result = (df
    .pipe(remove_outliers, 'price', n_std=3)
    .pipe(add_calculated_columns)
    .pipe(format_currency, ['total', 'tax']))

3. Passing Arguments to pipe

# Function with multiple arguments
def filter_by_date_range(df, start, end, date_col='date'):
    mask = (df[date_col] >= start) & (df[date_col] <= end)
    return df[mask]

# Pass keyword arguments
result = df.pipe(filter_by_date_range, '2024-01-01', '2024-12-31')

# With explicit date column
result = df.pipe(filter_by_date_range, '2024-01-01', '2024-12-31', date_col='order_date')

4. Lambda Functions

result = (df
    .pipe(lambda x: x[x['age'] > 18])
    .pipe(lambda x: x.assign(adult=True))
    .pipe(lambda x: x.sort_values('name')))

5. Alternative Syntax with Tuple

When your DataFrame is not the first argument:

def merge_with_lookup(lookup_df, main_df, key):
    return main_df.merge(lookup_df, on=key)

# Using tuple: (function, arg_name_for_df)
result = df.pipe((merge_with_lookup, 'main_df'), lookup_table, key='id')

6. Debugging with pipe

def debug_step(df, message=''):
    """Print debug info without modifying DataFrame."""
    print(f"{message}")
    print(f"  Shape: {df.shape}")
    print(f"  Columns: {list(df.columns)}")
    print(f"  Memory: {df.memory_usage(deep=True).sum() / 1e6:.2f} MB")
    return df

result = (df
    .pipe(debug_step, 'After loading')
    .query('value > 0')
    .pipe(debug_step, 'After filtering')
    .groupby('category').sum()
    .pipe(debug_step, 'After aggregation'))

7. Method Injection

df.pipe(print)  # Debug intermediate state

# Log to file
def log_to_file(df, filename):
    with open(filename, 'a') as f:
        f.write(f"Shape: {df.shape}, Columns: {list(df.columns)}\n")
    return df

result = df.pipe(log_to_file, 'pipeline.log').query('x > 0')

8. Reusable Pipeline Functions

def standard_cleaning_pipeline(df):
    """Standard data cleaning operations."""
    return (df
        .drop_duplicates()
        .dropna(subset=['id'])
        .assign(
            created_at=lambda x: pd.to_datetime(x['created_at']),
            updated_at=lambda x: pd.to_datetime(x['updated_at'])
        )
        .sort_values('created_at')
        .reset_index(drop=True))

# Apply to any DataFrame
clean_df = raw_df.pipe(standard_cleaning_pipeline)

9. Conditional Operations with pipe

def maybe_filter(df, condition, column, threshold):
    """Conditionally apply filter."""
    if condition:
        return df[df[column] > threshold]
    return df

# Apply filter only if flag is True
result = df.pipe(maybe_filter, apply_filter, 'value', 100)

10. Financial Example

def calculate_returns(df, price_col='close'):
    """Add return columns."""
    return df.assign(
        daily_return=df[price_col].pct_change(),
        cumulative_return=(1 + df[price_col].pct_change()).cumprod() - 1
    )

def add_moving_averages(df, windows=[20, 50], price_col='close'):
    """Add moving average columns."""
    for w in windows:
        df = df.assign(**{f'ma_{w}': df[price_col].rolling(w).mean()})
    return df

def flag_signals(df):
    """Add trading signals."""
    return df.assign(
        golden_cross=(df['ma_20'] > df['ma_50']) & (df['ma_20'].shift(1) <= df['ma_50'].shift(1)),
        death_cross=(df['ma_20'] < df['ma_50']) & (df['ma_20'].shift(1) >= df['ma_50'].shift(1))
    )

# Complete analysis pipeline
analysis = (stock_df
    .pipe(calculate_returns)
    .pipe(add_moving_averages, [20, 50, 200])
    .pipe(flag_signals)
    .dropna())

Why Use pipe?

Without pipe With pipe
Nested function calls Flat, readable chain
f3(f2(f1(df))) df.pipe(f1).pipe(f2).pipe(f3)
Hard to debug Easy to insert debug steps
Difficult to reorder Simple to rearrange

Best Practices

  1. Keep functions pure: Return new DataFrames, don't modify in place
  2. Single responsibility: Each pipe function does one thing
  3. Document functions: Add docstrings for complex operations
  4. Test independently: Functions can be unit tested separately
  5. Use for clarity: Don't pipe trivial operations
# Good: Complex, reusable operation
df.pipe(standardize_column_names)

# Not needed: Simple operation
df.pipe(lambda x: x.head())  # Just use df.head()