Fault Tolerance Patterns
In distributed systems, failures are not exceptional events—they are expected. This module covers battle-tested patterns for building systems that survive and thrive despite failures.Track Duration: 12-16 hours
Key Topics: Circuit Breakers, Bulkheads, Retries, Timeouts, Health Checks, Graceful Degradation
Interview Focus: Netflix Hystrix patterns, cascading failure prevention, failure domain isolation
Key Topics: Circuit Breakers, Bulkheads, Retries, Timeouts, Health Checks, Graceful Degradation
Interview Focus: Netflix Hystrix patterns, cascading failure prevention, failure domain isolation
The Reality of Failure
Everything fails, all the time — Werner Vogels, CTO Amazon
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ FAILURE STATISTICS AT SCALE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ GOOGLE (typical year): │
│ ───────────────────── │
│ • 12 network rewires │
│ • 3 router failures │
│ • 1,800 individual machine failures │
│ • 1,000+ disk failures per datacenter │
│ • Thousands of hard drive failures │
│ │
│ AWS (any given day): │
│ ──────────────────── │
│ • Individual instance failures │
│ • AZ connectivity issues │
│ • EBS volume degradation │
│ • Network partition events │
│ │
│ YOUR SYSTEM: │
│ ──────────── │
│ • Deployments cause brief unavailability │
│ • Dependencies fail (databases, caches, third-party APIs) │
│ • Network blips cause timeouts │
│ • Bugs cause crashes │
│ • Resource exhaustion (memory, connections, file descriptors) │
│ │
│ IF YOU'RE NOT DESIGNING FOR FAILURE, YOU'RE DESIGNING TO FAIL │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Timeout Patterns
The Foundation of Fault Tolerance
Without timeouts, a slow dependency can block your entire system forever.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ TIMEOUT CATEGORIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CONNECTION TIMEOUT: │
│ ─────────────────── │
│ Time to establish a connection (TCP handshake) │
│ • Typical: 1-5 seconds │
│ • If exceeded: Server is down or network is partitioned │
│ │
│ READ/WRITE TIMEOUT: │
│ ────────────────── │
│ Time to complete a read/write operation │
│ • Typical: 5-30 seconds (depends on operation) │
│ • If exceeded: Server is overloaded or processing is stuck │
│ │
│ REQUEST TIMEOUT: │
│ ──────────────── │
│ Total time for the entire request (including retries) │
│ • Typical: 10-60 seconds │
│ • Should be communicated to caller │
│ │
│ IDLE TIMEOUT: │
│ ───────────── │
│ Close connection if no activity │
│ • Prevents resource leaks │
│ • Typical: 30-300 seconds │
│ │
│ DEADLINE PROPAGATION: │
│ ───────────────────── │
│ Pass remaining time to downstream services │
│ If only 500ms left, don't start a 2-second operation │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Timeout Implementation
Copy
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class Deadline:
"""Represents an absolute deadline for an operation"""
absolute_time: float # Unix timestamp
@classmethod
def from_timeout(cls, timeout_seconds: float) -> 'Deadline':
return cls(time.time() + timeout_seconds)
def remaining(self) -> float:
"""Returns remaining time until deadline"""
return max(0, self.absolute_time - time.time())
def is_exceeded(self) -> bool:
return time.time() >= self.absolute_time
def with_buffer(self, buffer_seconds: float) -> 'Deadline':
"""Create a shorter deadline with buffer for processing"""
return Deadline(self.absolute_time - buffer_seconds)
class TimeoutContext:
"""Context manager for propagating deadlines through service calls"""
_current_deadline: Optional[Deadline] = None
@classmethod
@asynccontextmanager
async def with_timeout(cls, timeout_seconds: float):
"""Set a deadline for the current operation context"""
old_deadline = cls._current_deadline
new_deadline = Deadline.from_timeout(timeout_seconds)
# Take the earlier deadline
if old_deadline and old_deadline.absolute_time < new_deadline.absolute_time:
new_deadline = old_deadline
cls._current_deadline = new_deadline
try:
yield new_deadline
finally:
cls._current_deadline = old_deadline
@classmethod
def remaining(cls) -> Optional[float]:
"""Get remaining time from current deadline"""
if cls._current_deadline:
return cls._current_deadline.remaining()
return None
@classmethod
def check_deadline(cls):
"""Raise TimeoutError if deadline exceeded"""
if cls._current_deadline and cls._current_deadline.is_exceeded():
raise TimeoutError("Deadline exceeded")
async def make_downstream_call(service: str, deadline: Deadline):
"""Make a call with deadline propagation"""
remaining = deadline.remaining()
if remaining <= 0:
raise TimeoutError(f"Deadline already exceeded before calling {service}")
# Reserve some time for local processing
downstream_timeout = min(remaining - 0.1, remaining * 0.9)
# Pass deadline to downstream service (e.g., via gRPC metadata)
async with asyncio.timeout(downstream_timeout):
# Make the actual call
result = await call_service(service)
return result
Timeout Best Practices
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ TIMEOUT ANTI-PATTERNS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ✗ NO TIMEOUT │
│ One slow call blocks thread forever │
│ Thread pool exhausted → entire service down │
│ │
│ ✗ TIMEOUT TOO LONG │
│ 30s timeout × 100 threads = 100 threads blocked for 30s │
│ Service appears slow to users │
│ │
│ ✗ TIMEOUT TOO SHORT │
│ False failures during normal load spikes │
│ Increased retry load makes things worse │
│ │
│ ✗ IGNORING PARENT DEADLINE │
│ Parent has 500ms left, you start 2s operation │
│ Wasted work that will be discarded │
│ │
│ ───────────────────────────────────────────────────────────────────────── │
│ │
│ ✓ ADAPTIVE TIMEOUTS │
│ Base timeout on p99 latency + buffer │
│ Adjust based on current system load │
│ │
│ ✓ DEADLINE PROPAGATION │
│ Pass remaining time to downstream calls │
│ gRPC does this automatically with deadlines │
│ │
│ ✓ TIMEOUT BUDGETS │
│ Total timeout divided among downstream calls │
│ Early calls use less budget to leave room for later │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Retry Patterns
When and How to Retry
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ RETRY DECISION TREE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Is the error transient? │
│ │ │
│ ├── YES: Network timeout, 503 Service Unavailable, connection reset │
│ │ → RETRY (with exponential backoff) │
│ │ │
│ └── NO: 400 Bad Request, 404 Not Found, 401 Unauthorized │
│ → DON'T RETRY (will fail again) │
│ │
│ Is the operation idempotent? │
│ │ │
│ ├── YES: GET, PUT (with version), DELETE (with ID) │
│ │ → SAFE TO RETRY │
│ │ │
│ └── NO: POST (create), non-idempotent updates │
│ → RETRY ONLY WITH IDEMPOTENCY KEY │
│ │
│ Is there time/budget remaining? │
│ │ │
│ ├── YES: Continue with retry │
│ │ │
│ └── NO: Give up, return error to caller │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Exponential Backoff with Jitter
Copy
import random
import asyncio
from typing import Callable, TypeVar, Optional
from dataclasses import dataclass
from enum import Enum
class RetryableError(Exception):
"""Mark an error as retryable"""
pass
class BackoffStrategy(Enum):
EXPONENTIAL = "exponential"
LINEAR = "linear"
CONSTANT = "constant"
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay_ms: int = 100
max_delay_ms: int = 10000
strategy: BackoffStrategy = BackoffStrategy.EXPONENTIAL
jitter_factor: float = 0.2 # ±20%
T = TypeVar('T')
class Retry:
"""
Retry with exponential backoff and jitter.
Jitter prevents "thundering herd" when many clients retry simultaneously.
"""
def __init__(self, config: RetryConfig):
self.config = config
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay for given attempt number"""
if self.config.strategy == BackoffStrategy.EXPONENTIAL:
delay = self.config.base_delay_ms * (2 ** attempt)
elif self.config.strategy == BackoffStrategy.LINEAR:
delay = self.config.base_delay_ms * (attempt + 1)
else:
delay = self.config.base_delay_ms
# Cap at max delay
delay = min(delay, self.config.max_delay_ms)
# Add jitter: delay ± jitter_factor
jitter_range = delay * self.config.jitter_factor
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay) / 1000 # Convert to seconds
async def execute(
self,
operation: Callable[[], T],
is_retryable: Optional[Callable[[Exception], bool]] = None
) -> T:
"""Execute operation with retry logic"""
if is_retryable is None:
is_retryable = lambda e: isinstance(e, (RetryableError, TimeoutError, ConnectionError))
last_exception = None
for attempt in range(self.config.max_attempts):
try:
return await operation()
except Exception as e:
last_exception = e
if not is_retryable(e):
raise # Non-retryable, fail immediately
if attempt < self.config.max_attempts - 1:
delay = self._calculate_delay(attempt)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s: {e}")
await asyncio.sleep(delay)
raise last_exception
# Usage example
async def fetch_user(user_id: str) -> dict:
retry = Retry(RetryConfig(
max_attempts=3,
base_delay_ms=100,
max_delay_ms=5000,
strategy=BackoffStrategy.EXPONENTIAL
))
async def _fetch():
response = await http_client.get(f"/users/{user_id}")
if response.status_code >= 500:
raise RetryableError(f"Server error: {response.status_code}")
return response.json()
return await retry.execute(_fetch)
Retry Amplification Problem
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ RETRY AMPLIFICATION │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SCENARIO: 3-tier system, each tier retries 3 times │
│ │
│ Client → Gateway → Service → Database │
│ (3x) (3x) (3x) │
│ │
│ One database timeout can cause: │
│ Gateway: 3 retries │
│ Service: 3 × 3 = 9 retries │
│ Database: 3 × 3 × 3 = 27 queries! │
│ │
│ EXPONENTIAL AMPLIFICATION = disaster during outages │
│ │
│ ───────────────────────────────────────────────────────────────────────── │
│ │
│ SOLUTIONS: │
│ │
│ 1. RETRY ONLY AT EDGES │
│ • Client retries │
│ • Internal services: fast-fail, no retry │
│ │
│ 2. RETRY BUDGETS │
│ • Each service tracks retry ratio │
│ • If >20% of requests are retries, stop retrying │
│ • "Retry token bucket" │
│ │
│ 3. DEADLINE PROPAGATION │
│ • If deadline near, don't bother retrying │
│ • Fail fast instead │
│ │
│ 4. HEDGED REQUESTS │
│ • Send to multiple replicas simultaneously │
│ • Take first response, cancel others │
│ • Prevents retry cascade │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Circuit Breaker Pattern
The circuit breaker prevents cascade failures by failing fast when a dependency is unhealthy.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ CIRCUIT BREAKER STATES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ CLOSED │ │
│ │ (Normal operation) │ │
│ │ │ │
│ │ Requests flow through normally │ │
│ │ Track failure rate in sliding window │ │
│ │ If failure rate > threshold → OPEN │ │
│ │ │ │
│ └────────────────────────────┬────────────────────────────────────────┘ │
│ │ │
│ failure rate > threshold │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ OPEN │ │
│ │ (Fail immediately) │ │
│ │ │ │
│ │ All requests fail immediately │ │
│ │ No load on failing dependency │ │
│ │ After timeout → HALF-OPEN │ │
│ │ │ │
│ └────────────────────────────┬────────────────────────────────────────┘ │
│ │ │
│ timeout expires │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ HALF-OPEN │ │
│ │ (Test recovery) │ │
│ │ │ │
│ │ Allow limited requests through │ │
│ │ If they succeed → CLOSED │ │
│ │ If they fail → OPEN │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Circuit Breaker Implementation
Copy
import time
import threading
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Optional, Deque
from collections import deque
import asyncio
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: float = 0.5 # 50% failure rate
success_threshold: int = 3 # Successes needed in half-open
window_size: int = 10 # Sliding window size
open_timeout_seconds: float = 30.0
half_open_max_calls: int = 3
class CircuitOpenError(Exception):
"""Raised when circuit breaker is open"""
def __init__(self, message: str, retry_after: float):
super().__init__(message)
self.retry_after = retry_after
T = TypeVar('T')
class CircuitBreaker:
"""
Thread-safe circuit breaker implementation.
Based on Netflix Hystrix patterns.
"""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self._state = CircuitState.CLOSED
self._lock = threading.RLock()
# Sliding window of recent results (True = success, False = failure)
self._results: Deque[bool] = deque(maxlen=config.window_size)
# Timing and counters
self._opened_at: Optional[float] = None
self._half_open_successes = 0
self._half_open_calls = 0
@property
def state(self) -> CircuitState:
with self._lock:
return self._state
def _failure_rate(self) -> float:
"""Calculate current failure rate"""
if len(self._results) == 0:
return 0.0
failures = sum(1 for r in self._results if not r)
return failures / len(self._results)
def _should_open(self) -> bool:
"""Check if circuit should open"""
if len(self._results) < self.config.window_size:
return False # Not enough data
return self._failure_rate() >= self.config.failure_threshold
def _should_close(self) -> bool:
"""Check if circuit should close from half-open"""
return self._half_open_successes >= self.config.success_threshold
def _should_allow_half_open(self) -> bool:
"""Check if we should transition from open to half-open"""
if self._opened_at is None:
return False
return time.time() - self._opened_at >= self.config.open_timeout_seconds
def _can_execute_in_half_open(self) -> bool:
"""Check if we can allow another half-open call"""
return self._half_open_calls < self.config.half_open_max_calls
async def execute(
self,
operation: Callable[[], T],
fallback: Optional[Callable[[], T]] = None
) -> T:
"""Execute operation with circuit breaker protection"""
with self._lock:
# Check if we should transition state
if self._state == CircuitState.OPEN:
if self._should_allow_half_open():
self._state = CircuitState.HALF_OPEN
self._half_open_successes = 0
self._half_open_calls = 0
else:
retry_after = self.config.open_timeout_seconds - (time.time() - self._opened_at)
if fallback:
return await fallback()
raise CircuitOpenError(
f"Circuit {self.name} is OPEN",
retry_after=retry_after
)
if self._state == CircuitState.HALF_OPEN:
if not self._can_execute_in_half_open():
if fallback:
return await fallback()
raise CircuitOpenError(
f"Circuit {self.name} is HALF_OPEN, max calls reached",
retry_after=1.0
)
self._half_open_calls += 1
# Execute the operation
try:
result = await operation()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Record successful execution"""
with self._lock:
self._results.append(True)
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._should_close():
self._state = CircuitState.CLOSED
self._results.clear()
print(f"Circuit {self.name} CLOSED (recovered)")
def _on_failure(self):
"""Record failed execution"""
with self._lock:
self._results.append(False)
if self._state == CircuitState.HALF_OPEN:
# Any failure in half-open reopens
self._state = CircuitState.OPEN
self._opened_at = time.time()
print(f"Circuit {self.name} OPEN (half-open failure)")
elif self._state == CircuitState.CLOSED:
if self._should_open():
self._state = CircuitState.OPEN
self._opened_at = time.time()
print(f"Circuit {self.name} OPEN (failure threshold)")
def get_metrics(self) -> dict:
"""Get current circuit breaker metrics"""
with self._lock:
return {
"name": self.name,
"state": self._state.value,
"failure_rate": self._failure_rate(),
"window_size": len(self._results),
"half_open_successes": self._half_open_successes
}
# Usage example
payment_circuit = CircuitBreaker("payment-service", CircuitBreakerConfig(
failure_threshold=0.5,
success_threshold=3,
window_size=10,
open_timeout_seconds=30.0
))
async def process_payment(order_id: str, amount: float) -> dict:
async def _call_payment_service():
response = await http_client.post(
"/payments",
json={"order_id": order_id, "amount": amount}
)
if response.status_code >= 500:
raise Exception(f"Payment service error: {response.status_code}")
return response.json()
async def _fallback():
# Queue for later processing
await queue.enqueue("payment-retry", {"order_id": order_id, "amount": amount})
return {"status": "queued", "message": "Payment queued for processing"}
return await payment_circuit.execute(_call_payment_service, fallback=_fallback)
Bulkhead Pattern
Isolate components so a failure in one doesn’t sink the entire ship.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ BULKHEAD PATTERN │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ PROBLEM WITHOUT BULKHEADS: │
│ ────────────────────────── │
│ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ SHARED THREAD POOL │ │
│ │ [Thread 1] [Thread 2] [Thread 3] ... [Thread N] │ │
│ │ ↓ ↓ ↓ ↓ │ │
│ │ Slow Slow Slow Slow │ ← All threads │
│ │ Service Service Service Service │ stuck on slow │
│ │ │ dependency │
│ └────────────────────────────────────────────────────┘ │
│ │
│ Result: ALL services degraded because one is slow │
│ │
│ ═══════════════════════════════════════════════════════════════════════ │
│ │
│ WITH BULKHEADS: │
│ ─────────────── │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Pool A (10) │ │ Pool B (20) │ │ Pool C (15) │ │
│ │ Payment Svc │ │ User Service │ │ Product Svc │ │
│ │ [T1][T2].. │ │ [T1][T2].. │ │ [T1][T2].. │ │
│ │ ↓ │ │ ↓ │ │ ↓ │ │
│ │ SLOW! │ │ Normal │ │ Normal │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
│ Result: Payment is slow, but User and Product work fine! │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Bulkhead Types
Thread Pool Bulkhead
Separate thread pools for different dependencies.Pros: Strong isolation, works for blocking calls
Cons: Resource overhead, context switchingUse for: HTTP clients, database connections
Semaphore Bulkhead
Limit concurrent calls with semaphores.Pros: Lightweight, works with async
Cons: Weaker isolation, no queueUse for: Rate limiting, quick operations
Copy
import asyncio
from dataclasses import dataclass
from typing import Callable, TypeVar, Dict
from contextlib import asynccontextmanager
T = TypeVar('T')
@dataclass
class BulkheadConfig:
max_concurrent: int = 10
max_wait_seconds: float = 5.0
name: str = "default"
class BulkheadFullError(Exception):
"""Raised when bulkhead is at capacity"""
pass
class SemaphoreBulkhead:
"""
Semaphore-based bulkhead for async code.
Limits concurrent calls to a dependency.
"""
def __init__(self, config: BulkheadConfig):
self.config = config
self._semaphore = asyncio.Semaphore(config.max_concurrent)
self._waiting = 0
self._active = 0
@asynccontextmanager
async def acquire(self):
"""Acquire a slot in the bulkhead"""
self._waiting += 1
try:
acquired = await asyncio.wait_for(
self._semaphore.acquire(),
timeout=self.config.max_wait_seconds
)
except asyncio.TimeoutError:
self._waiting -= 1
raise BulkheadFullError(
f"Bulkhead {self.config.name} is full "
f"(active: {self._active}, waiting: {self._waiting})"
)
self._waiting -= 1
self._active += 1
try:
yield
finally:
self._active -= 1
self._semaphore.release()
async def execute(self, operation: Callable[[], T]) -> T:
"""Execute operation within bulkhead limits"""
async with self.acquire():
return await operation()
def get_metrics(self) -> dict:
return {
"name": self.config.name,
"active": self._active,
"waiting": self._waiting,
"available": self.config.max_concurrent - self._active
}
class BulkheadRegistry:
"""Registry for managing multiple bulkheads"""
def __init__(self):
self._bulkheads: Dict[str, SemaphoreBulkhead] = {}
def get_or_create(
self,
name: str,
max_concurrent: int = 10,
max_wait_seconds: float = 5.0
) -> SemaphoreBulkhead:
if name not in self._bulkheads:
self._bulkheads[name] = SemaphoreBulkhead(BulkheadConfig(
name=name,
max_concurrent=max_concurrent,
max_wait_seconds=max_wait_seconds
))
return self._bulkheads[name]
def get_all_metrics(self) -> list:
return [b.get_metrics() for b in self._bulkheads.values()]
# Global registry
bulkheads = BulkheadRegistry()
# Usage
async def get_user(user_id: str) -> dict:
user_bulkhead = bulkheads.get_or_create("user-service", max_concurrent=20)
async def _fetch():
return await http_client.get(f"/users/{user_id}")
return await user_bulkhead.execute(_fetch)
async def process_payment(payment: dict) -> dict:
payment_bulkhead = bulkheads.get_or_create("payment-service", max_concurrent=5)
async def _process():
return await http_client.post("/payments", json=payment)
return await payment_bulkhead.execute(_process)
Health Checks
Levels of Health Checking
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ HEALTH CHECK HIERARCHY │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ LEVEL 1: LIVENESS │
│ ───────────────── │
│ "Is the process alive?" │
│ • Can it respond to HTTP? │
│ • Is the main loop running? │
│ │
│ USE: Kubernetes liveness probe → restart if failing │
│ ENDPOINT: /health/live │
│ CHECK: Simple, fast, no dependencies │
│ │
│ ───────────────────────────────────────────────────────────────────────── │
│ │
│ LEVEL 2: READINESS │
│ ────────────────── │
│ "Can this instance handle traffic?" │
│ • Are dependencies available? │
│ • Is initialization complete? │
│ • Is the instance warmed up? │
│ │
│ USE: Kubernetes readiness probe → remove from load balancer │
│ ENDPOINT: /health/ready │
│ CHECK: Dependencies, but with timeout │
│ │
│ ───────────────────────────────────────────────────────────────────────── │
│ │
│ LEVEL 3: DEEP HEALTH │
│ ──────────────────── │
│ "Is everything working correctly?" │
│ • Can we read/write to database? │
│ • Is message queue responding? │
│ • Are all required services up? │
│ │
│ USE: Monitoring, diagnostics │
│ ENDPOINT: /health/deep │
│ CHECK: Full dependency check (expensive, don't use for LB) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Health Check Implementation
Copy
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional, Callable, Awaitable
import asyncio
import time
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class CheckResult:
name: str
status: HealthStatus
message: Optional[str] = None
latency_ms: Optional[float] = None
details: Optional[dict] = None
@dataclass
class HealthReport:
status: HealthStatus
checks: List[CheckResult]
timestamp: float
def to_dict(self) -> dict:
return {
"status": self.status.value,
"timestamp": self.timestamp,
"checks": [
{
"name": c.name,
"status": c.status.value,
"message": c.message,
"latency_ms": c.latency_ms,
"details": c.details
}
for c in self.checks
]
}
class HealthChecker:
"""
Comprehensive health checking system.
Supports liveness, readiness, and deep health checks.
"""
def __init__(self):
self._liveness_checks: Dict[str, Callable[[], Awaitable[CheckResult]]] = {}
self._readiness_checks: Dict[str, Callable[[], Awaitable[CheckResult]]] = {}
self._deep_checks: Dict[str, Callable[[], Awaitable[CheckResult]]] = {}
def register_liveness(
self,
name: str,
check: Callable[[], Awaitable[CheckResult]]
):
self._liveness_checks[name] = check
def register_readiness(
self,
name: str,
check: Callable[[], Awaitable[CheckResult]]
):
self._readiness_checks[name] = check
def register_deep(
self,
name: str,
check: Callable[[], Awaitable[CheckResult]]
):
self._deep_checks[name] = check
async def _run_checks(
self,
checks: Dict[str, Callable[[], Awaitable[CheckResult]]],
timeout: float = 5.0
) -> List[CheckResult]:
"""Run checks concurrently with timeout"""
results = []
async def run_single(name: str, check: Callable):
start = time.time()
try:
result = await asyncio.wait_for(check(), timeout=timeout)
result.latency_ms = (time.time() - start) * 1000
return result
except asyncio.TimeoutError:
return CheckResult(
name=name,
status=HealthStatus.UNHEALTHY,
message=f"Check timed out after {timeout}s",
latency_ms=(time.time() - start) * 1000
)
except Exception as e:
return CheckResult(
name=name,
status=HealthStatus.UNHEALTHY,
message=str(e),
latency_ms=(time.time() - start) * 1000
)
tasks = [run_single(name, check) for name, check in checks.items()]
results = await asyncio.gather(*tasks)
return list(results)
def _aggregate_status(self, results: List[CheckResult]) -> HealthStatus:
"""Aggregate individual check results into overall status"""
if not results:
return HealthStatus.HEALTHY
statuses = [r.status for r in results]
if any(s == HealthStatus.UNHEALTHY for s in statuses):
return HealthStatus.UNHEALTHY
if any(s == HealthStatus.DEGRADED for s in statuses):
return HealthStatus.DEGRADED
return HealthStatus.HEALTHY
async def check_liveness(self) -> HealthReport:
"""Fast liveness check - is the process responding?"""
results = await self._run_checks(self._liveness_checks, timeout=1.0)
return HealthReport(
status=self._aggregate_status(results),
checks=results,
timestamp=time.time()
)
async def check_readiness(self) -> HealthReport:
"""Readiness check - can this instance handle traffic?"""
results = await self._run_checks(self._readiness_checks, timeout=5.0)
return HealthReport(
status=self._aggregate_status(results),
checks=results,
timestamp=time.time()
)
async def check_deep(self) -> HealthReport:
"""Deep health check - full system validation"""
all_checks = {
**self._liveness_checks,
**self._readiness_checks,
**self._deep_checks
}
results = await self._run_checks(all_checks, timeout=30.0)
return HealthReport(
status=self._aggregate_status(results),
checks=results,
timestamp=time.time()
)
# Example usage
health = HealthChecker()
# Liveness: just check if we can respond
async def basic_liveness() -> CheckResult:
return CheckResult(name="basic", status=HealthStatus.HEALTHY)
health.register_liveness("basic", basic_liveness)
# Readiness: check database connection
async def database_ready() -> CheckResult:
try:
await db.execute("SELECT 1")
return CheckResult(name="database", status=HealthStatus.HEALTHY)
except Exception as e:
return CheckResult(
name="database",
status=HealthStatus.UNHEALTHY,
message=str(e)
)
health.register_readiness("database", database_ready)
# Deep: check full write path
async def database_write() -> CheckResult:
try:
await db.execute("INSERT INTO health_check (ts) VALUES (NOW())")
await db.execute("DELETE FROM health_check WHERE ts < NOW() - INTERVAL '1 hour'")
return CheckResult(name="database_write", status=HealthStatus.HEALTHY)
except Exception as e:
return CheckResult(
name="database_write",
status=HealthStatus.UNHEALTHY,
message=str(e)
)
health.register_deep("database_write", database_write)
Graceful Degradation
When things fail, fail gracefully instead of completely.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ GRACEFUL DEGRADATION STRATEGIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. FALLBACK TO CACHE │
│ ──────────────────── │
│ Primary: Fresh data from database │
│ Fallback: Stale data from cache │
│ Better stale than nothing! │
│ │
│ 2. FEATURE DEGRADATION │
│ ───────────────────── │
│ Primary: Personalized recommendations │
│ Fallback: Popular/trending items │
│ Users still see content │
│ │
│ 3. STATIC FALLBACK │
│ ──────────────── │
│ Primary: Dynamic content │
│ Fallback: Cached static page │
│ Site stays up │
│ │
│ 4. ASYNC QUEUE │
│ ───────────── │
│ Primary: Synchronous processing │
│ Fallback: Queue for later processing │
│ Operation will complete eventually │
│ │
│ 5. REDUCED QUALITY │
│ ──────────────── │
│ Primary: High-resolution images │
│ Fallback: Lower resolution or placeholders │
│ Page loads faster │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Implementing Graceful Degradation
Copy
from dataclasses import dataclass
from typing import Optional, TypeVar, Generic, Callable, Awaitable
from enum import Enum
import logging
T = TypeVar('T')
class FallbackReason(Enum):
PRIMARY_TIMEOUT = "primary_timeout"
PRIMARY_ERROR = "primary_error"
PRIMARY_UNAVAILABLE = "primary_unavailable"
CIRCUIT_OPEN = "circuit_open"
@dataclass
class DegradedResponse(Generic[T]):
"""Response that may be degraded"""
value: T
is_degraded: bool
fallback_reason: Optional[FallbackReason] = None
message: Optional[str] = None
class GracefulDegrader:
"""
Implements graceful degradation pattern.
Returns fallback values when primary source fails.
"""
def __init__(self, name: str, circuit_breaker: Optional[CircuitBreaker] = None):
self.name = name
self.circuit_breaker = circuit_breaker
self.logger = logging.getLogger(f"degrader.{name}")
async def execute(
self,
primary: Callable[[], Awaitable[T]],
fallback: Callable[[], Awaitable[T]],
fallback_description: str = "fallback"
) -> DegradedResponse[T]:
"""
Execute primary operation, fall back if it fails.
Args:
primary: Primary operation to try first
fallback: Fallback operation if primary fails
fallback_description: Human-readable description for logging
Returns:
DegradedResponse containing the result and degradation status
"""
# Try primary operation
try:
if self.circuit_breaker:
result = await self.circuit_breaker.execute(primary)
else:
result = await primary()
return DegradedResponse(
value=result,
is_degraded=False
)
except CircuitOpenError as e:
reason = FallbackReason.CIRCUIT_OPEN
message = f"Circuit open, using {fallback_description}"
except TimeoutError as e:
reason = FallbackReason.PRIMARY_TIMEOUT
message = f"Primary timed out, using {fallback_description}"
except Exception as e:
reason = FallbackReason.PRIMARY_ERROR
message = f"Primary failed ({type(e).__name__}), using {fallback_description}"
# Execute fallback
self.logger.warning(f"{self.name}: {message}")
try:
fallback_result = await fallback()
return DegradedResponse(
value=fallback_result,
is_degraded=True,
fallback_reason=reason,
message=message
)
except Exception as e:
self.logger.error(f"{self.name}: Fallback also failed: {e}")
raise
# Example usage
async def get_product_recommendations(user_id: str) -> DegradedResponse[list]:
degrader = GracefulDegrader("recommendations")
async def personalized():
# Call ML service for personalized recommendations
return await recommendation_service.get_personalized(user_id)
async def popular():
# Return cached popular items
return await cache.get("popular_products") or []
return await degrader.execute(
primary=personalized,
fallback=popular,
fallback_description="popular products"
)
# In your controller
async def product_page(request):
user_id = request.user.id
recommendations = await get_product_recommendations(user_id)
if recommendations.is_degraded:
# Maybe show a small banner
return render("product.html", {
"recommendations": recommendations.value,
"personalized": False,
"message": "Showing popular products"
})
else:
return render("product.html", {
"recommendations": recommendations.value,
"personalized": True
})
Load Shedding
When overwhelmed, strategically drop load to preserve core functionality.Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ LOAD SHEDDING STRATEGIES │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. LIFO (Last In, First Out) │
│ ──────────────────────────── │
│ Reject newest requests first │
│ Rationale: They waited least, less frustration │
│ │
│ 2. PRIORITY-BASED │
│ ──────────────── │
│ Keep: Premium users, critical operations │
│ Shed: Free users, non-critical operations │
│ │
│ 3. ADAPTIVE (CoDel - Controlled Delay) │
│ ─────────────────────────────────────── │
│ Monitor queue latency │
│ If consistently > target (e.g., 5ms), start dropping │
│ Smarter than fixed thresholds │
│ │
│ 4. DEADLINE-BASED │
│ ──────────────── │
│ Drop requests whose deadline has passed │
│ No point processing expired requests │
│ │
│ 5. TOKEN BUCKET / RATE LIMITING │
│ ─────────────────────────────── │
│ Fixed rate of allowed requests │
│ Excess immediately rejected │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Copy
import asyncio
import time
from dataclasses import dataclass
from typing import Optional, Dict
from collections import deque
@dataclass
class Request:
id: str
priority: int # Higher = more important
deadline: float # Unix timestamp
enqueued_at: float
class AdaptiveLoadShedder:
"""
Adaptive load shedding using CoDel-inspired algorithm.
Drops requests when queue latency consistently exceeds target.
"""
def __init__(
self,
target_latency_ms: float = 5.0,
interval_ms: float = 100.0,
max_queue_size: int = 1000
):
self.target_latency = target_latency_ms / 1000
self.interval = interval_ms / 1000
self.max_queue_size = max_queue_size
self._queue: deque[Request] = deque()
self._first_above_time: Optional[float] = None
self._drop_next = False
self._drop_count = 0
# Metrics
self._total_requests = 0
self._dropped_requests = 0
def _queue_latency(self) -> float:
"""Get current queue latency (time oldest request has waited)"""
if not self._queue:
return 0
return time.time() - self._queue[0].enqueued_at
def _should_drop(self) -> bool:
"""CoDel-inspired drop decision"""
latency = self._queue_latency()
now = time.time()
if latency < self.target_latency:
self._first_above_time = None
return False
if self._first_above_time is None:
self._first_above_time = now + self.interval
return False
if now >= self._first_above_time:
self._drop_count += 1
# Drop more aggressively as queue stays above target
next_drop_time = self.interval / (self._drop_count ** 0.5)
self._first_above_time = now + next_drop_time
return True
return False
async def enqueue(self, request: Request) -> bool:
"""
Try to enqueue a request.
Returns True if accepted, False if shed.
"""
self._total_requests += 1
# Hard limit on queue size
if len(self._queue) >= self.max_queue_size:
self._dropped_requests += 1
return False
# Check if deadline already passed
if time.time() > request.deadline:
self._dropped_requests += 1
return False
# CoDel-style dropping
if self._should_drop():
self._dropped_requests += 1
return False
self._queue.append(request)
return True
async def dequeue(self) -> Optional[Request]:
"""Get next request, skipping any with passed deadlines"""
while self._queue:
request = self._queue.popleft()
# Skip expired requests
if time.time() > request.deadline:
self._dropped_requests += 1
continue
return request
return None
def get_metrics(self) -> dict:
return {
"queue_size": len(self._queue),
"queue_latency_ms": self._queue_latency() * 1000,
"total_requests": self._total_requests,
"dropped_requests": self._dropped_requests,
"drop_rate": self._dropped_requests / max(1, self._total_requests)
}
# Priority-based load shedder
class PriorityLoadShedder:
"""
Priority-based load shedding.
When overwhelmed, drop lower priority requests first.
"""
def __init__(self, max_load: float = 0.8):
self.max_load = max_load
self._current_load = 0.0
# Priority thresholds (higher = more important)
self.CRITICAL = 100
self.HIGH = 75
self.NORMAL = 50
self.LOW = 25
def update_load(self, load: float):
"""Update current system load (0.0 to 1.0)"""
self._current_load = load
def _min_priority_allowed(self) -> int:
"""Calculate minimum priority to accept based on load"""
if self._current_load < self.max_load:
return 0 # Accept all
# Progressively increase threshold
overload = (self._current_load - self.max_load) / (1.0 - self.max_load)
if overload < 0.25:
return self.LOW # Drop LOW priority
elif overload < 0.5:
return self.NORMAL # Drop NORMAL and below
elif overload < 0.75:
return self.HIGH # Drop HIGH and below
else:
return self.CRITICAL # Only CRITICAL
def should_accept(self, priority: int) -> bool:
"""Check if request with given priority should be accepted"""
return priority >= self._min_priority_allowed()
Combining Patterns
Real systems use multiple patterns together:Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ FAULT TOLERANCE STACK │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────┐ │
│ │ Load Balancer │ │
│ │ (Health Checks) │ │
│ └──────────┬───────────┘ │
│ │ │
│ ┌──────────▼───────────┐ │
│ │ Rate Limiter │ ← Prevent overload │
│ │ (Load Shedding) │ │
│ └──────────┬───────────┘ │
│ │ │
│ ┌───────────────────────────────┼───────────────────────────────────┐ │
│ │ │ │ │
│ │ ┌─────────────────────────▼────────────────────────────┐ │ │
│ │ │ Circuit Breaker │ │ │
│ │ └─────────────────────────┬────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────▼────────────────────────────┐ │ │
│ │ │ Bulkhead │ │ │
│ │ │ (Separate pools per dependency) │ │ │
│ │ └─────────────────────────┬────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────▼────────────────────────────┐ │ │
│ │ │ Retry with Backoff │ │ │
│ │ │ (Exponential + Jitter) │ │ │
│ │ └─────────────────────────┬────────────────────────────┘ │ │
│ │ │ │ │
│ │ ┌─────────────────────────▼────────────────────────────┐ │ │
│ │ │ Timeout │ │ │
│ │ │ (Connection + Read + Deadline propagation) │ │ │
│ │ └─────────────────────────┬────────────────────────────┘ │ │
│ │ │ │ │
│ │ YOUR SERVICE │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ If all fails → Graceful Degradation (fallback, cache, static) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Interview Questions
Q: Design a resilient API gateway
Q: Design a resilient API gateway
Key points to cover:
- Timeouts: Connection and request timeouts per backend
- Circuit Breakers: Per-backend, with different thresholds
- Bulkheads: Separate thread/connection pools per backend
- Retries: Only for idempotent operations, with backoff
- Rate Limiting: Per-client and global limits
- Health Checks: Remove unhealthy backends from rotation
- Fallbacks: Static responses, cached data, or error pages
Q: What's the retry amplification problem and how do you solve it?
Q: What's the retry amplification problem and how do you solve it?
Problem:
In a multi-tier system, each tier retrying multiplies the total attempts.
3 tiers × 3 retries each = 27 actual requests for 1 user request.Solutions:
- Retry at edge only: Internal services fail fast
- Retry budgets: Track % of retries, stop if too high
- Deadline propagation: Don’t retry if deadline passed
- Hedged requests: Send to multiple replicas, take first
Q: How would you implement graceful degradation for a search feature?
Q: How would you implement graceful degradation for a search feature?
Degradation levels:
- Full functionality: Real-time search with personalization
- Cached results: Return last known good results for query
- Popular results: Return trending/popular items
- Static results: Return hardcoded fallback results
- Maintenance mode: Show “search temporarily unavailable”
Key Takeaways
Design for Failure
Assume everything will fail. Build systems that survive failures gracefully.
Fail Fast
Use timeouts, circuit breakers, and health checks to detect failures quickly.
Isolate Failures
Use bulkheads to prevent failures from spreading. Contain the blast radius.
Degrade Gracefully
When things break, return something useful. Stale data beats no data.