ML at Scale
From Laptop to Production
Your model works on 10,000 samples. What happens with 10 million?Reality Check: Most ML tutorials use tiny datasets. Production is a different beast:
- Training on terabytes of data
- Serving 10,000+ predictions per second
- Updating models without downtime
Estimated Time: 3-4 hours
Difficulty: Advanced
Prerequisites: All previous ML chapters, basic understanding of distributed systems
Focus: Real production patterns used at tech companies
Difficulty: Advanced
Prerequisites: All previous ML chapters, basic understanding of distributed systems
Focus: Real production patterns used at tech companies
Challenge 1: Data Doesn’t Fit in Memory
Solution: Batch Processing
Copy
import numpy as np
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import gc
# Simulate large dataset (in reality, this would be streaming from disk)
def data_generator(n_batches=100, batch_size=10000, n_features=100):
"""Simulate streaming data from disk/database."""
for i in range(n_batches):
X, y = make_classification(
n_samples=batch_size,
n_features=n_features,
random_state=i
)
yield X, y
# In production: read from parquet/csv chunks
# Incremental learning with SGD
model = SGDClassifier(loss='log_loss', random_state=42)
# First pass: compute scaling parameters
print("Pass 1: Computing scaling parameters...")
n_samples = 0
mean_sum = None
var_sum = None
for X, y in data_generator(n_batches=10):
n_samples += len(X)
if mean_sum is None:
mean_sum = X.sum(axis=0)
var_sum = (X**2).sum(axis=0)
else:
mean_sum += X.sum(axis=0)
var_sum += (X**2).sum(axis=0)
mean = mean_sum / n_samples
var = (var_sum / n_samples) - mean**2
std = np.sqrt(var)
print(f"Computed stats from {n_samples} samples")
# Second pass: train model
print("\nPass 2: Training model...")
for epoch in range(3):
batch_num = 0
for X, y in data_generator(n_batches=100):
# Scale data
X_scaled = (X - mean) / std
# Partial fit (incremental learning)
model.partial_fit(X_scaled, y, classes=[0, 1])
batch_num += 1
if batch_num % 20 == 0:
print(f"Epoch {epoch+1}, Batch {batch_num}")
# Evaluate on held-out batch
X_test, y_test = next(data_generator(n_batches=1, batch_size=10000))
X_test_scaled = (X_test - mean) / std
accuracy = model.score(X_test_scaled, y_test)
print(f"Epoch {epoch+1} accuracy: {accuracy:.4f}")
print(f"\nTotal training samples: {100 * 10000 * 3:,}")
Solution: Dask for Out-of-Core Computing
Copy
# Using Dask for larger-than-memory datasets
import dask.dataframe as dd
import dask.array as da
# Read large CSV in chunks
# df = dd.read_csv('huge_dataset_*.csv')
# Simulate with numpy
import numpy as np
# Create large dataset
print("Creating simulated large dataset...")
n_total = 1_000_000
n_features = 50
# In production, this would be reading from disk
X_large = da.random.random((n_total, n_features), chunks=(100_000, n_features))
y_large = da.random.randint(0, 2, n_total, chunks=100_000)
# Compute mean and std lazily
mean = X_large.mean(axis=0)
std = X_large.std(axis=0)
print(f"Mean shape: {mean.shape}")
print(f"Computing statistics...")
mean_computed, std_computed = da.compute(mean, std)
print(f"Mean computed: {mean_computed[:5]}")
# For ML, use dask-ml
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
# This works on datasets larger than memory!
# model = LogisticRegression()
# model.fit(X_large, y_large)
Challenge 2: Training Takes Too Long
Solution: Distributed Training
Copy
# Ray for distributed ML
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
import time
# Single machine baseline
X, y = make_classification(n_samples=100000, n_features=50, random_state=42)
start = time.time()
rf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
rf.fit(X, y)
single_time = time.time() - start
print(f"Single machine: {single_time:.2f}s")
# With parallel trees (already using n_jobs=-1 above)
# For true distributed, use Ray or Spark MLlib
# Example Ray pattern (conceptual)
"""
import ray
from ray.train.sklearn import SklearnTrainer
ray.init()
trainer = SklearnTrainer(
estimator=RandomForestClassifier(n_estimators=100),
datasets={"train": train_data},
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=False
)
)
result = trainer.fit()
"""
# GPU Acceleration with RAPIDS cuML (if GPU available)
"""
from cuml import RandomForestClassifier as cuRF
# GPU-accelerated training - 10-100x faster
rf_gpu = cuRF(n_estimators=100)
rf_gpu.fit(X_gpu, y_gpu)
"""
Solution: Model Selection at Scale
Copy
from sklearn.model_selection import RandomizedSearchCV, HalvingRandomSearchCV
from sklearn.ensemble import GradientBoostingClassifier
from scipy.stats import randint, uniform
import time
X, y = make_classification(n_samples=10000, n_features=20, random_state=42)
param_dist = {
'n_estimators': randint(50, 300),
'max_depth': randint(3, 15),
'learning_rate': uniform(0.01, 0.3),
'subsample': uniform(0.6, 0.4)
}
# Standard RandomizedSearchCV (slow)
print("Standard RandomizedSearchCV...")
start = time.time()
random_search = RandomizedSearchCV(
GradientBoostingClassifier(random_state=42),
param_dist,
n_iter=20,
cv=3,
random_state=42,
n_jobs=-1
)
random_search.fit(X, y)
standard_time = time.time() - start
print(f"Time: {standard_time:.2f}s, Best: {random_search.best_score_:.4f}")
# HalvingRandomSearchCV (faster - successively halves candidates)
print("\nHalvingRandomSearchCV...")
start = time.time()
halving_search = HalvingRandomSearchCV(
GradientBoostingClassifier(random_state=42),
param_dist,
factor=2, # Halve candidates each round
random_state=42,
n_jobs=-1
)
halving_search.fit(X, y)
halving_time = time.time() - start
print(f"Time: {halving_time:.2f}s, Best: {halving_search.best_score_:.4f}")
print(f"Speedup: {standard_time/halving_time:.1f}x")
Challenge 3: Serving Predictions at Scale
Solution: Model Optimization
Copy
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
import time
import pickle
# Train a model
X, y = make_classification(n_samples=10000, n_features=20, random_state=42)
X_test = np.random.randn(1000, 20)
# Large model
rf_large = RandomForestClassifier(n_estimators=500, max_depth=20, random_state=42)
rf_large.fit(X, y)
# Small model (distilled or optimized)
rf_small = RandomForestClassifier(n_estimators=50, max_depth=10, random_state=42)
rf_small.fit(X, y)
# Compare
def benchmark(model, X_test, name, n_iterations=100):
"""Benchmark prediction latency."""
# Warmup
model.predict(X_test[:10])
times = []
for _ in range(n_iterations):
start = time.perf_counter()
model.predict(X_test)
times.append(time.perf_counter() - start)
avg_time = np.mean(times) * 1000 # ms
p99_time = np.percentile(times, 99) * 1000
print(f"{name}:")
print(f" Avg latency: {avg_time:.2f}ms")
print(f" P99 latency: {p99_time:.2f}ms")
print(f" Throughput: {1000/avg_time * len(X_test):.0f} predictions/sec")
# Model size
model_bytes = len(pickle.dumps(model))
print(f" Model size: {model_bytes/1024:.1f}KB")
return avg_time
t_large = benchmark(rf_large, X_test, "Large RF (500 trees, depth=20)")
print()
t_small = benchmark(rf_small, X_test, "Small RF (50 trees, depth=10)")
print(f"\nSpeedup: {t_large/t_small:.1f}x")
Solution: Batching for Throughput
Copy
import numpy as np
import time
# Simulate model prediction
class MockModel:
def predict(self, X):
# Simulate some computation
return np.sum(X, axis=1) > 0
model = MockModel()
# Single predictions (slow)
def single_predictions(model, samples, n_samples):
predictions = []
for i in range(n_samples):
pred = model.predict(samples[i:i+1])
predictions.append(pred[0])
return predictions
# Batched predictions (fast)
def batched_predictions(model, samples, batch_size=100):
predictions = []
for i in range(0, len(samples), batch_size):
batch = samples[i:i+batch_size]
preds = model.predict(batch)
predictions.extend(preds)
return predictions
# Benchmark
n_samples = 10000
samples = np.random.randn(n_samples, 100)
start = time.time()
single_predictions(model, samples, min(n_samples, 1000)) # Only 1000 for speed
single_time = time.time() - start
start = time.time()
batched_predictions(model, samples, batch_size=100)
batch_time = time.time() - start
print(f"Single predictions (1000 samples): {single_time:.4f}s")
print(f"Batched predictions ({n_samples} samples): {batch_time:.4f}s")
print(f"Estimated speedup: {(single_time * 10) / batch_time:.1f}x")
Solution: Caching Predictions
Copy
from functools import lru_cache
import hashlib
import numpy as np
import time
class CachedPredictor:
"""Predictor with result caching for repeated inputs."""
def __init__(self, model, cache_size=10000):
self.model = model
self.cache = {}
self.cache_size = cache_size
self.hits = 0
self.misses = 0
def _hash_input(self, x):
"""Create hash key from input."""
return hashlib.md5(x.tobytes()).hexdigest()
def predict(self, X):
"""Predict with caching."""
results = []
for i in range(len(X)):
x = X[i:i+1]
key = self._hash_input(x)
if key in self.cache:
self.hits += 1
results.append(self.cache[key])
else:
self.misses += 1
pred = self.model.predict(x)[0]
# LRU-style eviction
if len(self.cache) >= self.cache_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = pred
results.append(pred)
return np.array(results)
def cache_stats(self):
total = self.hits + self.misses
hit_rate = self.hits / total if total > 0 else 0
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': hit_rate
}
# Demo
from sklearn.ensemble import RandomForestClassifier
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
cached_predictor = CachedPredictor(model)
# First pass (all misses)
predictions = cached_predictor.predict(X[:100])
print(f"First pass stats: {cached_predictor.cache_stats()}")
# Second pass (all hits if repeated inputs)
predictions = cached_predictor.predict(X[:100])
print(f"Second pass stats: {cached_predictor.cache_stats()}")
Challenge 4: Model Updates Without Downtime
Solution: Blue-Green Deployment
Copy
class BlueGreenDeployer:
"""
Blue-Green deployment pattern for ML models.
Always have two model versions:
- Blue: current production model
- Green: new model being tested
"""
def __init__(self):
self.blue_model = None # Current production
self.green_model = None # Staging/testing
self.active = 'blue'
self.traffic_split = 1.0 # 100% to active model
def deploy_to_green(self, new_model):
"""Deploy new model to green slot."""
self.green_model = new_model
print("New model deployed to GREEN slot")
def canary_release(self, percentage):
"""
Route percentage of traffic to green model.
Gradually increase to catch issues early.
"""
self.traffic_split = 1.0 - (percentage / 100)
print(f"Traffic: {100-percentage}% BLUE, {percentage}% GREEN")
def promote_green(self):
"""Swap green to blue (new becomes production)."""
self.blue_model = self.green_model
self.green_model = None
self.traffic_split = 1.0
print("GREEN promoted to BLUE (production)")
def rollback(self):
"""Rollback: route all traffic to blue."""
self.traffic_split = 1.0
print("Rolled back to BLUE model")
def predict(self, X):
"""Route prediction to appropriate model."""
import numpy as np
if np.random.random() < self.traffic_split:
return self.blue_model.predict(X), 'blue'
else:
return self.green_model.predict(X), 'green'
# Demo
from sklearn.linear_model import LogisticRegression
X, y = make_classification(n_samples=1000, random_state=42)
# Current production model
model_v1 = LogisticRegression()
model_v1.fit(X, y)
# New model version
model_v2 = LogisticRegression(C=0.5)
model_v2.fit(X, y)
# Deployment
deployer = BlueGreenDeployer()
deployer.blue_model = model_v1
deployer.deploy_to_green(model_v2)
# Canary release
deployer.canary_release(10) # 10% to new model
# Simulate predictions
results = {'blue': 0, 'green': 0}
for _ in range(1000):
_, model_used = deployer.predict(X[:1])
results[model_used] += 1
print(f"\nTraffic distribution: {results}")
Solution: Shadow Mode Testing
Copy
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ShadowModePredictor:
"""
Run new model in shadow mode:
- Production model serves real traffic
- Shadow model runs in parallel, results logged but not served
"""
def __init__(self, production_model, shadow_model):
self.production_model = production_model
self.shadow_model = shadow_model
self.discrepancies = []
self.executor = ThreadPoolExecutor(max_workers=2)
def predict(self, X):
"""
Serve production prediction while running shadow in parallel.
"""
# Production prediction (served to user)
prod_pred = self.production_model.predict(X)
# Shadow prediction (async, not served)
future = self.executor.submit(self._shadow_predict, X, prod_pred)
return prod_pred
def _shadow_predict(self, X, prod_pred):
"""Run shadow model and log discrepancies."""
shadow_pred = self.shadow_model.predict(X)
# Check for discrepancies
if not np.array_equal(prod_pred, shadow_pred):
discrepancy = {
'input_hash': hash(X.tobytes()),
'prod_pred': prod_pred.tolist(),
'shadow_pred': shadow_pred.tolist()
}
self.discrepancies.append(discrepancy)
# In production: log to monitoring system
# logger.warning(f"Shadow discrepancy: {discrepancy}")
def get_discrepancy_rate(self):
"""Get percentage of predictions that differ."""
total = len(self.discrepancies) # Simplified
return total
# Demo
shadow_predictor = ShadowModePredictor(model_v1, model_v2)
# Run predictions
for i in range(100):
X_sample = np.random.randn(1, 20)
pred = shadow_predictor.predict(X_sample)
print(f"Discrepancies found: {shadow_predictor.get_discrepancy_rate()}")
Challenge 5: Feature Engineering at Scale
Solution: Feature Store Pattern
Copy
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
class SimpleFeatureStore:
"""
Simplified feature store for ML at scale.
Real feature stores (Feast, Tecton) provide:
- Consistent features for training and serving
- Point-in-time correctness
- Feature versioning
"""
def __init__(self):
self.feature_registry = {}
self.feature_cache = {}
def register_feature(self, name, computation_fn, dependencies=None):
"""Register a feature with its computation logic."""
self.feature_registry[name] = {
'fn': computation_fn,
'dependencies': dependencies or []
}
print(f"Registered feature: {name}")
def compute_feature(self, name, entity_data):
"""Compute feature value for given entities."""
if name not in self.feature_registry:
raise ValueError(f"Feature {name} not registered")
# Check cache first
cache_key = (name, hash(entity_data.tobytes()))
if cache_key in self.feature_cache:
return self.feature_cache[cache_key]
# Compute dependencies first
feature_def = self.feature_registry[name]
dep_values = {}
for dep in feature_def['dependencies']:
dep_values[dep] = self.compute_feature(dep, entity_data)
# Compute feature
result = feature_def['fn'](entity_data, dep_values)
# Cache result
self.feature_cache[cache_key] = result
return result
def get_features(self, feature_names, entity_data):
"""Get multiple features for entities."""
features = {}
for name in feature_names:
features[name] = self.compute_feature(name, entity_data)
return features
# Demo
store = SimpleFeatureStore()
# Register features
store.register_feature(
'amount_zscore',
lambda data, deps: (data[:, 0] - data[:, 0].mean()) / data[:, 0].std()
)
store.register_feature(
'amount_log',
lambda data, deps: np.log1p(np.abs(data[:, 0]))
)
store.register_feature(
'amount_ratio',
lambda data, deps: data[:, 0] / (data[:, 1] + 1),
dependencies=[]
)
# Use features
entity_data = np.random.randn(100, 5) * 100
features = store.get_features(['amount_zscore', 'amount_log', 'amount_ratio'], entity_data)
print("\nComputed features:")
for name, values in features.items():
print(f" {name}: shape={values.shape}, mean={values.mean():.4f}")
Summary: Production Checklist
Before Deployment
- Model fits in memory on target hardware
- Latency meets SLA requirements
- Throughput handles peak traffic
- Fallback strategy defined
- Monitoring and alerting set up
During Deployment
- Canary release (gradual rollout)
- Shadow mode testing
- A/B testing infrastructure
- Feature flag controls
After Deployment
- Monitor prediction latency (p50, p99)
- Track prediction distribution drift
- Alert on model staleness
- Regular retraining pipeline
Key Insight: At scale, ML is 10% modeling and 90% engineering. The best model is worthless if it can’t serve predictions reliably.
Tools for Scale
| Challenge | Tools |
|---|---|
| Data Processing | Spark, Dask, Ray |
| Distributed Training | Ray Train, Horovod, TF Distributed |
| GPU Acceleration | RAPIDS cuML, NVIDIA Triton |
| Feature Store | Feast, Tecton, Hopsworks |
| Model Serving | TF Serving, Triton, BentoML |
| Orchestration | Kubeflow, MLflow, Airflow |
| Monitoring | Evidently, Grafana, Prometheus |
Copy
# Your scaling journey
print("""
┌─────────────────────────────────────────────────────────┐
│ ML Scaling Roadmap │
├─────────────────────────────────────────────────────────┤
│ 1. Laptop (0-100K samples) → scikit-learn │
│ 2. Single server (100K-10M) → Dask, joblib │
│ 3. Cluster (10M-1B) → Spark, Ray │
│ 4. Real-time serving → Triton, TF Serving │
│ 5. Global scale → Cloud ML platforms │
└─────────────────────────────────────────────────────────┘
""")
Start Simple: Don’t over-engineer. Most projects never need distributed training. Scale incrementally as data and traffic grow.