Senior Level: Fault tolerance is what separates good systems from great ones. Interviewers expect senior engineers to design for failure from day one.
Design for Failure Mindset
Redundancy Patterns
Active-Passive (Standby)
Active-Active
Multi-Region Active-Active
Resilience Patterns
Circuit Breaker (Deep Dive)
Copy
from enum import Enum
from datetime import datetime, timedelta
import threading
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing fast
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""
Production-grade circuit breaker with:
- Failure threshold
- Success threshold for recovery
- Timeout for open state
- Thread safety
"""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 3,
timeout_seconds: int = 30
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if not self._can_execute():
raise CircuitOpenError("Circuit is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if timeout has passed
if datetime.now() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
# HALF_OPEN: allow limited requests
return True
def _on_success(self):
with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
# Service recovered!
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
# Failed during recovery test
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage with fallback
circuit = CircuitBreaker(failure_threshold=5, timeout_seconds=30)
def get_user_with_fallback(user_id):
try:
return circuit.call(user_service.get_user, user_id)
except CircuitOpenError:
# Return cached data or default
return cache.get(f"user:{user_id}") or {"id": user_id, "name": "Unknown"}
Retry Strategies
- Python
- JavaScript
Copy
import asyncio
import random
import time
from functools import wraps
from typing import Callable, TypeVar, Type, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import logging
T = TypeVar('T')
logger = logging.getLogger(__name__)
class RetryStrategy(Enum):
EXPONENTIAL = "exponential"
LINEAR = "linear"
FIBONACCI = "fibonacci"
DECORRELATED_JITTER = "decorrelated_jitter"
@dataclass
class RetryConfig:
max_retries: int = 5
base_delay: float = 1.0
max_delay: float = 60.0
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
jitter: bool = True
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
non_retryable_exceptions: Tuple[Type[Exception], ...] = ()
on_retry: Optional[Callable[[Exception, int, float], None]] = None
class RetryExhausted(Exception):
def __init__(self, last_exception: Exception, attempts: int):
self.last_exception = last_exception
self.attempts = attempts
super().__init__(f"All {attempts} retry attempts failed")
class RetryHandler:
"""Advanced retry handler with multiple strategies"""
def __init__(self, config: RetryConfig = None):
self.config = config or RetryConfig()
self._fib_cache = {0: 0, 1: 1}
self._last_delay = self.config.base_delay
def _fibonacci(self, n: int) -> int:
if n not in self._fib_cache:
self._fib_cache[n] = self._fibonacci(n - 1) + self._fibonacci(n - 2)
return self._fib_cache[n]
def _calculate_delay(self, attempt: int) -> float:
cfg = self.config
if cfg.strategy == RetryStrategy.EXPONENTIAL:
delay = cfg.base_delay * (2 ** attempt)
elif cfg.strategy == RetryStrategy.LINEAR:
delay = cfg.base_delay * (attempt + 1)
elif cfg.strategy == RetryStrategy.FIBONACCI:
delay = cfg.base_delay * self._fibonacci(attempt + 2)
elif cfg.strategy == RetryStrategy.DECORRELATED_JITTER:
# AWS recommended: sleep = min(cap, random(base, sleep * 3))
delay = random.uniform(cfg.base_delay, self._last_delay * 3)
self._last_delay = delay
else:
delay = cfg.base_delay
delay = min(delay, cfg.max_delay)
# Add standard jitter (not for decorrelated which has built-in)
if cfg.jitter and cfg.strategy != RetryStrategy.DECORRELATED_JITTER:
delay = delay * (0.5 + random.random())
return delay
def _should_retry(self, exception: Exception) -> bool:
# Check non-retryable first
if isinstance(exception, self.config.non_retryable_exceptions):
return False
return isinstance(exception, self.config.retryable_exceptions)
async def execute(self, func: Callable[[], T]) -> T:
"""Execute function with retry logic"""
last_exception = None
for attempt in range(self.config.max_retries):
try:
if asyncio.iscoroutinefunction(func):
return await func()
return func()
except Exception as e:
last_exception = e
if not self._should_retry(e):
raise
if attempt == self.config.max_retries - 1:
break
delay = self._calculate_delay(attempt)
logger.warning(
f"Attempt {attempt + 1}/{self.config.max_retries} failed: {e}. "
f"Retrying in {delay:.2f}s"
)
if self.config.on_retry:
self.config.on_retry(e, attempt + 1, delay)
await asyncio.sleep(delay)
raise RetryExhausted(last_exception, self.config.max_retries)
def __call__(self, func: Callable) -> Callable:
"""Use as decorator"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
return await self.execute(lambda: func(*args, **kwargs))
@wraps(func)
def sync_wrapper(*args, **kwargs):
return asyncio.run(self.execute(lambda: func(*args, **kwargs)))
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
# Retry with circuit breaker integration
class ResilientCaller:
"""Combines retry, circuit breaker, and timeout"""
def __init__(
self,
circuit_breaker: 'CircuitBreaker',
retry_config: RetryConfig = None,
timeout_seconds: float = 30.0
):
self.circuit = circuit_breaker
self.retry = RetryHandler(retry_config or RetryConfig(max_retries=3))
self.timeout = timeout_seconds
async def call(
self,
func: Callable,
fallback: Callable = None,
*args,
**kwargs
) -> Any:
"""Execute with full resilience pattern"""
async def wrapped():
async with asyncio.timeout(self.timeout):
return await self.circuit.call_async(
lambda: func(*args, **kwargs)
)
try:
return await self.retry.execute(wrapped)
except (RetryExhausted, CircuitBreakerOpenException) as e:
if fallback:
logger.warning(f"Using fallback due to: {e}")
return await fallback(*args, **kwargs) if asyncio.iscoroutinefunction(fallback) else fallback(*args, **kwargs)
raise
# Usage examples
@RetryHandler(RetryConfig(
max_retries=3,
strategy=RetryStrategy.EXPONENTIAL,
retryable_exceptions=(ConnectionError, TimeoutError)
))
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
# With metrics callback
def log_retry(exc: Exception, attempt: int, delay: float):
metrics.increment("retries", tags={"attempt": attempt})
retry_handler = RetryHandler(RetryConfig(
max_retries=5,
strategy=RetryStrategy.DECORRELATED_JITTER,
on_retry=log_retry
))
async def call_payment_api(amount: float):
return await retry_handler.execute(
lambda: payment_client.charge(amount)
)
Copy
// Retry strategies
const RetryStrategy = {
EXPONENTIAL: 'exponential',
LINEAR: 'linear',
FIBONACCI: 'fibonacci',
DECORRELATED_JITTER: 'decorrelated_jitter'
};
class RetryExhaustedError extends Error {
constructor(lastError, attempts) {
super(`All ${attempts} retry attempts failed`);
this.name = 'RetryExhaustedError';
this.lastError = lastError;
this.attempts = attempts;
}
}
class RetryHandler {
constructor(options = {}) {
this.config = {
maxRetries: options.maxRetries || 5,
baseDelay: options.baseDelay || 1000,
maxDelay: options.maxDelay || 60000,
strategy: options.strategy || RetryStrategy.EXPONENTIAL,
jitter: options.jitter !== false,
retryableErrors: options.retryableErrors || [Error],
nonRetryableErrors: options.nonRetryableErrors || [],
onRetry: options.onRetry || null,
shouldRetry: options.shouldRetry || null
};
this.fibCache = { 0: 0, 1: 1 };
this.lastDelay = this.config.baseDelay;
}
fibonacci(n) {
if (!(n in this.fibCache)) {
this.fibCache[n] = this.fibonacci(n - 1) + this.fibonacci(n - 2);
}
return this.fibCache[n];
}
calculateDelay(attempt) {
const { baseDelay, maxDelay, strategy, jitter } = this.config;
let delay;
switch (strategy) {
case RetryStrategy.EXPONENTIAL:
delay = baseDelay * Math.pow(2, attempt);
break;
case RetryStrategy.LINEAR:
delay = baseDelay * (attempt + 1);
break;
case RetryStrategy.FIBONACCI:
delay = baseDelay * this.fibonacci(attempt + 2);
break;
case RetryStrategy.DECORRELATED_JITTER:
delay = Math.random() * (this.lastDelay * 3 - baseDelay) + baseDelay;
this.lastDelay = delay;
break;
default:
delay = baseDelay;
}
delay = Math.min(delay, maxDelay);
if (jitter && strategy !== RetryStrategy.DECORRELATED_JITTER) {
delay = delay * (0.5 + Math.random());
}
return delay;
}
shouldRetryError(error) {
// Check non-retryable first
for (const ErrorClass of this.config.nonRetryableErrors) {
if (error instanceof ErrorClass) return false;
}
// Custom predicate
if (this.config.shouldRetry) {
return this.config.shouldRetry(error);
}
// Check retryable
for (const ErrorClass of this.config.retryableErrors) {
if (error instanceof ErrorClass) return true;
}
return false;
}
async execute(fn) {
let lastError;
for (let attempt = 0; attempt < this.config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (!this.shouldRetryError(error)) {
throw error;
}
if (attempt === this.config.maxRetries - 1) {
break;
}
const delay = this.calculateDelay(attempt);
console.warn(
`Attempt ${attempt + 1}/${this.config.maxRetries} failed: ${error.message}. ` +
`Retrying in ${(delay / 1000).toFixed(2)}s`
);
if (this.config.onRetry) {
await this.config.onRetry(error, attempt + 1, delay);
}
await this.sleep(delay);
}
}
throw new RetryExhaustedError(lastError, this.config.maxRetries);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Use as wrapper
wrap(fn) {
return async (...args) => {
return this.execute(() => fn(...args));
};
}
}
// Resilient caller combining patterns
class ResilientCaller {
constructor({ circuitBreaker, retryConfig = {}, timeoutMs = 30000 }) {
this.circuit = circuitBreaker;
this.retry = new RetryHandler(retryConfig);
this.timeoutMs = timeoutMs;
}
async call(fn, { fallback = null, args = [], kwargs = {} } = {}) {
const wrapped = async () => {
return this.withTimeout(
this.circuit.execute(() => fn(...args)),
this.timeoutMs
);
};
try {
return await this.retry.execute(wrapped);
} catch (error) {
if (fallback && (error instanceof RetryExhaustedError ||
error.name === 'CircuitBreakerOpenError')) {
console.warn(`Using fallback due to: ${error.message}`);
return typeof fallback === 'function' ? fallback(...args) : fallback;
}
throw error;
}
}
withTimeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`Timeout after ${ms}ms`)), ms)
)
]);
}
}
// Usage examples
const retry = new RetryHandler({
maxRetries: 3,
strategy: RetryStrategy.EXPONENTIAL,
shouldRetry: (error) => {
// Retry on network errors or 5xx
return error.code === 'ECONNREFUSED' ||
error.code === 'ETIMEDOUT' ||
(error.response && error.response.status >= 500);
},
onRetry: async (error, attempt, delay) => {
// Send to metrics
await metrics.increment('api.retry', { attempt, error: error.name });
}
});
const fetchWithRetry = retry.wrap(async (url) => {
const response = await fetch(url);
if (!response.ok) {
const error = new Error(`HTTP ${response.status}`);
error.response = response;
throw error;
}
return response.json();
});
// Full resilience pattern
const paymentCaller = new ResilientCaller({
circuitBreaker: new CircuitBreaker('payments', { failureThreshold: 3 }),
retryConfig: {
maxRetries: 2,
strategy: RetryStrategy.DECORRELATED_JITTER
},
timeoutMs: 5000
});
async function chargePayment(amount) {
return paymentCaller.call(
(amt) => paymentService.charge(amt),
{
args: [amount],
fallback: async (amt) => {
// Queue for later processing
await paymentQueue.add({ amount: amt, status: 'pending' });
return { status: 'queued', message: 'Payment will be processed shortly' };
}
}
);
}
Bulkhead Pattern
- Python
- JavaScript
Copy
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, Callable
from enum import Enum
import time
import logging
logger = logging.getLogger(__name__)
class BulkheadFullError(Exception):
def __init__(self, bulkhead_name: str, current: int, max_size: int):
self.bulkhead_name = bulkhead_name
self.current = current
self.max_size = max_size
super().__init__(
f"Bulkhead '{bulkhead_name}' is full ({current}/{max_size})"
)
@dataclass
class BulkheadMetrics:
accepted: int = 0
rejected: int = 0
active: int = 0
peak_active: int = 0
total_wait_time: float = 0.0
class Bulkhead:
"""
Bulkhead pattern with semaphore and queue.
Isolates failures to prevent cascade effects.
"""
def __init__(
self,
name: str,
max_concurrent: int,
max_queue: int = 0,
queue_timeout: float = 10.0
):
self.name = name
self.max_concurrent = max_concurrent
self.max_queue = max_queue
self.queue_timeout = queue_timeout
self.semaphore = asyncio.Semaphore(max_concurrent)
self.metrics = BulkheadMetrics()
self._queue_size = 0
@asynccontextmanager
async def acquire(self, timeout: Optional[float] = None):
"""Acquire a slot in the bulkhead"""
timeout = timeout or self.queue_timeout
start = time.time()
# Check if we can queue
if self.semaphore.locked():
if self._queue_size >= self.max_queue:
self.metrics.rejected += 1
raise BulkheadFullError(
self.name,
self.metrics.active,
self.max_concurrent
)
self._queue_size += 1
try:
acquired = await asyncio.wait_for(
self.semaphore.acquire(),
timeout=timeout
)
except asyncio.TimeoutError:
self._queue_size = max(0, self._queue_size - 1)
self.metrics.rejected += 1
raise BulkheadFullError(
self.name,
self.metrics.active,
self.max_concurrent
)
finally:
if self._queue_size > 0:
self._queue_size -= 1
wait_time = time.time() - start
self.metrics.total_wait_time += wait_time
self.metrics.active += 1
self.metrics.accepted += 1
self.metrics.peak_active = max(self.metrics.peak_active, self.metrics.active)
try:
yield
finally:
self.metrics.active -= 1
self.semaphore.release()
def __call__(self, func: Callable):
"""Use as decorator"""
async def wrapper(*args, **kwargs):
async with self.acquire():
return await func(*args, **kwargs)
return wrapper
def get_metrics(self) -> Dict[str, Any]:
return {
"name": self.name,
"active": self.metrics.active,
"max_concurrent": self.max_concurrent,
"queue_size": self._queue_size,
"max_queue": self.max_queue,
"accepted": self.metrics.accepted,
"rejected": self.metrics.rejected,
"rejection_rate": self.metrics.rejected / max(1, self.metrics.accepted + self.metrics.rejected),
"peak_active": self.metrics.peak_active,
"avg_wait_ms": (self.metrics.total_wait_time / max(1, self.metrics.accepted)) * 1000
}
class ThreadPoolBulkhead:
"""
Thread pool based bulkhead for CPU-bound or blocking operations.
Uses a dedicated thread pool to isolate work.
"""
def __init__(self, name: str, max_workers: int):
from concurrent.futures import ThreadPoolExecutor
self.name = name
self.executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=f"bulkhead-{name}"
)
self.max_workers = max_workers
async def run(self, func: Callable, *args, **kwargs) -> Any:
"""Run blocking function in isolated thread pool"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
lambda: func(*args, **kwargs)
)
def shutdown(self):
self.executor.shutdown(wait=True)
class BulkheadManager:
"""Manage multiple bulkheads for different services"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.bulkheads = {}
return cls._instance
def create(
self,
name: str,
max_concurrent: int,
max_queue: int = 0
) -> Bulkhead:
bulkhead = Bulkhead(name, max_concurrent, max_queue)
self.bulkheads[name] = bulkhead
return bulkhead
def get(self, name: str) -> Optional[Bulkhead]:
return self.bulkheads.get(name)
def get_all_metrics(self) -> Dict[str, Dict]:
return {name: bh.get_metrics() for name, bh in self.bulkheads.items()}
# Usage
bulkhead_manager = BulkheadManager()
# Different pools for different services
payment_bulkhead = bulkhead_manager.create("payment", max_concurrent=10, max_queue=20)
inventory_bulkhead = bulkhead_manager.create("inventory", max_concurrent=30)
notification_bulkhead = bulkhead_manager.create("notification", max_concurrent=50)
@payment_bulkhead
async def charge_payment(order_id: str, amount: float):
"""Payment calls isolated in their own pool"""
return await payment_client.charge(order_id, amount)
@inventory_bulkhead
async def reserve_inventory(items: list):
"""Inventory calls isolated - won't affect payments"""
return await inventory_client.reserve(items)
async def process_order(order):
"""Even if inventory is slow, payments still work"""
try:
# These run in isolated pools
payment = await charge_payment(order.id, order.total)
inventory = await reserve_inventory(order.items)
return {"status": "success", "payment": payment}
except BulkheadFullError as e:
# One service full doesn't crash everything
logger.warning(f"Bulkhead full: {e}")
return {"status": "retry_later", "reason": str(e)}
# FastAPI integration
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.middleware("http")
async def bulkhead_metrics_middleware(request: Request, call_next):
response = await call_next(request)
# Add bulkhead metrics to response headers
metrics = bulkhead_manager.get_all_metrics()
response.headers["X-Bulkhead-Active"] = str(
sum(m["active"] for m in metrics.values())
)
return response
@app.get("/metrics/bulkheads")
async def get_bulkhead_metrics():
return bulkhead_manager.get_all_metrics()
Copy
class BulkheadFullError extends Error {
constructor(bulkheadName, current, maxSize) {
super(`Bulkhead '${bulkheadName}' is full (${current}/${maxSize})`);
this.name = 'BulkheadFullError';
this.bulkheadName = bulkheadName;
this.current = current;
this.maxSize = maxSize;
}
}
class Bulkhead {
constructor(name, { maxConcurrent = 10, maxQueue = 0, queueTimeout = 10000 } = {}) {
this.name = name;
this.maxConcurrent = maxConcurrent;
this.maxQueue = maxQueue;
this.queueTimeout = queueTimeout;
this.active = 0;
this.queue = [];
this.metrics = {
accepted: 0,
rejected: 0,
peakActive: 0,
totalWaitTime: 0
};
}
async execute(fn) {
const start = Date.now();
// Check if at capacity
if (this.active >= this.maxConcurrent) {
if (this.queue.length >= this.maxQueue) {
this.metrics.rejected++;
throw new BulkheadFullError(this.name, this.active, this.maxConcurrent);
}
// Wait in queue
await this.waitInQueue();
}
const waitTime = Date.now() - start;
this.metrics.totalWaitTime += waitTime;
this.active++;
this.metrics.accepted++;
this.metrics.peakActive = Math.max(this.metrics.peakActive, this.active);
try {
return await fn();
} finally {
this.active--;
this.releaseNext();
}
}
waitInQueue() {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
const index = this.queue.findIndex(item => item.resolve === resolve);
if (index !== -1) {
this.queue.splice(index, 1);
}
this.metrics.rejected++;
reject(new BulkheadFullError(this.name, this.active, this.maxConcurrent));
}, this.queueTimeout);
this.queue.push({
resolve: () => {
clearTimeout(timeout);
resolve();
}
});
});
}
releaseNext() {
if (this.queue.length > 0 && this.active < this.maxConcurrent) {
const next = this.queue.shift();
next.resolve();
}
}
wrap(fn) {
return async (...args) => {
return this.execute(() => fn(...args));
};
}
getMetrics() {
return {
name: this.name,
active: this.active,
maxConcurrent: this.maxConcurrent,
queueSize: this.queue.length,
maxQueue: this.maxQueue,
...this.metrics,
rejectionRate: this.metrics.rejected /
Math.max(1, this.metrics.accepted + this.metrics.rejected),
avgWaitMs: this.metrics.totalWaitTime / Math.max(1, this.metrics.accepted)
};
}
}
// Worker pool bulkhead for CPU-bound tasks
class WorkerPoolBulkhead {
constructor(name, maxWorkers) {
this.name = name;
this.maxWorkers = maxWorkers;
this.workers = [];
this.taskQueue = [];
// Initialize worker pool (using worker_threads in Node.js)
this.initWorkers();
}
initWorkers() {
// In a real implementation, use worker_threads
// This is a simplified version using a semaphore pattern
this.activeWorkers = 0;
}
async execute(task) {
// Queue task if all workers busy
if (this.activeWorkers >= this.maxWorkers) {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
});
}
return this.runTask(task);
}
async runTask(task) {
this.activeWorkers++;
try {
return await task();
} finally {
this.activeWorkers--;
this.processQueue();
}
}
processQueue() {
if (this.taskQueue.length > 0 && this.activeWorkers < this.maxWorkers) {
const { task, resolve, reject } = this.taskQueue.shift();
this.runTask(task).then(resolve).catch(reject);
}
}
}
// Bulkhead manager singleton
class BulkheadManager {
constructor() {
if (BulkheadManager.instance) {
return BulkheadManager.instance;
}
this.bulkheads = new Map();
BulkheadManager.instance = this;
}
create(name, options = {}) {
const bulkhead = new Bulkhead(name, options);
this.bulkheads.set(name, bulkhead);
return bulkhead;
}
get(name) {
return this.bulkheads.get(name);
}
getAllMetrics() {
const metrics = {};
for (const [name, bulkhead] of this.bulkheads) {
metrics[name] = bulkhead.getMetrics();
}
return metrics;
}
}
// Usage
const manager = new BulkheadManager();
const paymentBulkhead = manager.create('payment', {
maxConcurrent: 10,
maxQueue: 20
});
const inventoryBulkhead = manager.create('inventory', {
maxConcurrent: 30
});
const notificationBulkhead = manager.create('notification', {
maxConcurrent: 50
});
// Wrap service calls
const chargePayment = paymentBulkhead.wrap(async (orderId, amount) => {
return paymentClient.charge(orderId, amount);
});
const reserveInventory = inventoryBulkhead.wrap(async (items) => {
return inventoryClient.reserve(items);
});
async function processOrder(order) {
try {
const [payment, inventory] = await Promise.all([
chargePayment(order.id, order.total),
reserveInventory(order.items)
]);
return { status: 'success', payment, inventory };
} catch (error) {
if (error instanceof BulkheadFullError) {
console.warn(`Bulkhead full: ${error.message}`);
return { status: 'retry_later', reason: error.message };
}
throw error;
}
}
// Express integration
const express = require('express');
const app = express();
app.use((req, res, next) => {
const metrics = manager.getAllMetrics();
res.set('X-Bulkhead-Active',
Object.values(metrics).reduce((sum, m) => sum + m.active, 0)
);
next();
});
app.get('/metrics/bulkheads', (req, res) => {
res.json(manager.getAllMetrics());
});
app.get('/orders/:id', async (req, res) => {
try {
const result = await processOrder({ id: req.params.id, items: [] });
res.json(result);
} catch (error) {
if (error instanceof BulkheadFullError) {
res.status(503).json({
error: 'Service temporarily unavailable',
retryAfter: 5
});
} else {
res.status(500).json({ error: error.message });
}
}
});
Health Checks
Health Check Types
Copy
┌─────────────────────────────────────────────────────────────────┐
│ Health Check Types │
├─────────────────────────────────────────────────────────────────┤
│ │
│ LIVENESS CHECK │
│ ───────────── │
│ "Is the process running?" │
│ If fails: Restart the container/process │
│ │
│ GET /health/live → 200 OK │
│ │
│ Check: │
│ • Process responding │
│ • Not deadlocked │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ READINESS CHECK │
│ ─────────────── │
│ "Can it handle traffic?" │
│ If fails: Remove from load balancer │
│ │
│ GET /health/ready → 200 OK or 503 Not Ready │
│ │
│ Check: │
│ • Database connection works │
│ • Cache connection works │
│ • Required dependencies reachable │
│ • Warmup complete │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ DEEP HEALTH CHECK (Use sparingly!) │
│ ──────────────── │
│ "Is everything working?" │
│ Used for: Monitoring dashboards, not load balancers │
│ │
│ GET /health/deep → { db: ok, cache: ok, queue: ok } │
│ │
│ Warning: Can be expensive, rate limit! │
│ │
└─────────────────────────────────────────────────────────────────┘
Copy
from fastapi import FastAPI, Response
from datetime import datetime
app = FastAPI()
@app.get("/health/live")
async def liveness():
"""Just checks if the process is alive"""
return {"status": "alive", "timestamp": datetime.utcnow().isoformat()}
@app.get("/health/ready")
async def readiness(response: Response):
"""Checks if we can handle traffic"""
checks = {}
# Check database
try:
await db.execute("SELECT 1")
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {str(e)}"
response.status_code = 503
# Check Redis
try:
await redis.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {str(e)}"
response.status_code = 503
# Check if warmup complete
if not app.state.warmup_complete:
checks["warmup"] = "in progress"
response.status_code = 503
else:
checks["warmup"] = "complete"
return {
"status": "ready" if response.status_code == 200 else "not ready",
"checks": checks
}
Timeouts
Timeout Hierarchy
Deadline Propagation
Copy
import time
from contextvars import ContextVar
# Context variable for deadline
deadline_ctx: ContextVar[float] = ContextVar('deadline', default=None)
def with_deadline(timeout_seconds: float):
"""Set deadline for current request"""
deadline = time.time() + timeout_seconds
deadline_ctx.set(deadline)
return deadline
def remaining_time() -> float:
"""Get remaining time until deadline"""
deadline = deadline_ctx.get()
if deadline is None:
return float('inf')
return max(0, deadline - time.time())
async def call_service(service_name: str, payload: dict):
"""Call service with propagated deadline"""
remaining = remaining_time()
if remaining <= 0:
raise DeadlineExceeded("Request deadline already passed")
# Use remaining time as timeout (with buffer)
timeout = min(remaining * 0.9, 30.0) # 90% of remaining, max 30s
try:
async with asyncio.timeout(timeout):
return await http_client.post(
f"http://{service_name}/api",
json=payload,
headers={"X-Deadline": str(deadline_ctx.get())}
)
except asyncio.TimeoutError:
raise DeadlineExceeded(f"Timeout calling {service_name}")
Graceful Degradation
Senior Interview Questions
How do you achieve 99.99% availability?
How do you achieve 99.99% availability?
Key components:
- Redundancy: At least 2 of everything (servers, DBs, regions)
- Load balancing: Automatic failover when node fails
- Health checks: Detect failures in seconds
- Auto-scaling: Handle traffic spikes
- Multi-region: Survive region outages
- Chaos engineering: Regularly test failure scenarios
- Single component at 99.9% can’t achieve 99.99%
- Need redundancy: 2 components at 99.9% = 99.9999% (if independent)
How do you handle partial failures in distributed transactions?
How do you handle partial failures in distributed transactions?
Saga Pattern:
- Each step has a compensating action
- If step N fails, run compensations for steps N-1 to 1
- Track saga state in database
Copy
1. Create order → Compensate: Cancel order
2. Reserve inventory → Compensate: Release inventory
3. Charge payment → Compensate: Refund payment
4. Ship order → Compensate: Cancel shipment
If step 3 fails:
- Refund payment (if partially charged)
- Release inventory
- Cancel order
How do you prevent cascading failures?
How do you prevent cascading failures?
Defense layers:
- Circuit breakers: Stop calling failing service
- Timeouts: Don’t wait forever
- Bulkheads: Isolate failures to one service
- Rate limiting: Prevent overload
- Load shedding: Reject low-priority requests
- Fallbacks: Degrade gracefully
How do you test system reliability?
How do you test system reliability?
Chaos Engineering approach:
- Define steady state: Normal metrics (latency, error rate)
- Form hypothesis: “System handles server failure”
- Inject failure: Kill a server
- Observe: Did metrics stay within bounds?
- Fix and repeat
- Server crashes
- Network partitions
- High latency
- Disk full
- Memory exhaustion
- Clock skew
- Dependency outages