> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Model Deployment

> Take your model from notebook to production

# Model Deployment: From Notebook to Production

<Frame>
  <img src="https://mintcdn.com/devweeekends/1cs3K7TO-w20cKuc/images/courses/ml-mastery/model-deployment-concept.svg?fit=max&auto=format&n=1cs3K7TO-w20cKuc&q=85&s=80deb433d0a82f282fe607d718ed0c42" alt="MLOps Deployment Pipeline" width="1080" height="1080" data-path="images/courses/ml-mastery/model-deployment-concept.svg" />
</Frame>

## Your Model Is Useless in a Notebook

You've trained a great model. It achieves 95% accuracy!

But it's sitting in a Jupyter notebook on your laptop.

**To be useful, models need to be:**

* Saved and loaded
* Served via an API
* Monitored in production
* Updated when needed

<Frame>
  <img src="https://mintcdn.com/devweeekends/1cs3K7TO-w20cKuc/images/courses/ml-mastery/model-deployment-real-world.svg?fit=max&auto=format&n=1cs3K7TO-w20cKuc&q=85&s=32b3f34d29067bcccc0b6c9fc2e81f5b" alt="Spotify ML Platform at Scale" width="1080" height="1080" data-path="images/courses/ml-mastery/model-deployment-real-world.svg" />
</Frame>

***

## Step 1: Saving Models

### Using Joblib (Recommended for scikit-learn)

```python theme={null}
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

# Train a model
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

print(f"Training accuracy: {model.score(X_train, y_train):.2%}")
print(f"Test accuracy: {model.score(X_test, y_test):.2%}")

# Save the model
joblib.dump(model, 'iris_classifier.joblib')
print("Model saved!")
```

### Loading the Model

```python theme={null}
# Load the model (anywhere, anytime)
loaded_model = joblib.load('iris_classifier.joblib')

# Use it!
sample = [[5.1, 3.5, 1.4, 0.2]]
prediction = loaded_model.predict(sample)
probability = loaded_model.predict_proba(sample)

print(f"Prediction: {iris.target_names[prediction[0]]}")
print(f"Probabilities: {probability[0]}")
```

### Using Pickle (Built-in Python)

```python theme={null}
import pickle

# Save
with open('model.pkl', 'wb') as f:
    pickle.dump(model, f)

# Load
with open('model.pkl', 'rb') as f:
    loaded_model = pickle.load(f)
```

***

## Step 2: Save the Full Pipeline

Don't just save the model - save the **entire preprocessing pipeline**!

```python theme={null}
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

# Create a complete pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100))
])

# Train
pipeline.fit(X_train, y_train)

# Save the ENTIRE pipeline (scaler + model)
joblib.dump(pipeline, 'complete_pipeline.joblib')

# Load and use
loaded_pipeline = joblib.load('complete_pipeline.joblib')
prediction = loaded_pipeline.predict([[5.1, 3.5, 1.4, 0.2]])
```

<Warning>
  **Common Mistake**: Saving only the model, not the scaler. Then your predictions are wrong because the data isn't scaled the same way!
</Warning>

***

## Step 3: Create an API with FastAPI

```python theme={null}
# Save this as app.py
from fastapi import FastAPI
from pydantic import BaseModel
import joblib
import numpy as np

# Load the model at startup
model = joblib.load('complete_pipeline.joblib')

# Create FastAPI app
app = FastAPI(title="Iris Classifier API")

# Define request body
class IrisFeatures(BaseModel):
    sepal_length: float
    sepal_width: float
    petal_length: float
    petal_width: float

# Define response
class Prediction(BaseModel):
    species: str
    probability: float

# Prediction endpoint
@app.post("/predict", response_model=Prediction)
def predict(features: IrisFeatures):
    # Convert to array
    X = np.array([[
        features.sepal_length,
        features.sepal_width,
        features.petal_length,
        features.petal_width
    ]])
    
    # Predict
    prediction = model.predict(X)[0]
    probability = model.predict_proba(X).max()
    
    species_names = ['setosa', 'versicolor', 'virginica']
    
    return Prediction(
        species=species_names[prediction],
        probability=float(probability)
    )

# Health check
@app.get("/health")
def health():
    return {"status": "healthy"}
```

### Run the API

```bash theme={null}
# Install FastAPI and uvicorn
pip install fastapi uvicorn

# Run the server
uvicorn app:app --reload --host 0.0.0.0 --port 8000
```

### Test the API

```python theme={null}
import requests

# Make a prediction request
response = requests.post(
    "http://localhost:8000/predict",
    json={
        "sepal_length": 5.1,
        "sepal_width": 3.5,
        "petal_length": 1.4,
        "petal_width": 0.2
    }
)

print(response.json())
# Output: {"species": "setosa", "probability": 1.0}
```

***

## Step 4: Containerize with Docker

```dockerfile theme={null}
# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model and code
COPY complete_pipeline.joblib .
COPY app.py .

# Run the API
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
```

```txt theme={null}
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
scikit-learn==1.3.2
joblib==1.3.2
numpy==1.26.2
```

### Build and Run

```bash theme={null}
# Build the image
docker build -t iris-classifier .

# Run the container
docker run -p 8000:8000 iris-classifier
```

***

## Step 5: Model Versioning

Track your models like you track code:

```python theme={null}
import joblib
from datetime import datetime
import json

def save_model_with_metadata(model, model_name, metrics, version=None):
    """Save model with metadata for tracking."""
    
    if version is None:
        version = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    filename = f"{model_name}_v{version}"
    
    # Save model
    joblib.dump(model, f"{filename}.joblib")
    
    # Save metadata
    metadata = {
        "model_name": model_name,
        "version": version,
        "created_at": datetime.now().isoformat(),
        "metrics": metrics,
        "model_type": type(model).__name__
    }
    
    with open(f"{filename}_metadata.json", "w") as f:
        json.dump(metadata, f, indent=2)
    
    print(f"Saved {filename}")
    return filename

# Usage
save_model_with_metadata(
    model=pipeline,
    model_name="iris_classifier",
    metrics={"accuracy": 0.95, "f1_score": 0.94}
)
```

***

## Step 6: Model Monitoring

Track model performance in production:

```python theme={null}
from datetime import datetime
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("model_monitor")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []
    
    def log_prediction(self, input_data, prediction, latency_ms, actual=None):
        """Log a prediction for monitoring."""
        record = {
            "timestamp": datetime.now().isoformat(),
            "input": input_data,
            "prediction": prediction,
            "latency_ms": latency_ms,
            "actual": actual
        }
        self.predictions.append(record)
        self.latencies.append(latency_ms)
        
        # Alert if latency too high
        if latency_ms > 100:
            logger.warning(f"High latency: {latency_ms}ms")
        
        # Alert if unusual prediction distribution
        if len(self.predictions) > 100:
            recent_preds = [p["prediction"] for p in self.predictions[-100:]]
            if len(set(recent_preds)) == 1:
                logger.warning("Model predicting same class for all inputs!")
    
    def get_metrics(self):
        """Get monitoring metrics."""
        return {
            "total_predictions": len(self.predictions),
            "avg_latency_ms": sum(self.latencies) / len(self.latencies) if self.latencies else 0,
            "max_latency_ms": max(self.latencies) if self.latencies else 0
        }

# Use in API
monitor = ModelMonitor()

@app.post("/predict")
def predict(features: IrisFeatures):
    import time
    start = time.time()
    
    # Make prediction
    prediction = model.predict(X)[0]
    
    latency_ms = (time.time() - start) * 1000
    
    # Log for monitoring
    monitor.log_prediction(
        input_data=features.dict(),
        prediction=int(prediction),
        latency_ms=latency_ms
    )
    
    return {"prediction": int(prediction)}
```

***

## Step 7: A/B Testing Models

Compare new models against production:

```python theme={null}
import random

class ModelABTest:
    def __init__(self, model_a, model_b, traffic_to_b=0.1):
        """
        A/B test between two models.
        traffic_to_b: fraction of traffic to send to new model
        """
        self.model_a = model_a  # Production model
        self.model_b = model_b  # Challenger model
        self.traffic_to_b = traffic_to_b
        self.results_a = []
        self.results_b = []
    
    def predict(self, X):
        """Route prediction to one of the models."""
        if random.random() < self.traffic_to_b:
            prediction = self.model_b.predict(X)
            self.results_b.append(prediction)
            model_used = "B"
        else:
            prediction = self.model_a.predict(X)
            self.results_a.append(prediction)
            model_used = "A"
        
        return prediction, model_used
    
    def get_stats(self):
        """Compare model performance."""
        return {
            "model_a_predictions": len(self.results_a),
            "model_b_predictions": len(self.results_b),
            "traffic_split": f"{100-self.traffic_to_b*100:.0f}% / {self.traffic_to_b*100:.0f}%"
        }

# Usage
ab_test = ModelABTest(
    model_a=joblib.load("model_v1.joblib"),
    model_b=joblib.load("model_v2.joblib"),
    traffic_to_b=0.05  # 5% to new model
)
```

***

## Deployment Checklist

<CardGroup cols={2}>
  <Card title="Before Deployment" icon="list-check">
    * [ ] Model tested on holdout data
    * [ ] Pipeline includes preprocessing
    * [ ] Model serialized (joblib/pickle)
    * [ ] API endpoints documented
    * [ ] Error handling added
    * [ ] Input validation in place
  </Card>

  <Card title="After Deployment" icon="radar">
    * [ ] Health checks working
    * [ ] Logging configured
    * [ ] Latency monitored
    * [ ] Prediction distribution tracked
    * [ ] Rollback plan ready
    * [ ] Model version tracked
  </Card>
</CardGroup>

***

## Cloud Deployment Options

| Platform             | Complexity | Best For                |
| -------------------- | ---------- | ----------------------- |
| **Heroku**           | Low        | Quick prototypes        |
| **Railway**          | Low        | Simple apps             |
| **AWS Lambda**       | Medium     | Serverless, pay-per-use |
| **Google Cloud Run** | Medium     | Container-based         |
| **AWS SageMaker**    | High       | Enterprise ML           |
| **Azure ML**         | High       | Enterprise ML           |

***

## 🚀 Mini Projects

<CardGroup cols={2}>
  <Card title="Project 1: Model Serialization Pipeline" icon="box-archive">
    Save and load models with preprocessing
  </Card>

  <Card title="Project 2: Simple REST API" icon="server">
    Build a prediction API with Flask
  </Card>

  <Card title="Project 3: Model Versioning System" icon="code-branch">
    Track different model versions
  </Card>

  <Card title="Project 4: Monitoring Dashboard" icon="chart-mixed">
    Monitor model performance in production
  </Card>
</CardGroup>

### Project 1: Model Serialization Pipeline

Create a complete pipeline that saves models with their preprocessing steps.

<details>
  <summary>View Complete Solution</summary>

  ```python theme={null}
  import numpy as np
  import pandas as pd
  import joblib
  import json
  from datetime import datetime
  from sklearn.datasets import load_breast_cancer
  from sklearn.model_selection import train_test_split
  from sklearn.preprocessing import StandardScaler
  from sklearn.ensemble import RandomForestClassifier
  from sklearn.pipeline import Pipeline
  from sklearn.metrics import accuracy_score, classification_report
  import os

  # Step 1: Create model artifact class
  class ModelArtifact:
      """Complete model artifact with metadata and preprocessing"""
      
      def __init__(self, name, version="1.0.0"):
          self.name = name
          self.version = version
          self.pipeline = None
          self.metadata = {}
          self.feature_names = None
          
      def build_pipeline(self, model, preprocessor=None):
          """Build sklearn pipeline"""
          steps = []
          if preprocessor:
              steps.append(('preprocessor', preprocessor))
          steps.append(('model', model))
          self.pipeline = Pipeline(steps)
          return self
      
      def train(self, X, y, feature_names=None):
          """Train the pipeline"""
          self.feature_names = feature_names
          self.pipeline.fit(X, y)
          
          # Store metadata
          self.metadata = {
              'name': self.name,
              'version': self.version,
              'trained_at': datetime.now().isoformat(),
              'n_samples': len(X),
              'n_features': X.shape[1],
              'feature_names': list(feature_names) if feature_names is not None else None,
              'model_type': type(self.pipeline.named_steps['model']).__name__
          }
          
          # Training metrics
          y_pred = self.pipeline.predict(X)
          self.metadata['train_accuracy'] = float(accuracy_score(y, y_pred))
          
          return self
      
      def predict(self, X):
          """Make predictions"""
          return self.pipeline.predict(X)
      
      def predict_proba(self, X):
          """Predict probabilities"""
          if hasattr(self.pipeline, 'predict_proba'):
              return self.pipeline.predict_proba(X)
          return None
      
      def save(self, directory):
          """Save model artifact"""
          os.makedirs(directory, exist_ok=True)
          
          # Save pipeline
          pipeline_path = os.path.join(directory, 'pipeline.joblib')
          joblib.dump(self.pipeline, pipeline_path)
          
          # Save metadata
          metadata_path = os.path.join(directory, 'metadata.json')
          with open(metadata_path, 'w') as f:
              json.dump(self.metadata, f, indent=2)
          
          print(f"✅ Model saved to {directory}")
          return directory
      
      @classmethod
      def load(cls, directory):
          """Load model artifact"""
          # Load metadata
          metadata_path = os.path.join(directory, 'metadata.json')
          with open(metadata_path, 'r') as f:
              metadata = json.load(f)
          
          # Create instance
          artifact = cls(metadata['name'], metadata['version'])
          artifact.metadata = metadata
          artifact.feature_names = metadata.get('feature_names')
          
          # Load pipeline
          pipeline_path = os.path.join(directory, 'pipeline.joblib')
          artifact.pipeline = joblib.load(pipeline_path)
          
          print(f"✅ Model loaded from {directory}")
          return artifact

  # Step 2: Train and save model
  print("="*60)
  print("📦 MODEL SERIALIZATION PIPELINE")
  print("="*60)

  # Load data
  cancer = load_breast_cancer()
  X_train, X_test, y_train, y_test = train_test_split(
      cancer.data, cancer.target, test_size=0.2, random_state=42
  )

  # Create artifact
  artifact = ModelArtifact(
      name="BreastCancerClassifier",
      version="1.0.0"
  )

  # Build pipeline with preprocessing
  artifact.build_pipeline(
      model=RandomForestClassifier(n_estimators=100, random_state=42),
      preprocessor=StandardScaler()
  )

  # Train
  artifact.train(X_train, y_train, feature_names=cancer.feature_names)
  print(f"\nTraining accuracy: {artifact.metadata['train_accuracy']:.4f}")

  # Evaluate on test
  y_pred = artifact.predict(X_test)
  test_accuracy = accuracy_score(y_test, y_pred)
  print(f"Test accuracy: {test_accuracy:.4f}")

  # Save
  artifact.save('models/breast_cancer_v1')

  # Step 3: Load and use model
  print("\n" + "="*60)
  print("🔄 LOADING AND USING MODEL")
  print("="*60)

  loaded_artifact = ModelArtifact.load('models/breast_cancer_v1')

  print(f"\nModel: {loaded_artifact.metadata['name']} v{loaded_artifact.metadata['version']}")
  print(f"Trained at: {loaded_artifact.metadata['trained_at']}")
  print(f"Model type: {loaded_artifact.metadata['model_type']}")

  # Make predictions with loaded model
  new_predictions = loaded_artifact.predict(X_test)
  loaded_accuracy = accuracy_score(y_test, new_predictions)
  print(f"Loaded model accuracy: {loaded_accuracy:.4f}")

  # Step 4: Model versioning
  print("\n" + "="*60)
  print("🏷️ MODEL VERSIONING")
  print("="*60)

  # Create v2 with different hyperparameters
  artifact_v2 = ModelArtifact(
      name="BreastCancerClassifier",
      version="2.0.0"
  )

  artifact_v2.build_pipeline(
      model=RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42),
      preprocessor=StandardScaler()
  )

  artifact_v2.train(X_train, y_train, feature_names=cancer.feature_names)
  artifact_v2.save('models/breast_cancer_v2')

  # Compare versions
  print("\nVersion comparison:")
  print(f"v1 accuracy: {artifact.metadata['train_accuracy']:.4f}")
  print(f"v2 accuracy: {artifact_v2.metadata['train_accuracy']:.4f}")

  print("\n✅ Model serialization complete!")
  ```

  **What you learned:**

  * Always save preprocessing with the model
  * Metadata helps track model lineage
  * Versioning enables rollback if needed
</details>

### Project 2: Simple REST API

Build a prediction API using Flask.

<details>
  <summary>View Complete Solution</summary>

  ```python theme={null}
  # File: app.py
  """
  Simple ML Prediction API

  To run: python app.py
  To test: curl -X POST http://localhost:5000/predict -H "Content-Type: application/json" -d '{"features": [1.0, 2.0, ...]}'
  """

  from flask import Flask, request, jsonify
  import numpy as np
  import joblib
  from datetime import datetime
  import logging

  # Setup logging
  logging.basicConfig(level=logging.INFO)
  logger = logging.getLogger(__name__)

  app = Flask(__name__)

  # Global model storage
  model_registry = {}

  def create_mock_model():
      """Create a mock model for demonstration"""
      from sklearn.datasets import load_breast_cancer
      from sklearn.model_selection import train_test_split
      from sklearn.preprocessing import StandardScaler
      from sklearn.ensemble import RandomForestClassifier
      from sklearn.pipeline import Pipeline
      
      cancer = load_breast_cancer()
      X_train, _, y_train, _ = train_test_split(
          cancer.data, cancer.target, test_size=0.2, random_state=42
      )
      
      pipeline = Pipeline([
          ('scaler', StandardScaler()),
          ('model', RandomForestClassifier(n_estimators=100, random_state=42))
      ])
      pipeline.fit(X_train, y_train)
      
      return pipeline, cancer.feature_names.tolist()

  # Load model at startup
  print("Loading model...")
  model_registry['breast_cancer'] = {
      'model': create_mock_model()[0],
      'feature_names': create_mock_model()[1],
      'version': '1.0.0',
      'loaded_at': datetime.now().isoformat()
  }
  print("Model loaded!")

  @app.route('/health', methods=['GET'])
  def health():
      """Health check endpoint"""
      return jsonify({
          'status': 'healthy',
          'timestamp': datetime.now().isoformat(),
          'models_loaded': list(model_registry.keys())
      })

  @app.route('/models', methods=['GET'])
  def list_models():
      """List available models"""
      models = []
      for name, info in model_registry.items():
          models.append({
              'name': name,
              'version': info['version'],
              'loaded_at': info['loaded_at'],
              'n_features': len(info['feature_names'])
          })
      return jsonify({'models': models})

  @app.route('/models/<model_name>/info', methods=['GET'])
  def model_info(model_name):
      """Get model information"""
      if model_name not in model_registry:
          return jsonify({'error': f'Model {model_name} not found'}), 404
      
      info = model_registry[model_name]
      return jsonify({
          'name': model_name,
          'version': info['version'],
          'feature_names': info['feature_names'],
          'loaded_at': info['loaded_at']
      })

  @app.route('/predict/<model_name>', methods=['POST'])
  def predict(model_name):
      """Make predictions"""
      start_time = datetime.now()
      
      # Validate model exists
      if model_name not in model_registry:
          return jsonify({'error': f'Model {model_name} not found'}), 404
      
      # Parse request
      data = request.get_json()
      if not data or 'features' not in data:
          return jsonify({'error': 'Missing features in request'}), 400
      
      try:
          features = np.array(data['features'])
          
          # Handle single sample
          if len(features.shape) == 1:
              features = features.reshape(1, -1)
          
          # Get model
          model_info = model_registry[model_name]
          model = model_info['model']
          
          # Validate feature count
          expected_features = len(model_info['feature_names'])
          if features.shape[1] != expected_features:
              return jsonify({
                  'error': f'Expected {expected_features} features, got {features.shape[1]}'
              }), 400
          
          # Make prediction
          predictions = model.predict(features).tolist()
          probabilities = None
          if hasattr(model, 'predict_proba'):
              probabilities = model.predict_proba(features).tolist()
          
          # Calculate latency
          latency_ms = (datetime.now() - start_time).total_seconds() * 1000
          
          logger.info(f"Prediction made: {predictions}, latency: {latency_ms:.2f}ms")
          
          response = {
              'predictions': predictions,
              'model_version': model_info['version'],
              'latency_ms': latency_ms,
              'timestamp': datetime.now().isoformat()
          }
          
          if probabilities:
              response['probabilities'] = probabilities
          
          return jsonify(response)
          
      except Exception as e:
          logger.error(f"Prediction error: {str(e)}")
          return jsonify({'error': str(e)}), 500

  @app.route('/predict/<model_name>/batch', methods=['POST'])
  def predict_batch(model_name):
      """Batch predictions"""
      if model_name not in model_registry:
          return jsonify({'error': f'Model {model_name} not found'}), 404
      
      data = request.get_json()
      if not data or 'samples' not in data:
          return jsonify({'error': 'Missing samples in request'}), 400
      
      try:
          samples = np.array(data['samples'])
          model = model_registry[model_name]['model']
          
          predictions = model.predict(samples).tolist()
          
          return jsonify({
              'predictions': predictions,
              'n_samples': len(samples)
          })
          
      except Exception as e:
          return jsonify({'error': str(e)}), 500

  # Demo client code
  def demo_client():
      """Demonstrate API usage"""
      import requests
      
      base_url = 'http://localhost:5000'
      
      # Health check
      print("\n1. Health check:")
      r = requests.get(f'{base_url}/health')
      print(r.json())
      
      # List models
      print("\n2. List models:")
      r = requests.get(f'{base_url}/models')
      print(r.json())
      
      # Model info
      print("\n3. Model info:")
      r = requests.get(f'{base_url}/models/breast_cancer/info')
      print(r.json())
      
      # Single prediction
      print("\n4. Single prediction:")
      sample = list(np.random.randn(30))  # 30 features
      r = requests.post(
          f'{base_url}/predict/breast_cancer',
          json={'features': sample}
      )
      print(r.json())
      
      # Batch prediction
      print("\n5. Batch prediction:")
      samples = [list(np.random.randn(30)) for _ in range(5)]
      r = requests.post(
          f'{base_url}/predict/breast_cancer/batch',
          json={'samples': samples}
      )
      print(r.json())

  if __name__ == '__main__':
      print("="*60)
      print("🚀 ML PREDICTION API")
      print("="*60)
      print("\nEndpoints:")
      print("  GET  /health           - Health check")
      print("  GET  /models           - List models")
      print("  GET  /models/<name>/info - Model info")
      print("  POST /predict/<name>   - Single prediction")
      print("  POST /predict/<name>/batch - Batch predictions")
      print("\nStarting server...")
      
      # In production, use gunicorn instead:
      # gunicorn -w 4 -b 0.0.0.0:5000 app:app
      app.run(debug=True, port=5000)
  ```

  **What you learned:**

  * REST APIs make models accessible to other applications
  * Always include health checks and proper error handling
  * Batch endpoints improve throughput for multiple predictions
</details>

### Project 3: Model Versioning System

Create a simple model versioning and registry system.

<details>
  <summary>View Complete Solution</summary>

  ```python theme={null}
  import numpy as np
  import pandas as pd
  import joblib
  import json
  import os
  import hashlib
  from datetime import datetime
  from sklearn.datasets import load_breast_cancer
  from sklearn.model_selection import train_test_split, cross_val_score
  from sklearn.ensemble import RandomForestClassifier
  from sklearn.linear_model import LogisticRegression
  from sklearn.preprocessing import StandardScaler
  from sklearn.pipeline import Pipeline
  from sklearn.metrics import accuracy_score

  class ModelRegistry:
      """Simple model registry with versioning"""
      
      def __init__(self, registry_path='model_registry'):
          self.registry_path = registry_path
          self.registry_file = os.path.join(registry_path, 'registry.json')
          os.makedirs(registry_path, exist_ok=True)
          self._load_registry()
      
      def _load_registry(self):
          """Load or create registry"""
          if os.path.exists(self.registry_file):
              with open(self.registry_file, 'r') as f:
                  self.registry = json.load(f)
          else:
              self.registry = {'models': {}}
      
      def _save_registry(self):
          """Save registry to disk"""
          with open(self.registry_file, 'w') as f:
              json.dump(self.registry, f, indent=2)
      
      def _compute_hash(self, pipeline):
          """Compute model hash for versioning"""
          import pickle
          model_bytes = pickle.dumps(pipeline)
          return hashlib.md5(model_bytes).hexdigest()[:8]
      
      def register_model(self, name, pipeline, metrics, tags=None):
          """Register a new model version"""
          # Create model entry if doesn't exist
          if name not in self.registry['models']:
              self.registry['models'][name] = {
                  'versions': [],
                  'production': None,
                  'staging': None
              }
          
          # Generate version
          existing_versions = len(self.registry['models'][name]['versions'])
          version = f"v{existing_versions + 1}"
          model_hash = self._compute_hash(pipeline)
          
          # Save model file
          model_dir = os.path.join(self.registry_path, name, version)
          os.makedirs(model_dir, exist_ok=True)
          model_path = os.path.join(model_dir, 'model.joblib')
          joblib.dump(pipeline, model_path)
          
          # Create version entry
          version_entry = {
              'version': version,
              'hash': model_hash,
              'created_at': datetime.now().isoformat(),
              'model_path': model_path,
              'metrics': metrics,
              'tags': tags or {},
              'stage': 'development'
          }
          
          self.registry['models'][name]['versions'].append(version_entry)
          self._save_registry()
          
          print(f"✅ Registered {name} {version} (hash: {model_hash})")
          return version
      
      def promote_to_staging(self, name, version):
          """Promote model to staging"""
          if name not in self.registry['models']:
              raise ValueError(f"Model {name} not found")
          
          self.registry['models'][name]['staging'] = version
          
          # Update stage in version entry
          for v in self.registry['models'][name]['versions']:
              if v['version'] == version:
                  v['stage'] = 'staging'
          
          self._save_registry()
          print(f"📦 {name} {version} promoted to staging")
      
      def promote_to_production(self, name, version):
          """Promote model to production"""
          if name not in self.registry['models']:
              raise ValueError(f"Model {name} not found")
          
          # Demote current production
          current_prod = self.registry['models'][name]['production']
          if current_prod:
              for v in self.registry['models'][name]['versions']:
                  if v['version'] == current_prod:
                      v['stage'] = 'archived'
          
          self.registry['models'][name]['production'] = version
          
          # Update stage
          for v in self.registry['models'][name]['versions']:
              if v['version'] == version:
                  v['stage'] = 'production'
          
          self._save_registry()
          print(f"🚀 {name} {version} promoted to production")
      
      def get_production_model(self, name):
          """Load production model"""
          if name not in self.registry['models']:
              raise ValueError(f"Model {name} not found")
          
          prod_version = self.registry['models'][name]['production']
          if not prod_version:
              raise ValueError(f"No production model for {name}")
          
          return self.load_model(name, prod_version)
      
      def load_model(self, name, version):
          """Load specific model version"""
          for v in self.registry['models'][name]['versions']:
              if v['version'] == version:
                  return joblib.load(v['model_path'])
          raise ValueError(f"Version {version} not found for {name}")
      
      def list_versions(self, name):
          """List all versions of a model"""
          if name not in self.registry['models']:
              return []
          
          versions = []
          for v in self.registry['models'][name]['versions']:
              versions.append({
                  'version': v['version'],
                  'stage': v['stage'],
                  'created_at': v['created_at'],
                  'metrics': v['metrics']
              })
          return versions
      
      def compare_versions(self, name, X_test, y_test):
          """Compare all versions on test data"""
          results = []
          for v in self.registry['models'][name]['versions']:
              model = joblib.load(v['model_path'])
              y_pred = model.predict(X_test)
              accuracy = accuracy_score(y_test, y_pred)
              results.append({
                  'version': v['version'],
                  'stage': v['stage'],
                  'test_accuracy': accuracy,
                  'registered_accuracy': v['metrics'].get('cv_accuracy', 'N/A')
              })
          return pd.DataFrame(results)

  # Step 1: Create and use registry
  print("="*60)
  print("🏷️ MODEL VERSIONING SYSTEM")
  print("="*60)

  registry = ModelRegistry()

  # Load data
  cancer = load_breast_cancer()
  X_train, X_test, y_train, y_test = train_test_split(
      cancer.data, cancer.target, test_size=0.2, random_state=42
  )

  # Step 2: Train and register multiple versions
  print("\n1️⃣ TRAINING MODELS")
  print("-"*40)

  # Version 1: Simple Random Forest
  pipeline_v1 = Pipeline([
      ('scaler', StandardScaler()),
      ('model', RandomForestClassifier(n_estimators=50, random_state=42))
  ])
  pipeline_v1.fit(X_train, y_train)
  cv_scores = cross_val_score(pipeline_v1, X_train, y_train, cv=5)

  registry.register_model(
      name='breast_cancer_classifier',
      pipeline=pipeline_v1,
      metrics={'cv_accuracy': cv_scores.mean(), 'cv_std': cv_scores.std()},
      tags={'algorithm': 'random_forest', 'n_estimators': 50}
  )

  # Version 2: More trees
  pipeline_v2 = Pipeline([
      ('scaler', StandardScaler()),
      ('model', RandomForestClassifier(n_estimators=200, random_state=42))
  ])
  pipeline_v2.fit(X_train, y_train)
  cv_scores = cross_val_score(pipeline_v2, X_train, y_train, cv=5)

  registry.register_model(
      name='breast_cancer_classifier',
      pipeline=pipeline_v2,
      metrics={'cv_accuracy': cv_scores.mean(), 'cv_std': cv_scores.std()},
      tags={'algorithm': 'random_forest', 'n_estimators': 200}
  )

  # Version 3: Logistic Regression
  pipeline_v3 = Pipeline([
      ('scaler', StandardScaler()),
      ('model', LogisticRegression(max_iter=1000, random_state=42))
  ])
  pipeline_v3.fit(X_train, y_train)
  cv_scores = cross_val_score(pipeline_v3, X_train, y_train, cv=5)

  registry.register_model(
      name='breast_cancer_classifier',
      pipeline=pipeline_v3,
      metrics={'cv_accuracy': cv_scores.mean(), 'cv_std': cv_scores.std()},
      tags={'algorithm': 'logistic_regression'}
  )

  # Step 3: List versions
  print("\n2️⃣ MODEL VERSIONS")
  print("-"*40)
  versions = registry.list_versions('breast_cancer_classifier')
  for v in versions:
      print(f"  {v['version']}: {v['stage']} - CV Accuracy: {v['metrics']['cv_accuracy']:.4f}")

  # Step 4: Compare versions
  print("\n3️⃣ VERSION COMPARISON")
  print("-"*40)
  comparison = registry.compare_versions('breast_cancer_classifier', X_test, y_test)
  print(comparison.to_string(index=False))

  # Step 5: Promote best version
  print("\n4️⃣ PROMOTION WORKFLOW")
  print("-"*40)

  best_version = comparison.loc[comparison['test_accuracy'].idxmax(), 'version']
  print(f"Best version: {best_version}")

  registry.promote_to_staging('breast_cancer_classifier', best_version)
  registry.promote_to_production('breast_cancer_classifier', best_version)

  # Step 6: Use production model
  print("\n5️⃣ USING PRODUCTION MODEL")
  print("-"*40)

  prod_model = registry.get_production_model('breast_cancer_classifier')
  prod_accuracy = accuracy_score(y_test, prod_model.predict(X_test))
  print(f"Production model accuracy: {prod_accuracy:.4f}")

  print("\n✅ Model versioning complete!")
  ```

  **What you learned:**

  * Model versioning enables experimentation and rollback
  * Staging environments catch issues before production
  * Registries track model lineage and metrics
</details>

### Project 4: Monitoring Dashboard

Create a simple model monitoring system.

<details>
  <summary>View Complete Solution</summary>

  ```python theme={null}
  import numpy as np
  import pandas as pd
  import matplotlib.pyplot as plt
  from datetime import datetime, timedelta
  from collections import deque
  from sklearn.datasets import load_breast_cancer
  from sklearn.model_selection import train_test_split
  from sklearn.ensemble import RandomForestClassifier
  from sklearn.preprocessing import StandardScaler
  from sklearn.pipeline import Pipeline
  import json

  class ModelMonitor:
      """Simple model monitoring system"""
      
      def __init__(self, model, reference_data, max_history=1000):
          self.model = model
          self.reference_data = reference_data
          self.reference_predictions = model.predict(reference_data)
          self.reference_probas = model.predict_proba(reference_data)[:, 1]
          
          # Prediction history
          self.predictions = deque(maxlen=max_history)
          self.probabilities = deque(maxlen=max_history)
          self.timestamps = deque(maxlen=max_history)
          self.latencies = deque(maxlen=max_history)
          
          # Alerts
          self.alerts = []
          
          # Thresholds
          self.latency_threshold_ms = 100
          self.drift_threshold = 0.1
      
      def predict_with_monitoring(self, X):
          """Make prediction with monitoring"""
          start_time = datetime.now()
          
          # Make prediction
          predictions = self.model.predict(X)
          probabilities = self.model.predict_proba(X)[:, 1]
          
          # Calculate latency
          latency_ms = (datetime.now() - start_time).total_seconds() * 1000
          
          # Store metrics
          for pred, prob in zip(predictions, probabilities):
              self.predictions.append(pred)
              self.probabilities.append(prob)
              self.timestamps.append(datetime.now())
              self.latencies.append(latency_ms / len(predictions))
          
          # Check for issues
          self._check_latency(latency_ms)
          self._check_prediction_drift()
          
          return predictions
      
      def _check_latency(self, latency_ms):
          """Check for latency issues"""
          if latency_ms > self.latency_threshold_ms:
              self.alerts.append({
                  'type': 'latency',
                  'message': f'High latency: {latency_ms:.2f}ms',
                  'timestamp': datetime.now().isoformat(),
                  'severity': 'warning'
              })
      
      def _check_prediction_drift(self):
          """Check for prediction distribution drift"""
          if len(self.predictions) < 100:
              return
          
          recent_predictions = list(self.predictions)[-100:]
          recent_positive_rate = np.mean(recent_predictions)
          reference_positive_rate = np.mean(self.reference_predictions)
          
          drift = abs(recent_positive_rate - reference_positive_rate)
          
          if drift > self.drift_threshold:
              self.alerts.append({
                  'type': 'drift',
                  'message': f'Prediction drift detected: {drift:.2%}',
                  'timestamp': datetime.now().isoformat(),
                  'severity': 'critical' if drift > 0.2 else 'warning',
                  'details': {
                      'reference_rate': reference_positive_rate,
                      'current_rate': recent_positive_rate
                  }
              })
      
      def get_metrics(self):
          """Get current metrics"""
          if not self.predictions:
              return {}
          
          predictions_list = list(self.predictions)
          probabilities_list = list(self.probabilities)
          latencies_list = list(self.latencies)
          
          return {
              'total_predictions': len(predictions_list),
              'positive_rate': np.mean(predictions_list),
              'avg_probability': np.mean(probabilities_list),
              'avg_latency_ms': np.mean(latencies_list),
              'p99_latency_ms': np.percentile(latencies_list, 99) if len(latencies_list) > 10 else None,
              'last_prediction_time': self.timestamps[-1].isoformat() if self.timestamps else None
          }
      
      def get_alerts(self, severity=None):
          """Get alerts, optionally filtered by severity"""
          if severity:
              return [a for a in self.alerts if a['severity'] == severity]
          return self.alerts
      
      def generate_report(self):
          """Generate monitoring report"""
          metrics = self.get_metrics()
          
          report = {
              'generated_at': datetime.now().isoformat(),
              'metrics': metrics,
              'alerts': {
                  'total': len(self.alerts),
                  'critical': len([a for a in self.alerts if a['severity'] == 'critical']),
                  'warning': len([a for a in self.alerts if a['severity'] == 'warning'])
              },
              'reference_stats': {
                  'positive_rate': float(np.mean(self.reference_predictions)),
                  'avg_probability': float(np.mean(self.reference_probas))
              }
          }
          
          return report
      
      def plot_dashboard(self):
          """Create monitoring dashboard"""
          fig, axes = plt.subplots(2, 2, figsize=(14, 10))
          
          # Prediction distribution over time
          ax1 = axes[0, 0]
          if len(self.predictions) > 0:
              predictions_list = list(self.predictions)
              window_size = min(50, len(predictions_list))
              rolling_positive_rate = pd.Series(predictions_list).rolling(window_size).mean()
              ax1.plot(rolling_positive_rate, 'b-', label='Rolling positive rate')
              ax1.axhline(y=np.mean(self.reference_predictions), color='r', linestyle='--', 
                         label='Reference rate')
              ax1.set_ylabel('Positive Rate')
              ax1.set_title('Prediction Distribution Over Time')
              ax1.legend()
              ax1.set_ylim(0, 1)
          
          # Latency over time
          ax2 = axes[0, 1]
          if len(self.latencies) > 0:
              ax2.plot(list(self.latencies), 'g-', alpha=0.7)
              ax2.axhline(y=self.latency_threshold_ms, color='r', linestyle='--', 
                         label=f'Threshold ({self.latency_threshold_ms}ms)')
              ax2.set_ylabel('Latency (ms)')
              ax2.set_title('Prediction Latency')
              ax2.legend()
          
          # Probability distribution
          ax3 = axes[1, 0]
          if len(self.probabilities) > 0:
              ax3.hist(list(self.probabilities), bins=20, alpha=0.7, label='Recent', color='blue')
              ax3.hist(self.reference_probas, bins=20, alpha=0.5, label='Reference', color='red')
              ax3.set_xlabel('Probability')
              ax3.set_ylabel('Count')
              ax3.set_title('Probability Distribution Comparison')
              ax3.legend()
          
          # Alerts timeline
          ax4 = axes[1, 1]
          if self.alerts:
              alert_times = [datetime.fromisoformat(a['timestamp']) for a in self.alerts[-50:]]
              alert_types = [a['type'] for a in self.alerts[-50:]]
              colors = ['red' if a['severity'] == 'critical' else 'orange' for a in self.alerts[-50:]]
              
              ax4.scatter(range(len(alert_times)), [0.5]*len(alert_times), c=colors, s=100, marker='s')
              for i, atype in enumerate(alert_types):
                  ax4.annotate(atype, (i, 0.6), rotation=45, fontsize=8)
              ax4.set_yticks([])
              ax4.set_title('Alert Timeline')
          else:
              ax4.text(0.5, 0.5, 'No alerts', ha='center', va='center', fontsize=14)
              ax4.set_title('Alert Timeline')
          
          plt.tight_layout()
          plt.savefig('monitoring_dashboard.png', dpi=150)
          return fig

  # Step 1: Setup
  print("="*60)
  print("📊 MODEL MONITORING DASHBOARD")
  print("="*60)

  # Train model
  cancer = load_breast_cancer()
  X_train, X_test, y_train, y_test = train_test_split(
      cancer.data, cancer.target, test_size=0.2, random_state=42
  )

  pipeline = Pipeline([
      ('scaler', StandardScaler()),
      ('model', RandomForestClassifier(n_estimators=100, random_state=42))
  ])
  pipeline.fit(X_train, y_train)

  # Create monitor
  monitor = ModelMonitor(pipeline, X_train)

  # Step 2: Simulate production traffic
  print("\n1️⃣ SIMULATING PRODUCTION TRAFFIC")
  print("-"*40)

  # Normal traffic
  print("Normal traffic...")
  for _ in range(500):
      idx = np.random.randint(0, len(X_test), size=5)
      monitor.predict_with_monitoring(X_test[idx])

  # Simulate drift (modify data)
  print("Simulating data drift...")
  X_drifted = X_test.copy()
  X_drifted[:, 0] += 5  # Shift first feature

  for _ in range(200):
      idx = np.random.randint(0, len(X_drifted), size=5)
      monitor.predict_with_monitoring(X_drifted[idx])

  # Step 3: Get metrics
  print("\n2️⃣ CURRENT METRICS")
  print("-"*40)
  metrics = monitor.get_metrics()
  for key, value in metrics.items():
      print(f"  {key}: {value}")

  # Step 4: Check alerts
  print("\n3️⃣ ALERTS")
  print("-"*40)
  alerts = monitor.get_alerts()
  print(f"Total alerts: {len(alerts)}")
  print(f"Critical: {len([a for a in alerts if a['severity'] == 'critical'])}")
  print(f"Warning: {len([a for a in alerts if a['severity'] == 'warning'])}")

  if alerts:
      print("\nRecent alerts:")
      for alert in alerts[-5:]:
          print(f"  [{alert['severity'].upper()}] {alert['type']}: {alert['message']}")

  # Step 5: Generate report
  print("\n4️⃣ MONITORING REPORT")
  print("-"*40)
  report = monitor.generate_report()
  print(json.dumps(report, indent=2, default=str))

  # Step 6: Create dashboard
  print("\n5️⃣ CREATING DASHBOARD")
  print("-"*40)
  monitor.plot_dashboard()
  print("Dashboard saved to monitoring_dashboard.png")

  print("\n✅ Monitoring complete!")
  ```

  **What you learned:**

  * Monitor prediction distributions to catch data drift
  * Track latency to ensure SLA compliance
  * Alerts enable proactive problem detection
</details>

***

## Key Takeaways

<CardGroup cols={2}>
  <Card title="Save the Pipeline" icon="box">
    Include all preprocessing with the model
  </Card>

  <Card title="API = Interface" icon="plug">
    FastAPI makes serving models easy
  </Card>

  <Card title="Docker = Portability" icon="docker">
    Same environment everywhere
  </Card>

  <Card title="Monitor = Trust" icon="chart-line">
    Know when your model degrades
  </Card>
</CardGroup>

***

## What's Next?

We have more advanced topics to explore! Let's learn about time series forecasting.

<Card title="Continue to Time Series" icon="arrow-right" href="/courses/ml-mastery/15-time-series">
  Predict the future from sequential data
</Card>

***

## Interview Deep-Dive

<AccordionGroup>
  <Accordion title="Walk me through how you would deploy an ML model that needs to serve 10,000 predictions per second with P99 latency under 50ms.">
    This is fundamentally an engineering problem, not an ML problem. The model accuracy is already decided at training time -- deployment is about serving reliably at scale.

    * **Model optimization first.** Before touching infrastructure, I would profile the model. A Random Forest with 500 trees at depth 20 is going to be 10x slower than one with 50 trees at depth 10. I would benchmark the accuracy drop from model simplification and see if the trade-off is acceptable. Often, you can cut model size by 80% with less than 1% accuracy loss.
    * **Batch predictions where possible.** If predictions are not real-time (e.g., daily churn scoring), precompute them in a batch job and serve from a cache. This is the cheapest and most reliable path. Only build real-time serving if the use case demands it.
    * **For real-time: containerize with a lightweight framework.** I would use FastAPI or a gRPC service inside a Docker container, deployed on Kubernetes with horizontal pod autoscaling. The model loads into memory once at startup, and each request is just a numpy operation.
    * **Connection pooling and async I/O.** If features come from a database or feature store, the network call is usually the bottleneck, not the model. Use connection pooling and async requests to parallelize feature fetching.
    * **Load testing before launch.** Use Locust or k6 to simulate 10K RPS and measure P50, P95, and P99 latencies. If P99 exceeds 50ms, the usual culprits are garbage collection pauses, cold starts, or feature computation latency -- not the model inference itself.

    **Follow-up: What happens when the model needs to be updated without downtime?**

    Blue-green deployment is the standard pattern. You maintain two identical environments. The "blue" environment runs the current model, the "green" environment gets the new model deployed and tested. Once the green model passes smoke tests and shadow-mode validation, you switch the load balancer to route traffic from blue to green. If anything goes wrong, you switch back instantly. The key detail most people miss: you also need to version your preprocessing pipeline. If the new model expects different feature transformations than the old model, switching models mid-flight will produce garbage predictions.
  </Accordion>

  <Accordion title="Your deployed model's accuracy has dropped 8% over the past month but no code changes were made. What is your debugging process?">
    No code changes means the model itself has not changed, so the issue is almost certainly in the data. My debugging process follows a systematic funnel:

    * **Step 1: Confirm the drop is real.** Check if the evaluation data itself is reliable. Has the labeling process changed? Is there a new data source contributing noisier labels? A perceived accuracy drop could be a labeling quality issue, not a model issue.
    * **Step 2: Check input feature distributions.** Compare each feature's distribution in the recent prediction window against the training data. Use Population Stability Index (PSI) or Kolmogorov-Smirnov tests. If a feature like "average\_transaction\_amount" shifted because of inflation or a pricing change, the model is seeing inputs outside its training domain.
    * **Step 3: Check for upstream data pipeline issues.** Missing values that were previously imputed, schema changes in upstream tables, timestamp timezone bugs, or a feature that silently started returning nulls -- these are the most common silent killers. I have seen a model degrade because an upstream team changed a column from integer to float, which caused a downstream join to silently drop rows.
    * **Step 4: Segment the performance drop.** Is accuracy down across all segments, or just one? If it dropped only for a specific customer cohort or geographic region, you have a targeted drift problem. If it dropped everywhere uniformly, it is more likely a global data issue.
    * **Step 5: Retrain on recent data and compare.** If a freshly trained model on the last 3 months of data recovers the lost accuracy, the diagnosis is confirmed: data drift. Set up a regular retraining cadence and monitoring alerts.

    **Follow-up: How would you set up monitoring to catch this earlier next time?**

    I would implement three layers of monitoring. First, input monitoring -- track feature distributions and alert when PSI exceeds a threshold (I typically start with 0.2). Second, output monitoring -- track the distribution of predicted probabilities. If the model suddenly starts predicting everything as low-risk, that is a red flag even before you get ground truth labels. Third, performance monitoring -- as soon as labels become available (even with a delay), compute accuracy, precision, and recall on a rolling window and alert on degradation. The goal is to detect drift within days, not months.
  </Accordion>

  <Accordion title="How do you handle the training-serving skew problem -- where features computed during training differ subtly from features computed at serving time?">
    Training-serving skew is one of the top reasons ML models fail silently in production, and it is notoriously hard to debug because the model does not throw errors -- it just makes slightly worse predictions.

    * **Root cause: dual code paths.** During training, features are computed in batch with pandas on a data warehouse. During serving, the same features need to be computed in real-time, often in a different language or framework. Even minor differences -- different null handling, different timestamp rounding, different string encoding -- produce skew.
    * **Solution 1: Feature stores.** Use a feature store (Feast, Tecton, Hopsworks) that provides a single feature definition used identically for both training and serving. The feature store computes features once, stores them, and serves the same values to both the training pipeline and the prediction endpoint. This eliminates the dual code path problem entirely.
    * **Solution 2: Pipeline as the single source of truth.** If a feature store is too heavy, package your feature engineering as a shared library that both the training job and the serving endpoint import. One codebase, one behavior.
    * **Solution 3: Log-and-compare.** In the serving path, log the actual feature values fed to the model for a sample of requests. Periodically compare these logged features against what the training pipeline would produce for the same raw inputs. Any discrepancy is skew.

    The worst kind of skew is the "off-by-one" temporal bug. For example, during training you compute a "last 7 days average" that accidentally includes the current day (leaking the label). At serving time, the current day is not yet complete, so the feature has a different value. The model was trained with a leak, and serving without the leak makes it worse -- but fixing the leak and retraining will also degrade accuracy because the model was never learning the real signal.

    **Follow-up: How do you detect training-serving skew in a model that is already in production?**

    The most reliable method is shadow scoring. Periodically take a batch of recent production requests, run them through both the serving pipeline and the training pipeline, and compare the feature vectors. Any systematic difference -- even small ones -- is skew. I would also compare the distribution of prediction scores between training data and recent serving data. If the training data has a mean prediction of 0.35 and serving data has 0.42, that gap could be skew, drift, or both -- but it warrants investigation.
  </Accordion>

  <Accordion title="You need to deploy a model in an environment where the prediction latency budget is only 5ms. What trade-offs would you consider?">
    Five milliseconds is tight. That is roughly the time for a single database round-trip, so every architectural decision matters.

    * **Model choice is constrained.** Deep ensembles with hundreds of trees are out. I would look at logistic regression, small gradient boosting models (under 50 trees, shallow depth), or even a precomputed lookup table if the feature space is discrete and bounded. A single decision tree with depth 6 can serve in microseconds.
    * **Feature computation budget.** If features require any I/O (database lookups, external API calls), that will blow the 5ms budget immediately. All features must either be precomputed and cached in memory, or computable from the request payload alone.
    * **Model distillation.** Train a complex model offline (XGBoost with 1000 trees), then use it to label a large dataset. Train a simple model (logistic regression) on those labels. The simple model learns to mimic the complex model and can serve in under 1ms. You lose some accuracy but gain massive latency improvement.
    * **ONNX or compiled models.** Convert the sklearn model to ONNX format and serve with ONNX Runtime. This eliminates Python's interpreter overhead and can give 2-5x speedup. For extreme cases, compile the model to C code using tools like sklearn-onnx or treelite.
    * **Avoid Python's GIL bottleneck.** If you are serving many concurrent requests in Python, the Global Interpreter Lock serializes CPU-bound work. Consider a Rust or C++ serving layer, or use multiprocess workers instead of multithreaded ones.

    The fundamental trade-off is accuracy versus latency. I would quantify this explicitly: "If we use the full model at 50ms, we get 92% accuracy. If we use the distilled model at 3ms, we get 89% accuracy. Is that 3% worth the 10x latency cost?" That is a business decision, not a technical one.

    **Follow-up: How would you benchmark and validate that the simplified model meets the latency requirement under production load?**

    I would set up a realistic load test, not just a single-request benchmark. Single-request latency is misleading because it does not account for contention, garbage collection, or memory pressure under load. I would use a tool like Locust to simulate the expected concurrent request rate, measure P50, P95, and P99 latencies, and validate that P99 stays under 5ms. I would run this test for at least 10 minutes to catch GC pauses and memory leaks. The benchmark should also include the full request path -- network, deserialization, feature computation, model inference, and serialization -- not just the model.predict() call.
  </Accordion>
</AccordionGroup>
