Skip to main content
Senior Level: Fault tolerance is what separates good systems from great ones. Interviewers expect senior engineers to design for failure from day one.

Design for Failure Mindset

Failure Scenarios

Redundancy Patterns

Active-Passive (Standby)

Active-Passive Pattern

Active-Active

Active-Active Pattern

Multi-Region Active-Active

Multi-Region Deployment

Resilience Patterns

Circuit Breaker (Deep Dive)

Circuit Breaker
from enum import Enum
from datetime import datetime, timedelta
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing fast
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """
    Production-grade circuit breaker with:
    - Failure threshold
    - Success threshold for recovery
    - Timeout for open state
    - Thread safety
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 3,
        timeout_seconds: int = 30
    ):
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timedelta(seconds=timeout_seconds)
        
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = None
        self.lock = threading.Lock()
    
    def call(self, func, *args, **kwargs):
        with self.lock:
            if not self._can_execute():
                raise CircuitOpenError("Circuit is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if timeout has passed
            if datetime.now() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                return True
            return False
        
        # HALF_OPEN: allow limited requests
        return True
    
    def _on_success(self):
        with self.lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    # Service recovered!
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0
    
    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.state == CircuitState.HALF_OPEN:
                # Failed during recovery test
                self.state = CircuitState.OPEN
            elif self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN


# Usage with fallback
circuit = CircuitBreaker(failure_threshold=5, timeout_seconds=30)

def get_user_with_fallback(user_id):
    try:
        return circuit.call(user_service.get_user, user_id)
    except CircuitOpenError:
        # Return cached data or default
        return cache.get(f"user:{user_id}") or {"id": user_id, "name": "Unknown"}

Retry Strategies

Retry Strategies
import asyncio
import random
import time
from functools import wraps
from typing import Callable, TypeVar, Type, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import logging

T = TypeVar('T')
logger = logging.getLogger(__name__)

class RetryStrategy(Enum):
    EXPONENTIAL = "exponential"
    LINEAR = "linear"
    FIBONACCI = "fibonacci"
    DECORRELATED_JITTER = "decorrelated_jitter"

@dataclass
class RetryConfig:
    max_retries: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    strategy: RetryStrategy = RetryStrategy.EXPONENTIAL
    jitter: bool = True
    retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
    non_retryable_exceptions: Tuple[Type[Exception], ...] = ()
    on_retry: Optional[Callable[[Exception, int, float], None]] = None

class RetryExhausted(Exception):
    def __init__(self, last_exception: Exception, attempts: int):
        self.last_exception = last_exception
        self.attempts = attempts
        super().__init__(f"All {attempts} retry attempts failed")

class RetryHandler:
    """Advanced retry handler with multiple strategies"""
    
    def __init__(self, config: RetryConfig = None):
        self.config = config or RetryConfig()
        self._fib_cache = {0: 0, 1: 1}
        self._last_delay = self.config.base_delay
    
    def _fibonacci(self, n: int) -> int:
        if n not in self._fib_cache:
            self._fib_cache[n] = self._fibonacci(n - 1) + self._fibonacci(n - 2)
        return self._fib_cache[n]
    
    def _calculate_delay(self, attempt: int) -> float:
        cfg = self.config
        
        if cfg.strategy == RetryStrategy.EXPONENTIAL:
            delay = cfg.base_delay * (2 ** attempt)
        elif cfg.strategy == RetryStrategy.LINEAR:
            delay = cfg.base_delay * (attempt + 1)
        elif cfg.strategy == RetryStrategy.FIBONACCI:
            delay = cfg.base_delay * self._fibonacci(attempt + 2)
        elif cfg.strategy == RetryStrategy.DECORRELATED_JITTER:
            # AWS recommended: sleep = min(cap, random(base, sleep * 3))
            delay = random.uniform(cfg.base_delay, self._last_delay * 3)
            self._last_delay = delay
        else:
            delay = cfg.base_delay
        
        delay = min(delay, cfg.max_delay)
        
        # Add standard jitter (not for decorrelated which has built-in)
        if cfg.jitter and cfg.strategy != RetryStrategy.DECORRELATED_JITTER:
            delay = delay * (0.5 + random.random())
        
        return delay
    
    def _should_retry(self, exception: Exception) -> bool:
        # Check non-retryable first
        if isinstance(exception, self.config.non_retryable_exceptions):
            return False
        return isinstance(exception, self.config.retryable_exceptions)
    
    async def execute(self, func: Callable[[], T]) -> T:
        """Execute function with retry logic"""
        last_exception = None
        
        for attempt in range(self.config.max_retries):
            try:
                if asyncio.iscoroutinefunction(func):
                    return await func()
                return func()
                
            except Exception as e:
                last_exception = e
                
                if not self._should_retry(e):
                    raise
                
                if attempt == self.config.max_retries - 1:
                    break
                
                delay = self._calculate_delay(attempt)
                
                logger.warning(
                    f"Attempt {attempt + 1}/{self.config.max_retries} failed: {e}. "
                    f"Retrying in {delay:.2f}s"
                )
                
                if self.config.on_retry:
                    self.config.on_retry(e, attempt + 1, delay)
                
                await asyncio.sleep(delay)
        
        raise RetryExhausted(last_exception, self.config.max_retries)
    
    def __call__(self, func: Callable) -> Callable:
        """Use as decorator"""
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            return await self.execute(lambda: func(*args, **kwargs))
        
        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            return asyncio.run(self.execute(lambda: func(*args, **kwargs)))
        
        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper


# Retry with circuit breaker integration
class ResilientCaller:
    """Combines retry, circuit breaker, and timeout"""
    
    def __init__(
        self,
        circuit_breaker: 'CircuitBreaker',
        retry_config: RetryConfig = None,
        timeout_seconds: float = 30.0
    ):
        self.circuit = circuit_breaker
        self.retry = RetryHandler(retry_config or RetryConfig(max_retries=3))
        self.timeout = timeout_seconds
    
    async def call(
        self, 
        func: Callable, 
        fallback: Callable = None,
        *args, 
        **kwargs
    ) -> Any:
        """Execute with full resilience pattern"""
        
        async def wrapped():
            async with asyncio.timeout(self.timeout):
                return await self.circuit.call_async(
                    lambda: func(*args, **kwargs)
                )
        
        try:
            return await self.retry.execute(wrapped)
        except (RetryExhausted, CircuitBreakerOpenException) as e:
            if fallback:
                logger.warning(f"Using fallback due to: {e}")
                return await fallback(*args, **kwargs) if asyncio.iscoroutinefunction(fallback) else fallback(*args, **kwargs)
            raise


# Usage examples
@RetryHandler(RetryConfig(
    max_retries=3,
    strategy=RetryStrategy.EXPONENTIAL,
    retryable_exceptions=(ConnectionError, TimeoutError)
))
async def fetch_data(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()


# With metrics callback
def log_retry(exc: Exception, attempt: int, delay: float):
    metrics.increment("retries", tags={"attempt": attempt})
    
retry_handler = RetryHandler(RetryConfig(
    max_retries=5,
    strategy=RetryStrategy.DECORRELATED_JITTER,
    on_retry=log_retry
))

async def call_payment_api(amount: float):
    return await retry_handler.execute(
        lambda: payment_client.charge(amount)
    )

Bulkhead Pattern

Bulkhead Pattern
import asyncio
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import Dict, Any, Optional, Callable
from enum import Enum
import time
import logging

logger = logging.getLogger(__name__)

class BulkheadFullError(Exception):
    def __init__(self, bulkhead_name: str, current: int, max_size: int):
        self.bulkhead_name = bulkhead_name
        self.current = current
        self.max_size = max_size
        super().__init__(
            f"Bulkhead '{bulkhead_name}' is full ({current}/{max_size})"
        )

@dataclass
class BulkheadMetrics:
    accepted: int = 0
    rejected: int = 0
    active: int = 0
    peak_active: int = 0
    total_wait_time: float = 0.0

class Bulkhead:
    """
    Bulkhead pattern with semaphore and queue.
    Isolates failures to prevent cascade effects.
    """
    
    def __init__(
        self,
        name: str,
        max_concurrent: int,
        max_queue: int = 0,
        queue_timeout: float = 10.0
    ):
        self.name = name
        self.max_concurrent = max_concurrent
        self.max_queue = max_queue
        self.queue_timeout = queue_timeout
        
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.metrics = BulkheadMetrics()
        self._queue_size = 0
    
    @asynccontextmanager
    async def acquire(self, timeout: Optional[float] = None):
        """Acquire a slot in the bulkhead"""
        timeout = timeout or self.queue_timeout
        start = time.time()
        
        # Check if we can queue
        if self.semaphore.locked():
            if self._queue_size >= self.max_queue:
                self.metrics.rejected += 1
                raise BulkheadFullError(
                    self.name, 
                    self.metrics.active, 
                    self.max_concurrent
                )
            self._queue_size += 1
        
        try:
            acquired = await asyncio.wait_for(
                self.semaphore.acquire(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            self._queue_size = max(0, self._queue_size - 1)
            self.metrics.rejected += 1
            raise BulkheadFullError(
                self.name,
                self.metrics.active,
                self.max_concurrent
            )
        finally:
            if self._queue_size > 0:
                self._queue_size -= 1
        
        wait_time = time.time() - start
        self.metrics.total_wait_time += wait_time
        self.metrics.active += 1
        self.metrics.accepted += 1
        self.metrics.peak_active = max(self.metrics.peak_active, self.metrics.active)
        
        try:
            yield
        finally:
            self.metrics.active -= 1
            self.semaphore.release()
    
    def __call__(self, func: Callable):
        """Use as decorator"""
        async def wrapper(*args, **kwargs):
            async with self.acquire():
                return await func(*args, **kwargs)
        return wrapper
    
    def get_metrics(self) -> Dict[str, Any]:
        return {
            "name": self.name,
            "active": self.metrics.active,
            "max_concurrent": self.max_concurrent,
            "queue_size": self._queue_size,
            "max_queue": self.max_queue,
            "accepted": self.metrics.accepted,
            "rejected": self.metrics.rejected,
            "rejection_rate": self.metrics.rejected / max(1, self.metrics.accepted + self.metrics.rejected),
            "peak_active": self.metrics.peak_active,
            "avg_wait_ms": (self.metrics.total_wait_time / max(1, self.metrics.accepted)) * 1000
        }


class ThreadPoolBulkhead:
    """
    Thread pool based bulkhead for CPU-bound or blocking operations.
    Uses a dedicated thread pool to isolate work.
    """
    
    def __init__(self, name: str, max_workers: int):
        from concurrent.futures import ThreadPoolExecutor
        self.name = name
        self.executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=f"bulkhead-{name}"
        )
        self.max_workers = max_workers
    
    async def run(self, func: Callable, *args, **kwargs) -> Any:
        """Run blocking function in isolated thread pool"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: func(*args, **kwargs)
        )
    
    def shutdown(self):
        self.executor.shutdown(wait=True)


class BulkheadManager:
    """Manage multiple bulkheads for different services"""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.bulkheads = {}
        return cls._instance
    
    def create(
        self,
        name: str,
        max_concurrent: int,
        max_queue: int = 0
    ) -> Bulkhead:
        bulkhead = Bulkhead(name, max_concurrent, max_queue)
        self.bulkheads[name] = bulkhead
        return bulkhead
    
    def get(self, name: str) -> Optional[Bulkhead]:
        return self.bulkheads.get(name)
    
    def get_all_metrics(self) -> Dict[str, Dict]:
        return {name: bh.get_metrics() for name, bh in self.bulkheads.items()}


# Usage
bulkhead_manager = BulkheadManager()

# Different pools for different services
payment_bulkhead = bulkhead_manager.create("payment", max_concurrent=10, max_queue=20)
inventory_bulkhead = bulkhead_manager.create("inventory", max_concurrent=30)
notification_bulkhead = bulkhead_manager.create("notification", max_concurrent=50)

@payment_bulkhead
async def charge_payment(order_id: str, amount: float):
    """Payment calls isolated in their own pool"""
    return await payment_client.charge(order_id, amount)

@inventory_bulkhead
async def reserve_inventory(items: list):
    """Inventory calls isolated - won't affect payments"""
    return await inventory_client.reserve(items)

async def process_order(order):
    """Even if inventory is slow, payments still work"""
    try:
        # These run in isolated pools
        payment = await charge_payment(order.id, order.total)
        inventory = await reserve_inventory(order.items)
        return {"status": "success", "payment": payment}
    except BulkheadFullError as e:
        # One service full doesn't crash everything
        logger.warning(f"Bulkhead full: {e}")
        return {"status": "retry_later", "reason": str(e)}

# FastAPI integration
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.middleware("http")
async def bulkhead_metrics_middleware(request: Request, call_next):
    response = await call_next(request)
    # Add bulkhead metrics to response headers
    metrics = bulkhead_manager.get_all_metrics()
    response.headers["X-Bulkhead-Active"] = str(
        sum(m["active"] for m in metrics.values())
    )
    return response

@app.get("/metrics/bulkheads")
async def get_bulkhead_metrics():
    return bulkhead_manager.get_all_metrics()

Health Checks

Health Check Types

┌─────────────────────────────────────────────────────────────────┐
│                   Health Check Types                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  LIVENESS CHECK                                                 │
│  ─────────────                                                  │
│  "Is the process running?"                                     │
│  If fails: Restart the container/process                       │
│                                                                 │
│  GET /health/live → 200 OK                                     │
│                                                                 │
│  Check:                                                        │
│  • Process responding                                          │
│  • Not deadlocked                                              │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  READINESS CHECK                                                │
│  ───────────────                                                │
│  "Can it handle traffic?"                                      │
│  If fails: Remove from load balancer                           │
│                                                                 │
│  GET /health/ready → 200 OK or 503 Not Ready                   │
│                                                                 │
│  Check:                                                        │
│  • Database connection works                                   │
│  • Cache connection works                                      │
│  • Required dependencies reachable                             │
│  • Warmup complete                                             │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  DEEP HEALTH CHECK (Use sparingly!)                            │
│  ────────────────                                               │
│  "Is everything working?"                                      │
│  Used for: Monitoring dashboards, not load balancers           │
│                                                                 │
│  GET /health/deep → { db: ok, cache: ok, queue: ok }          │
│                                                                 │
│  Warning: Can be expensive, rate limit!                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
from fastapi import FastAPI, Response
from datetime import datetime

app = FastAPI()

@app.get("/health/live")
async def liveness():
    """Just checks if the process is alive"""
    return {"status": "alive", "timestamp": datetime.utcnow().isoformat()}

@app.get("/health/ready")
async def readiness(response: Response):
    """Checks if we can handle traffic"""
    checks = {}
    
    # Check database
    try:
        await db.execute("SELECT 1")
        checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {str(e)}"
        response.status_code = 503
    
    # Check Redis
    try:
        await redis.ping()
        checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = f"error: {str(e)}"
        response.status_code = 503
    
    # Check if warmup complete
    if not app.state.warmup_complete:
        checks["warmup"] = "in progress"
        response.status_code = 503
    else:
        checks["warmup"] = "complete"
    
    return {
        "status": "ready" if response.status_code == 200 else "not ready",
        "checks": checks
    }

Timeouts

Timeout Hierarchy

Timeout Hierarchy

Deadline Propagation

import time
from contextvars import ContextVar

# Context variable for deadline
deadline_ctx: ContextVar[float] = ContextVar('deadline', default=None)

def with_deadline(timeout_seconds: float):
    """Set deadline for current request"""
    deadline = time.time() + timeout_seconds
    deadline_ctx.set(deadline)
    return deadline

def remaining_time() -> float:
    """Get remaining time until deadline"""
    deadline = deadline_ctx.get()
    if deadline is None:
        return float('inf')
    return max(0, deadline - time.time())

async def call_service(service_name: str, payload: dict):
    """Call service with propagated deadline"""
    remaining = remaining_time()
    
    if remaining <= 0:
        raise DeadlineExceeded("Request deadline already passed")
    
    # Use remaining time as timeout (with buffer)
    timeout = min(remaining * 0.9, 30.0)  # 90% of remaining, max 30s
    
    try:
        async with asyncio.timeout(timeout):
            return await http_client.post(
                f"http://{service_name}/api",
                json=payload,
                headers={"X-Deadline": str(deadline_ctx.get())}
            )
    except asyncio.TimeoutError:
        raise DeadlineExceeded(f"Timeout calling {service_name}")

Graceful Degradation

Graceful Degradation

Senior Interview Questions

Key components:
  1. Redundancy: At least 2 of everything (servers, DBs, regions)
  2. Load balancing: Automatic failover when node fails
  3. Health checks: Detect failures in seconds
  4. Auto-scaling: Handle traffic spikes
  5. Multi-region: Survive region outages
  6. Chaos engineering: Regularly test failure scenarios
Math: 99.99% = 52 minutes downtime/year
  • Single component at 99.9% can’t achieve 99.99%
  • Need redundancy: 2 components at 99.9% = 99.9999% (if independent)
Saga Pattern:
  1. Each step has a compensating action
  2. If step N fails, run compensations for steps N-1 to 1
  3. Track saga state in database
Example:
1. Create order → Compensate: Cancel order
2. Reserve inventory → Compensate: Release inventory
3. Charge payment → Compensate: Refund payment
4. Ship order → Compensate: Cancel shipment

If step 3 fails:
- Refund payment (if partially charged)
- Release inventory
- Cancel order
Defense layers:
  1. Circuit breakers: Stop calling failing service
  2. Timeouts: Don’t wait forever
  3. Bulkheads: Isolate failures to one service
  4. Rate limiting: Prevent overload
  5. Load shedding: Reject low-priority requests
  6. Fallbacks: Degrade gracefully
Key insight: “Fast failure is better than slow failure. If a service is struggling, fail fast and use fallback.”
Chaos Engineering approach:
  1. Define steady state: Normal metrics (latency, error rate)
  2. Form hypothesis: “System handles server failure”
  3. Inject failure: Kill a server
  4. Observe: Did metrics stay within bounds?
  5. Fix and repeat
Types of failures to test:
  • Server crashes
  • Network partitions
  • High latency
  • Disk full
  • Memory exhaustion
  • Clock skew
  • Dependency outages