December 2025 Update: Production-ready caching patterns including semantic caching, Redis integration, and OpenAI’s prompt caching.
Why Cache LLM Responses?
LLM calls are expensive and slow:| Metric | Without Caching | With Caching |
|---|---|---|
| Latency | 500-3000ms | 5-50ms |
| Cost | $0.01-0.10/call | $0 for cache hits |
| Rate Limits | Easily hit | Reduced pressure |
Copy
Cache Hit Rate Cost Savings Latency Improvement
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
50% 50% 10x
80% 80% 50x
95% 95% 100x
OpenAI Prompt Caching (Built-in)
OpenAI automatically caches prompts with shared prefixes:Copy
from openai import OpenAI
client = OpenAI()
# Long system prompt - gets cached after first call
SYSTEM_PROMPT = """You are an expert customer service agent for TechCorp.
[Insert 2000+ tokens of product documentation, FAQs, policies...]
Always be helpful, accurate, and follow company guidelines.
"""
# First call: Full price
response1 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": "What's your return policy?"}
]
)
# Second call: Cached prefix = 50% discount on cached tokens!
response2 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT}, # Cached!
{"role": "user", "content": "How do I track my order?"}
]
)
# Check cache usage
print(f"Cached tokens: {response2.usage.prompt_tokens_details.cached_tokens}")
Maximizing Prompt Cache Hits
Copy
class PromptCacheOptimizer:
"""Optimize prompts for OpenAI's prompt caching"""
def __init__(self, base_system_prompt: str):
# Static content at the beginning (gets cached)
self.static_prefix = base_system_prompt
def build_prompt(
self,
dynamic_context: str,
user_message: str
) -> list[dict]:
"""Build prompt with cacheable prefix"""
# Structure: [Static (cached)] + [Dynamic] + [User]
return [
{
"role": "system",
"content": self.static_prefix + "\n\n" + dynamic_context
},
{"role": "user", "content": user_message}
]
# Usage
optimizer = PromptCacheOptimizer("""
You are an expert assistant with deep knowledge of our products.
# Product Catalog
[2000+ tokens of static product info...]
# Company Policies
[1000+ tokens of static policies...]
# Response Guidelines
- Be concise and helpful
- Always cite sources
- Admit when unsure
""")
# The static prefix will be cached across all calls
messages = optimizer.build_prompt(
dynamic_context="Customer is a VIP member since 2020",
user_message="Can I get a discount?"
)
Exact Match Caching
Cache identical requests with deterministic settings:Copy
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis
class ExactMatchCache:
"""Cache exact query matches"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
ttl_hours: int = 24
):
self.redis = redis.from_url(redis_url)
self.ttl = timedelta(hours=ttl_hours)
def _hash_request(
self,
model: str,
messages: list[dict],
temperature: float,
**kwargs
) -> str:
"""Create deterministic hash of request"""
key_data = {
"model": model,
"messages": messages,
"temperature": temperature,
**{k: v for k, v in sorted(kwargs.items())}
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
def get(
self,
model: str,
messages: list[dict],
**kwargs
) -> Optional[str]:
"""Get cached response"""
key = self._hash_request(model, messages, **kwargs)
cached = self.redis.get(f"llm:{key}")
return cached.decode() if cached else None
def set(
self,
model: str,
messages: list[dict],
response: str,
**kwargs
):
"""Cache a response"""
key = self._hash_request(model, messages, **kwargs)
self.redis.setex(
f"llm:{key}",
self.ttl,
response
)
def get_stats(self) -> dict:
"""Get cache statistics"""
info = self.redis.info()
return {
"hits": info.get("keyspace_hits", 0),
"misses": info.get("keyspace_misses", 0),
"hit_rate": info.get("keyspace_hits", 0) /
max(info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1)
}
# Usage with OpenAI
from openai import OpenAI
client = OpenAI()
cache = ExactMatchCache()
def cached_completion(messages: list[dict], **kwargs) -> str:
# Only cache deterministic requests
if kwargs.get("temperature", 1.0) == 0:
cached = cache.get("gpt-4o", messages, **kwargs)
if cached:
return cached
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
**kwargs
)
result = response.choices[0].message.content
# Cache deterministic responses
if kwargs.get("temperature", 1.0) == 0:
cache.set("gpt-4o", messages, result, **kwargs)
return result
Semantic Caching
Cache based on meaning, not exact match:Copy
from openai import OpenAI
import numpy as np
from typing import Optional
import json
class SemanticCache:
"""Cache based on semantic similarity"""
def __init__(
self,
similarity_threshold: float = 0.95,
embedding_model: str = "text-embedding-3-small"
):
self.client = OpenAI()
self.threshold = similarity_threshold
self.embedding_model = embedding_model
# In-memory store (use Redis/Pinecone in production)
self.cache: list[dict] = []
def _get_embedding(self, text: str) -> np.ndarray:
response = self.client.embeddings.create(
model=self.embedding_model,
input=text
)
return np.array(response.data[0].embedding)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, query: str) -> Optional[str]:
"""Find semantically similar cached response"""
if not self.cache:
return None
query_embedding = self._get_embedding(query)
best_match = None
best_score = 0
for entry in self.cache:
similarity = self._cosine_similarity(
query_embedding,
entry["embedding"]
)
if similarity > best_score and similarity >= self.threshold:
best_score = similarity
best_match = entry
if best_match:
return best_match["response"]
return None
def set(self, query: str, response: str):
"""Cache a query-response pair"""
embedding = self._get_embedding(query)
self.cache.append({
"query": query,
"response": response,
"embedding": embedding
})
# Usage
semantic_cache = SemanticCache(similarity_threshold=0.92)
def smart_completion(user_query: str) -> str:
# Check semantic cache
cached = semantic_cache.get(user_query)
if cached:
print("🎯 Semantic cache hit!")
return cached
# Call LLM
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_query}]
)
result = response.choices[0].message.content
# Cache the response
semantic_cache.set(user_query, result)
return result
# These would likely hit the same cache entry:
# "What is machine learning?"
# "Can you explain machine learning?"
# "What's ML?"
Production Semantic Cache with Redis
Copy
import redis
import numpy as np
import json
from typing import Optional
class ProductionSemanticCache:
"""Production-ready semantic cache with Redis"""
def __init__(
self,
redis_url: str,
similarity_threshold: float = 0.93,
max_entries: int = 10000
):
self.redis = redis.from_url(redis_url)
self.threshold = similarity_threshold
self.max_entries = max_entries
self.client = OpenAI()
def _get_embedding(self, text: str) -> list[float]:
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def get(self, query: str, context_key: str = "default") -> Optional[str]:
"""Get cached response with context isolation"""
query_embedding = np.array(self._get_embedding(query))
# Get all cache entries for this context
cache_keys = self.redis.keys(f"semantic:{context_key}:*")
best_match = None
best_score = 0
for key in cache_keys:
entry = json.loads(self.redis.get(key))
cached_embedding = np.array(entry["embedding"])
similarity = np.dot(query_embedding, cached_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding)
)
if similarity > best_score and similarity >= self.threshold:
best_score = similarity
best_match = entry
return best_match["response"] if best_match else None
def set(
self,
query: str,
response: str,
context_key: str = "default",
ttl_seconds: int = 86400
):
"""Cache with TTL and context isolation"""
embedding = self._get_embedding(query)
entry = {
"query": query,
"response": response,
"embedding": embedding
}
# Use hash of query as key
key = f"semantic:{context_key}:{hash(query)}"
self.redis.setex(key, ttl_seconds, json.dumps(entry))
# Enforce max entries
self._enforce_limit(context_key)
def _enforce_limit(self, context_key: str):
"""Remove oldest entries if over limit"""
keys = self.redis.keys(f"semantic:{context_key}:*")
if len(keys) > self.max_entries:
# Remove oldest 10%
to_remove = len(keys) - int(self.max_entries * 0.9)
for key in keys[:to_remove]:
self.redis.delete(key)
Multi-Layer Caching
Combine caching strategies for maximum efficiency:Copy
from abc import ABC, abstractmethod
from typing import Optional
import time
class CacheLayer(ABC):
@abstractmethod
def get(self, key: str) -> Optional[str]:
pass
@abstractmethod
def set(self, key: str, value: str):
pass
class L1MemoryCache(CacheLayer):
"""In-memory cache for hot data"""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
self.cache = {}
self.max_size = max_size
self.ttl = ttl_seconds
def get(self, key: str) -> Optional[str]:
entry = self.cache.get(key)
if entry and time.time() - entry["time"] < self.ttl:
return entry["value"]
return None
def set(self, key: str, value: str):
if len(self.cache) >= self.max_size:
# Remove oldest
oldest = min(self.cache, key=lambda k: self.cache[k]["time"])
del self.cache[oldest]
self.cache[key] = {"value": value, "time": time.time()}
class L2RedisCache(CacheLayer):
"""Redis cache for shared state"""
def __init__(self, redis_client, ttl_seconds: int = 3600):
self.redis = redis_client
self.ttl = ttl_seconds
def get(self, key: str) -> Optional[str]:
value = self.redis.get(f"l2:{key}")
return value.decode() if value else None
def set(self, key: str, value: str):
self.redis.setex(f"l2:{key}", self.ttl, value)
class L3SemanticCache(CacheLayer):
"""Semantic similarity cache"""
def __init__(self, semantic_cache: SemanticCache):
self.cache = semantic_cache
def get(self, key: str) -> Optional[str]:
return self.cache.get(key)
def set(self, key: str, value: str):
self.cache.set(key, value)
class MultiLayerCache:
"""Tiered caching system"""
def __init__(self, layers: list[CacheLayer]):
self.layers = layers
def get(self, key: str) -> tuple[Optional[str], int]:
"""Get value, returns (value, layer_index) or (None, -1)"""
for i, layer in enumerate(self.layers):
value = layer.get(key)
if value:
# Backfill higher layers
for j in range(i):
self.layers[j].set(key, value)
return value, i
return None, -1
def set(self, key: str, value: str):
"""Set in all layers"""
for layer in self.layers:
layer.set(key, value)
# Usage
cache = MultiLayerCache([
L1MemoryCache(max_size=100, ttl_seconds=60), # Hot cache
L2RedisCache(redis_client, ttl_seconds=3600), # Shared cache
L3SemanticCache(semantic_cache) # Semantic matching
])
def cached_llm_call(query: str) -> str:
# Check cache layers
cached, layer = cache.get(query)
if cached:
print(f"Cache hit at L{layer + 1}")
return cached
# Call LLM
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}]
)
result = response.choices[0].message.content
# Populate all cache layers
cache.set(query, result)
return result
Cache Invalidation Strategies
Copy
from datetime import datetime, timedelta
from typing import Optional
import re
class SmartCacheInvalidator:
"""Intelligent cache invalidation"""
def __init__(self, cache: MultiLayerCache):
self.cache = cache
self.invalidation_rules = []
def add_time_rule(
self,
pattern: str,
ttl: timedelta
):
"""Invalidate entries matching pattern after TTL"""
self.invalidation_rules.append({
"type": "time",
"pattern": re.compile(pattern),
"ttl": ttl
})
def add_event_rule(
self,
event_type: str,
pattern: str
):
"""Invalidate on specific events"""
self.invalidation_rules.append({
"type": "event",
"event": event_type,
"pattern": re.compile(pattern)
})
def on_event(self, event_type: str, data: dict):
"""Handle invalidation events"""
for rule in self.invalidation_rules:
if rule["type"] == "event" and rule["event"] == event_type:
# Invalidate matching cache entries
self._invalidate_pattern(rule["pattern"])
def _invalidate_pattern(self, pattern: re.Pattern):
"""Invalidate all entries matching pattern"""
# Implementation depends on cache backend
pass
# Usage
invalidator = SmartCacheInvalidator(cache)
# Invalidate product queries after product update
invalidator.add_event_rule("product_updated", r".*product.*")
# On product update event
invalidator.on_event("product_updated", {"product_id": "123"})
Key Takeaways
Use OpenAI's Cache
50% discount on cached prompt prefixes - structure prompts accordingly
Layer Your Caches
Memory → Redis → Semantic for optimal hit rates
Semantic for Flexibility
Similar questions get cached answers, improving hit rate
Invalidate Smartly
Time-based + event-based invalidation keeps cache fresh
What’s Next
Embeddings Deep Dive
Master embedding models and similarity search