Skip to main content
December 2025 Update: Practical strategies for reducing LLM costs by 50-90% while maintaining quality.

The Cost Challenge

LLM costs can explode in production:
ModelInput CostOutput Cost1M requests/month
GPT-4o$2.50/1M$10.00/1M~$5,000+
GPT-4o-mini$0.15/1M$0.60/1M~$300
Claude 3.5 Sonnet$3.00/1M$15.00/1M~$7,000+
Claude 3.5 Haiku$0.25/1M$1.25/1M~$600

Token Counting and Tracking

Understanding Token Costs

import tiktoken
from dataclasses import dataclass
from typing import Optional

# Pricing per 1M tokens (as of Dec 2024)
PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4-turbo": {"input": 10.00, "output": 30.00},
    "claude-3-5-sonnet": {"input": 3.00, "output": 15.00},
    "claude-3-5-haiku": {"input": 0.25, "output": 1.25},
}

@dataclass
class TokenUsage:
    input_tokens: int
    output_tokens: int
    model: str
    
    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens
    
    @property
    def cost_usd(self) -> float:
        pricing = PRICING.get(self.model, {"input": 0, "output": 0})
        input_cost = (self.input_tokens / 1_000_000) * pricing["input"]
        output_cost = (self.output_tokens / 1_000_000) * pricing["output"]
        return input_cost + output_cost

class TokenCounter:
    """Count and track token usage"""
    
    def __init__(self):
        self.encoders = {}
        self.total_usage = {"input": 0, "output": 0, "cost": 0.0}
    
    def get_encoder(self, model: str):
        if model not in self.encoders:
            try:
                self.encoders[model] = tiktoken.encoding_for_model(model)
            except KeyError:
                self.encoders[model] = tiktoken.get_encoding("cl100k_base")
        return self.encoders[model]
    
    def count(self, text: str, model: str = "gpt-4o") -> int:
        """Count tokens in text"""
        encoder = self.get_encoder(model)
        return len(encoder.encode(text))
    
    def count_messages(
        self,
        messages: list[dict],
        model: str = "gpt-4o"
    ) -> int:
        """Count tokens in message list"""
        total = 0
        encoder = self.get_encoder(model)
        
        for message in messages:
            # Message overhead
            total += 4  # role, content, etc.
            total += len(encoder.encode(message.get("content", "")))
            
            if "name" in message:
                total += len(encoder.encode(message["name"]))
        
        total += 2  # Assistant prefix
        return total
    
    def record(self, usage: TokenUsage):
        """Record usage for tracking"""
        self.total_usage["input"] += usage.input_tokens
        self.total_usage["output"] += usage.output_tokens
        self.total_usage["cost"] += usage.cost_usd
    
    def get_summary(self) -> dict:
        return self.total_usage.copy()

# Usage
counter = TokenCounter()

# Before API call - estimate cost
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Explain quantum computing"}
]
estimated_input = counter.count_messages(messages)
print(f"Estimated input tokens: {estimated_input}")

# After API call - record actual usage
usage = TokenUsage(
    input_tokens=response.usage.prompt_tokens,
    output_tokens=response.usage.completion_tokens,
    model="gpt-4o"
)
counter.record(usage)
print(f"Cost: ${usage.cost_usd:.6f}")

Model Routing

Route requests to the cheapest capable model:
from openai import OpenAI
from enum import Enum

client = OpenAI()

class TaskComplexity(Enum):
    SIMPLE = "simple"      # FAQ, basic Q&A
    MEDIUM = "medium"      # Summarization, analysis
    COMPLEX = "complex"    # Reasoning, coding, creative

class ModelRouter:
    """Route requests to appropriate models based on complexity"""
    
    MODEL_MAP = {
        TaskComplexity.SIMPLE: "gpt-4o-mini",
        TaskComplexity.MEDIUM: "gpt-4o-mini",
        TaskComplexity.COMPLEX: "gpt-4o"
    }
    
    def classify_complexity(self, query: str) -> TaskComplexity:
        """Classify query complexity"""
        # Use cheap model to classify
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Classify the complexity of this task:
                    - simple: Basic Q&A, greetings, simple lookups
                    - medium: Summarization, explanation, simple analysis
                    - complex: Multi-step reasoning, coding, creative writing
                    
                    Respond with just: simple, medium, or complex"""
                },
                {"role": "user", "content": query}
            ],
            max_tokens=10
        )
        
        result = response.choices[0].message.content.lower().strip()
        
        if "complex" in result:
            return TaskComplexity.COMPLEX
        elif "medium" in result:
            return TaskComplexity.MEDIUM
        return TaskComplexity.SIMPLE
    
    def route(self, query: str) -> str:
        """Get appropriate model for query"""
        complexity = self.classify_complexity(query)
        return self.MODEL_MAP[complexity]

# Usage
router = ModelRouter()

def smart_chat(user_input: str) -> str:
    model = router.route(user_input)
    print(f"Using model: {model}")
    
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": user_input}]
    )
    
    return response.choices[0].message.content

Rule-Based Routing

For lower overhead, use rules instead of LLM classification:
import re

class RuleBasedRouter:
    """Route based on patterns and keywords"""
    
    COMPLEX_PATTERNS = [
        r"write.*code",
        r"debug",
        r"explain.*step",
        r"analyze.*complex",
        r"compare.*and.*contrast",
        r"create.*story",
        r"design.*system",
    ]
    
    SIMPLE_PATTERNS = [
        r"^(hi|hello|hey)\b",
        r"what time",
        r"weather",
        r"define\s+\w+$",
        r"^(yes|no|ok|thanks)\b",
    ]
    
    def __init__(self):
        self.complex_regex = [
            re.compile(p, re.IGNORECASE) for p in self.COMPLEX_PATTERNS
        ]
        self.simple_regex = [
            re.compile(p, re.IGNORECASE) for p in self.SIMPLE_PATTERNS
        ]
    
    def route(self, query: str) -> str:
        # Check simple patterns first
        for pattern in self.simple_regex:
            if pattern.search(query):
                return "gpt-4o-mini"
        
        # Check complex patterns
        for pattern in self.complex_regex:
            if pattern.search(query):
                return "gpt-4o"
        
        # Default to cheaper model
        return "gpt-4o-mini"

Caching Strategies

Semantic Caching

Cache responses for semantically similar queries:
import hashlib
import json
from openai import OpenAI
import numpy as np
from datetime import datetime, timedelta

client = OpenAI()

class SemanticCache:
    """Cache LLM responses with semantic similarity matching"""
    
    def __init__(
        self,
        similarity_threshold: float = 0.95,
        ttl_hours: int = 24
    ):
        self.similarity_threshold = similarity_threshold
        self.ttl = timedelta(hours=ttl_hours)
        self.cache: list[dict] = []  # In production, use Redis/DB
    
    def _get_embedding(self, text: str) -> np.ndarray:
        response = client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def get(self, query: str) -> tuple[str | None, bool]:
        """Get cached response if similar query exists"""
        query_embedding = self._get_embedding(query)
        
        now = datetime.now()
        
        for entry in self.cache:
            # Check TTL
            if now - entry["timestamp"] > self.ttl:
                continue
            
            # Check similarity
            similarity = self._cosine_similarity(
                query_embedding,
                entry["embedding"]
            )
            
            if similarity >= self.similarity_threshold:
                return entry["response"], True  # Cache hit
        
        return None, False
    
    def set(self, query: str, response: str):
        """Cache a query-response pair"""
        embedding = self._get_embedding(query)
        
        self.cache.append({
            "query": query,
            "response": response,
            "embedding": embedding,
            "timestamp": datetime.now()
        })
    
    def clear_expired(self):
        """Remove expired entries"""
        now = datetime.now()
        self.cache = [
            e for e in self.cache
            if now - e["timestamp"] <= self.ttl
        ]

# Usage
cache = SemanticCache(similarity_threshold=0.92)

def cached_chat(user_input: str) -> dict:
    # Check cache
    cached, hit = cache.get(user_input)
    
    if hit:
        return {
            "response": cached,
            "cached": True,
            "tokens_saved": True
        }
    
    # Generate new response
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_input}]
    )
    
    result = response.choices[0].message.content
    
    # Cache it
    cache.set(user_input, result)
    
    return {
        "response": result,
        "cached": False,
        "usage": response.usage
    }

Exact Match Caching

For deterministic queries:
import hashlib
from functools import lru_cache

class ExactCache:
    """Simple exact-match cache for deterministic queries"""
    
    def __init__(self, max_size: int = 10000):
        self.cache = {}
        self.max_size = max_size
    
    def _hash_key(self, model: str, messages: list, **kwargs) -> str:
        """Create deterministic hash for request"""
        key_data = {
            "model": model,
            "messages": messages,
            **kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()
    
    def get(self, model: str, messages: list, **kwargs) -> str | None:
        key = self._hash_key(model, messages, **kwargs)
        return self.cache.get(key)
    
    def set(self, model: str, messages: list, response: str, **kwargs):
        if len(self.cache) >= self.max_size:
            # Remove oldest entry (FIFO)
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        key = self._hash_key(model, messages, **kwargs)
        self.cache[key] = response

# Usage with temperature=0 for deterministic responses
exact_cache = ExactCache()

def deterministic_chat(system: str, user: str) -> str:
    messages = [
        {"role": "system", "content": system},
        {"role": "user", "content": user}
    ]
    
    # Check cache
    cached = exact_cache.get("gpt-4o", messages, temperature=0)
    if cached:
        return cached
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=0  # Deterministic
    )
    
    result = response.choices[0].message.content
    exact_cache.set("gpt-4o", messages, result, temperature=0)
    
    return result

Prompt Optimization

Reduce Prompt Length

def optimize_prompt(prompt: str) -> str:
    """Reduce prompt tokens while preserving meaning"""
    optimizations = [
        # Remove redundant phrases
        ("Please provide", "Give"),
        ("I would like you to", ""),
        ("Can you please", ""),
        ("It would be great if you could", ""),
        
        # Shorten instructions
        ("In the context of", "For"),
        ("With respect to", "For"),
        ("Make sure to", ""),
        
        # Remove filler
        (r"\s+", " "),  # Multiple spaces
        (r"^\s+|\s+$", ""),  # Trim
    ]
    
    result = prompt
    for old, new in optimizations:
        if old.startswith("^") or old.startswith(r"\s"):
            import re
            result = re.sub(old, new, result)
        else:
            result = result.replace(old, new)
    
    return result.strip()

# Example
long_prompt = """
Please provide me with a detailed analysis of the following text. 
I would like you to identify the main themes and summarize them.
It would be great if you could also highlight any key insights.
"""

short_prompt = optimize_prompt(long_prompt)
# "Give a detailed analysis of the following text. Identify the main themes and summarize them. Also highlight key insights."

Context Compression

class ContextCompressor:
    """Compress context to reduce tokens"""
    
    def compress_for_rag(
        self,
        documents: list[str],
        query: str,
        max_tokens: int = 2000
    ) -> str:
        """Compress retrieved documents to fit token budget"""
        
        # Get most relevant sentences
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"""Extract the most relevant sentences from these documents for answering the query.
                    Keep only essential information. Target: {max_tokens} tokens max.
                    
                    Query: {query}"""
                },
                {
                    "role": "user",
                    "content": "\n\n".join(documents)
                }
            ]
        )
        
        return response.choices[0].message.content
    
    def summarize_history(
        self,
        messages: list[dict],
        max_tokens: int = 500
    ) -> str:
        """Summarize conversation history"""
        
        history = "\n".join([
            f"{m['role']}: {m['content']}"
            for m in messages
        ])
        
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"Summarize this conversation in under {max_tokens} tokens, preserving key facts and decisions."
                },
                {"role": "user", "content": history}
            ]
        )
        
        return response.choices[0].message.content

Batching and Async

Batch Similar Requests

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

class RequestBatcher:
    """Batch similar requests for efficiency"""
    
    def __init__(self, batch_size: int = 10, wait_time: float = 0.1):
        self.batch_size = batch_size
        self.wait_time = wait_time
        self.pending: list[tuple] = []
        self.lock = asyncio.Lock()
    
    async def add_request(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini"
    ) -> str:
        """Add request to batch and wait for result"""
        future = asyncio.Future()
        
        async with self.lock:
            self.pending.append((messages, model, future))
            
            if len(self.pending) >= self.batch_size:
                await self._process_batch()
        
        # Wait a bit for more requests to batch
        await asyncio.sleep(self.wait_time)
        
        async with self.lock:
            if self.pending:
                await self._process_batch()
        
        return await future
    
    async def _process_batch(self):
        """Process all pending requests"""
        if not self.pending:
            return
        
        batch = self.pending
        self.pending = []
        
        # Process in parallel
        tasks = [
            async_client.chat.completions.create(
                model=model,
                messages=messages
            )
            for messages, model, _ in batch
        ]
        
        responses = await asyncio.gather(*tasks)
        
        # Resolve futures
        for (_, _, future), response in zip(batch, responses):
            future.set_result(response.choices[0].message.content)

# Usage
batcher = RequestBatcher(batch_size=10)

async def batch_chat(queries: list[str]) -> list[str]:
    tasks = [
        batcher.add_request([{"role": "user", "content": q}])
        for q in queries
    ]
    return await asyncio.gather(*tasks)

Cost Monitoring Dashboard

from dataclasses import dataclass, field
from datetime import datetime, date
from collections import defaultdict
import json

@dataclass
class CostTracker:
    """Track and analyze LLM costs"""
    
    daily_costs: dict = field(default_factory=lambda: defaultdict(float))
    model_costs: dict = field(default_factory=lambda: defaultdict(float))
    request_count: dict = field(default_factory=lambda: defaultdict(int))
    
    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int
    ):
        today = date.today().isoformat()
        
        usage = TokenUsage(input_tokens, output_tokens, model)
        cost = usage.cost_usd
        
        self.daily_costs[today] += cost
        self.model_costs[model] += cost
        self.request_count[model] += 1
    
    def get_daily_report(self) -> dict:
        return {
            "daily_costs": dict(self.daily_costs),
            "model_breakdown": dict(self.model_costs),
            "request_counts": dict(self.request_count),
            "total_cost": sum(self.daily_costs.values()),
            "avg_cost_per_request": (
                sum(self.daily_costs.values()) / 
                max(sum(self.request_count.values()), 1)
            )
        }
    
    def check_budget(
        self,
        daily_limit: float,
        alert_threshold: float = 0.8
    ) -> dict:
        today = date.today().isoformat()
        current = self.daily_costs.get(today, 0)
        
        return {
            "current_spend": current,
            "daily_limit": daily_limit,
            "remaining": daily_limit - current,
            "utilization": current / daily_limit,
            "alert": current >= daily_limit * alert_threshold
        }

# Usage
tracker = CostTracker()

def tracked_chat(user_input: str, model: str = "gpt-4o") -> str:
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": user_input}]
    )
    
    # Track costs
    tracker.record(
        model=model,
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens
    )
    
    # Check budget
    budget = tracker.check_budget(daily_limit=100.0)
    if budget["alert"]:
        print(f"⚠️ Budget alert: ${budget['current_spend']:.2f} / ${budget['daily_limit']}")
    
    return response.choices[0].message.content

Cost Optimization Checklist

Use Cheaper Models

GPT-4o-mini is 15-30x cheaper than GPT-4o for many tasks

Implement Caching

Cache responses to avoid repeated API calls for similar queries

Compress Context

Reduce prompt and context size before sending to API

Set Budgets

Implement daily/monthly budget limits with alerts

Quick Wins

StrategyEffortSavings
Switch to mini modelsLow50-80%
Add response cachingMedium20-50%
Prompt optimizationLow10-30%
Model routingMedium30-50%
Context compressionHigh20-40%

What’s Next

Multi-Agent Design Patterns

Learn advanced patterns for building multi-agent AI systems