Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

LLM orchestration is the infrastructure layer that sits between your application code and the zoo of LLM providers. Think of it like an ORM for LLMs — just as SQLAlchemy lets you swap between PostgreSQL and MySQL without rewriting queries, an orchestration layer like LiteLLM lets you swap between OpenAI, Anthropic, and Groq without touching business logic. Without orchestration, every provider switch means updating API calls, response parsing, error handling, and retry logic throughout your codebase. With it, you change a single model string.

LiteLLM Overview

LiteLLM provides a unified interface for 100+ LLM providers. Under the hood, it translates the OpenAI-compatible request format into each provider’s native format (Anthropic’s separate system parameter, Bedrock’s AWS auth, etc.) and normalizes the responses back:
from litellm import completion, acompletion
import litellm

# Works the same way for any provider
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)

# Switch providers by changing model string
response = completion(
    model="claude-3-5-sonnet-20241022",
    messages=[{"role": "user", "content": "Hello"}]
)

response = completion(
    model="groq/llama-3.3-70b-versatile",
    messages=[{"role": "user", "content": "Hello"}]
)

response = completion(
    model="together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo",
    messages=[{"role": "user", "content": "Hello"}]
)

Async Support

For production workloads, async calls are essential. Without async, each LLM call blocks your server for 1-3 seconds — meaning a single-threaded server can only handle one request at a time.
import asyncio

async def query_llm(prompt: str, model: str = "gpt-4o") -> str:
    response = await acompletion(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# Run concurrent queries
async def parallel_queries():
    prompts = ["Explain AI", "Explain ML", "Explain DL"]
    tasks = [query_llm(p) for p in prompts]
    results = await asyncio.gather(*tasks)
    return results

Provider Configuration

Environment Variables

# OpenAI
export OPENAI_API_KEY="sk-..."

# Anthropic
export ANTHROPIC_API_KEY="sk-ant-..."

# Google
export GEMINI_API_KEY="..."

# Azure OpenAI
export AZURE_API_KEY="..."
export AZURE_API_BASE="https://your-resource.openai.azure.com"
export AZURE_API_VERSION="2024-02-01"

# AWS Bedrock
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_REGION_NAME="us-east-1"

Programmatic Configuration

import litellm

# Set API keys programmatically
litellm.api_key = "sk-..."
litellm.anthropic_key = "sk-ant-..."

# Configure Azure
response = completion(
    model="azure/gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    api_base="https://your-resource.openai.azure.com",
    api_version="2024-02-01",
    api_key="your-azure-key"
)

# Configure Bedrock
response = completion(
    model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
    messages=[{"role": "user", "content": "Hello"}],
    aws_access_key_id="...",
    aws_secret_access_key="...",
    aws_region_name="us-east-1"
)

Fallback Configuration

Fallbacks are your safety net. When the primary model returns an error (rate limit, timeout, outage), the system automatically tries the next model in the chain. This is the single most impactful reliability pattern for production LLM applications.
Pitfall: Not testing your fallback path. Many teams configure fallbacks but never verify they work. Periodically inject failures in staging to confirm your fallback chain activates correctly and produces acceptable output quality from secondary models.
from litellm import completion
import litellm

# Enable fallbacks
litellm.set_verbose = True

# Define fallback models
fallback_models = [
    "gpt-4o",
    "claude-3-5-sonnet-20241022",
    "groq/llama-3.3-70b-versatile"
]

def completion_with_fallback(messages: list, **kwargs) -> str:
    """Try each model in sequence until one succeeds.
    
    This manual approach gives you full control over fallback
    behavior. For most cases, LiteLLM's Router (shown below)
    handles this automatically with less boilerplate.
    """
    
    last_error = None
    
    for model in fallback_models:
        try:
            response = completion(
                model=model,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            last_error = e
            print(f"Model {model} failed: {e}")
            continue
    
    raise last_error

# Using LiteLLM Router for automatic fallback
from litellm import Router

router = Router(
    model_list=[
        {
            "model_name": "primary",
            "litellm_params": {
                "model": "gpt-4o",
                "api_key": "sk-..."
            }
        },
        {
            "model_name": "primary",  # Same name = fallback
            "litellm_params": {
                "model": "claude-3-5-sonnet-20241022",
                "api_key": "sk-ant-..."
            }
        },
        {
            "model_name": "primary",
            "litellm_params": {
                "model": "groq/llama-3.3-70b-versatile",
                "api_key": "gsk_..."
            }
        }
    ],
    fallbacks=[
        {"primary": ["primary"]}  # Fallback to next with same name
    ],
    num_retries=2
)

# Router automatically handles fallback
response = router.completion(
    model="primary",
    messages=[{"role": "user", "content": "Hello"}]
)

Load Balancing

When you have multiple deployments of the same model (e.g., two Azure OpenAI regions plus direct OpenAI), load balancing distributes requests to avoid hitting rate limits on any single endpoint. This is especially important for Azure, where each deployment has independent TPM/RPM quotas.
from litellm import Router

# Configure load balancing across deployments
router = Router(
    model_list=[
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "azure/gpt-4o-deployment-1",
                "api_base": "https://region1.openai.azure.com",
                "api_key": "key1"
            },
            "tpm": 100000,  # Tokens per minute
            "rpm": 1000     # Requests per minute
        },
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "azure/gpt-4o-deployment-2",
                "api_base": "https://region2.openai.azure.com",
                "api_key": "key2"
            },
            "tpm": 100000,
            "rpm": 1000
        },
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "openai/gpt-4o",
                "api_key": "sk-..."
            },
            "tpm": 150000,
            "rpm": 500
        }
    ],
    routing_strategy="least-busy",  # or "simple-shuffle", "latency-based-routing"
    enable_pre_call_checks=True
)

# Requests automatically distributed
for i in range(100):
    response = await router.acompletion(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Query {i}"}]
    )

Rate Limiting

Rate limiting protects you from two things: exceeding provider quotas (which causes errors) and runaway costs from bugs or attacks. A single misconfigured loop can generate thousands of API calls in seconds — rate limiting is your circuit breaker.
from litellm import Router
import redis

# Redis-based rate limiting
redis_client = redis.Redis(host="localhost", port=6379)

router = Router(
    model_list=[
        {
            "model_name": "gpt-4o",
            "litellm_params": {"model": "gpt-4o"},
            "tpm": 90000,
            "rpm": 500
        }
    ],
    redis_host="localhost",
    redis_port=6379,
    routing_strategy="usage-based-routing"
)

# Custom rate limiting
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio

@dataclass
class RateLimit:
    requests: int
    tokens: int
    window_seconds: int = 60

class RateLimitedRouter:
    """Router with custom rate limiting.
    
    Wraps a LiteLLM Router with per-model request and token
    budgets. If a model exceeds its budget within the time
    window, requests are held until capacity frees up.
    """
    
    def __init__(
        self,
        router: Router,
        limits: dict[str, RateLimit]
    ):
        self.router = router
        self.limits = limits
        self.usage = {}
        self.lock = asyncio.Lock()
    
    async def _check_limit(self, model: str) -> bool:
        """Check if within rate limits"""
        limit = self.limits.get(model)
        if not limit:
            return True
        
        now = datetime.now()
        window_start = now - timedelta(seconds=limit.window_seconds)
        
        async with self.lock:
            # Clean old entries
            if model in self.usage:
                self.usage[model] = [
                    u for u in self.usage[model]
                    if u["time"] > window_start
                ]
            else:
                self.usage[model] = []
            
            # Check limits
            current_requests = len(self.usage[model])
            current_tokens = sum(u["tokens"] for u in self.usage[model])
            
            return (
                current_requests < limit.requests and
                current_tokens < limit.tokens
            )
    
    async def _record_usage(self, model: str, tokens: int):
        """Record usage"""
        async with self.lock:
            if model not in self.usage:
                self.usage[model] = []
            
            self.usage[model].append({
                "time": datetime.now(),
                "tokens": tokens
            })
    
    async def completion(self, model: str, **kwargs):
        """Rate-limited completion"""
        while not await self._check_limit(model):
            await asyncio.sleep(0.1)
        
        response = await self.router.acompletion(model=model, **kwargs)
        
        total_tokens = response.usage.total_tokens
        await self._record_usage(model, total_tokens)
        
        return response

Caching Integration

from litellm import completion
import litellm
import redis

# Enable Redis caching
litellm.cache = litellm.Cache(
    type="redis",
    host="localhost",
    port=6379,
    ttl=3600  # 1 hour
)

# Semantic caching with embeddings
litellm.cache = litellm.Cache(
    type="redis",
    host="localhost",
    port=6379,
    similarity_threshold=0.8,  # Return cached if > 80% similar
    supported_call_types=["completion", "acompletion"]
)

# Request with caching
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What is AI?"}],
    caching=True
)

# Check if response was cached
if hasattr(response, "_hidden_params"):
    was_cached = response._hidden_params.get("cache_hit", False)
    print(f"Cache hit: {was_cached}")

# Disable caching for specific request
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What time is it?"}],
    caching=False  # Skip cache for time-sensitive queries
)

Custom Provider Wrapper

When LiteLLM’s built-in abstraction is not enough — for example, you need cost tracking, custom logging, or semantic model aliases (“fast”, “smart”, “cheap”) — wrap it in a thin client that adds your business logic.
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import litellm
from litellm import completion, acompletion

@dataclass
class ModelConfig:
    model: str
    provider: str
    max_tokens: int = 4096
    temperature: float = 0.7
    cost_per_1k_input: float = 0.01
    cost_per_1k_output: float = 0.03

class UnifiedLLMClient:
    """Unified client for LLM operations.
    
    Maps semantic aliases ("fast", "smart", "cheap", "creative")
    to specific provider/model combinations. This lets application
    code express intent ("I need a fast response") rather than
    hard-coding model names that change every few months.
    """
    
    MODELS = {
        "fast": ModelConfig(
            model="groq/llama-3.3-70b-versatile",
            provider="groq",
            cost_per_1k_input=0.00059,
            cost_per_1k_output=0.00079
        ),
        "smart": ModelConfig(
            model="gpt-4o",
            provider="openai",
            cost_per_1k_input=0.0025,
            cost_per_1k_output=0.010
        ),
        "cheap": ModelConfig(
            model="gpt-4o-mini",
            provider="openai",
            cost_per_1k_input=0.00015,
            cost_per_1k_output=0.0006
        ),
        "creative": ModelConfig(
            model="claude-3-5-sonnet-20241022",
            provider="anthropic",
            cost_per_1k_input=0.003,
            cost_per_1k_output=0.015
        )
    }
    
    def __init__(self, default_model: str = "smart"):
        self.default_model = default_model
        self.total_cost = 0.0
        self.request_count = 0
    
    def _get_config(self, model_key: str) -> ModelConfig:
        if model_key in self.MODELS:
            return self.MODELS[model_key]
        # Treat as raw model name
        return ModelConfig(model=model_key, provider="custom")
    
    def _calculate_cost(
        self,
        config: ModelConfig,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        return (
            (input_tokens / 1000) * config.cost_per_1k_input +
            (output_tokens / 1000) * config.cost_per_1k_output
        )
    
    async def complete(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
        **kwargs
    ) -> Dict[str, Any]:
        """Unified completion with tracking"""
        
        model_key = model or self.default_model
        config = self._get_config(model_key)
        
        response = await acompletion(
            model=config.model,
            messages=messages,
            max_tokens=kwargs.get("max_tokens", config.max_tokens),
            temperature=kwargs.get("temperature", config.temperature),
            **{k: v for k, v in kwargs.items() 
               if k not in ["max_tokens", "temperature"]}
        )
        
        # Track usage
        usage = response.usage
        cost = self._calculate_cost(
            config,
            usage.prompt_tokens,
            usage.completion_tokens
        )
        self.total_cost += cost
        self.request_count += 1
        
        return {
            "content": response.choices[0].message.content,
            "model": config.model,
            "usage": {
                "input_tokens": usage.prompt_tokens,
                "output_tokens": usage.completion_tokens,
                "cost": cost
            }
        }
    
    def get_stats(self) -> Dict[str, Any]:
        return {
            "total_requests": self.request_count,
            "total_cost": round(self.total_cost, 4)
        }

# Usage
client = UnifiedLLMClient(default_model="smart")

# Use semantic model names
response = await client.complete(
    messages=[{"role": "user", "content": "Quick question"}],
    model="fast"  # Uses Groq for speed
)

response = await client.complete(
    messages=[{"role": "user", "content": "Write a story"}],
    model="creative"  # Uses Claude for creativity
)

print(client.get_stats())

Streaming with Router

from litellm import Router

router = Router(model_list=[...])

async def stream_completion(messages: list):
    """Stream responses through router"""
    
    response = await router.acompletion(
        model="gpt-4o",
        messages=messages,
        stream=True
    )
    
    full_content = ""
    async for chunk in response:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_content += content
            yield content
    
    return full_content

# FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: dict):
    async def generate():
        async for chunk in stream_completion(request["messages"]):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Observability Integration

You cannot optimize what you cannot measure. LLM observability means tracking every call’s latency, token usage, cost, model, and success/failure status. Without this, you are flying blind — you will not know which model is slow, which prompts are expensive, or when your error rate spikes.
import litellm
from litellm.integrations.custom_logger import CustomLogger

class LLMLogger(CustomLogger):
    """Custom logger for LLM calls"""
    
    def log_pre_api_call(self, model, messages, kwargs):
        print(f"Calling {model} with {len(messages)} messages")
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        duration = end_time - start_time
        tokens = response_obj.usage.total_tokens
        print(f"Success: {tokens} tokens in {duration:.2f}s")
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        print(f"Failure: {response_obj}")
    
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Async logging (e.g., to database)
        pass

# Register logger
litellm.callbacks = [LLMLogger()]

# Or use built-in integrations
litellm.success_callback = ["langfuse"]  # Send to Langfuse
litellm.failure_callback = ["sentry"]     # Errors to Sentry

Common Pitfalls

Building a full orchestration layer for an app that only uses one model is premature. Start with direct API calls, add a thin wrapper when you need a second provider, and adopt LiteLLM Router when you genuinely need load balancing or fallbacks. Abstraction has a learning and debugging cost.
Using gpt-4o instead of a dated version like gpt-4o-2024-08-06 means your application behavior can change without any code deployment when the provider updates the model alias. In production, always pin model versions and test before upgrading.
Enabling semantic caching globally means time-sensitive queries (“What time is my meeting?”) or personalized queries (“What are my recent orders?”) return stale or wrong results. Cache selectively: factual reference queries yes, user-specific or time-sensitive queries no.
Routing strategies based on historical latency may not account for cold-start effects. A provider that has not received traffic in minutes may have higher latency on the next request. Consider adding a “keep-warm” mechanism for critical providers.

Model Comparison

ProviderModelSpeedQualityCost
Groqllama-3.3-70bFastestGoodLow
OpenAIgpt-4o-miniFastGoodLow
OpenAIgpt-4oMediumExcellentMedium
Anthropicclaude-3-5-sonnetMediumExcellentMedium
Googlegemini-1.5-proMediumExcellentMedium

What is Next

Semantic Search

Learn hybrid search, reranking, and advanced retrieval techniques

Interview Deep-Dive

Strong Answer:
  • The first concern is provider abstraction. You need a unified interface so that switching from OpenAI to Anthropic to a self-hosted model does not require changes in your application code. This means normalizing the request format (messages array, temperature, max_tokens), the response format (content, usage, finish_reason), and the error taxonomy (rate limit, auth failure, context overflow). LiteLLM does this well out of the box, but in production I have found you often need a thin wrapper on top for your own cost tracking and routing logic.
  • The second concern is failover and resilience. LLM APIs go down more often than people expect — OpenAI has had multiple multi-hour outages. You need automatic failover with a priority chain: try GPT-4o first, fall back to Claude 3.5 Sonnet, then to a self-hosted Llama model as the last resort. The failover logic needs to distinguish between retryable errors (429 rate limit, 500 server error) and non-retryable errors (400 bad request, 401 auth failure). Retrying on a non-retryable error wastes time and money.
  • The third concern is cost-aware routing. Not every query needs your most expensive model. I build a routing layer that classifies incoming requests by complexity — simple extraction goes to gpt-4o-mini at 0.15/Minputtokens,complexreasoninggoestogpt4oat0.15/M input tokens, complex reasoning goes to gpt-4o at 2.50/M. This classification itself can be a lightweight rule-based system or a small model. In one system I worked on, this routing saved about 60% on the monthly API bill without measurable quality degradation.
  • The fourth concern is observability. Every request through the orchestration layer must be logged with: provider, model, latency, token counts, cost, and whether it was a primary call or a fallback. Without this, you cannot debug failures, optimize costs, or detect quality regressions after a model update.
Follow-up: How do you handle the semantic differences between providers — for example, Anthropic’s system prompt handling versus OpenAI’s?This is where naive orchestration breaks down. Each provider has subtle differences that a unified interface can hide but not eliminate. Anthropic requires the system prompt as a separate system parameter, not as a message with role “system.” OpenAI supports response_format for JSON mode, Anthropic uses tool-use with a specific schema. Function calling schemas differ between providers. My approach is a provider-specific adapter layer beneath the unified interface. The adapter translates the canonical request format into provider-specific format on the way in, and normalizes the response on the way out. The critical thing is to have integration tests per provider that validate the adapter behavior, because providers change their APIs without warning. I have been burned by Anthropic changing their message format validation rules in a minor version update that broke our adapter silently.
Strong Answer:
  • Round-robin is the simplest: you cycle through your available endpoints in order. Its strength is simplicity and even distribution. Its weakness is that it is completely blind to the actual state of each endpoint. If one Azure deployment is overloaded and responding in 5 seconds while another is idle at 200ms, round-robin still sends them equal traffic. In practice, round-robin works fine when all your deployments have identical capacity and are healthy, which is often the case in steady state.
  • Least-busy routing sends each request to the deployment with the fewest in-flight requests. This adapts naturally to varying processing speeds — a slow deployment accumulates in-flight requests and naturally receives less new traffic. The trade-off is that you need to track in-flight counts accurately across your routing layer, which in a distributed system means shared state or approximate counting. The failure mode is thundering herd: if a deployment recovers from being slow, it suddenly has zero in-flight requests and gets hammered with all new traffic simultaneously.
  • Latency-based routing tracks the rolling average (or P95) latency of each deployment and prefers the fastest one. This is the most sophisticated and usually gives the best user experience because it directly optimizes for what users care about — response time. The trade-off is cold-start bias: a deployment that has not been used recently has no latency data, so you need an exploration strategy. I typically use epsilon-greedy: 90% of traffic goes to the lowest-latency endpoint, 10% is randomly distributed to keep latency estimates fresh for all endpoints.
  • In production with multiple Azure OpenAI deployments, I use a combination: latency-based routing as the primary strategy, with a fallback to round-robin when latency data is stale (no requests in the last 60 seconds). I also layer on rate-limit-aware routing — if a deployment returns a 429 with a Retry-After header, I remove it from the pool for that duration. This combination handles the common failure modes: regional outages, temporary rate limits, and gradual performance degradation.
Follow-up: How does token-per-minute (TPM) rate limiting interact with your load balancing strategy?TPM limits are the hidden constraint that breaks naive load balancing. Each Azure OpenAI deployment has a TPM limit — say 100K tokens per minute. A round-robin strategy that distributes requests evenly by count can still hit TPM limits if the requests have wildly different token counts. One summarization request consuming 50K input tokens can eat half your TPM budget. My approach is to track estimated token consumption per deployment on a rolling 60-second window. Before routing a request, I estimate its token count (input tokens from the message length, output tokens from max_tokens), check which deployments have enough remaining TPM headroom, and route to the lowest-latency deployment among those with sufficient budget. If no deployment has headroom, I queue the request with a short backoff rather than sending it and getting a 429. This token-aware routing is what separates a production-grade orchestration layer from a demo.
Strong Answer:
  • Exact-match caching is straightforward: hash the request (model, messages, temperature, etc.) and use that as a cache key. If the exact same request comes in again, serve the cached response. This is safe and deterministic — you will never serve a wrong cached response. The limitation is that “What is machine learning?” and “Explain machine learning to me” are completely different cache keys despite being semantically identical. Hit rates for exact-match caching are typically 5-15% for conversational applications and 30-50% for applications with templated queries.
  • Semantic caching embeds the query, compares it against cached query embeddings using cosine similarity, and returns the cached response if similarity exceeds a threshold. This dramatically increases hit rates — 30-60% in typical applications — because it catches paraphrases and minor variations. The cost savings can be substantial: if your average request costs 0.02andyouserve400.02 and you serve 40% from semantic cache, you are saving 0.008 per request.
  • The risk of semantic caching is serving stale or wrong responses. A similarity threshold of 0.95 feels safe, but embedding models can assign 0.95+ similarity to queries that are semantically related but require different answers. “What is the refund policy for electronics?” and “What is the refund policy for software?” might score 0.96 similarity but have completely different correct answers. You have essentially introduced a new class of bug: the cache collision.
  • The second risk is temporal staleness. If your underlying data changes — a product price updates, a policy changes — the cached response is now wrong and will be served confidently. You need cache invalidation tied to your data freshness, not just a TTL.
  • My recommendation: use exact-match caching as the default for all applications. Layer semantic caching on top only for specific query patterns where you have validated that the similarity threshold does not produce false matches in your domain. Always log cache hits so you can audit them for correctness.
Follow-up: How would you implement cache invalidation for a RAG system where the underlying documents change frequently?The cleanest approach is content-addressed caching. Instead of just hashing the query, you also include a hash of the retrieved document IDs (or their content hashes) in the cache key. When a document is updated, its content hash changes, which automatically invalidates any cached responses that were generated from that document. This is more granular than TTL-based invalidation — only the affected responses are invalidated, not the entire cache. For the retrieval step itself, I cache embedding vectors per document chunk and invalidate when the chunk content changes. This way, the retrieval results for a given query automatically reflect document updates because the similarity scores change when document embeddings change. The implementation cost is moderate — you need a cache layer that supports compound keys and a document update hook that propagates content hash changes — but it eliminates the entire class of stale-cache bugs.