Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Model Deployment: From Notebook to Production

MLOps Deployment Pipeline

Your Model Is Useless in a Notebook

You’ve trained a great model. It achieves 95% accuracy! But it’s sitting in a Jupyter notebook on your laptop. To be useful, models need to be:
  • Saved and loaded
  • Served via an API
  • Monitored in production
  • Updated when needed
Spotify ML Platform at Scale

Step 1: Saving Models

import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# Train a model
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

print(f"Training accuracy: {model.score(X_train, y_train):.2%}")
print(f"Test accuracy: {model.score(X_test, y_test):.2%}")

# Save the model
joblib.dump(model, 'iris_classifier.joblib')
print("Model saved!")

Loading the Model

# Load the model (anywhere, anytime)
loaded_model = joblib.load('iris_classifier.joblib')

# Use it!
sample = [[5.1, 3.5, 1.4, 0.2]]
prediction = loaded_model.predict(sample)
probability = loaded_model.predict_proba(sample)

print(f"Prediction: {iris.target_names[prediction[0]]}")
print(f"Probabilities: {probability[0]}")

Using Pickle (Built-in Python)

import pickle

# Save
with open('model.pkl', 'wb') as f:
    pickle.dump(model, f)

# Load
with open('model.pkl', 'rb') as f:
    loaded_model = pickle.load(f)

Step 2: Save the Full Pipeline

Don’t just save the model - save the entire preprocessing pipeline!
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

# Create a complete pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100))
])

# Train
pipeline.fit(X_train, y_train)

# Save the ENTIRE pipeline (scaler + model)
joblib.dump(pipeline, 'complete_pipeline.joblib')

# Load and use
loaded_pipeline = joblib.load('complete_pipeline.joblib')
prediction = loaded_pipeline.predict([[5.1, 3.5, 1.4, 0.2]])
Common Mistake: Saving only the model, not the scaler. Then your predictions are wrong because the data isn’t scaled the same way!

Step 3: Create an API with FastAPI

# Save this as app.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np

# Load the model at startup
model = joblib.load('complete_pipeline.joblib')

# Create FastAPI app
app = FastAPI(title="Iris Classifier API")

# Define request body
class IrisFeatures(BaseModel):
    sepal_length: float
    sepal_width: float
    petal_length: float
    petal_width: float

# Define response
class Prediction(BaseModel):
    species: str
    probability: float

# Prediction endpoint
@app.post("/predict", response_model=Prediction)
def predict(features: IrisFeatures):
    # Convert to array
    X = np.array([[
        features.sepal_length,
        features.sepal_width,
        features.petal_length,
        features.petal_width
    ]])
    
    # Predict
    prediction = model.predict(X)[0]
    probability = model.predict_proba(X).max()
    
    species_names = ['setosa', 'versicolor', 'virginica']
    
    return Prediction(
        species=species_names[prediction],
        probability=float(probability)
    )

# Health check
@app.get("/health")
def health():
    return {"status": "healthy"}

Run the API

# Install FastAPI and uvicorn
pip install fastapi uvicorn

# Run the server
uvicorn app:app --reload --host 0.0.0.0 --port 8000

Test the API

import requests

# Make a prediction request
response = requests.post(
    "http://localhost:8000/predict",
    json={
        "sepal_length": 5.1,
        "sepal_width": 3.5,
        "petal_length": 1.4,
        "petal_width": 0.2
    }
)

print(response.json())
# Output: {"species": "setosa", "probability": 1.0}

Step 4: Containerize with Docker

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model and code
COPY complete_pipeline.joblib .
COPY app.py .

# Run the API
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
scikit-learn==1.3.2
joblib==1.3.2
numpy==1.26.2

Build and Run

# Build the image
docker build -t iris-classifier .

# Run the container
docker run -p 8000:8000 iris-classifier

Step 5: Model Versioning

Track your models like you track code:
import joblib
from datetime import datetime
import json

def save_model_with_metadata(model, model_name, metrics, version=None):
    """Save model with metadata for tracking."""
    
    if version is None:
        version = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    filename = f"{model_name}_v{version}"
    
    # Save model
    joblib.dump(model, f"{filename}.joblib")
    
    # Save metadata
    metadata = {
        "model_name": model_name,
        "version": version,
        "created_at": datetime.now().isoformat(),
        "metrics": metrics,
        "model_type": type(model).__name__
    }
    
    with open(f"{filename}_metadata.json", "w") as f:
        json.dump(metadata, f, indent=2)
    
    print(f"Saved {filename}")
    return filename

# Usage
save_model_with_metadata(
    model=pipeline,
    model_name="iris_classifier",
    metrics={"accuracy": 0.95, "f1_score": 0.94}
)

Step 6: Model Monitoring

Track model performance in production:
from datetime import datetime
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("model_monitor")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []
    
    def log_prediction(self, input_data, prediction, latency_ms, actual=None):
        """Log a prediction for monitoring."""
        record = {
            "timestamp": datetime.now().isoformat(),
            "input": input_data,
            "prediction": prediction,
            "latency_ms": latency_ms,
            "actual": actual
        }
        self.predictions.append(record)
        self.latencies.append(latency_ms)
        
        # Alert if latency too high
        if latency_ms > 100:
            logger.warning(f"High latency: {latency_ms}ms")
        
        # Alert if unusual prediction distribution
        if len(self.predictions) > 100:
            recent_preds = [p["prediction"] for p in self.predictions[-100:]]
            if len(set(recent_preds)) == 1:
                logger.warning("Model predicting same class for all inputs!")
    
    def get_metrics(self):
        """Get monitoring metrics."""
        return {
            "total_predictions": len(self.predictions),
            "avg_latency_ms": sum(self.latencies) / len(self.latencies) if self.latencies else 0,
            "max_latency_ms": max(self.latencies) if self.latencies else 0
        }

# Use in API
monitor = ModelMonitor()

@app.post("/predict")
def predict(features: IrisFeatures):
    import time
    start = time.time()
    
    # Make prediction
    prediction = model.predict(X)[0]
    
    latency_ms = (time.time() - start) * 1000
    
    # Log for monitoring
    monitor.log_prediction(
        input_data=features.dict(),
        prediction=int(prediction),
        latency_ms=latency_ms
    )
    
    return {"prediction": int(prediction)}

Step 7: A/B Testing Models

Compare new models against production:
import random

class ModelABTest:
    def __init__(self, model_a, model_b, traffic_to_b=0.1):
        """
        A/B test between two models.
        traffic_to_b: fraction of traffic to send to new model
        """
        self.model_a = model_a  # Production model
        self.model_b = model_b  # Challenger model
        self.traffic_to_b = traffic_to_b
        self.results_a = []
        self.results_b = []
    
    def predict(self, X):
        """Route prediction to one of the models."""
        if random.random() < self.traffic_to_b:
            prediction = self.model_b.predict(X)
            self.results_b.append(prediction)
            model_used = "B"
        else:
            prediction = self.model_a.predict(X)
            self.results_a.append(prediction)
            model_used = "A"
        
        return prediction, model_used
    
    def get_stats(self):
        """Compare model performance."""
        return {
            "model_a_predictions": len(self.results_a),
            "model_b_predictions": len(self.results_b),
            "traffic_split": f"{100-self.traffic_to_b*100:.0f}% / {self.traffic_to_b*100:.0f}%"
        }

# Usage
ab_test = ModelABTest(
    model_a=joblib.load("model_v1.joblib"),
    model_b=joblib.load("model_v2.joblib"),
    traffic_to_b=0.05  # 5% to new model
)

Deployment Checklist

Before Deployment

  • Model tested on holdout data
  • Pipeline includes preprocessing
  • Model serialized (joblib/pickle)
  • API endpoints documented
  • Error handling added
  • Input validation in place

After Deployment

  • Health checks working
  • Logging configured
  • Latency monitored
  • Prediction distribution tracked
  • Rollback plan ready
  • Model version tracked

Cloud Deployment Options

PlatformComplexityBest For
HerokuLowQuick prototypes
RailwayLowSimple apps
AWS LambdaMediumServerless, pay-per-use
Google Cloud RunMediumContainer-based
AWS SageMakerHighEnterprise ML
Azure MLHighEnterprise ML

🚀 Mini Projects

Project 1: Model Serialization Pipeline

Save and load models with preprocessing

Project 2: Simple REST API

Build a prediction API with Flask

Project 3: Model Versioning System

Track different model versions

Project 4: Monitoring Dashboard

Monitor model performance in production

Project 1: Model Serialization Pipeline

Create a complete pipeline that saves models with their preprocessing steps.

Project 2: Simple REST API

Build a prediction API using Flask.

Project 3: Model Versioning System

Create a simple model versioning and registry system.

Project 4: Monitoring Dashboard

Create a simple model monitoring system.

Key Takeaways

Save the Pipeline

Include all preprocessing with the model

API = Interface

FastAPI makes serving models easy

Docker = Portability

Same environment everywhere

Monitor = Trust

Know when your model degrades

What’s Next?

We have more advanced topics to explore! Let’s learn about time series forecasting.

Continue to Time Series

Predict the future from sequential data

Interview Deep-Dive

This is fundamentally an engineering problem, not an ML problem. The model accuracy is already decided at training time — deployment is about serving reliably at scale.
  • Model optimization first. Before touching infrastructure, I would profile the model. A Random Forest with 500 trees at depth 20 is going to be 10x slower than one with 50 trees at depth 10. I would benchmark the accuracy drop from model simplification and see if the trade-off is acceptable. Often, you can cut model size by 80% with less than 1% accuracy loss.
  • Batch predictions where possible. If predictions are not real-time (e.g., daily churn scoring), precompute them in a batch job and serve from a cache. This is the cheapest and most reliable path. Only build real-time serving if the use case demands it.
  • For real-time: containerize with a lightweight framework. I would use FastAPI or a gRPC service inside a Docker container, deployed on Kubernetes with horizontal pod autoscaling. The model loads into memory once at startup, and each request is just a numpy operation.
  • Connection pooling and async I/O. If features come from a database or feature store, the network call is usually the bottleneck, not the model. Use connection pooling and async requests to parallelize feature fetching.
  • Load testing before launch. Use Locust or k6 to simulate 10K RPS and measure P50, P95, and P99 latencies. If P99 exceeds 50ms, the usual culprits are garbage collection pauses, cold starts, or feature computation latency — not the model inference itself.
Follow-up: What happens when the model needs to be updated without downtime?Blue-green deployment is the standard pattern. You maintain two identical environments. The “blue” environment runs the current model, the “green” environment gets the new model deployed and tested. Once the green model passes smoke tests and shadow-mode validation, you switch the load balancer to route traffic from blue to green. If anything goes wrong, you switch back instantly. The key detail most people miss: you also need to version your preprocessing pipeline. If the new model expects different feature transformations than the old model, switching models mid-flight will produce garbage predictions.
No code changes means the model itself has not changed, so the issue is almost certainly in the data. My debugging process follows a systematic funnel:
  • Step 1: Confirm the drop is real. Check if the evaluation data itself is reliable. Has the labeling process changed? Is there a new data source contributing noisier labels? A perceived accuracy drop could be a labeling quality issue, not a model issue.
  • Step 2: Check input feature distributions. Compare each feature’s distribution in the recent prediction window against the training data. Use Population Stability Index (PSI) or Kolmogorov-Smirnov tests. If a feature like “average_transaction_amount” shifted because of inflation or a pricing change, the model is seeing inputs outside its training domain.
  • Step 3: Check for upstream data pipeline issues. Missing values that were previously imputed, schema changes in upstream tables, timestamp timezone bugs, or a feature that silently started returning nulls — these are the most common silent killers. I have seen a model degrade because an upstream team changed a column from integer to float, which caused a downstream join to silently drop rows.
  • Step 4: Segment the performance drop. Is accuracy down across all segments, or just one? If it dropped only for a specific customer cohort or geographic region, you have a targeted drift problem. If it dropped everywhere uniformly, it is more likely a global data issue.
  • Step 5: Retrain on recent data and compare. If a freshly trained model on the last 3 months of data recovers the lost accuracy, the diagnosis is confirmed: data drift. Set up a regular retraining cadence and monitoring alerts.
Follow-up: How would you set up monitoring to catch this earlier next time?I would implement three layers of monitoring. First, input monitoring — track feature distributions and alert when PSI exceeds a threshold (I typically start with 0.2). Second, output monitoring — track the distribution of predicted probabilities. If the model suddenly starts predicting everything as low-risk, that is a red flag even before you get ground truth labels. Third, performance monitoring — as soon as labels become available (even with a delay), compute accuracy, precision, and recall on a rolling window and alert on degradation. The goal is to detect drift within days, not months.
Training-serving skew is one of the top reasons ML models fail silently in production, and it is notoriously hard to debug because the model does not throw errors — it just makes slightly worse predictions.
  • Root cause: dual code paths. During training, features are computed in batch with pandas on a data warehouse. During serving, the same features need to be computed in real-time, often in a different language or framework. Even minor differences — different null handling, different timestamp rounding, different string encoding — produce skew.
  • Solution 1: Feature stores. Use a feature store (Feast, Tecton, Hopsworks) that provides a single feature definition used identically for both training and serving. The feature store computes features once, stores them, and serves the same values to both the training pipeline and the prediction endpoint. This eliminates the dual code path problem entirely.
  • Solution 2: Pipeline as the single source of truth. If a feature store is too heavy, package your feature engineering as a shared library that both the training job and the serving endpoint import. One codebase, one behavior.
  • Solution 3: Log-and-compare. In the serving path, log the actual feature values fed to the model for a sample of requests. Periodically compare these logged features against what the training pipeline would produce for the same raw inputs. Any discrepancy is skew.
The worst kind of skew is the “off-by-one” temporal bug. For example, during training you compute a “last 7 days average” that accidentally includes the current day (leaking the label). At serving time, the current day is not yet complete, so the feature has a different value. The model was trained with a leak, and serving without the leak makes it worse — but fixing the leak and retraining will also degrade accuracy because the model was never learning the real signal.Follow-up: How do you detect training-serving skew in a model that is already in production?The most reliable method is shadow scoring. Periodically take a batch of recent production requests, run them through both the serving pipeline and the training pipeline, and compare the feature vectors. Any systematic difference — even small ones — is skew. I would also compare the distribution of prediction scores between training data and recent serving data. If the training data has a mean prediction of 0.35 and serving data has 0.42, that gap could be skew, drift, or both — but it warrants investigation.
Five milliseconds is tight. That is roughly the time for a single database round-trip, so every architectural decision matters.
  • Model choice is constrained. Deep ensembles with hundreds of trees are out. I would look at logistic regression, small gradient boosting models (under 50 trees, shallow depth), or even a precomputed lookup table if the feature space is discrete and bounded. A single decision tree with depth 6 can serve in microseconds.
  • Feature computation budget. If features require any I/O (database lookups, external API calls), that will blow the 5ms budget immediately. All features must either be precomputed and cached in memory, or computable from the request payload alone.
  • Model distillation. Train a complex model offline (XGBoost with 1000 trees), then use it to label a large dataset. Train a simple model (logistic regression) on those labels. The simple model learns to mimic the complex model and can serve in under 1ms. You lose some accuracy but gain massive latency improvement.
  • ONNX or compiled models. Convert the sklearn model to ONNX format and serve with ONNX Runtime. This eliminates Python’s interpreter overhead and can give 2-5x speedup. For extreme cases, compile the model to C code using tools like sklearn-onnx or treelite.
  • Avoid Python’s GIL bottleneck. If you are serving many concurrent requests in Python, the Global Interpreter Lock serializes CPU-bound work. Consider a Rust or C++ serving layer, or use multiprocess workers instead of multithreaded ones.
The fundamental trade-off is accuracy versus latency. I would quantify this explicitly: “If we use the full model at 50ms, we get 92% accuracy. If we use the distilled model at 3ms, we get 89% accuracy. Is that 3% worth the 10x latency cost?” That is a business decision, not a technical one.Follow-up: How would you benchmark and validate that the simplified model meets the latency requirement under production load?I would set up a realistic load test, not just a single-request benchmark. Single-request latency is misleading because it does not account for contention, garbage collection, or memory pressure under load. I would use a tool like Locust to simulate the expected concurrent request rate, measure P50, P95, and P99 latencies, and validate that P99 stays under 5ms. I would run this test for at least 10 minutes to catch GC pauses and memory leaks. The benchmark should also include the full request path — network, deserialization, feature computation, model inference, and serialization — not just the model.predict() call.