Skip to main content
ML Pipelines Concept
ML Pipelines Real World Example

ML Pipelines

The Problem with Notebook Code

# Typical notebook workflow (PROBLEMS!)

# 1. Load data
df = pd.read_csv('data.csv')

# 2. Scale features (fit on ALL data - LEAK!)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# 3. Train-test split (AFTER scaling - BAD!)
X_train, X_test = train_test_split(X_scaled)

# 4. Feature selection (using ALL data - LEAK!)
selector = SelectKBest(k=10)
X_selected = selector.fit_transform(X_scaled, y)

# 5. Model
model.fit(X_train, y_train)
Problems:
  • Data leakage from preprocessing on all data
  • Can’t reproduce the same transformations
  • Hard to deploy - need to remember all steps
  • Easy to make mistakes

The Pipeline Solution

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.ensemble import RandomForestClassifier

# All steps in one object
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('selector', SelectKBest(k=10)),
    ('classifier', RandomForestClassifier(n_estimators=100))
])

# Fit on training data only
pipeline.fit(X_train, y_train)

# Predict (automatically applies all transformations)
predictions = pipeline.predict(X_test)

# Score
accuracy = pipeline.score(X_test, y_test)
Benefits:
  • ✅ No data leakage - each step fit only on training data
  • ✅ Reproducible - same transformations every time
  • ✅ Deployable - save and load entire pipeline
  • ✅ Clean - one object does everything

Building Pipelines Step by Step

Basic Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

# Simple pipeline
simple_pipeline = Pipeline([
    ('scale', StandardScaler()),
    ('classify', LogisticRegression())
])

# Use like a regular model
simple_pipeline.fit(X_train, y_train)
print(f"Accuracy: {simple_pipeline.score(X_test, y_test):.4f}")

Using make_pipeline (Auto-naming)

from sklearn.pipeline import make_pipeline

# Automatically names steps based on class names
auto_pipeline = make_pipeline(
    StandardScaler(),
    SelectKBest(k=10),
    RandomForestClassifier()
)

print(auto_pipeline.named_steps)
# {'standardscaler': StandardScaler(), 
#  'selectkbest': SelectKBest(k=10), 
#  'randomforestclassifier': RandomForestClassifier()}

Column Transformer: Different Preprocessing for Different Features

Real data has mixed types:
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

# Sample mixed data
df = pd.DataFrame({
    'age': [25, 30, None, 45, 35],
    'income': [50000, 60000, 75000, None, 55000],
    'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', 'Master'],
    'target': [0, 1, 1, 0, 1]
})

# Define feature groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']

# Different preprocessing for each type
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]), numeric_features),
        
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ]
)

# Full pipeline
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', LogisticRegression())
])

X = df.drop('target', axis=1)
y = df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

full_pipeline.fit(X_train, y_train)
print(f"Accuracy: {full_pipeline.score(X_test, y_test):.4f}")

Complete Real-World Pipeline

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV

# Custom feature engineering function
def create_features(X):
    """Add custom features."""
    X = X.copy()
    if 'income' in X.columns and 'age' in X.columns:
        X['income_per_age'] = X['income'] / (X['age'] + 1)
    return X

# Define column types
numeric_features = ['age', 'income', 'tenure']
categorical_features = ['department', 'gender']
passthrough_features = ['is_manager']  # Already binary

# Build preprocessing
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
    ('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features),
        ('pass', 'passthrough', passthrough_features)
    ],
    remainder='drop'  # Drop columns not specified
)

# Complete pipeline
complete_pipeline = Pipeline([
    ('feature_eng', FunctionTransformer(create_features)),
    ('preprocessor', preprocessor),
    ('feature_selection', SelectFromModel(
        RandomForestClassifier(n_estimators=50, random_state=42),
        threshold='median'
    )),
    ('classifier', GradientBoostingClassifier(random_state=42))
])

Hyperparameter Tuning with Pipelines

Access nested parameters with step__param syntax:
from sklearn.model_selection import GridSearchCV

# Define parameter grid
param_grid = {
    # Preprocessor parameters
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    
    # Feature selection parameters  
    'feature_selection__threshold': ['median', 'mean'],
    
    # Classifier parameters
    'classifier__n_estimators': [50, 100, 200],
    'classifier__max_depth': [3, 5, 7],
    'classifier__learning_rate': [0.01, 0.1]
}

# Grid search
grid_search = GridSearchCV(
    complete_pipeline,
    param_grid,
    cv=5,
    scoring='roc_auc',
    n_jobs=-1,
    verbose=1
)

# Fit (this searches all combinations)
grid_search.fit(X_train, y_train)

print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")

# Best pipeline is automatically selected
best_pipeline = grid_search.best_estimator_

Cross-Validation with Pipelines

Pipelines ensure proper CV without data leakage:
from sklearn.model_selection import cross_val_score, StratifiedKFold

# Create pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Cross-validation (preprocessing happens INSIDE each fold)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='roc_auc')

print(f"CV Scores: {scores}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std():.4f})")
Why this is correct:
  • Each fold: Fit preprocessor on train fold → transform test fold
  • No information from test fold leaks into preprocessing

Custom Transformers

Create your own pipeline-compatible transformers:
from sklearn.base import BaseEstimator, TransformerMixin

class OutlierRemover(BaseEstimator, TransformerMixin):
    """Remove outliers using IQR method."""
    
    def __init__(self, factor=1.5):
        self.factor = factor
    
    def fit(self, X, y=None):
        """Calculate bounds from training data."""
        X = np.array(X)
        self.lower_ = np.percentile(X, 25, axis=0) - self.factor * self._iqr(X)
        self.upper_ = np.percentile(X, 75, axis=0) + self.factor * self._iqr(X)
        return self
    
    def transform(self, X):
        """Clip values to bounds."""
        X = np.array(X)
        return np.clip(X, self.lower_, self.upper_)
    
    def _iqr(self, X):
        return np.percentile(X, 75, axis=0) - np.percentile(X, 25, axis=0)


class FeatureAdder(BaseEstimator, TransformerMixin):
    """Add polynomial features."""
    
    def __init__(self, add_squares=True, add_interactions=False):
        self.add_squares = add_squares
        self.add_interactions = add_interactions
    
    def fit(self, X, y=None):
        return self  # Nothing to fit
    
    def transform(self, X):
        X = np.array(X)
        features = [X]
        
        if self.add_squares:
            features.append(X ** 2)
        
        if self.add_interactions and X.shape[1] >= 2:
            for i in range(X.shape[1]):
                for j in range(i + 1, X.shape[1]):
                    features.append((X[:, i] * X[:, j]).reshape(-1, 1))
        
        return np.hstack(features)


# Use in pipeline
custom_pipeline = Pipeline([
    ('outliers', OutlierRemover(factor=2.0)),
    ('features', FeatureAdder(add_squares=True)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())
])

Saving and Loading Pipelines

import joblib

# Save entire pipeline
joblib.dump(pipeline, 'model_pipeline.pkl')

# Load pipeline (includes all preprocessing!)
loaded_pipeline = joblib.load('model_pipeline.pkl')

# Use directly - no need to remember preprocessing steps
new_data = pd.DataFrame({
    'age': [35],
    'income': [75000],
    'city': ['NYC'],
    'education': ['Master']
})

prediction = loaded_pipeline.predict(new_data)
probability = loaded_pipeline.predict_proba(new_data)

print(f"Prediction: {prediction[0]}")
print(f"Probability: {probability[0]}")

Pipeline Visualization

from sklearn import set_config
from sklearn.utils import estimator_html_repr

# Enable diagram display
set_config(display='diagram')

# Display pipeline structure
pipeline
Or get the HTML representation:
# Save as HTML
html = estimator_html_repr(pipeline)
with open('pipeline_diagram.html', 'w') as f:
    f.write(html)

Debugging Pipelines

Inspect Intermediate Results

# Access individual steps
preprocessor = pipeline.named_steps['preprocessor']
classifier = pipeline.named_steps['classifier']

# Transform data through specific steps
X_preprocessed = pipeline[:-1].transform(X_test)  # All but last step
print(f"Shape after preprocessing: {X_preprocessed.shape}")

# Check feature names after one-hot encoding
if hasattr(preprocessor, 'get_feature_names_out'):
    feature_names = preprocessor.get_feature_names_out()
    print(f"Feature names: {feature_names[:10]}...")

Memory and Caching

from sklearn.pipeline import Pipeline
from joblib import Memory

# Cache expensive transformations
memory = Memory(location='./cache', verbose=0)

cached_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier())
], memory=memory)

# First run: computes and caches preprocessor output
# Subsequent runs: loads from cache (fast!)

Production Pipeline Template

"""Production ML Pipeline Template"""

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
import joblib
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def create_production_pipeline(numeric_features, categorical_features):
    """Create a production-ready pipeline."""
    
    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    preprocessor = ColumnTransformer([
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])
    
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', GradientBoostingClassifier(
            n_estimators=100,
            max_depth=5,
            random_state=42
        ))
    ])
    
    return pipeline

def train_and_save(X_train, y_train, numeric_features, categorical_features, path):
    """Train pipeline and save to disk."""
    logger.info("Creating pipeline...")
    pipeline = create_production_pipeline(numeric_features, categorical_features)
    
    logger.info("Training pipeline...")
    pipeline.fit(X_train, y_train)
    
    logger.info(f"Saving to {path}...")
    joblib.dump({
        'pipeline': pipeline,
        'numeric_features': numeric_features,
        'categorical_features': categorical_features
    }, path)
    
    logger.info("Done!")
    return pipeline

def load_and_predict(path, X_new):
    """Load pipeline and make predictions."""
    logger.info(f"Loading from {path}...")
    artifacts = joblib.load(path)
    pipeline = artifacts['pipeline']
    
    logger.info("Making predictions...")
    predictions = pipeline.predict(X_new)
    probabilities = pipeline.predict_proba(X_new)
    
    return predictions, probabilities

Key Takeaways

Pipelines Prevent Leakage

Preprocessing is fit only on training data

One Object Does All

Fit, transform, predict in one call

Easy Deployment

Save/load entire workflow

Clean Hyperparameter Tuning

Search across all pipeline parameters

What’s Next?

Let’s wrap up with a checklist of common ML mistakes to avoid!

Continue to ML Mistakes Checklist

Avoid the pitfalls that trip up even experienced practitioners