December 2025 Update: Practical strategies for reducing LLM costs by 50-90% while maintaining quality.
The Cost Challenge
LLM costs can explode in production:| Model | Input Cost | Output Cost | 1M requests/month |
|---|---|---|---|
| GPT-4o | $2.50/1M | $10.00/1M | ~$5,000+ |
| GPT-4o-mini | $0.15/1M | $0.60/1M | ~$300 |
| Claude 3.5 Sonnet | $3.00/1M | $15.00/1M | ~$7,000+ |
| Claude 3.5 Haiku | $0.25/1M | $1.25/1M | ~$600 |
Token Counting and Tracking
Understanding Token Costs
Copy
import tiktoken
from dataclasses import dataclass
from typing import Optional
# Pricing per 1M tokens (as of Dec 2024)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku": {"input": 0.25, "output": 1.25},
}
@dataclass
class TokenUsage:
input_tokens: int
output_tokens: int
model: str
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@property
def cost_usd(self) -> float:
pricing = PRICING.get(self.model, {"input": 0, "output": 0})
input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
return input_cost + output_cost
class TokenCounter:
"""Count and track token usage"""
def __init__(self):
self.encoders = {}
self.total_usage = {"input": 0, "output": 0, "cost": 0.0}
def get_encoder(self, model: str):
if model not in self.encoders:
try:
self.encoders[model] = tiktoken.encoding_for_model(model)
except KeyError:
self.encoders[model] = tiktoken.get_encoding("cl100k_base")
return self.encoders[model]
def count(self, text: str, model: str = "gpt-4o") -> int:
"""Count tokens in text"""
encoder = self.get_encoder(model)
return len(encoder.encode(text))
def count_messages(
self,
messages: list[dict],
model: str = "gpt-4o"
) -> int:
"""Count tokens in message list"""
total = 0
encoder = self.get_encoder(model)
for message in messages:
# Message overhead
total += 4 # role, content, etc.
total += len(encoder.encode(message.get("content", "")))
if "name" in message:
total += len(encoder.encode(message["name"]))
total += 2 # Assistant prefix
return total
def record(self, usage: TokenUsage):
"""Record usage for tracking"""
self.total_usage["input"] += usage.input_tokens
self.total_usage["output"] += usage.output_tokens
self.total_usage["cost"] += usage.cost_usd
def get_summary(self) -> dict:
return self.total_usage.copy()
# Usage
counter = TokenCounter()
# Before API call - estimate cost
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing"}
]
estimated_input = counter.count_messages(messages)
print(f"Estimated input tokens: {estimated_input}")
# After API call - record actual usage
usage = TokenUsage(
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
model="gpt-4o"
)
counter.record(usage)
print(f"Cost: ${usage.cost_usd:.6f}")
Model Routing
Route requests to the cheapest capable model:Copy
from openai import OpenAI
from enum import Enum
client = OpenAI()
class TaskComplexity(Enum):
SIMPLE = "simple" # FAQ, basic Q&A
MEDIUM = "medium" # Summarization, analysis
COMPLEX = "complex" # Reasoning, coding, creative
class ModelRouter:
"""Route requests to appropriate models based on complexity"""
MODEL_MAP = {
TaskComplexity.SIMPLE: "gpt-4o-mini",
TaskComplexity.MEDIUM: "gpt-4o-mini",
TaskComplexity.COMPLEX: "gpt-4o"
}
def classify_complexity(self, query: str) -> TaskComplexity:
"""Classify query complexity"""
# Use cheap model to classify
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Classify the complexity of this task:
- simple: Basic Q&A, greetings, simple lookups
- medium: Summarization, explanation, simple analysis
- complex: Multi-step reasoning, coding, creative writing
Respond with just: simple, medium, or complex"""
},
{"role": "user", "content": query}
],
max_tokens=10
)
result = response.choices[0].message.content.lower().strip()
if "complex" in result:
return TaskComplexity.COMPLEX
elif "medium" in result:
return TaskComplexity.MEDIUM
return TaskComplexity.SIMPLE
def route(self, query: str) -> str:
"""Get appropriate model for query"""
complexity = self.classify_complexity(query)
return self.MODEL_MAP[complexity]
# Usage
router = ModelRouter()
def smart_chat(user_input: str) -> str:
model = router.route(user_input)
print(f"Using model: {model}")
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": user_input}]
)
return response.choices[0].message.content
Rule-Based Routing
For lower overhead, use rules instead of LLM classification:Copy
import re
class RuleBasedRouter:
"""Route based on patterns and keywords"""
COMPLEX_PATTERNS = [
r"write.*code",
r"debug",
r"explain.*step",
r"analyze.*complex",
r"compare.*and.*contrast",
r"create.*story",
r"design.*system",
]
SIMPLE_PATTERNS = [
r"^(hi|hello|hey)\b",
r"what time",
r"weather",
r"define\s+\w+$",
r"^(yes|no|ok|thanks)\b",
]
def __init__(self):
self.complex_regex = [
re.compile(p, re.IGNORECASE) for p in self.COMPLEX_PATTERNS
]
self.simple_regex = [
re.compile(p, re.IGNORECASE) for p in self.SIMPLE_PATTERNS
]
def route(self, query: str) -> str:
# Check simple patterns first
for pattern in self.simple_regex:
if pattern.search(query):
return "gpt-4o-mini"
# Check complex patterns
for pattern in self.complex_regex:
if pattern.search(query):
return "gpt-4o"
# Default to cheaper model
return "gpt-4o-mini"
Caching Strategies
Semantic Caching
Cache responses for semantically similar queries:Copy
import hashlib
import json
from openai import OpenAI
import numpy as np
from datetime import datetime, timedelta
client = OpenAI()
class SemanticCache:
"""Cache LLM responses with semantic similarity matching"""
def __init__(
self,
similarity_threshold: float = 0.95,
ttl_hours: int = 24
):
self.similarity_threshold = similarity_threshold
self.ttl = timedelta(hours=ttl_hours)
self.cache: list[dict] = [] # In production, use Redis/DB
def _get_embedding(self, text: str) -> np.ndarray:
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, query: str) -> tuple[str | None, bool]:
"""Get cached response if similar query exists"""
query_embedding = self._get_embedding(query)
now = datetime.now()
for entry in self.cache:
# Check TTL
if now - entry["timestamp"] > self.ttl:
continue
# Check similarity
similarity = self._cosine_similarity(
query_embedding,
entry["embedding"]
)
if similarity >= self.similarity_threshold:
return entry["response"], True # Cache hit
return None, False
def set(self, query: str, response: str):
"""Cache a query-response pair"""
embedding = self._get_embedding(query)
self.cache.append({
"query": query,
"response": response,
"embedding": embedding,
"timestamp": datetime.now()
})
def clear_expired(self):
"""Remove expired entries"""
now = datetime.now()
self.cache = [
e for e in self.cache
if now - e["timestamp"] <= self.ttl
]
# Usage
cache = SemanticCache(similarity_threshold=0.92)
def cached_chat(user_input: str) -> dict:
# Check cache
cached, hit = cache.get(user_input)
if hit:
return {
"response": cached,
"cached": True,
"tokens_saved": True
}
# Generate new response
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_input}]
)
result = response.choices[0].message.content
# Cache it
cache.set(user_input, result)
return {
"response": result,
"cached": False,
"usage": response.usage
}
Exact Match Caching
For deterministic queries:Copy
import hashlib
from functools import lru_cache
class ExactCache:
"""Simple exact-match cache for deterministic queries"""
def __init__(self, max_size: int = 10000):
self.cache = {}
self.max_size = max_size
def _hash_key(self, model: str, messages: list, **kwargs) -> str:
"""Create deterministic hash for request"""
key_data = {
"model": model,
"messages": messages,
**kwargs
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
def get(self, model: str, messages: list, **kwargs) -> str | None:
key = self._hash_key(model, messages, **kwargs)
return self.cache.get(key)
def set(self, model: str, messages: list, response: str, **kwargs):
if len(self.cache) >= self.max_size:
# Remove oldest entry (FIFO)
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
key = self._hash_key(model, messages, **kwargs)
self.cache[key] = response
# Usage with temperature=0 for deterministic responses
exact_cache = ExactCache()
def deterministic_chat(system: str, user: str) -> str:
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user}
]
# Check cache
cached = exact_cache.get("gpt-4o", messages, temperature=0)
if cached:
return cached
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0 # Deterministic
)
result = response.choices[0].message.content
exact_cache.set("gpt-4o", messages, result, temperature=0)
return result
Prompt Optimization
Reduce Prompt Length
Copy
def optimize_prompt(prompt: str) -> str:
"""Reduce prompt tokens while preserving meaning"""
optimizations = [
# Remove redundant phrases
("Please provide", "Give"),
("I would like you to", ""),
("Can you please", ""),
("It would be great if you could", ""),
# Shorten instructions
("In the context of", "For"),
("With respect to", "For"),
("Make sure to", ""),
# Remove filler
(r"\s+", " "), # Multiple spaces
(r"^\s+|\s+$", ""), # Trim
]
result = prompt
for old, new in optimizations:
if old.startswith("^") or old.startswith(r"\s"):
import re
result = re.sub(old, new, result)
else:
result = result.replace(old, new)
return result.strip()
# Example
long_prompt = """
Please provide me with a detailed analysis of the following text.
I would like you to identify the main themes and summarize them.
It would be great if you could also highlight any key insights.
"""
short_prompt = optimize_prompt(long_prompt)
# "Give a detailed analysis of the following text. Identify the main themes and summarize them. Also highlight key insights."
Context Compression
Copy
class ContextCompressor:
"""Compress context to reduce tokens"""
def compress_for_rag(
self,
documents: list[str],
query: str,
max_tokens: int = 2000
) -> str:
"""Compress retrieved documents to fit token budget"""
# Get most relevant sentences
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"""Extract the most relevant sentences from these documents for answering the query.
Keep only essential information. Target: {max_tokens} tokens max.
Query: {query}"""
},
{
"role": "user",
"content": "\n\n".join(documents)
}
]
)
return response.choices[0].message.content
def summarize_history(
self,
messages: list[dict],
max_tokens: int = 500
) -> str:
"""Summarize conversation history"""
history = "\n".join([
f"{m['role']}: {m['content']}"
for m in messages
])
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"Summarize this conversation in under {max_tokens} tokens, preserving key facts and decisions."
},
{"role": "user", "content": history}
]
)
return response.choices[0].message.content
Batching and Async
Batch Similar Requests
Copy
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
class RequestBatcher:
"""Batch similar requests for efficiency"""
def __init__(self, batch_size: int = 10, wait_time: float = 0.1):
self.batch_size = batch_size
self.wait_time = wait_time
self.pending: list[tuple] = []
self.lock = asyncio.Lock()
async def add_request(
self,
messages: list[dict],
model: str = "gpt-4o-mini"
) -> str:
"""Add request to batch and wait for result"""
future = asyncio.Future()
async with self.lock:
self.pending.append((messages, model, future))
if len(self.pending) >= self.batch_size:
await self._process_batch()
# Wait a bit for more requests to batch
await asyncio.sleep(self.wait_time)
async with self.lock:
if self.pending:
await self._process_batch()
return await future
async def _process_batch(self):
"""Process all pending requests"""
if not self.pending:
return
batch = self.pending
self.pending = []
# Process in parallel
tasks = [
async_client.chat.completions.create(
model=model,
messages=messages
)
for messages, model, _ in batch
]
responses = await asyncio.gather(*tasks)
# Resolve futures
for (_, _, future), response in zip(batch, responses):
future.set_result(response.choices[0].message.content)
# Usage
batcher = RequestBatcher(batch_size=10)
async def batch_chat(queries: list[str]) -> list[str]:
tasks = [
batcher.add_request([{"role": "user", "content": q}])
for q in queries
]
return await asyncio.gather(*tasks)
Cost Monitoring Dashboard
Copy
from dataclasses import dataclass, field
from datetime import datetime, date
from collections import defaultdict
import json
@dataclass
class CostTracker:
"""Track and analyze LLM costs"""
daily_costs: dict = field(default_factory=lambda: defaultdict(float))
model_costs: dict = field(default_factory=lambda: defaultdict(float))
request_count: dict = field(default_factory=lambda: defaultdict(int))
def record(
self,
model: str,
input_tokens: int,
output_tokens: int
):
today = date.today().isoformat()
usage = TokenUsage(input_tokens, output_tokens, model)
cost = usage.cost_usd
self.daily_costs[today] += cost
self.model_costs[model] += cost
self.request_count[model] += 1
def get_daily_report(self) -> dict:
return {
"daily_costs": dict(self.daily_costs),
"model_breakdown": dict(self.model_costs),
"request_counts": dict(self.request_count),
"total_cost": sum(self.daily_costs.values()),
"avg_cost_per_request": (
sum(self.daily_costs.values()) /
max(sum(self.request_count.values()), 1)
)
}
def check_budget(
self,
daily_limit: float,
alert_threshold: float = 0.8
) -> dict:
today = date.today().isoformat()
current = self.daily_costs.get(today, 0)
return {
"current_spend": current,
"daily_limit": daily_limit,
"remaining": daily_limit - current,
"utilization": current / daily_limit,
"alert": current >= daily_limit * alert_threshold
}
# Usage
tracker = CostTracker()
def tracked_chat(user_input: str, model: str = "gpt-4o") -> str:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": user_input}]
)
# Track costs
tracker.record(
model=model,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens
)
# Check budget
budget = tracker.check_budget(daily_limit=100.0)
if budget["alert"]:
print(f"⚠️ Budget alert: ${budget['current_spend']:.2f} / ${budget['daily_limit']}")
return response.choices[0].message.content
Cost Optimization Checklist
Use Cheaper Models
GPT-4o-mini is 15-30x cheaper than GPT-4o for many tasks
Implement Caching
Cache responses to avoid repeated API calls for similar queries
Compress Context
Reduce prompt and context size before sending to API
Set Budgets
Implement daily/monthly budget limits with alerts
Quick Wins
| Strategy | Effort | Savings |
|---|---|---|
| Switch to mini models | Low | 50-80% |
| Add response caching | Medium | 20-50% |
| Prompt optimization | Low | 10-30% |
| Model routing | Medium | 30-50% |
| Context compression | High | 20-40% |
What’s Next
Multi-Agent Design Patterns
Learn advanced patterns for building multi-agent AI systems