ML Pipelines
The Problem with Notebook Code
Copy
# Typical notebook workflow (PROBLEMS!)
# 1. Load data
df = pd.read_csv('data.csv')
# 2. Scale features (fit on ALL data - LEAK!)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 3. Train-test split (AFTER scaling - BAD!)
X_train, X_test = train_test_split(X_scaled)
# 4. Feature selection (using ALL data - LEAK!)
selector = SelectKBest(k=10)
X_selected = selector.fit_transform(X_scaled, y)
# 5. Model
model.fit(X_train, y_train)
- Data leakage from preprocessing on all data
- Can’t reproduce the same transformations
- Hard to deploy - need to remember all steps
- Easy to make mistakes
The Pipeline Solution
Copy
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest
from sklearn.ensemble import RandomForestClassifier
# All steps in one object
pipeline = Pipeline([
('scaler', StandardScaler()),
('selector', SelectKBest(k=10)),
('classifier', RandomForestClassifier(n_estimators=100))
])
# Fit on training data only
pipeline.fit(X_train, y_train)
# Predict (automatically applies all transformations)
predictions = pipeline.predict(X_test)
# Score
accuracy = pipeline.score(X_test, y_test)
- ✅ No data leakage - each step fit only on training data
- ✅ Reproducible - same transformations every time
- ✅ Deployable - save and load entire pipeline
- ✅ Clean - one object does everything
Building Pipelines Step by Step
Basic Pipeline
Copy
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
# Simple pipeline
simple_pipeline = Pipeline([
('scale', StandardScaler()),
('classify', LogisticRegression())
])
# Use like a regular model
simple_pipeline.fit(X_train, y_train)
print(f"Accuracy: {simple_pipeline.score(X_test, y_test):.4f}")
Using make_pipeline (Auto-naming)
Copy
from sklearn.pipeline import make_pipeline
# Automatically names steps based on class names
auto_pipeline = make_pipeline(
StandardScaler(),
SelectKBest(k=10),
RandomForestClassifier()
)
print(auto_pipeline.named_steps)
# {'standardscaler': StandardScaler(),
# 'selectkbest': SelectKBest(k=10),
# 'randomforestclassifier': RandomForestClassifier()}
Column Transformer: Different Preprocessing for Different Features
Real data has mixed types:Copy
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
# Sample mixed data
df = pd.DataFrame({
'age': [25, 30, None, 45, 35],
'income': [50000, 60000, 75000, None, 55000],
'city': ['NYC', 'LA', 'NYC', 'Chicago', 'LA'],
'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', 'Master'],
'target': [0, 1, 1, 0, 1]
})
# Define feature groups
numeric_features = ['age', 'income']
categorical_features = ['city', 'education']
# Different preprocessing for each type
preprocessor = ColumnTransformer(
transformers=[
('num', Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
]), numeric_features),
('cat', Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
('encoder', OneHotEncoder(handle_unknown='ignore'))
]), categorical_features)
]
)
# Full pipeline
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', LogisticRegression())
])
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
full_pipeline.fit(X_train, y_train)
print(f"Accuracy: {full_pipeline.score(X_test, y_test):.4f}")
Complete Real-World Pipeline
Copy
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import SelectFromModel
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import cross_val_score, GridSearchCV
# Custom feature engineering function
def create_features(X):
"""Add custom features."""
X = X.copy()
if 'income' in X.columns and 'age' in X.columns:
X['income_per_age'] = X['income'] / (X['age'] + 1)
return X
# Define column types
numeric_features = ['age', 'income', 'tenure']
categorical_features = ['department', 'gender']
passthrough_features = ['is_manager'] # Already binary
# Build preprocessing
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='Unknown')),
('encoder', OneHotEncoder(drop='first', handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features),
('pass', 'passthrough', passthrough_features)
],
remainder='drop' # Drop columns not specified
)
# Complete pipeline
complete_pipeline = Pipeline([
('feature_eng', FunctionTransformer(create_features)),
('preprocessor', preprocessor),
('feature_selection', SelectFromModel(
RandomForestClassifier(n_estimators=50, random_state=42),
threshold='median'
)),
('classifier', GradientBoostingClassifier(random_state=42))
])
Hyperparameter Tuning with Pipelines
Access nested parameters withstep__param syntax:
Copy
from sklearn.model_selection import GridSearchCV
# Define parameter grid
param_grid = {
# Preprocessor parameters
'preprocessor__num__imputer__strategy': ['mean', 'median'],
# Feature selection parameters
'feature_selection__threshold': ['median', 'mean'],
# Classifier parameters
'classifier__n_estimators': [50, 100, 200],
'classifier__max_depth': [3, 5, 7],
'classifier__learning_rate': [0.01, 0.1]
}
# Grid search
grid_search = GridSearchCV(
complete_pipeline,
param_grid,
cv=5,
scoring='roc_auc',
n_jobs=-1,
verbose=1
)
# Fit (this searches all combinations)
grid_search.fit(X_train, y_train)
print(f"Best parameters: {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
# Best pipeline is automatically selected
best_pipeline = grid_search.best_estimator_
Cross-Validation with Pipelines
Pipelines ensure proper CV without data leakage:Copy
from sklearn.model_selection import cross_val_score, StratifiedKFold
# Create pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# Cross-validation (preprocessing happens INSIDE each fold)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = cross_val_score(pipeline, X, y, cv=cv, scoring='roc_auc')
print(f"CV Scores: {scores}")
print(f"Mean: {scores.mean():.4f} (+/- {scores.std():.4f})")
- Each fold: Fit preprocessor on train fold → transform test fold
- No information from test fold leaks into preprocessing
Custom Transformers
Create your own pipeline-compatible transformers:Copy
from sklearn.base import BaseEstimator, TransformerMixin
class OutlierRemover(BaseEstimator, TransformerMixin):
"""Remove outliers using IQR method."""
def __init__(self, factor=1.5):
self.factor = factor
def fit(self, X, y=None):
"""Calculate bounds from training data."""
X = np.array(X)
self.lower_ = np.percentile(X, 25, axis=0) - self.factor * self._iqr(X)
self.upper_ = np.percentile(X, 75, axis=0) + self.factor * self._iqr(X)
return self
def transform(self, X):
"""Clip values to bounds."""
X = np.array(X)
return np.clip(X, self.lower_, self.upper_)
def _iqr(self, X):
return np.percentile(X, 75, axis=0) - np.percentile(X, 25, axis=0)
class FeatureAdder(BaseEstimator, TransformerMixin):
"""Add polynomial features."""
def __init__(self, add_squares=True, add_interactions=False):
self.add_squares = add_squares
self.add_interactions = add_interactions
def fit(self, X, y=None):
return self # Nothing to fit
def transform(self, X):
X = np.array(X)
features = [X]
if self.add_squares:
features.append(X ** 2)
if self.add_interactions and X.shape[1] >= 2:
for i in range(X.shape[1]):
for j in range(i + 1, X.shape[1]):
features.append((X[:, i] * X[:, j]).reshape(-1, 1))
return np.hstack(features)
# Use in pipeline
custom_pipeline = Pipeline([
('outliers', OutlierRemover(factor=2.0)),
('features', FeatureAdder(add_squares=True)),
('scaler', StandardScaler()),
('classifier', LogisticRegression())
])
Saving and Loading Pipelines
Copy
import joblib
# Save entire pipeline
joblib.dump(pipeline, 'model_pipeline.pkl')
# Load pipeline (includes all preprocessing!)
loaded_pipeline = joblib.load('model_pipeline.pkl')
# Use directly - no need to remember preprocessing steps
new_data = pd.DataFrame({
'age': [35],
'income': [75000],
'city': ['NYC'],
'education': ['Master']
})
prediction = loaded_pipeline.predict(new_data)
probability = loaded_pipeline.predict_proba(new_data)
print(f"Prediction: {prediction[0]}")
print(f"Probability: {probability[0]}")
Pipeline Visualization
Copy
from sklearn import set_config
from sklearn.utils import estimator_html_repr
# Enable diagram display
set_config(display='diagram')
# Display pipeline structure
pipeline
Copy
# Save as HTML
html = estimator_html_repr(pipeline)
with open('pipeline_diagram.html', 'w') as f:
f.write(html)
Debugging Pipelines
Inspect Intermediate Results
Copy
# Access individual steps
preprocessor = pipeline.named_steps['preprocessor']
classifier = pipeline.named_steps['classifier']
# Transform data through specific steps
X_preprocessed = pipeline[:-1].transform(X_test) # All but last step
print(f"Shape after preprocessing: {X_preprocessed.shape}")
# Check feature names after one-hot encoding
if hasattr(preprocessor, 'get_feature_names_out'):
feature_names = preprocessor.get_feature_names_out()
print(f"Feature names: {feature_names[:10]}...")
Memory and Caching
Copy
from sklearn.pipeline import Pipeline
from joblib import Memory
# Cache expensive transformations
memory = Memory(location='./cache', verbose=0)
cached_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier())
], memory=memory)
# First run: computes and caches preprocessor output
# Subsequent runs: loads from cache (fast!)
Production Pipeline Template
Copy
"""Production ML Pipeline Template"""
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
import joblib
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_production_pipeline(numeric_features, categorical_features):
"""Create a production-ready pipeline."""
numeric_transformer = Pipeline([
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline([
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])
preprocessor = ColumnTransformer([
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', GradientBoostingClassifier(
n_estimators=100,
max_depth=5,
random_state=42
))
])
return pipeline
def train_and_save(X_train, y_train, numeric_features, categorical_features, path):
"""Train pipeline and save to disk."""
logger.info("Creating pipeline...")
pipeline = create_production_pipeline(numeric_features, categorical_features)
logger.info("Training pipeline...")
pipeline.fit(X_train, y_train)
logger.info(f"Saving to {path}...")
joblib.dump({
'pipeline': pipeline,
'numeric_features': numeric_features,
'categorical_features': categorical_features
}, path)
logger.info("Done!")
return pipeline
def load_and_predict(path, X_new):
"""Load pipeline and make predictions."""
logger.info(f"Loading from {path}...")
artifacts = joblib.load(path)
pipeline = artifacts['pipeline']
logger.info("Making predictions...")
predictions = pipeline.predict(X_new)
probabilities = pipeline.predict_proba(X_new)
return predictions, probabilities
Key Takeaways
Pipelines Prevent Leakage
Preprocessing is fit only on training data
One Object Does All
Fit, transform, predict in one call
Easy Deployment
Save/load entire workflow
Clean Hyperparameter Tuning
Search across all pipeline parameters
What’s Next?
Let’s wrap up with a checklist of common ML mistakes to avoid!Continue to ML Mistakes Checklist
Avoid the pitfalls that trip up even experienced practitioners