Skip to main content
December 2025 Update: Production-ready caching patterns including semantic caching, Redis integration, and OpenAI’s prompt caching.

Why Cache LLM Responses?

LLM calls are expensive and slow:
MetricWithout CachingWith Caching
Latency500-3000ms5-50ms
Cost$0.01-0.10/call$0 for cache hits
Rate LimitsEasily hitReduced pressure
Cache Hit Rate   Cost Savings   Latency Improvement
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
   50%              50%              10x
   80%              80%              50x
   95%              95%              100x

OpenAI Prompt Caching (Built-in)

OpenAI automatically caches prompts with shared prefixes:
from openai import OpenAI

client = OpenAI()

# Long system prompt - gets cached after first call
SYSTEM_PROMPT = """You are an expert customer service agent for TechCorp.

[Insert 2000+ tokens of product documentation, FAQs, policies...]

Always be helpful, accurate, and follow company guidelines.
"""

# First call: Full price
response1 = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": "What's your return policy?"}
    ]
)

# Second call: Cached prefix = 50% discount on cached tokens!
response2 = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": SYSTEM_PROMPT},  # Cached!
        {"role": "user", "content": "How do I track my order?"}
    ]
)

# Check cache usage
print(f"Cached tokens: {response2.usage.prompt_tokens_details.cached_tokens}")

Maximizing Prompt Cache Hits

class PromptCacheOptimizer:
    """Optimize prompts for OpenAI's prompt caching"""
    
    def __init__(self, base_system_prompt: str):
        # Static content at the beginning (gets cached)
        self.static_prefix = base_system_prompt
        
    def build_prompt(
        self,
        dynamic_context: str,
        user_message: str
    ) -> list[dict]:
        """Build prompt with cacheable prefix"""
        
        # Structure: [Static (cached)] + [Dynamic] + [User]
        return [
            {
                "role": "system",
                "content": self.static_prefix + "\n\n" + dynamic_context
            },
            {"role": "user", "content": user_message}
        ]

# Usage
optimizer = PromptCacheOptimizer("""
You are an expert assistant with deep knowledge of our products.

# Product Catalog
[2000+ tokens of static product info...]

# Company Policies  
[1000+ tokens of static policies...]

# Response Guidelines
- Be concise and helpful
- Always cite sources
- Admit when unsure
""")

# The static prefix will be cached across all calls
messages = optimizer.build_prompt(
    dynamic_context="Customer is a VIP member since 2020",
    user_message="Can I get a discount?"
)

Exact Match Caching

Cache identical requests with deterministic settings:
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional
import redis

class ExactMatchCache:
    """Cache exact query matches"""
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        ttl_hours: int = 24
    ):
        self.redis = redis.from_url(redis_url)
        self.ttl = timedelta(hours=ttl_hours)
    
    def _hash_request(
        self,
        model: str,
        messages: list[dict],
        temperature: float,
        **kwargs
    ) -> str:
        """Create deterministic hash of request"""
        key_data = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            **{k: v for k, v in sorted(kwargs.items())}
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()
    
    def get(
        self,
        model: str,
        messages: list[dict],
        **kwargs
    ) -> Optional[str]:
        """Get cached response"""
        key = self._hash_request(model, messages, **kwargs)
        cached = self.redis.get(f"llm:{key}")
        return cached.decode() if cached else None
    
    def set(
        self,
        model: str,
        messages: list[dict],
        response: str,
        **kwargs
    ):
        """Cache a response"""
        key = self._hash_request(model, messages, **kwargs)
        self.redis.setex(
            f"llm:{key}",
            self.ttl,
            response
        )
    
    def get_stats(self) -> dict:
        """Get cache statistics"""
        info = self.redis.info()
        return {
            "hits": info.get("keyspace_hits", 0),
            "misses": info.get("keyspace_misses", 0),
            "hit_rate": info.get("keyspace_hits", 0) / 
                       max(info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1)
        }

# Usage with OpenAI
from openai import OpenAI

client = OpenAI()
cache = ExactMatchCache()

def cached_completion(messages: list[dict], **kwargs) -> str:
    # Only cache deterministic requests
    if kwargs.get("temperature", 1.0) == 0:
        cached = cache.get("gpt-4o", messages, **kwargs)
        if cached:
            return cached
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        **kwargs
    )
    
    result = response.choices[0].message.content
    
    # Cache deterministic responses
    if kwargs.get("temperature", 1.0) == 0:
        cache.set("gpt-4o", messages, result, **kwargs)
    
    return result

Semantic Caching

Cache based on meaning, not exact match:
from openai import OpenAI
import numpy as np
from typing import Optional
import json

class SemanticCache:
    """Cache based on semantic similarity"""
    
    def __init__(
        self,
        similarity_threshold: float = 0.95,
        embedding_model: str = "text-embedding-3-small"
    ):
        self.client = OpenAI()
        self.threshold = similarity_threshold
        self.embedding_model = embedding_model
        
        # In-memory store (use Redis/Pinecone in production)
        self.cache: list[dict] = []
    
    def _get_embedding(self, text: str) -> np.ndarray:
        response = self.client.embeddings.create(
            model=self.embedding_model,
            input=text
        )
        return np.array(response.data[0].embedding)
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def get(self, query: str) -> Optional[str]:
        """Find semantically similar cached response"""
        if not self.cache:
            return None
        
        query_embedding = self._get_embedding(query)
        
        best_match = None
        best_score = 0
        
        for entry in self.cache:
            similarity = self._cosine_similarity(
                query_embedding, 
                entry["embedding"]
            )
            if similarity > best_score and similarity >= self.threshold:
                best_score = similarity
                best_match = entry
        
        if best_match:
            return best_match["response"]
        return None
    
    def set(self, query: str, response: str):
        """Cache a query-response pair"""
        embedding = self._get_embedding(query)
        
        self.cache.append({
            "query": query,
            "response": response,
            "embedding": embedding
        })

# Usage
semantic_cache = SemanticCache(similarity_threshold=0.92)

def smart_completion(user_query: str) -> str:
    # Check semantic cache
    cached = semantic_cache.get(user_query)
    if cached:
        print("🎯 Semantic cache hit!")
        return cached
    
    # Call LLM
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_query}]
    )
    
    result = response.choices[0].message.content
    
    # Cache the response
    semantic_cache.set(user_query, result)
    
    return result

# These would likely hit the same cache entry:
# "What is machine learning?"
# "Can you explain machine learning?"
# "What's ML?"

Production Semantic Cache with Redis

import redis
import numpy as np
import json
from typing import Optional

class ProductionSemanticCache:
    """Production-ready semantic cache with Redis"""
    
    def __init__(
        self,
        redis_url: str,
        similarity_threshold: float = 0.93,
        max_entries: int = 10000
    ):
        self.redis = redis.from_url(redis_url)
        self.threshold = similarity_threshold
        self.max_entries = max_entries
        self.client = OpenAI()
    
    def _get_embedding(self, text: str) -> list[float]:
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def get(self, query: str, context_key: str = "default") -> Optional[str]:
        """Get cached response with context isolation"""
        query_embedding = np.array(self._get_embedding(query))
        
        # Get all cache entries for this context
        cache_keys = self.redis.keys(f"semantic:{context_key}:*")
        
        best_match = None
        best_score = 0
        
        for key in cache_keys:
            entry = json.loads(self.redis.get(key))
            cached_embedding = np.array(entry["embedding"])
            
            similarity = np.dot(query_embedding, cached_embedding) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding)
            )
            
            if similarity > best_score and similarity >= self.threshold:
                best_score = similarity
                best_match = entry
        
        return best_match["response"] if best_match else None
    
    def set(
        self,
        query: str,
        response: str,
        context_key: str = "default",
        ttl_seconds: int = 86400
    ):
        """Cache with TTL and context isolation"""
        embedding = self._get_embedding(query)
        
        entry = {
            "query": query,
            "response": response,
            "embedding": embedding
        }
        
        # Use hash of query as key
        key = f"semantic:{context_key}:{hash(query)}"
        self.redis.setex(key, ttl_seconds, json.dumps(entry))
        
        # Enforce max entries
        self._enforce_limit(context_key)
    
    def _enforce_limit(self, context_key: str):
        """Remove oldest entries if over limit"""
        keys = self.redis.keys(f"semantic:{context_key}:*")
        if len(keys) > self.max_entries:
            # Remove oldest 10%
            to_remove = len(keys) - int(self.max_entries * 0.9)
            for key in keys[:to_remove]:
                self.redis.delete(key)

Multi-Layer Caching

Combine caching strategies for maximum efficiency:
from abc import ABC, abstractmethod
from typing import Optional
import time

class CacheLayer(ABC):
    @abstractmethod
    def get(self, key: str) -> Optional[str]:
        pass
    
    @abstractmethod
    def set(self, key: str, value: str):
        pass

class L1MemoryCache(CacheLayer):
    """In-memory cache for hot data"""
    
    def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl_seconds
    
    def get(self, key: str) -> Optional[str]:
        entry = self.cache.get(key)
        if entry and time.time() - entry["time"] < self.ttl:
            return entry["value"]
        return None
    
    def set(self, key: str, value: str):
        if len(self.cache) >= self.max_size:
            # Remove oldest
            oldest = min(self.cache, key=lambda k: self.cache[k]["time"])
            del self.cache[oldest]
        
        self.cache[key] = {"value": value, "time": time.time()}

class L2RedisCache(CacheLayer):
    """Redis cache for shared state"""
    
    def __init__(self, redis_client, ttl_seconds: int = 3600):
        self.redis = redis_client
        self.ttl = ttl_seconds
    
    def get(self, key: str) -> Optional[str]:
        value = self.redis.get(f"l2:{key}")
        return value.decode() if value else None
    
    def set(self, key: str, value: str):
        self.redis.setex(f"l2:{key}", self.ttl, value)

class L3SemanticCache(CacheLayer):
    """Semantic similarity cache"""
    
    def __init__(self, semantic_cache: SemanticCache):
        self.cache = semantic_cache
    
    def get(self, key: str) -> Optional[str]:
        return self.cache.get(key)
    
    def set(self, key: str, value: str):
        self.cache.set(key, value)

class MultiLayerCache:
    """Tiered caching system"""
    
    def __init__(self, layers: list[CacheLayer]):
        self.layers = layers
    
    def get(self, key: str) -> tuple[Optional[str], int]:
        """Get value, returns (value, layer_index) or (None, -1)"""
        for i, layer in enumerate(self.layers):
            value = layer.get(key)
            if value:
                # Backfill higher layers
                for j in range(i):
                    self.layers[j].set(key, value)
                return value, i
        return None, -1
    
    def set(self, key: str, value: str):
        """Set in all layers"""
        for layer in self.layers:
            layer.set(key, value)

# Usage
cache = MultiLayerCache([
    L1MemoryCache(max_size=100, ttl_seconds=60),     # Hot cache
    L2RedisCache(redis_client, ttl_seconds=3600),    # Shared cache
    L3SemanticCache(semantic_cache)                   # Semantic matching
])

def cached_llm_call(query: str) -> str:
    # Check cache layers
    cached, layer = cache.get(query)
    if cached:
        print(f"Cache hit at L{layer + 1}")
        return cached
    
    # Call LLM
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": query}]
    )
    result = response.choices[0].message.content
    
    # Populate all cache layers
    cache.set(query, result)
    
    return result

Cache Invalidation Strategies

from datetime import datetime, timedelta
from typing import Optional
import re

class SmartCacheInvalidator:
    """Intelligent cache invalidation"""
    
    def __init__(self, cache: MultiLayerCache):
        self.cache = cache
        self.invalidation_rules = []
    
    def add_time_rule(
        self,
        pattern: str,
        ttl: timedelta
    ):
        """Invalidate entries matching pattern after TTL"""
        self.invalidation_rules.append({
            "type": "time",
            "pattern": re.compile(pattern),
            "ttl": ttl
        })
    
    def add_event_rule(
        self,
        event_type: str,
        pattern: str
    ):
        """Invalidate on specific events"""
        self.invalidation_rules.append({
            "type": "event",
            "event": event_type,
            "pattern": re.compile(pattern)
        })
    
    def on_event(self, event_type: str, data: dict):
        """Handle invalidation events"""
        for rule in self.invalidation_rules:
            if rule["type"] == "event" and rule["event"] == event_type:
                # Invalidate matching cache entries
                self._invalidate_pattern(rule["pattern"])
    
    def _invalidate_pattern(self, pattern: re.Pattern):
        """Invalidate all entries matching pattern"""
        # Implementation depends on cache backend
        pass

# Usage
invalidator = SmartCacheInvalidator(cache)

# Invalidate product queries after product update
invalidator.add_event_rule("product_updated", r".*product.*")

# On product update event
invalidator.on_event("product_updated", {"product_id": "123"})

Key Takeaways

Use OpenAI's Cache

50% discount on cached prompt prefixes - structure prompts accordingly

Layer Your Caches

Memory → Redis → Semantic for optimal hit rates

Semantic for Flexibility

Similar questions get cached answers, improving hit rate

Invalidate Smartly

Time-based + event-based invalidation keeps cache fresh

What’s Next

Embeddings Deep Dive

Master embedding models and similarity search