Model Deployment: From Notebook to Production
Your Model Is Useless in a Notebook
You’ve trained a great model. It achieves 95% accuracy!
But it’s sitting in a Jupyter notebook on your laptop.
To be useful, models need to be:
- Saved and loaded
- Served via an API
- Monitored in production
- Updated when needed
Step 1: Saving Models
Using Joblib (Recommended for scikit-learn)
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
# Train a model
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
iris.data, iris.target, test_size=0.2, random_state=42
)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
print(f"Training accuracy: {model.score(X_train, y_train):.2%}")
print(f"Test accuracy: {model.score(X_test, y_test):.2%}")
# Save the model
joblib.dump(model, 'iris_classifier.joblib')
print("Model saved!")
Loading the Model
# Load the model (anywhere, anytime)
loaded_model = joblib.load('iris_classifier.joblib')
# Use it!
sample = [[5.1, 3.5, 1.4, 0.2]]
prediction = loaded_model.predict(sample)
probability = loaded_model.predict_proba(sample)
print(f"Prediction: {iris.target_names[prediction[0]]}")
print(f"Probabilities: {probability[0]}")
Using Pickle (Built-in Python)
import pickle
# Save
with open('model.pkl', 'wb') as f:
pickle.dump(model, f)
# Load
with open('model.pkl', 'rb') as f:
loaded_model = pickle.load(f)
Step 2: Save the Full Pipeline
Don’t just save the model - save the entire preprocessing pipeline!
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
# Create a complete pipeline
pipeline = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100))
])
# Train
pipeline.fit(X_train, y_train)
# Save the ENTIRE pipeline (scaler + model)
joblib.dump(pipeline, 'complete_pipeline.joblib')
# Load and use
loaded_pipeline = joblib.load('complete_pipeline.joblib')
prediction = loaded_pipeline.predict([[5.1, 3.5, 1.4, 0.2]])
Common Mistake: Saving only the model, not the scaler. Then your predictions are wrong because the data isn’t scaled the same way!
Step 3: Create an API with FastAPI
# Save this as app.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np
# Load the model at startup
model = joblib.load('complete_pipeline.joblib')
# Create FastAPI app
app = FastAPI(title="Iris Classifier API")
# Define request body
class IrisFeatures(BaseModel):
sepal_length: float
sepal_width: float
petal_length: float
petal_width: float
# Define response
class Prediction(BaseModel):
species: str
probability: float
# Prediction endpoint
@app.post("/predict", response_model=Prediction)
def predict(features: IrisFeatures):
# Convert to array
X = np.array([[
features.sepal_length,
features.sepal_width,
features.petal_length,
features.petal_width
]])
# Predict
prediction = model.predict(X)[0]
probability = model.predict_proba(X).max()
species_names = ['setosa', 'versicolor', 'virginica']
return Prediction(
species=species_names[prediction],
probability=float(probability)
)
# Health check
@app.get("/health")
def health():
return {"status": "healthy"}
Run the API
# Install FastAPI and uvicorn
pip install fastapi uvicorn
# Run the server
uvicorn app:app --reload --host 0.0.0.0 --port 8000
Test the API
import requests
# Make a prediction request
response = requests.post(
"http://localhost:8000/predict",
json={
"sepal_length": 5.1,
"sepal_width": 3.5,
"petal_length": 1.4,
"petal_width": 0.2
}
)
print(response.json())
# Output: {"species": "setosa", "probability": 1.0}
Step 4: Containerize with Docker
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model and code
COPY complete_pipeline.joblib .
COPY app.py .
# Run the API
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
scikit-learn==1.3.2
joblib==1.3.2
numpy==1.26.2
Build and Run
# Build the image
docker build -t iris-classifier .
# Run the container
docker run -p 8000:8000 iris-classifier
Step 5: Model Versioning
Track your models like you track code:
import joblib
from datetime import datetime
import json
def save_model_with_metadata(model, model_name, metrics, version=None):
"""Save model with metadata for tracking."""
if version is None:
version = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{model_name}_v{version}"
# Save model
joblib.dump(model, f"{filename}.joblib")
# Save metadata
metadata = {
"model_name": model_name,
"version": version,
"created_at": datetime.now().isoformat(),
"metrics": metrics,
"model_type": type(model).__name__
}
with open(f"{filename}_metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
print(f"Saved {filename}")
return filename
# Usage
save_model_with_metadata(
model=pipeline,
model_name="iris_classifier",
metrics={"accuracy": 0.95, "f1_score": 0.94}
)
Step 6: Model Monitoring
Track model performance in production:
from datetime import datetime
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("model_monitor")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, input_data, prediction, latency_ms, actual=None):
"""Log a prediction for monitoring."""
record = {
"timestamp": datetime.now().isoformat(),
"input": input_data,
"prediction": prediction,
"latency_ms": latency_ms,
"actual": actual
}
self.predictions.append(record)
self.latencies.append(latency_ms)
# Alert if latency too high
if latency_ms > 100:
logger.warning(f"High latency: {latency_ms}ms")
# Alert if unusual prediction distribution
if len(self.predictions) > 100:
recent_preds = [p["prediction"] for p in self.predictions[-100:]]
if len(set(recent_preds)) == 1:
logger.warning("Model predicting same class for all inputs!")
def get_metrics(self):
"""Get monitoring metrics."""
return {
"total_predictions": len(self.predictions),
"avg_latency_ms": sum(self.latencies) / len(self.latencies) if self.latencies else 0,
"max_latency_ms": max(self.latencies) if self.latencies else 0
}
# Use in API
monitor = ModelMonitor()
@app.post("/predict")
def predict(features: IrisFeatures):
import time
start = time.time()
# Make prediction
prediction = model.predict(X)[0]
latency_ms = (time.time() - start) * 1000
# Log for monitoring
monitor.log_prediction(
input_data=features.dict(),
prediction=int(prediction),
latency_ms=latency_ms
)
return {"prediction": int(prediction)}
Step 7: A/B Testing Models
Compare new models against production:
import random
class ModelABTest:
def __init__(self, model_a, model_b, traffic_to_b=0.1):
"""
A/B test between two models.
traffic_to_b: fraction of traffic to send to new model
"""
self.model_a = model_a # Production model
self.model_b = model_b # Challenger model
self.traffic_to_b = traffic_to_b
self.results_a = []
self.results_b = []
def predict(self, X):
"""Route prediction to one of the models."""
if random.random() < self.traffic_to_b:
prediction = self.model_b.predict(X)
self.results_b.append(prediction)
model_used = "B"
else:
prediction = self.model_a.predict(X)
self.results_a.append(prediction)
model_used = "A"
return prediction, model_used
def get_stats(self):
"""Compare model performance."""
return {
"model_a_predictions": len(self.results_a),
"model_b_predictions": len(self.results_b),
"traffic_split": f"{100-self.traffic_to_b*100:.0f}% / {self.traffic_to_b*100:.0f}%"
}
# Usage
ab_test = ModelABTest(
model_a=joblib.load("model_v1.joblib"),
model_b=joblib.load("model_v2.joblib"),
traffic_to_b=0.05 # 5% to new model
)
Deployment Checklist
Cloud Deployment Options
| Platform | Complexity | Best For |
|---|
| Heroku | Low | Quick prototypes |
| Railway | Low | Simple apps |
| AWS Lambda | Medium | Serverless, pay-per-use |
| Google Cloud Run | Medium | Container-based |
| AWS SageMaker | High | Enterprise ML |
| Azure ML | High | Enterprise ML |
🚀 Mini Projects
Project 1: Model Serialization Pipeline
Save and load models with preprocessing
Project 2: Simple REST API
Build a prediction API with Flask
Project 3: Model Versioning System
Track different model versions
Project 4: Monitoring Dashboard
Monitor model performance in production
Project 1: Model Serialization Pipeline
Create a complete pipeline that saves models with their preprocessing steps.
Project 2: Simple REST API
Build a prediction API using Flask.
Project 3: Model Versioning System
Create a simple model versioning and registry system.
Project 4: Monitoring Dashboard
Create a simple model monitoring system.
Key Takeaways
Save the Pipeline
Include all preprocessing with the model
API = Interface
FastAPI makes serving models easy
Docker = Portability
Same environment everywhere
Monitor = Trust
Know when your model degrades
What’s Next?
We have more advanced topics to explore! Let’s learn about time series forecasting.
Continue to Time Series
Predict the future from sequential data