Anomaly Detection
Catching the Unusual
Your credit card company calls: “Did you just spend $5,000 at a jewelry store in another country?” That’s anomaly detection in action—finding the needle in the haystack, the fraudulent transaction among millions of legitimate ones.Estimated Time: 3-4 hours
Difficulty: Intermediate
Prerequisites: Clustering, Dimensionality Reduction
Tools: scikit-learn, PyOD
Difficulty: Intermediate
Prerequisites: Clustering, Dimensionality Reduction
Tools: scikit-learn, PyOD
Types of Anomalies
| Type | Description | Example |
|---|---|---|
| Point Anomaly | Single data point differs | One transaction of 10,000whenaverageis100 |
| Contextual Anomaly | Anomalous in context | High AC usage in winter |
| Collective Anomaly | Group of related points | Network intrusion pattern |
Copy
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import make_blobs
# Generate normal data with anomalies
np.random.seed(42)
X_normal, _ = make_blobs(n_samples=300, centers=1, cluster_std=0.5, random_state=42)
X_anomalies = np.random.uniform(-4, 4, (10, 2)) # Random outliers
X = np.vstack([X_normal, X_anomalies])
labels = np.array([0]*300 + [1]*10) # 0=normal, 1=anomaly
plt.figure(figsize=(10, 6))
plt.scatter(X[labels==0, 0], X[labels==0, 1], c='blue', label='Normal', alpha=0.5)
plt.scatter(X[labels==1, 0], X[labels==1, 1], c='red', s=100, label='Anomaly', marker='x')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Point Anomalies in 2D Data')
plt.legend()
plt.show()
Approach 1: Statistical Methods
Z-Score (Standard Deviation)
Simple but effective for univariate data: zi=σxi−μ Points with ∣z∣>3 are typically considered anomalies.Copy
import numpy as np
import matplotlib.pyplot as plt
# Transaction amounts
np.random.seed(42)
normal_transactions = np.random.exponential(100, 1000) # Normal: mean ~$100
fraud_transactions = np.array([5000, 7500, 10000, 8000, 6000]) # Fraud
all_transactions = np.concatenate([normal_transactions, fraud_transactions])
# Z-score detection
mean = np.mean(all_transactions)
std = np.std(all_transactions)
z_scores = (all_transactions - mean) / std
threshold = 3
anomalies = np.abs(z_scores) > threshold
print(f"Mean: ${mean:.2f}, Std: ${std:.2f}")
print(f"Detected anomalies: {np.sum(anomalies)}")
print(f"Anomaly values: {all_transactions[anomalies]}")
# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# Distribution
axes[0].hist(all_transactions, bins=50, edgecolor='black', alpha=0.7)
axes[0].axvline(mean + threshold*std, color='red', linestyle='--', label=f'Z={threshold}')
axes[0].axvline(mean - threshold*std, color='red', linestyle='--')
axes[0].set_xlabel('Transaction Amount ($)')
axes[0].set_ylabel('Frequency')
axes[0].set_title('Transaction Distribution')
axes[0].legend()
# Z-scores
axes[1].scatter(range(len(z_scores)), z_scores, c=['red' if a else 'blue' for a in anomalies], alpha=0.5)
axes[1].axhline(threshold, color='red', linestyle='--')
axes[1].axhline(-threshold, color='red', linestyle='--')
axes[1].set_xlabel('Transaction Index')
axes[1].set_ylabel('Z-Score')
axes[1].set_title('Z-Scores with Anomalies')
plt.tight_layout()
plt.show()
Limitation: Z-score assumes normal distribution. For skewed data, consider IQR or robust methods.
IQR (Interquartile Range)
More robust to outliers:Copy
def detect_anomalies_iqr(data, k=1.5):
"""
IQR-based anomaly detection.
k=1.5: moderate outliers
k=3.0: extreme outliers
"""
Q1 = np.percentile(data, 25)
Q3 = np.percentile(data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - k * IQR
upper_bound = Q3 + k * IQR
anomalies = (data < lower_bound) | (data > upper_bound)
return anomalies, lower_bound, upper_bound
anomalies_iqr, lower, upper = detect_anomalies_iqr(all_transactions)
print(f"IQR bounds: ${lower:.2f} to ${upper:.2f}")
print(f"Detected: {np.sum(anomalies_iqr)} anomalies")
Approach 2: Isolation Forest
Key Insight: Anomalies are easier to isolate than normal points. Randomly select a feature, randomly split—anomalies get isolated in fewer steps.Copy
from sklearn.ensemble import IsolationForest
import numpy as np
import matplotlib.pyplot as plt
# Generate data
np.random.seed(42)
X_normal = np.random.randn(300, 2) * 0.5
X_anomalies = np.random.uniform(-4, 4, (15, 2))
X = np.vstack([X_normal, X_anomalies])
true_labels = np.array([1]*300 + [-1]*15) # 1=normal, -1=anomaly
# Train Isolation Forest
iso_forest = IsolationForest(
n_estimators=100,
contamination=0.05, # Expected fraction of anomalies
random_state=42
)
predictions = iso_forest.fit_predict(X)
anomaly_scores = iso_forest.decision_function(X)
# Evaluate
from sklearn.metrics import confusion_matrix, classification_report
print("Confusion Matrix:")
print(confusion_matrix(true_labels, predictions))
print("\nClassification Report:")
print(classification_report(true_labels, predictions, target_names=['Anomaly', 'Normal']))
# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Predictions
colors = ['red' if p == -1 else 'blue' for p in predictions]
axes[0].scatter(X[:, 0], X[:, 1], c=colors, alpha=0.6)
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].set_title('Isolation Forest Predictions')
axes[0].legend(['Normal', 'Anomaly'])
# Anomaly scores
xx, yy = np.meshgrid(np.linspace(-5, 5, 100), np.linspace(-5, 5, 100))
Z = iso_forest.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
axes[1].contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='RdBu_r')
axes[1].contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
axes[1].scatter(X[:, 0], X[:, 1], c=colors, edgecolors='black', alpha=0.8)
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
axes[1].set_title('Anomaly Score Contours (Black=Boundary)')
plt.tight_layout()
plt.show()
Tuning Isolation Forest:
contamination: Estimated % of anomalies (if unknown, start with 0.01-0.1)n_estimators: More trees = more stable (100 is usually enough)max_samples: Smaller samples = faster training, might miss subtle patterns
Approach 3: Local Outlier Factor (LOF)
Key Insight: Compare each point’s density to its neighbors’ densities.Copy
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
import matplotlib.pyplot as plt
# Generate data with different density clusters
np.random.seed(42)
X1 = np.random.randn(100, 2) * 0.3 # Dense cluster
X2 = np.random.randn(100, 2) * 0.3 + [3, 3] # Another dense cluster
X_sparse = np.random.randn(20, 2) * 2 + [1.5, 1.5] # Sparse region
X_anomalies = np.array([[5, 5], [-2, 4], [4, -2], [6, 0]])
X = np.vstack([X1, X2, X_sparse, X_anomalies])
# Train LOF
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
predictions = lof.fit_predict(X)
scores = -lof.negative_outlier_factor_ # Higher = more anomalous
# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Predictions
colors = ['red' if p == -1 else 'blue' for p in predictions]
axes[0].scatter(X[:, 0], X[:, 1], c=colors, alpha=0.6)
axes[0].set_xlabel('Feature 1')
axes[0].set_ylabel('Feature 2')
axes[0].set_title('LOF Predictions')
# Anomaly scores as bubble sizes
sizes = (scores - scores.min()) / (scores.max() - scores.min()) * 500 + 10
axes[1].scatter(X[:, 0], X[:, 1], s=sizes, c=scores, cmap='RdYlBu_r', alpha=0.6)
axes[1].colorbar = plt.colorbar(axes[1].collections[0], ax=axes[1])
axes[1].set_xlabel('Feature 1')
axes[1].set_ylabel('Feature 2')
axes[1].set_title('LOF Scores (Larger = More Anomalous)')
plt.tight_layout()
plt.show()
print(f"Detected {np.sum(predictions == -1)} anomalies")
| Aspect | Isolation Forest | LOF |
|---|---|---|
| Speed | Fast | Slower (needs neighbors) |
| Local density | No | Yes |
| High dimensions | Good | May struggle |
| Best for | Global outliers | Local outliers in varied densities |
Approach 4: One-Class SVM
Train on normal data only, find the boundary:Copy
from sklearn.svm import OneClassSVM
import numpy as np
import matplotlib.pyplot as plt
# Training data (normal only)
np.random.seed(42)
X_train = np.random.randn(200, 2) * 0.5
# Test data (normal + anomalies)
X_test_normal = np.random.randn(50, 2) * 0.5
X_test_anomaly = np.random.uniform(-3, 3, (10, 2))
X_test = np.vstack([X_test_normal, X_test_anomaly])
y_test = np.array([1]*50 + [-1]*10)
# Train One-Class SVM
oc_svm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.1)
oc_svm.fit(X_train)
# Predict
predictions = oc_svm.predict(X_test)
# Evaluate
from sklearn.metrics import classification_report
print(classification_report(y_test, predictions, target_names=['Anomaly', 'Normal']))
# Visualize decision boundary
xx, yy = np.meshgrid(np.linspace(-4, 4, 100), np.linspace(-4, 4, 100))
Z = oc_svm.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='RdBu_r', alpha=0.5)
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
plt.scatter(X_train[:, 0], X_train[:, 1], c='blue', alpha=0.3, label='Training (Normal)')
plt.scatter(X_test[y_test==1, 0], X_test[y_test==1, 1], c='green', marker='o', label='Test Normal')
plt.scatter(X_test[y_test==-1, 0], X_test[y_test==-1, 1], c='red', marker='x', s=100, label='Test Anomaly')
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('One-Class SVM Decision Boundary')
plt.legend()
plt.show()
One-Class SVM Use Case: When you have lots of normal data but few or no labeled anomalies. Train on normal, detect deviations.
Approach 5: Autoencoders
Neural networks that learn to compress and reconstruct. Anomalies have high reconstruction error.Copy
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
# Create dataset
np.random.seed(42)
X_normal = np.random.randn(1000, 10)
X_anomaly = np.random.uniform(-5, 5, (50, 10))
X = np.vstack([X_normal, X_anomaly])
y = np.array([0]*1000 + [1]*50) # 0=normal, 1=anomaly
# Scale
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Simple autoencoder with numpy (for demonstration)
class SimpleAutoencoder:
def __init__(self, input_dim, encoding_dim=3):
self.W1 = np.random.randn(input_dim, encoding_dim) * 0.1
self.b1 = np.zeros(encoding_dim)
self.W2 = np.random.randn(encoding_dim, input_dim) * 0.1
self.b2 = np.zeros(input_dim)
def encode(self, X):
return np.tanh(X @ self.W1 + self.b1)
def decode(self, Z):
return Z @ self.W2 + self.b2
def forward(self, X):
Z = self.encode(X)
return self.decode(Z)
def train(self, X, epochs=100, lr=0.01):
for epoch in range(epochs):
# Forward
Z = self.encode(X)
X_reconstructed = self.decode(Z)
# Loss
loss = np.mean((X - X_reconstructed)**2)
# Backward (simplified gradient descent)
grad_output = 2 * (X_reconstructed - X) / len(X)
# Update decoder weights
self.W2 -= lr * Z.T @ grad_output
self.b2 -= lr * grad_output.sum(axis=0)
# Update encoder weights
grad_Z = grad_output @ self.W2.T
grad_Z *= (1 - Z**2) # tanh derivative
self.W1 -= lr * X.T @ grad_Z
self.b1 -= lr * grad_Z.sum(axis=0)
if epoch % 20 == 0:
print(f"Epoch {epoch}: Loss = {loss:.4f}")
def reconstruction_error(self, X):
X_reconstructed = self.forward(X)
return np.mean((X - X_reconstructed)**2, axis=1)
# Train on normal data only
ae = SimpleAutoencoder(input_dim=10, encoding_dim=3)
ae.train(X_scaled[:800], epochs=100, lr=0.01)
# Compute reconstruction errors
errors = ae.reconstruction_error(X_scaled)
# Threshold (95th percentile of training errors)
threshold = np.percentile(errors[:800], 95)
predictions = (errors > threshold).astype(int)
# Evaluate
from sklearn.metrics import classification_report, confusion_matrix
print(f"\nThreshold: {threshold:.4f}")
print("\nConfusion Matrix:")
print(confusion_matrix(y, predictions))
print("\nClassification Report:")
print(classification_report(y, predictions, target_names=['Normal', 'Anomaly']))
# Visualize
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(errors[y==0], bins=30, alpha=0.7, label='Normal', color='blue')
plt.hist(errors[y==1], bins=30, alpha=0.7, label='Anomaly', color='red')
plt.axvline(threshold, color='black', linestyle='--', label='Threshold')
plt.xlabel('Reconstruction Error')
plt.ylabel('Frequency')
plt.title('Error Distribution by Class')
plt.legend()
plt.subplot(1, 2, 2)
plt.scatter(range(len(errors)), errors, c=['red' if e > threshold else 'blue' for e in errors], alpha=0.5)
plt.axhline(threshold, color='black', linestyle='--')
plt.xlabel('Sample Index')
plt.ylabel('Reconstruction Error')
plt.title('Reconstruction Errors')
plt.tight_layout()
plt.show()
Real-World Application: Credit Card Fraud
Copy
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, roc_auc_score
import matplotlib.pyplot as plt
# Simulate credit card transactions
np.random.seed(42)
# Normal transactions
n_normal = 10000
normal = {
'amount': np.random.exponential(50, n_normal), # Most transactions are small
'hour': np.random.choice(range(24), n_normal, p=[0.01]*6 + [0.06]*12 + [0.04]*6), # Peak during day
'distance_from_home': np.random.exponential(5, n_normal),
'frequency_24h': np.random.poisson(3, n_normal),
}
# Fraudulent transactions
n_fraud = 100
fraud = {
'amount': np.random.uniform(500, 5000, n_fraud), # Large amounts
'hour': np.random.choice(range(24), n_fraud), # Random hours
'distance_from_home': np.random.uniform(100, 1000, n_fraud), # Far away
'frequency_24h': np.random.poisson(10, n_fraud), # Many transactions
}
# Combine
df_normal = pd.DataFrame(normal)
df_normal['is_fraud'] = 0
df_fraud = pd.DataFrame(fraud)
df_fraud['is_fraud'] = 1
df = pd.concat([df_normal, df_fraud], ignore_index=True)
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
print(f"Dataset: {len(df)} transactions, {df['is_fraud'].sum()} fraud ({df['is_fraud'].mean()*100:.2f}%)")
# Features and labels
X = df.drop('is_fraud', axis=1)
y = df['is_fraud']
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train Isolation Forest
iso_forest = IsolationForest(
n_estimators=100,
contamination=0.01,
random_state=42
)
predictions = iso_forest.fit_predict(X_scaled)
predictions = (predictions == -1).astype(int) # Convert to 0/1
# Evaluate
print("\nClassification Report:")
print(classification_report(y, predictions, target_names=['Normal', 'Fraud']))
# Anomaly scores for ROC
scores = -iso_forest.decision_function(X_scaled) # Higher = more anomalous
roc_auc = roc_auc_score(y, scores)
print(f"ROC-AUC: {roc_auc:.3f}")
# Visualize key features
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
features = ['amount', 'distance_from_home', 'frequency_24h', 'hour']
for ax, feat in zip(axes.flatten(), features):
ax.hist(df.loc[df['is_fraud']==0, feat], bins=30, alpha=0.7, label='Normal', density=True)
ax.hist(df.loc[df['is_fraud']==1, feat], bins=30, alpha=0.7, label='Fraud', density=True)
ax.set_xlabel(feat)
ax.set_ylabel('Density')
ax.set_title(f'{feat} Distribution')
ax.legend()
plt.tight_layout()
plt.show()
Comparing Methods
Copy
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np
import time
# Use our fraud dataset
methods = {
'Isolation Forest': IsolationForest(contamination=0.01, random_state=42),
'LOF': LocalOutlierFactor(contamination=0.01, n_neighbors=20),
'One-Class SVM': OneClassSVM(nu=0.01, kernel='rbf')
}
results = []
for name, model in methods.items():
start = time.time()
if name == 'LOF':
preds = model.fit_predict(X_scaled)
else:
model.fit(X_scaled)
preds = model.predict(X_scaled)
preds = (preds == -1).astype(int)
elapsed = time.time() - start
results.append({
'Method': name,
'Precision': precision_score(y, preds),
'Recall': recall_score(y, preds),
'F1': f1_score(y, preds),
'Time (s)': elapsed
})
results_df = pd.DataFrame(results)
print(results_df.to_string(index=False))
Summary
| Method | Best For | Pros | Cons |
|---|---|---|---|
| Z-Score/IQR | Univariate, known distributions | Simple, interpretable | Assumes distribution |
| Isolation Forest | High-dimensional, fast | Scalable, no assumptions | Contamination param critical |
| LOF | Local density variations | Captures local patterns | Slow on large data |
| One-Class SVM | Training on normal only | Strong boundary | Slow, sensitive to kernel |
| Autoencoder | Complex patterns, deep learning | Learns complex patterns | Needs lots of data |
Best Practice: Start with Isolation Forest (fast, general-purpose), then try specialized methods if needed. Always validate with domain experts—what looks anomalous statistically may be normal business behavior!
Copy
# Quick template for anomaly detection
from sklearn.ensemble import IsolationForest
# Fit
detector = IsolationForest(contamination='auto', random_state=42)
detector.fit(X)
# Predict (-1 = anomaly, 1 = normal)
predictions = detector.predict(X_new)
# Anomaly scores (lower = more anomalous)
scores = detector.decision_function(X_new)