Caching is often the difference between a system that handles 100 QPS and one that handles 1 million QPS. This module covers everything from cache fundamentals to advanced distributed caching patterns.
Application manages cache explicitly. Most common pattern.
Copy
┌──────────┐ 1. Read ┌───────────┐│ Client │ ───────────────► │ Cache ││ │ ◄─────────────── │ (Redis) │└────┬─────┘ 2. Miss/Hit └───────────┘ │ │ │ 3. On miss, │ │ read from DB │ ▼ │┌──────────┐ ││ Database │ │└────┬─────┘ │ │ │ └───── 4. Store in cache ──────┘
Copy
async def get_user(user_id: str) -> User: # 1. Try cache first cached = await cache.get(f"user:{user_id}") if cached: return User.from_json(cached) # 2. Cache miss - read from database user = await db.query("SELECT * FROM users WHERE id = ?", user_id) # 3. Populate cache for next time await cache.set(f"user:{user_id}", user.to_json(), ttl=3600) return user
Pros: Simple, only caches what’s needed, cache failures don’t break reads
Cons: Cache miss = slow first read, potential for stale data
async def update_user(user_id: str, data: dict): # Write to database first (source of truth) await db.update("UPDATE users SET ... WHERE id = ?", user_id, data) # Then update cache user = await db.get_user(user_id) await cache.set(f"user:{user_id}", user.to_json(), ttl=3600) return user
Pros: Cache always consistent, no stale data
Cons: Higher write latency, both must succeed
Write to cache immediately, persist to DB asynchronously.
async def update_user(user_id: str, data: dict): # Write to database only await db.update("UPDATE users SET ... WHERE id = ?", user_id, data) # Either invalidate cache (let it refetch) await cache.delete(f"user:{user_id}") # Or let TTL expire naturally # (depends on staleness tolerance)
Pros: Simple, good for write-heavy infrequently-read data
Cons: First read after write is slow
┌─────────────────────────────────────────────────────────────────────────────┐│ CACHE INVALIDATION APPROACHES │├─────────────────────────────────────────────────────────────────────────────┤│ ││ 1. TTL (Time To Live) ││ ───────────────────── ││ Set expiration time on each entry ││ Simple but may serve stale data until expiry ││ ││ cache.set("user:123", data, ttl=3600) # Expires in 1 hour ││ ││ 2. EXPLICIT INVALIDATION ││ ──────────────────────── ││ Delete from cache when data changes ││ Must track all places that cache data ││ ││ await db.update_user(user_id, data) ││ await cache.delete(f"user:{user_id}") ││ ││ 3. EVENT-DRIVEN INVALIDATION ││ ──────────────────────────── ││ Publish events when data changes ││ Cache subscribers invalidate on events ││ ││ Database → Event Bus → Cache Invalidator ││ (CDC) (Kafka) (Consumer) ││ ││ 4. VERSION-BASED INVALIDATION ││ ───────────────────────────── ││ Include version in cache key ││ Bump version to invalidate all entries ││ ││ cache.get(f"user:{user_id}:v{version}") ││ # Change version to invalidate all user caches ││ │└─────────────────────────────────────────────────────────────────────────────┘
import asynciofrom typing import Dictclass SingleFlight: """ Deduplicates concurrent requests for the same key. Only one fetch happens, all waiters get the result. """ def __init__(self): self._in_flight: Dict[str, asyncio.Future] = {} async def do(self, key: str, fetch_func) -> any: if key in self._in_flight: # Wait for in-flight request return await self._in_flight[key] # We're the first, create a future future = asyncio.Future() self._in_flight[key] = future try: result = await fetch_func() future.set_result(result) return result except Exception as e: future.set_exception(e) raise finally: del self._in_flight[key]# Usagesingle_flight = SingleFlight()async def get_user(user_id: str): cached = await cache.get(f"user:{user_id}") if cached: return cached # All concurrent requests for same user will share one DB query user = await single_flight.do( f"user:{user_id}", lambda: db.get_user(user_id) ) await cache.set(f"user:{user_id}", user, ttl=3600) return user
Pros: Prevents duplicate DB queries
Cons: All waiters blocked on one request
Refresh before expiry with some probability.
Copy
import randomimport mathasync def get_with_probabilistic_refresh( key: str, fetch_func, ttl: int = 3600, beta: float = 1.0 # Controls refresh aggressiveness): cached, remaining_ttl = await cache.get_with_ttl(key) if cached is None: # Cache miss value = await fetch_func() await cache.set(key, value, ttl=ttl) return value # Calculate probability of refresh # Higher probability as TTL approaches 0 expiry_gap = remaining_ttl / ttl random_factor = random.random() should_refresh = random_factor > math.exp(-beta * math.log(1 / expiry_gap)) if should_refresh: # Refresh in background asyncio.create_task(_refresh(key, fetch_func, ttl)) return cachedasync def _refresh(key: str, fetch_func, ttl: int): value = await fetch_func() await cache.set(key, value, ttl=ttl)
Pros: Spreads refreshes over time, no locks
Cons: May still have some concurrent refreshes
Use distributed lock for refresh.
Copy
async def get_with_lock(key: str, fetch_func, ttl: int = 3600): cached = await cache.get(key) if cached: return cached lock_key = f"lock:{key}" # Try to acquire lock acquired = await cache.set_nx(lock_key, "1", ttl=10) # 10s lock timeout if acquired: try: # We have the lock, fetch and cache value = await fetch_func() await cache.set(key, value, ttl=ttl) return value finally: await cache.delete(lock_key) else: # Another process is fetching, wait and retry for _ in range(10): # Max 10 retries await asyncio.sleep(0.1) # 100ms cached = await cache.get(key) if cached: return cached # Give up waiting, fetch ourselves return await fetch_func()
Pros: Only one fetch, works across processes
Cons: Lock overhead, potential deadlocks if not careful
Serve stale data while refreshing in background.
Copy
async def get_stale_while_revalidate( key: str, fetch_func, ttl: int = 3600, stale_ttl: int = 7200 # Serve stale for 2 hours): cached, remaining_ttl = await cache.get_with_ttl(key) if cached is None: # Complete miss value = await fetch_func() await cache.set(key, value, ttl=stale_ttl) await cache.set(f"{key}:fresh_until", time.time() + ttl, ttl=stale_ttl) return value fresh_until = await cache.get(f"{key}:fresh_until") or 0 if time.time() > fresh_until: # Data is stale, refresh in background asyncio.create_task(_refresh(key, fetch_func, ttl, stale_ttl)) # Return cached data (possibly stale) return cachedasync def _refresh(key: str, fetch_func, ttl: int, stale_ttl: int): value = await fetch_func() await cache.set(key, value, ttl=stale_ttl) await cache.set(f"{key}:fresh_until", time.time() + ttl, ttl=stale_ttl)
Pros: Always fast, never blocks on refresh
Cons: May serve stale data, more complex
from functools import lru_cacheimport timeclass TwoLevelCache: """ L1: In-process (fast, small, per-instance) L2: Distributed (slower, large, shared) """ def __init__(self, l2_client, l1_max_size=1000, l1_ttl=10): self.l2 = l2_client self.l1_ttl = l1_ttl self.l1 = {} # {key: (value, expire_at)} async def get(self, key: str): # Check L1 first if key in self.l1: value, expire_at = self.l1[key] if time.time() < expire_at: return value del self.l1[key] # L1 miss, check L2 value = await self.l2.get(key) if value is not None: # Store in L1 for next time self.l1[key] = (value, time.time() + self.l1_ttl) return value# Hot keys served from memory, never hit Redis!
import randomclass ReplicatedKeyCache: """ Replicate hot keys across multiple logical slots. Reads pick a random replica, writes update all. """ def __init__(self, cache, replicas=10): self.cache = cache self.replicas = replicas self.hot_keys = set() # Track known hot keys def _get_replica_key(self, key: str, replica: int = None) -> str: if replica is None: replica = random.randint(0, self.replicas - 1) return f"{key}:replica:{replica}" async def get(self, key: str): if key in self.hot_keys: # Pick random replica replica_key = self._get_replica_key(key) return await self.cache.get(replica_key) return await self.cache.get(key) async def set_hot(self, key: str, value: any, ttl: int = 3600): """Set a known hot key across all replicas""" self.hot_keys.add(key) # Write to all replicas tasks = [ self.cache.set(self._get_replica_key(key, i), value, ttl=ttl) for i in range(self.replicas) ] await asyncio.gather(*tasks) async def delete_hot(self, key: str): """Delete hot key from all replicas""" self.hot_keys.discard(key) tasks = [ self.cache.delete(self._get_replica_key(key, i)) for i in range(self.replicas) ] await asyncio.gather(*tasks)
Pros: Spreads load across cluster
Cons: More storage, complex invalidation
Batch concurrent requests for same key.
Copy
from collections import defaultdictimport asyncioclass RequestCoalescer: """ Coalesce concurrent requests for the same key. Instead of N cache hits, do 1 cache hit and broadcast result. """ def __init__(self, cache, batch_window_ms=5): self.cache = cache self.batch_window = batch_window_ms / 1000 self.pending = defaultdict(list) # key -> [(event, result_holder)] self._lock = asyncio.Lock() async def get(self, key: str): async with self._lock: if key in self.pending: # Join existing batch event = asyncio.Event() result_holder = {'value': None, 'error': None} self.pending[key].append((event, result_holder)) # Wait outside lock async with self._lock: pass await event.wait() if result_holder['error']: raise result_holder['error'] return result_holder['value'] # Start a new batch self.pending[key] = [] # Wait for more requests to join await asyncio.sleep(self.batch_window) # Get waiters and clear async with self._lock: waiters = self.pending.pop(key, []) # Fetch once try: value = await self.cache.get(key) # Notify all waiters for event, result_holder in waiters: result_holder['value'] = value event.set() return value except Exception as e: for event, result_holder in waiters: result_holder['error'] = e event.set() raise
Pros: Reduces cache hits by order of magnitude
Cons: Adds small latency (batch window), complex
At Staff/Principal level, you must move beyond the “application-manages-cache” (Cache-Aside) model. In a microservices architecture, managing cache logic in every service leads to duplication and inconsistency.
Staff Tip: Sidecar caching is the only way to scale caching in a Polyglot Microservices environment. It decouples the “How to cache” (retries, hashing) from the “What to cache” (business logic).
Your company exposes a global read-heavy API (product catalog, content, or configuration) with users across multiple regions. You want sub-100ms p99 latencies while still keeping a single source-of-truth database.
Scenario 2: Configuration and Feature Flag Caching
You have a central configuration/feature flag service that many microservices depend on. Latency spikes or downtime in this service must not take down the entire fleet.Requirements:
Services should start even if the config service is temporarily unavailable.
Each service instance should cache configuration locally and refresh in the background.
Changes should propagate within seconds, not minutes.
Architecture:
Source of truth: configuration stored in a durable DB (e.g., Postgres) behind a config service.
Distributed cache: Redis/Etcd used by the config service to cache hot config blobs (config:service_name).
Service-local cache: each microservice maintains an in-process cache of parsed config.
Notification channel: a message bus (Kafka, SNS/SQS, Redis Pub/Sub) carries “config changed” events.
Copy
# In the config serviceasync def update_feature_flags(payload: dict): await db.update_flags(payload) await cache.set("config:features", json.dumps(payload), ttl=300) await event_bus.publish("config.updated", {"keys": ["features"]})# In each serviceasync def config_watcher(): async for event in event_bus.subscribe("config.updated"): for key in event["keys"]: await refresh_config_key(key)async def refresh_config_key(key: str): raw = await cache.get(f"config:{key}") if raw is None: raw = await config_http_client.get(f"/config/{key}") LOCAL_CONFIG[key] = json.loads(raw)
Design notes:
On startup, services block briefly to fetch essential config; thereafter they serve from local memory.
If the config service and cache are both down, services can keep using the last known good config in memory.
For safety-sensitive flags, include expiry timestamps so services can disable risky behavior if config has not refreshed in too long.
Scenario 3: Bounded-Staleness Caching on Top of a Strong Database
Sometimes you want strictly serializable writes but are happy to serve data up to (T) seconds old for reads. This is common for dashboards, search results, and secondary views.Goal: “Reads may be stale by at most T seconds, never more. Writes must always see their own effect.”Design:
Keep strong consistency in the primary DB (e.g., Spanner/CockroachDB/Postgres with serializable isolation).
Layer a cache in front with staleness metadata per key.
Copy
@dataclassclass CachedValue: value: dict last_write_time: float # Unix timestampMAX_STALENESS_SEC = 5.0async def get_with_bounded_staleness(key: str) -> dict: now = time.time() wrapped: CachedValue | None = await cache.get(key) if wrapped and now - wrapped.last_write_time <= MAX_STALENESS_SEC: return wrapped.value # Too stale or missing: read from DB row = await db.fetch_row(key) wrapped = CachedValue(value=row, last_write_time=now) await cache.set(key, wrapped, ttl=60) return row
Write path:
On write, delete cache (or write a fresh CachedValue with current last_write_time).
If you have a write-heavy workload, combine this with double-delete or CDC-backed invalidation to avoid stale repopulation races.
Trade-offs:
You can tighten or loosen MAX_STALENESS_SEC per endpoint: 1–2s for user-facing dashboards, 30–60s for admin analytics.
Reads that cannot tolerate any staleness bypass cache entirely and always hit the DB.