Skip to main content
ML at Scale

ML at Scale

From Laptop to Production

Your model works on 10,000 samples. What happens with 10 million?
Reality Check: Most ML tutorials use tiny datasets. Production is a different beast:
  • Training on terabytes of data
  • Serving 10,000+ predictions per second
  • Updating models without downtime
Estimated Time: 3-4 hours
Difficulty: Advanced
Prerequisites: All previous ML chapters, basic understanding of distributed systems
Focus: Real production patterns used at tech companies

Challenge 1: Data Doesn’t Fit in Memory

Solution: Batch Processing

import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import gc

# Simulate large dataset (in reality, this would be streaming from disk)
def data_generator(n_batches=100, batch_size=10000, n_features=100):
    """Simulate streaming data from disk/database."""
    for i in range(n_batches):
        X, y = make_classification(
            n_samples=batch_size,
            n_features=n_features,
            random_state=i
        )
        yield X, y
        # In production: read from parquet/csv chunks

# Incremental learning with SGD
model = SGDClassifier(loss='log_loss', random_state=42)

# First pass: compute scaling parameters
print("Pass 1: Computing scaling parameters...")
n_samples = 0
mean_sum = None
var_sum = None

for X, y in data_generator(n_batches=10):
    n_samples += len(X)
    if mean_sum is None:
        mean_sum = X.sum(axis=0)
        var_sum = (X**2).sum(axis=0)
    else:
        mean_sum += X.sum(axis=0)
        var_sum += (X**2).sum(axis=0)

mean = mean_sum / n_samples
var = (var_sum / n_samples) - mean**2
std = np.sqrt(var)

print(f"Computed stats from {n_samples} samples")

# Second pass: train model
print("\nPass 2: Training model...")
for epoch in range(3):
    batch_num = 0
    for X, y in data_generator(n_batches=100):
        # Scale data
        X_scaled = (X - mean) / std
        
        # Partial fit (incremental learning)
        model.partial_fit(X_scaled, y, classes=[0, 1])
        
        batch_num += 1
        if batch_num % 20 == 0:
            print(f"Epoch {epoch+1}, Batch {batch_num}")
    
    # Evaluate on held-out batch
    X_test, y_test = next(data_generator(n_batches=1, batch_size=10000))
    X_test_scaled = (X_test - mean) / std
    accuracy = model.score(X_test_scaled, y_test)
    print(f"Epoch {epoch+1} accuracy: {accuracy:.4f}")

print(f"\nTotal training samples: {100 * 10000 * 3:,}")

Solution: Dask for Out-of-Core Computing

# Using Dask for larger-than-memory datasets
import dask.dataframe as dd
import dask.array as da

# Read large CSV in chunks
# df = dd.read_csv('huge_dataset_*.csv')

# Simulate with numpy
import numpy as np

# Create large dataset
print("Creating simulated large dataset...")
n_total = 1_000_000
n_features = 50

# In production, this would be reading from disk
X_large = da.random.random((n_total, n_features), chunks=(100_000, n_features))
y_large = da.random.randint(0, 2, n_total, chunks=100_000)

# Compute mean and std lazily
mean = X_large.mean(axis=0)
std = X_large.std(axis=0)

print(f"Mean shape: {mean.shape}")
print(f"Computing statistics...")
mean_computed, std_computed = da.compute(mean, std)
print(f"Mean computed: {mean_computed[:5]}")

# For ML, use dask-ml
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split

# This works on datasets larger than memory!
# model = LogisticRegression()
# model.fit(X_large, y_large)

Challenge 2: Training Takes Too Long

Solution: Distributed Training

# Ray for distributed ML
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import time

# Single machine baseline
X, y = make_classification(n_samples=100000, n_features=50, random_state=42)

start = time.time()
rf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
rf.fit(X, y)
single_time = time.time() - start
print(f"Single machine: {single_time:.2f}s")

# With parallel trees (already using n_jobs=-1 above)
# For true distributed, use Ray or Spark MLlib

# Example Ray pattern (conceptual)
"""
import ray
from ray.train.sklearn import SklearnTrainer

ray.init()

trainer = SklearnTrainer(
    estimator=RandomForestClassifier(n_estimators=100),
    datasets={"train": train_data},
    scaling_config=ScalingConfig(
        num_workers=4,
        use_gpu=False
    )
)

result = trainer.fit()
"""

# GPU Acceleration with RAPIDS cuML (if GPU available)
"""
from cuml import RandomForestClassifier as cuRF

# GPU-accelerated training - 10-100x faster
rf_gpu = cuRF(n_estimators=100)
rf_gpu.fit(X_gpu, y_gpu)
"""

Solution: Model Selection at Scale

from sklearn.model_selection import RandomizedSearchCV, HalvingRandomSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from scipy.stats import randint, uniform
import time

X, y = make_classification(n_samples=10000, n_features=20, random_state=42)

param_dist = {
    'n_estimators': randint(50, 300),
    'max_depth': randint(3, 15),
    'learning_rate': uniform(0.01, 0.3),
    'subsample': uniform(0.6, 0.4)
}

# Standard RandomizedSearchCV (slow)
print("Standard RandomizedSearchCV...")
start = time.time()
random_search = RandomizedSearchCV(
    GradientBoostingClassifier(random_state=42),
    param_dist,
    n_iter=20,
    cv=3,
    random_state=42,
    n_jobs=-1
)
random_search.fit(X, y)
standard_time = time.time() - start
print(f"Time: {standard_time:.2f}s, Best: {random_search.best_score_:.4f}")

# HalvingRandomSearchCV (faster - successively halves candidates)
print("\nHalvingRandomSearchCV...")
start = time.time()
halving_search = HalvingRandomSearchCV(
    GradientBoostingClassifier(random_state=42),
    param_dist,
    factor=2,  # Halve candidates each round
    random_state=42,
    n_jobs=-1
)
halving_search.fit(X, y)
halving_time = time.time() - start
print(f"Time: {halving_time:.2f}s, Best: {halving_search.best_score_:.4f}")
print(f"Speedup: {standard_time/halving_time:.1f}x")

Challenge 3: Serving Predictions at Scale

Solution: Model Optimization

import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
import time
import pickle

# Train a model
X, y = make_classification(n_samples=10000, n_features=20, random_state=42)
X_test = np.random.randn(1000, 20)

# Large model
rf_large = RandomForestClassifier(n_estimators=500, max_depth=20, random_state=42)
rf_large.fit(X, y)

# Small model (distilled or optimized)
rf_small = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42)
rf_small.fit(X, y)

# Compare
def benchmark(model, X_test, name, n_iterations=100):
    """Benchmark prediction latency."""
    # Warmup
    model.predict(X_test[:10])
    
    times = []
    for _ in range(n_iterations):
        start = time.perf_counter()
        model.predict(X_test)
        times.append(time.perf_counter() - start)
    
    avg_time = np.mean(times) * 1000  # ms
    p99_time = np.percentile(times, 99) * 1000
    
    print(f"{name}:")
    print(f"  Avg latency: {avg_time:.2f}ms")
    print(f"  P99 latency: {p99_time:.2f}ms")
    print(f"  Throughput: {1000/avg_time * len(X_test):.0f} predictions/sec")
    
    # Model size
    model_bytes = len(pickle.dumps(model))
    print(f"  Model size: {model_bytes/1024:.1f}KB")
    
    return avg_time

t_large = benchmark(rf_large, X_test, "Large RF (500 trees, depth=20)")
print()
t_small = benchmark(rf_small, X_test, "Small RF (50 trees, depth=10)")

print(f"\nSpeedup: {t_large/t_small:.1f}x")

Solution: Batching for Throughput

import numpy as np
import time

# Simulate model prediction
class MockModel:
    def predict(self, X):
        # Simulate some computation
        return np.sum(X, axis=1) > 0

model = MockModel()

# Single predictions (slow)
def single_predictions(model, samples, n_samples):
    predictions = []
    for i in range(n_samples):
        pred = model.predict(samples[i:i+1])
        predictions.append(pred[0])
    return predictions

# Batched predictions (fast)
def batched_predictions(model, samples, batch_size=100):
    predictions = []
    for i in range(0, len(samples), batch_size):
        batch = samples[i:i+batch_size]
        preds = model.predict(batch)
        predictions.extend(preds)
    return predictions

# Benchmark
n_samples = 10000
samples = np.random.randn(n_samples, 100)

start = time.time()
single_predictions(model, samples, min(n_samples, 1000))  # Only 1000 for speed
single_time = time.time() - start

start = time.time()
batched_predictions(model, samples, batch_size=100)
batch_time = time.time() - start

print(f"Single predictions (1000 samples): {single_time:.4f}s")
print(f"Batched predictions ({n_samples} samples): {batch_time:.4f}s")
print(f"Estimated speedup: {(single_time * 10) / batch_time:.1f}x")

Solution: Caching Predictions

from functools import lru_cache
import hashlib
import numpy as np
import time

class CachedPredictor:
    """Predictor with result caching for repeated inputs."""
    
    def __init__(self, model, cache_size=10000):
        self.model = model
        self.cache = {}
        self.cache_size = cache_size
        self.hits = 0
        self.misses = 0
    
    def _hash_input(self, x):
        """Create hash key from input."""
        return hashlib.md5(x.tobytes()).hexdigest()
    
    def predict(self, X):
        """Predict with caching."""
        results = []
        
        for i in range(len(X)):
            x = X[i:i+1]
            key = self._hash_input(x)
            
            if key in self.cache:
                self.hits += 1
                results.append(self.cache[key])
            else:
                self.misses += 1
                pred = self.model.predict(x)[0]
                
                # LRU-style eviction
                if len(self.cache) >= self.cache_size:
                    oldest_key = next(iter(self.cache))
                    del self.cache[oldest_key]
                
                self.cache[key] = pred
                results.append(pred)
        
        return np.array(results)
    
    def cache_stats(self):
        total = self.hits + self.misses
        hit_rate = self.hits / total if total > 0 else 0
        return {
            'hits': self.hits,
            'misses': self.misses,
            'hit_rate': hit_rate
        }

# Demo
from sklearn.ensemble import RandomForestClassifier

X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)

cached_predictor = CachedPredictor(model)

# First pass (all misses)
predictions = cached_predictor.predict(X[:100])
print(f"First pass stats: {cached_predictor.cache_stats()}")

# Second pass (all hits if repeated inputs)
predictions = cached_predictor.predict(X[:100])
print(f"Second pass stats: {cached_predictor.cache_stats()}")

Challenge 4: Model Updates Without Downtime

Solution: Blue-Green Deployment

class BlueGreenDeployer:
    """
    Blue-Green deployment pattern for ML models.
    
    Always have two model versions:
    - Blue: current production model
    - Green: new model being tested
    """
    
    def __init__(self):
        self.blue_model = None  # Current production
        self.green_model = None  # Staging/testing
        self.active = 'blue'
        self.traffic_split = 1.0  # 100% to active model
    
    def deploy_to_green(self, new_model):
        """Deploy new model to green slot."""
        self.green_model = new_model
        print("New model deployed to GREEN slot")
    
    def canary_release(self, percentage):
        """
        Route percentage of traffic to green model.
        Gradually increase to catch issues early.
        """
        self.traffic_split = 1.0 - (percentage / 100)
        print(f"Traffic: {100-percentage}% BLUE, {percentage}% GREEN")
    
    def promote_green(self):
        """Swap green to blue (new becomes production)."""
        self.blue_model = self.green_model
        self.green_model = None
        self.traffic_split = 1.0
        print("GREEN promoted to BLUE (production)")
    
    def rollback(self):
        """Rollback: route all traffic to blue."""
        self.traffic_split = 1.0
        print("Rolled back to BLUE model")
    
    def predict(self, X):
        """Route prediction to appropriate model."""
        import numpy as np
        
        if np.random.random() < self.traffic_split:
            return self.blue_model.predict(X), 'blue'
        else:
            return self.green_model.predict(X), 'green'

# Demo
from sklearn.linear_model import LogisticRegression

X, y = make_classification(n_samples=1000, random_state=42)

# Current production model
model_v1 = LogisticRegression()
model_v1.fit(X, y)

# New model version
model_v2 = LogisticRegression(C=0.5)
model_v2.fit(X, y)

# Deployment
deployer = BlueGreenDeployer()
deployer.blue_model = model_v1
deployer.deploy_to_green(model_v2)

# Canary release
deployer.canary_release(10)  # 10% to new model

# Simulate predictions
results = {'blue': 0, 'green': 0}
for _ in range(1000):
    _, model_used = deployer.predict(X[:1])
    results[model_used] += 1

print(f"\nTraffic distribution: {results}")

Solution: Shadow Mode Testing

import numpy as np
from concurrent.futures import ThreadPoolExecutor
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ShadowModePredictor:
    """
    Run new model in shadow mode:
    - Production model serves real traffic
    - Shadow model runs in parallel, results logged but not served
    """
    
    def __init__(self, production_model, shadow_model):
        self.production_model = production_model
        self.shadow_model = shadow_model
        self.discrepancies = []
        self.executor = ThreadPoolExecutor(max_workers=2)
    
    def predict(self, X):
        """
        Serve production prediction while running shadow in parallel.
        """
        # Production prediction (served to user)
        prod_pred = self.production_model.predict(X)
        
        # Shadow prediction (async, not served)
        future = self.executor.submit(self._shadow_predict, X, prod_pred)
        
        return prod_pred
    
    def _shadow_predict(self, X, prod_pred):
        """Run shadow model and log discrepancies."""
        shadow_pred = self.shadow_model.predict(X)
        
        # Check for discrepancies
        if not np.array_equal(prod_pred, shadow_pred):
            discrepancy = {
                'input_hash': hash(X.tobytes()),
                'prod_pred': prod_pred.tolist(),
                'shadow_pred': shadow_pred.tolist()
            }
            self.discrepancies.append(discrepancy)
            
            # In production: log to monitoring system
            # logger.warning(f"Shadow discrepancy: {discrepancy}")
    
    def get_discrepancy_rate(self):
        """Get percentage of predictions that differ."""
        total = len(self.discrepancies)  # Simplified
        return total

# Demo
shadow_predictor = ShadowModePredictor(model_v1, model_v2)

# Run predictions
for i in range(100):
    X_sample = np.random.randn(1, 20)
    pred = shadow_predictor.predict(X_sample)

print(f"Discrepancies found: {shadow_predictor.get_discrepancy_rate()}")

Challenge 5: Feature Engineering at Scale

Solution: Feature Store Pattern

import numpy as np
from datetime import datetime, timedelta
import pandas as pd

class SimpleFeatureStore:
    """
    Simplified feature store for ML at scale.
    
    Real feature stores (Feast, Tecton) provide:
    - Consistent features for training and serving
    - Point-in-time correctness
    - Feature versioning
    """
    
    def __init__(self):
        self.feature_registry = {}
        self.feature_cache = {}
    
    def register_feature(self, name, computation_fn, dependencies=None):
        """Register a feature with its computation logic."""
        self.feature_registry[name] = {
            'fn': computation_fn,
            'dependencies': dependencies or []
        }
        print(f"Registered feature: {name}")
    
    def compute_feature(self, name, entity_data):
        """Compute feature value for given entities."""
        if name not in self.feature_registry:
            raise ValueError(f"Feature {name} not registered")
        
        # Check cache first
        cache_key = (name, hash(entity_data.tobytes()))
        if cache_key in self.feature_cache:
            return self.feature_cache[cache_key]
        
        # Compute dependencies first
        feature_def = self.feature_registry[name]
        dep_values = {}
        for dep in feature_def['dependencies']:
            dep_values[dep] = self.compute_feature(dep, entity_data)
        
        # Compute feature
        result = feature_def['fn'](entity_data, dep_values)
        
        # Cache result
        self.feature_cache[cache_key] = result
        
        return result
    
    def get_features(self, feature_names, entity_data):
        """Get multiple features for entities."""
        features = {}
        for name in feature_names:
            features[name] = self.compute_feature(name, entity_data)
        return features

# Demo
store = SimpleFeatureStore()

# Register features
store.register_feature(
    'amount_zscore',
    lambda data, deps: (data[:, 0] - data[:, 0].mean()) / data[:, 0].std()
)

store.register_feature(
    'amount_log',
    lambda data, deps: np.log1p(np.abs(data[:, 0]))
)

store.register_feature(
    'amount_ratio',
    lambda data, deps: data[:, 0] / (data[:, 1] + 1),
    dependencies=[]
)

# Use features
entity_data = np.random.randn(100, 5) * 100
features = store.get_features(['amount_zscore', 'amount_log', 'amount_ratio'], entity_data)

print("\nComputed features:")
for name, values in features.items():
    print(f"  {name}: shape={values.shape}, mean={values.mean():.4f}")

Summary: Production Checklist

Before Deployment

  • Model fits in memory on target hardware
  • Latency meets SLA requirements
  • Throughput handles peak traffic
  • Fallback strategy defined
  • Monitoring and alerting set up

During Deployment

  • Canary release (gradual rollout)
  • Shadow mode testing
  • A/B testing infrastructure
  • Feature flag controls

After Deployment

  • Monitor prediction latency (p50, p99)
  • Track prediction distribution drift
  • Alert on model staleness
  • Regular retraining pipeline
Key Insight: At scale, ML is 10% modeling and 90% engineering. The best model is worthless if it can’t serve predictions reliably.

Tools for Scale

ChallengeTools
Data ProcessingSpark, Dask, Ray
Distributed TrainingRay Train, Horovod, TF Distributed
GPU AccelerationRAPIDS cuML, NVIDIA Triton
Feature StoreFeast, Tecton, Hopsworks
Model ServingTF Serving, Triton, BentoML
OrchestrationKubeflow, MLflow, Airflow
MonitoringEvidently, Grafana, Prometheus
# Your scaling journey
print("""
┌─────────────────────────────────────────────────────────┐
│                  ML Scaling Roadmap                     │
├─────────────────────────────────────────────────────────┤
│  1. Laptop (0-100K samples)      → scikit-learn        │
│  2. Single server (100K-10M)     → Dask, joblib        │
│  3. Cluster (10M-1B)             → Spark, Ray          │
│  4. Real-time serving            → Triton, TF Serving  │
│  5. Global scale                 → Cloud ML platforms  │
└─────────────────────────────────────────────────────────┘
""")
Start Simple: Don’t over-engineer. Most projects never need distributed training. Scale incrementally as data and traffic grow.