Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

ML Pipelines Concept
ML Pipelines Real World Example

ML Pipelines

The Problem with Notebook Code

This is what most ML code looks like in tutorials and Jupyter notebooks. Can you spot the problems? (Hint: there are at least four.)
# Typical notebook workflow (PROBLEMS!)

# 1. Load data
df = pd.read_csv('data.csv')

# 2. Scale features (fit on ALL data - LEAK!)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)  # Mean/std computed from test data too!

# 3. Train-test split (AFTER scaling - BAD!)
X_train, X_test = train_test_split(X_scaled)  # Too late -- damage already done

# 4. Feature selection (using ALL data - LEAK!)
selector = SelectKBest(k=10)
X_selected = selector.fit_transform(X_scaled, y)  # Test labels influence selection!

# 5. Model
model.fit(X_train, y_train)
# And when you deploy this... how do you replicate steps 2-4 on new data?
Problems:
  • Data leakage: Preprocessing uses test data statistics, inflating your evaluation scores
  • Not reproducible: If you change one step, you need to manually re-run everything downstream
  • Not deployable: In production, you need to apply the exact same transformations in the exact same order. Good luck remembering which scaler was fit on which data six months later.
  • Fragile: Easy to make a mistake when manually chaining steps — and these bugs are silent (no error, just wrong results)

The Pipeline Solution

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.ensemble import RandomForestClassifier

# All steps in one object
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('selector', SelectKBest(k=10)),
    ('classifier', RandomForestClassifier(n_estimators=100))
])

# Fit on training data only
pipeline.fit(X_train, y_train)

# Predict (automatically applies all transformations)
predictions = pipeline.predict(X_test)

# Score
accuracy = pipeline.score(X_test, y_test)
Benefits:
  • No data leakage — each step is fit only on training data during fit(), then applied to test data during predict()
  • Reproducible — the exact same sequence of transformations, every time, no manual steps
  • Deployable — save one object, load it in production, call predict() on raw data
  • Clean — one object replaces dozens of lines of manual transformation code

Building Pipelines Step by Step

Basic Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# Simple pipeline
simple_pipeline = Pipeline([
    ('scale', StandardScaler()),
    ('classify', LogisticRegression())
])

# Use like a regular model
simple_pipeline.fit(X_train, y_train)
print(f"Accuracy: {simple_pipeline.score(X_test, y_test):.4f}")

Using make_pipeline (Auto-naming)

from sklearn.pipeline import make_pipeline

# Automatically names steps based on class names
auto_pipeline = make_pipeline(
    StandardScaler(),
    SelectKBest(k=10),
    RandomForestClassifier()
)

print(auto_pipeline.named_steps)
# {'standardscaler': StandardScaler(), 
#  'selectkbest': SelectKBest(k=10), 
#  'randomforestclassifier': RandomForestClassifier()}

Column Transformer: Different Preprocessing for Different Features

Real data is messy. You have numeric columns (age, income) that need scaling, categorical columns (city, education) that need encoding, and maybe binary columns that need nothing at all. ColumnTransformer lets you define different preprocessing recipes for different columns and bundles them into a single step.
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# Sample mixed data
df = pd.DataFrame({
    'age': [25, 30, None, 45, 35],
    'income': [50000, 60000, 75000, None, 55000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', 'Master'],
    'target': [0, 1, 1, 0, 1]
})

# Define feature groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']

# Different preprocessing for each type
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]), numeric_features),
        
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ]
)

# Full pipeline
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression())
])

X = df.drop('target', axis=1)
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

full_pipeline.fit(X_train, y_train)
print(f"Accuracy: {full_pipeline.score(X_test, y_test):.4f}")

Complete Real-World Pipeline

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV

# Custom feature engineering function
def create_features(X):
    """Add custom features."""
    X = X.copy()
    if 'income' in X.columns and 'age' in X.columns:
        X['income_per_age'] = X['income'] / (X['age'] + 1)
    return X

# Define column types
numeric_features = ['age', 'income', 'tenure']
categorical_features = ['department', 'gender']
passthrough_features = ['is_manager']  # Already binary

# Build preprocessing
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
    ('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features),
        ('pass', 'passthrough', passthrough_features)
    ],
    remainder='drop'  # Drop columns not specified
)

# Complete pipeline
complete_pipeline = Pipeline([
    ('feature_eng', FunctionTransformer(create_features)),
    ('preprocessor', preprocessor),
    ('feature_selection', SelectFromModel(
        RandomForestClassifier(n_estimators=50, random_state=42),
        threshold='median'
    )),
    ('classifier', GradientBoostingClassifier(random_state=42))
])

Hyperparameter Tuning with Pipelines

One of the most powerful features of pipelines: you can tune hyperparameters across ALL steps (preprocessing and model) in a single grid search. Access nested parameters with step__param syntax (double underscore). This means you can search over “should I use mean or median imputation?” alongside “what learning rate works best?” — the grid search will find the best combination automatically.
from sklearn.model_selection import GridSearchCV

# Define parameter grid
param_grid = {
    # Preprocessor parameters
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    
    # Feature selection parameters  
    'feature_selection__threshold': ['median', 'mean'],
    
    # Classifier parameters
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [3, 5, 7],
    'classifier__learning_rate': [0.01, 0.1]
}

# Grid search
grid_search = GridSearchCV(
    complete_pipeline,
    param_grid,
    cv=5,
    scoring='roc_auc',
    n_jobs=-1,
    verbose=1
)

# Fit (this searches all combinations)
grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")

# Best pipeline is automatically selected
best_pipeline = grid_search.best_estimator_

Cross-Validation with Pipelines

Pipelines ensure proper CV without data leakage:
from sklearn.model_selection import cross_val_score, StratifiedKFold

# Create pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Cross-validation (preprocessing happens INSIDE each fold)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='roc_auc')

print(f"CV Scores: {scores}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std():.4f})")
Why this is correct:
  • Each fold: Fit preprocessor on train fold → transform test fold
  • No information from test fold leaks into preprocessing

Custom Transformers

When sklearn’s built-in transformers are not enough, you can create your own. The key is inheriting from BaseEstimator and TransformerMixin, then implementing fit() (learn from training data) and transform() (apply to any data). This pattern ensures your custom logic works seamlessly inside pipelines, cross-validation, and grid search — no special cases needed.
from sklearn.base import BaseEstimator, TransformerMixin

class OutlierRemover(BaseEstimator, TransformerMixin):
    """
    Remove outliers using IQR method.
    
    Clips values to [Q1 - factor*IQR, Q3 + factor*IQR].
    Bounds are learned from training data only (fit), then applied
    to any data (transform) -- no leakage even in cross-validation.
    """
    
    def __init__(self, factor=1.5):
        self.factor = factor
    
    def fit(self, X, y=None):
        """Calculate bounds from training data only."""
        X = np.array(X)
        self.lower_ = np.percentile(X, 25, axis=0) - self.factor * self._iqr(X)
        self.upper_ = np.percentile(X, 75, axis=0) + self.factor * self._iqr(X)
        return self  # Must return self for pipeline chaining
    
    def transform(self, X):
        """Clip values to learned bounds (does not recompute from X)."""
        X = np.array(X)
        return np.clip(X, self.lower_, self.upper_)
    
    def _iqr(self, X):
        return np.percentile(X, 75, axis=0) - np.percentile(X, 25, axis=0)


class FeatureAdder(BaseEstimator, TransformerMixin):
    """Add polynomial features."""
    
    def __init__(self, add_squares=True, add_interactions=False):
        self.add_squares = add_squares
        self.add_interactions = add_interactions
    
    def fit(self, X, y=None):
        return self  # Nothing to fit
    
    def transform(self, X):
        X = np.array(X)
        features = [X]
        
        if self.add_squares:
            features.append(X ** 2)
        
        if self.add_interactions and X.shape[1] >= 2:
            for i in range(X.shape[1]):
                for j in range(i + 1, X.shape[1]):
                    features.append((X[:, i] * X[:, j]).reshape(-1, 1))
        
        return np.hstack(features)


# Use in pipeline
custom_pipeline = Pipeline([
    ('outliers', OutlierRemover(factor=2.0)),
    ('features', FeatureAdder(add_squares=True)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

Saving and Loading Pipelines

import joblib

# Save entire pipeline
joblib.dump(pipeline, 'model_pipeline.pkl')

# Load pipeline (includes all preprocessing!)
loaded_pipeline = joblib.load('model_pipeline.pkl')

# Use directly - no need to remember preprocessing steps
new_data = pd.DataFrame({
    'age': [35],
    'income': [75000],
    'city': ['NYC'],
    'education': ['Master']
})

prediction = loaded_pipeline.predict(new_data)
probability = loaded_pipeline.predict_proba(new_data)

print(f"Prediction: {prediction[0]}")
print(f"Probability: {probability[0]}")

Pipeline Visualization

from sklearn import set_config
from sklearn.utils import estimator_html_repr

# Enable diagram display
set_config(display='diagram')

# Display pipeline structure
pipeline
Or get the HTML representation:
# Save as HTML
html = estimator_html_repr(pipeline)
with open('pipeline_diagram.html', 'w') as f:
    f.write(html)

Debugging Pipelines

Inspect Intermediate Results

# Access individual steps
preprocessor = pipeline.named_steps['preprocessor']
classifier = pipeline.named_steps['classifier']

# Transform data through specific steps
X_preprocessed = pipeline[:-1].transform(X_test)  # All but last step
print(f"Shape after preprocessing: {X_preprocessed.shape}")

# Check feature names after one-hot encoding
if hasattr(preprocessor, 'get_feature_names_out'):
    feature_names = preprocessor.get_feature_names_out()
    print(f"Feature names: {feature_names[:10]}...")

Memory and Caching

from sklearn.pipeline import Pipeline
from joblib import Memory

# Cache expensive transformations
memory = Memory(location='./cache', verbose=0)

cached_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier())
], memory=memory)

# First run: computes and caches preprocessor output
# Subsequent runs: loads from cache (fast!)

Production Pipeline Template

"""Production ML Pipeline Template"""

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
import joblib
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_production_pipeline(numeric_features, categorical_features):
    """Create a production-ready pipeline."""
    
    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    preprocessor = ColumnTransformer([
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])
    
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', GradientBoostingClassifier(
            n_estimators=100,
            max_depth=5,
            random_state=42
        ))
    ])
    
    return pipeline

def train_and_save(X_train, y_train, numeric_features, categorical_features, path):
    """Train pipeline and save to disk."""
    logger.info("Creating pipeline...")
    pipeline = create_production_pipeline(numeric_features, categorical_features)
    
    logger.info("Training pipeline...")
    pipeline.fit(X_train, y_train)
    
    logger.info(f"Saving to {path}...")
    joblib.dump({
        'pipeline': pipeline,
        'numeric_features': numeric_features,
        'categorical_features': categorical_features
    }, path)
    
    logger.info("Done!")
    return pipeline

def load_and_predict(path, X_new):
    """Load pipeline and make predictions."""
    logger.info(f"Loading from {path}...")
    artifacts = joblib.load(path)
    pipeline = artifacts['pipeline']
    
    logger.info("Making predictions...")
    predictions = pipeline.predict(X_new)
    probabilities = pipeline.predict_proba(X_new)
    
    return predictions, probabilities

Key Takeaways

Pipelines Prevent Leakage

Preprocessing is fit only on training data

One Object Does All

Fit, transform, predict in one call

Easy Deployment

Save/load entire workflow

Clean Hyperparameter Tuning

Search across all pipeline parameters

What’s Next?

Let’s wrap up with a checklist of common ML mistakes to avoid!

Continue to ML Mistakes Checklist

Avoid the pitfalls that trip up even experienced practitioners

Interview Deep-Dive

The key design principle is that every data transformation must live inside the pipeline so that cross-validation and deployment are both correct by construction.
  • ColumnTransformer for mixed data types. Define separate preprocessing paths for numeric features (imputation then scaling), categorical features (imputation then one-hot encoding), and any passthrough features (binary flags that need no transformation). ColumnTransformer routes each feature to its appropriate path and concatenates the results.
  • Custom transformers for domain-specific feature engineering. Inherit from BaseEstimator and TransformerMixin to create custom steps. The critical design rule: the fit() method learns parameters from training data, and transform() applies those parameters to any data. For example, a “TargetEncoder” custom transformer would compute mean target values per category in fit() and apply those mappings in transform(). This prevents leakage because the target means are computed only on training folds.
  • Pipeline composes everything sequentially. The full pipeline chains: FunctionTransformer (for stateless feature engineering like ratios), then ColumnTransformer (for type-specific preprocessing), then a feature selection step (SelectFromModel), then the estimator. Each step’s fit() is called only on training data during cross-validation.
  • Hyperparameter tuning across the entire pipeline. GridSearchCV or RandomizedSearchCV with the double-underscore notation (preprocessor__num__imputer__strategy, classifier__max_depth) searches across preprocessing and model parameters simultaneously. This is powerful because the optimal imputation strategy might depend on the model — a tree model might prefer median imputation while a linear model might prefer mean.
  • Caching for expensive transformations. If the preprocessor is expensive (e.g., large text vectorization), use Pipeline’s memory parameter with joblib caching. During grid search, the preprocessor output is computed once and reused across all classifier parameter combinations.
Follow-up: How do you handle a custom feature that depends on external data at prediction time, like “current weather”?This breaks the standard pipeline pattern because the feature is not derivable from the input row alone. I would handle it with a custom transformer that accepts a “context provider” at initialization — an object that fetches current weather data from an API or cache. In training, the context provider returns historical weather. In serving, it returns live data. The transformer’s transform() method merges the external data with the input features. The key is that the context provider interface is the same in both environments, preventing training-serving skew. In production, I would wrap the weather API behind a caching layer with a fallback to historical averages if the API is down — you never want a model to fail because an external service is unavailable.
Pipeline leakage audits are methodical. I follow a specific checklist that catches the most common leakage patterns.
  • Audit 1: Check the order of operations. Is there any preprocessing done BEFORE the data enters the pipeline? If the scaler was fit on the full dataset and then the scaled data was passed to the pipeline, the pipeline is clean but the data is already leaky. Grep the notebook for any fit_transform() calls that happen before the train-test split.
  • Audit 2: Check for fit_transform() on full data inside custom transformers. A custom transformer might internally call fit on the input data in its transform() method rather than using parameters learned during fit(). For example, a ZScoreNormalizer that computes mean and std in transform() instead of using self.mean_ and self.std_ from fit(). This is leakage because test data statistics influence the transformation.
  • Audit 3: Run the permutation test. Replace the target variable with random noise (y_random = np.random.randint(0, 2, len(y))). Run the full pipeline including cross-validation. If accuracy is significantly above 50% with random labels, the pipeline is leaking information. A leak-free pipeline should score at chance on random labels.
  • Audit 4: Compare CV score to fresh holdout score. If the pipeline reports 96% CV accuracy but a completely fresh holdout (data set aside before any analysis) scores 82%, there is leakage somewhere in the pipeline or the CV procedure. A gap of more than 3-5% warrants investigation.
  • Audit 5: Check for target encoding leakage. If any feature is derived from the target variable (e.g., mean target per category), verify that this encoding is done inside the pipeline (computed fresh in each CV fold) and not precomputed on the full dataset.
  • Audit 6: Feature importance sanity check. If a feature that should not logically be predictive (like row_id, timestamp, or file_name) shows up as the most important feature, that is almost certainly leakage. The feature is acting as a key to look up the answer rather than learning a pattern.
Follow-up: The audit reveals the pipeline is clean but 96% still seems too good. What else could explain it?If the pipeline is genuinely leak-free, consider these possibilities. The problem might just be easy — some classification tasks legitimately have very high accuracy (e.g., MNIST digit recognition). Check published benchmarks for this dataset. Alternatively, there might be near-duplicate samples in the data (e.g., augmented copies or repeated measurements of the same entity). Train-test overlap of this kind inflates accuracy without being classic leakage. Run a deduplication check on feature vectors. Finally, the features themselves might contain implicit leakage from the data collection process — like the portable X-ray example from medical imaging where metadata in the image correlates with the diagnosis. Audit the feature definitions with a domain expert who understands the data generation process.
This is one of the most common real-world tasks for ML engineers, and it requires disciplined engineering rather than ML cleverness.
  • Step 1: Reproduce the notebook results exactly. Before changing anything, run the notebook end-to-end and record every metric: accuracy, precision, recall, feature importance rankings, prediction distribution. These become your regression tests. If you cannot reproduce the notebook results, stop — there is a hidden dependency (random seed, data version, library version) that must be identified first.
  • Step 2: Extract preprocessing into pipeline steps. Go through the notebook cell by cell. Every data transformation (scaling, imputation, encoding, feature engineering) becomes either a built-in sklearn transformer or a custom transformer class. The critical rule: the order and logic of transformations must be identical to the notebook. Do not “improve” anything yet — the goal is exact behavioral equivalence.
  • Step 3: Verify equivalence numerically. After building the pipeline, pass the same training data through both the notebook code and the pipeline. Compare intermediate outputs (feature matrices after preprocessing) using np.allclose(). Any numerical difference, no matter how small, must be investigated. Floating-point order-of-operations differences can compound.
  • Step 4: Run the same cross-validation. The pipeline’s CV score should match the notebook’s CV score within floating-point tolerance. If they differ by more than 0.1%, something in the pipeline differs from the notebook logic.
  • Step 5: Only then start improving. Now that you have a clean, tested pipeline that exactly reproduces the notebook, you can safely refactor. Add proper train-test splits, fix any leakage the notebook had, add monitoring hooks. Each change is validated against the regression tests.
  • Step 6: Version everything. Pin the library versions (requirements.txt or conda environment), version the pipeline code (git), and version the trained model artifact. If anything breaks in production, you can trace exactly which change caused it.
Follow-up: The notebook uses a custom function that the data scientist wrote with a bug in the feature engineering. Do you fix the bug?Not immediately. If the model was trained with the bug, the bug is now part of the model’s behavior. Fixing the bug changes the feature values, which changes the model’s predictions, potentially for the worse. I would first migrate the pipeline with the bug intact (maintaining behavioral equivalence), deploy it, and verify it works. Then I would fix the bug, retrain the model on corrected features, evaluate the new model, and deploy it as a new version through the standard model update process with A/B testing. The worst mistake is fixing the bug in the serving pipeline without retraining — the model was trained on buggy features, so serving it correct features gives it data it has never seen before.