Skip to main content
LLM orchestration simplifies working with multiple providers through unified interfaces, automatic failover, and intelligent routing.

LiteLLM Overview

LiteLLM provides a unified interface for 100+ LLM providers:
from litellm import completion, acompletion
import litellm

# Works the same way for any provider
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}]
)

# Switch providers by changing model string
response = completion(
    model="claude-3-5-sonnet-20241022",
    messages=[{"role": "user", "content": "Hello"}]
)

response = completion(
    model="groq/llama-3.3-70b-versatile",
    messages=[{"role": "user", "content": "Hello"}]
)

response = completion(
    model="together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo",
    messages=[{"role": "user", "content": "Hello"}]
)

Async Support

import asyncio

async def query_llm(prompt: str, model: str = "gpt-4o") -> str:
    response = await acompletion(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# Run concurrent queries
async def parallel_queries():
    prompts = ["Explain AI", "Explain ML", "Explain DL"]
    tasks = [query_llm(p) for p in prompts]
    results = await asyncio.gather(*tasks)
    return results

Provider Configuration

Environment Variables

# OpenAI
export OPENAI_API_KEY="sk-..."

# Anthropic
export ANTHROPIC_API_KEY="sk-ant-..."

# Google
export GEMINI_API_KEY="..."

# Azure OpenAI
export AZURE_API_KEY="..."
export AZURE_API_BASE="https://your-resource.openai.azure.com"
export AZURE_API_VERSION="2024-02-01"

# AWS Bedrock
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_REGION_NAME="us-east-1"

Programmatic Configuration

import litellm

# Set API keys programmatically
litellm.api_key = "sk-..."
litellm.anthropic_key = "sk-ant-..."

# Configure Azure
response = completion(
    model="azure/gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    api_base="https://your-resource.openai.azure.com",
    api_version="2024-02-01",
    api_key="your-azure-key"
)

# Configure Bedrock
response = completion(
    model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
    messages=[{"role": "user", "content": "Hello"}],
    aws_access_key_id="...",
    aws_secret_access_key="...",
    aws_region_name="us-east-1"
)

Fallback Configuration

from litellm import completion
import litellm

# Enable fallbacks
litellm.set_verbose = True

# Define fallback models
fallback_models = [
    "gpt-4o",
    "claude-3-5-sonnet-20241022",
    "groq/llama-3.3-70b-versatile"
]

def completion_with_fallback(messages: list, **kwargs) -> str:
    """Try each model in sequence until one succeeds"""
    
    last_error = None
    
    for model in fallback_models:
        try:
            response = completion(
                model=model,
                messages=messages,
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            last_error = e
            print(f"Model {model} failed: {e}")
            continue
    
    raise last_error

# Using LiteLLM Router for automatic fallback
from litellm import Router

router = Router(
    model_list=[
        {
            "model_name": "primary",
            "litellm_params": {
                "model": "gpt-4o",
                "api_key": "sk-..."
            }
        },
        {
            "model_name": "primary",  # Same name = fallback
            "litellm_params": {
                "model": "claude-3-5-sonnet-20241022",
                "api_key": "sk-ant-..."
            }
        },
        {
            "model_name": "primary",
            "litellm_params": {
                "model": "groq/llama-3.3-70b-versatile",
                "api_key": "gsk_..."
            }
        }
    ],
    fallbacks=[
        {"primary": ["primary"]}  # Fallback to next with same name
    ],
    num_retries=2
)

# Router automatically handles fallback
response = router.completion(
    model="primary",
    messages=[{"role": "user", "content": "Hello"}]
)

Load Balancing

from litellm import Router

# Configure load balancing across deployments
router = Router(
    model_list=[
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "azure/gpt-4o-deployment-1",
                "api_base": "https://region1.openai.azure.com",
                "api_key": "key1"
            },
            "tpm": 100000,  # Tokens per minute
            "rpm": 1000     # Requests per minute
        },
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "azure/gpt-4o-deployment-2",
                "api_base": "https://region2.openai.azure.com",
                "api_key": "key2"
            },
            "tpm": 100000,
            "rpm": 1000
        },
        {
            "model_name": "gpt-4o",
            "litellm_params": {
                "model": "openai/gpt-4o",
                "api_key": "sk-..."
            },
            "tpm": 150000,
            "rpm": 500
        }
    ],
    routing_strategy="least-busy",  # or "simple-shuffle", "latency-based-routing"
    enable_pre_call_checks=True
)

# Requests automatically distributed
for i in range(100):
    response = await router.acompletion(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"Query {i}"}]
    )

Rate Limiting

from litellm import Router
import redis

# Redis-based rate limiting
redis_client = redis.Redis(host="localhost", port=6379)

router = Router(
    model_list=[
        {
            "model_name": "gpt-4o",
            "litellm_params": {"model": "gpt-4o"},
            "tpm": 90000,
            "rpm": 500
        }
    ],
    redis_host="localhost",
    redis_port=6379,
    routing_strategy="usage-based-routing"
)

# Custom rate limiting
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio

@dataclass
class RateLimit:
    requests: int
    tokens: int
    window_seconds: int = 60

class RateLimitedRouter:
    """Router with custom rate limiting"""
    
    def __init__(
        self,
        router: Router,
        limits: dict[str, RateLimit]
    ):
        self.router = router
        self.limits = limits
        self.usage = {}
        self.lock = asyncio.Lock()
    
    async def _check_limit(self, model: str) -> bool:
        """Check if within rate limits"""
        limit = self.limits.get(model)
        if not limit:
            return True
        
        now = datetime.now()
        window_start = now - timedelta(seconds=limit.window_seconds)
        
        async with self.lock:
            # Clean old entries
            if model in self.usage:
                self.usage[model] = [
                    u for u in self.usage[model]
                    if u["time"] > window_start
                ]
            else:
                self.usage[model] = []
            
            # Check limits
            current_requests = len(self.usage[model])
            current_tokens = sum(u["tokens"] for u in self.usage[model])
            
            return (
                current_requests < limit.requests and
                current_tokens < limit.tokens
            )
    
    async def _record_usage(self, model: str, tokens: int):
        """Record usage"""
        async with self.lock:
            if model not in self.usage:
                self.usage[model] = []
            
            self.usage[model].append({
                "time": datetime.now(),
                "tokens": tokens
            })
    
    async def completion(self, model: str, **kwargs):
        """Rate-limited completion"""
        while not await self._check_limit(model):
            await asyncio.sleep(0.1)
        
        response = await self.router.acompletion(model=model, **kwargs)
        
        total_tokens = response.usage.total_tokens
        await self._record_usage(model, total_tokens)
        
        return response

Caching Integration

from litellm import completion
import litellm
import redis

# Enable Redis caching
litellm.cache = litellm.Cache(
    type="redis",
    host="localhost",
    port=6379,
    ttl=3600  # 1 hour
)

# Semantic caching with embeddings
litellm.cache = litellm.Cache(
    type="redis",
    host="localhost",
    port=6379,
    similarity_threshold=0.8,  # Return cached if > 80% similar
    supported_call_types=["completion", "acompletion"]
)

# Request with caching
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What is AI?"}],
    caching=True
)

# Check if response was cached
if hasattr(response, "_hidden_params"):
    was_cached = response._hidden_params.get("cache_hit", False)
    print(f"Cache hit: {was_cached}")

# Disable caching for specific request
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What time is it?"}],
    caching=False  # Skip cache for time-sensitive queries
)

Custom Provider Wrapper

from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import litellm
from litellm import completion, acompletion

@dataclass
class ModelConfig:
    model: str
    provider: str
    max_tokens: int = 4096
    temperature: float = 0.7
    cost_per_1k_input: float = 0.01
    cost_per_1k_output: float = 0.03

class UnifiedLLMClient:
    """Unified client for LLM operations"""
    
    MODELS = {
        "fast": ModelConfig(
            model="groq/llama-3.3-70b-versatile",
            provider="groq",
            cost_per_1k_input=0.00059,
            cost_per_1k_output=0.00079
        ),
        "smart": ModelConfig(
            model="gpt-4o",
            provider="openai",
            cost_per_1k_input=0.0025,
            cost_per_1k_output=0.010
        ),
        "cheap": ModelConfig(
            model="gpt-4o-mini",
            provider="openai",
            cost_per_1k_input=0.00015,
            cost_per_1k_output=0.0006
        ),
        "creative": ModelConfig(
            model="claude-3-5-sonnet-20241022",
            provider="anthropic",
            cost_per_1k_input=0.003,
            cost_per_1k_output=0.015
        )
    }
    
    def __init__(self, default_model: str = "smart"):
        self.default_model = default_model
        self.total_cost = 0.0
        self.request_count = 0
    
    def _get_config(self, model_key: str) -> ModelConfig:
        if model_key in self.MODELS:
            return self.MODELS[model_key]
        # Treat as raw model name
        return ModelConfig(model=model_key, provider="custom")
    
    def _calculate_cost(
        self,
        config: ModelConfig,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        return (
            (input_tokens / 1000) * config.cost_per_1k_input +
            (output_tokens / 1000) * config.cost_per_1k_output
        )
    
    async def complete(
        self,
        messages: List[Dict[str, str]],
        model: str = None,
        **kwargs
    ) -> Dict[str, Any]:
        """Unified completion with tracking"""
        
        model_key = model or self.default_model
        config = self._get_config(model_key)
        
        response = await acompletion(
            model=config.model,
            messages=messages,
            max_tokens=kwargs.get("max_tokens", config.max_tokens),
            temperature=kwargs.get("temperature", config.temperature),
            **{k: v for k, v in kwargs.items() 
               if k not in ["max_tokens", "temperature"]}
        )
        
        # Track usage
        usage = response.usage
        cost = self._calculate_cost(
            config,
            usage.prompt_tokens,
            usage.completion_tokens
        )
        self.total_cost += cost
        self.request_count += 1
        
        return {
            "content": response.choices[0].message.content,
            "model": config.model,
            "usage": {
                "input_tokens": usage.prompt_tokens,
                "output_tokens": usage.completion_tokens,
                "cost": cost
            }
        }
    
    def get_stats(self) -> Dict[str, Any]:
        return {
            "total_requests": self.request_count,
            "total_cost": round(self.total_cost, 4)
        }

# Usage
client = UnifiedLLMClient(default_model="smart")

# Use semantic model names
response = await client.complete(
    messages=[{"role": "user", "content": "Quick question"}],
    model="fast"  # Uses Groq for speed
)

response = await client.complete(
    messages=[{"role": "user", "content": "Write a story"}],
    model="creative"  # Uses Claude for creativity
)

print(client.get_stats())

Streaming with Router

from litellm import Router

router = Router(model_list=[...])

async def stream_completion(messages: list):
    """Stream responses through router"""
    
    response = await router.acompletion(
        model="gpt-4o",
        messages=messages,
        stream=True
    )
    
    full_content = ""
    async for chunk in response:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_content += content
            yield content
    
    return full_content

# FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: dict):
    async def generate():
        async for chunk in stream_completion(request["messages"]):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Observability Integration

import litellm
from litellm.integrations.custom_logger import CustomLogger

class LLMLogger(CustomLogger):
    """Custom logger for LLM calls"""
    
    def log_pre_api_call(self, model, messages, kwargs):
        print(f"Calling {model} with {len(messages)} messages")
    
    def log_success_event(self, kwargs, response_obj, start_time, end_time):
        duration = end_time - start_time
        tokens = response_obj.usage.total_tokens
        print(f"Success: {tokens} tokens in {duration:.2f}s")
    
    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
        print(f"Failure: {response_obj}")
    
    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
        # Async logging (e.g., to database)
        pass

# Register logger
litellm.callbacks = [LLMLogger()]

# Or use built-in integrations
litellm.success_callback = ["langfuse"]  # Send to Langfuse
litellm.failure_callback = ["sentry"]     # Errors to Sentry

Model Comparison

ProviderModelSpeedQualityCost
Groqllama-3.3-70bFastestGoodLow
OpenAIgpt-4o-miniFastGoodLow
OpenAIgpt-4oMediumExcellentMedium
Anthropicclaude-3-5-sonnetMediumExcellentMedium
Googlegemini-1.5-proMediumExcellentMedium

What is Next

Semantic Search

Learn hybrid search, reranking, and advanced retrieval techniques