LiteLLM Overview
LiteLLM provides a unified interface for 100+ LLM providers:Copy
from litellm import completion, acompletion
import litellm
# Works the same way for any provider
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}]
)
# Switch providers by changing model string
response = completion(
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "Hello"}]
)
response = completion(
model="groq/llama-3.3-70b-versatile",
messages=[{"role": "user", "content": "Hello"}]
)
response = completion(
model="together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo",
messages=[{"role": "user", "content": "Hello"}]
)
Async Support
Copy
import asyncio
async def query_llm(prompt: str, model: str = "gpt-4o") -> str:
response = await acompletion(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# Run concurrent queries
async def parallel_queries():
prompts = ["Explain AI", "Explain ML", "Explain DL"]
tasks = [query_llm(p) for p in prompts]
results = await asyncio.gather(*tasks)
return results
Provider Configuration
Environment Variables
Copy
# OpenAI
export OPENAI_API_KEY="sk-..."
# Anthropic
export ANTHROPIC_API_KEY="sk-ant-..."
# Google
export GEMINI_API_KEY="..."
# Azure OpenAI
export AZURE_API_KEY="..."
export AZURE_API_BASE="https://your-resource.openai.azure.com"
export AZURE_API_VERSION="2024-02-01"
# AWS Bedrock
export AWS_ACCESS_KEY_ID="..."
export AWS_SECRET_ACCESS_KEY="..."
export AWS_REGION_NAME="us-east-1"
Programmatic Configuration
Copy
import litellm
# Set API keys programmatically
litellm.api_key = "sk-..."
litellm.anthropic_key = "sk-ant-..."
# Configure Azure
response = completion(
model="azure/gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
api_base="https://your-resource.openai.azure.com",
api_version="2024-02-01",
api_key="your-azure-key"
)
# Configure Bedrock
response = completion(
model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
messages=[{"role": "user", "content": "Hello"}],
aws_access_key_id="...",
aws_secret_access_key="...",
aws_region_name="us-east-1"
)
Fallback Configuration
Copy
from litellm import completion
import litellm
# Enable fallbacks
litellm.set_verbose = True
# Define fallback models
fallback_models = [
"gpt-4o",
"claude-3-5-sonnet-20241022",
"groq/llama-3.3-70b-versatile"
]
def completion_with_fallback(messages: list, **kwargs) -> str:
"""Try each model in sequence until one succeeds"""
last_error = None
for model in fallback_models:
try:
response = completion(
model=model,
messages=messages,
**kwargs
)
return response.choices[0].message.content
except Exception as e:
last_error = e
print(f"Model {model} failed: {e}")
continue
raise last_error
# Using LiteLLM Router for automatic fallback
from litellm import Router
router = Router(
model_list=[
{
"model_name": "primary",
"litellm_params": {
"model": "gpt-4o",
"api_key": "sk-..."
}
},
{
"model_name": "primary", # Same name = fallback
"litellm_params": {
"model": "claude-3-5-sonnet-20241022",
"api_key": "sk-ant-..."
}
},
{
"model_name": "primary",
"litellm_params": {
"model": "groq/llama-3.3-70b-versatile",
"api_key": "gsk_..."
}
}
],
fallbacks=[
{"primary": ["primary"]} # Fallback to next with same name
],
num_retries=2
)
# Router automatically handles fallback
response = router.completion(
model="primary",
messages=[{"role": "user", "content": "Hello"}]
)
Load Balancing
Copy
from litellm import Router
# Configure load balancing across deployments
router = Router(
model_list=[
{
"model_name": "gpt-4o",
"litellm_params": {
"model": "azure/gpt-4o-deployment-1",
"api_base": "https://region1.openai.azure.com",
"api_key": "key1"
},
"tpm": 100000, # Tokens per minute
"rpm": 1000 # Requests per minute
},
{
"model_name": "gpt-4o",
"litellm_params": {
"model": "azure/gpt-4o-deployment-2",
"api_base": "https://region2.openai.azure.com",
"api_key": "key2"
},
"tpm": 100000,
"rpm": 1000
},
{
"model_name": "gpt-4o",
"litellm_params": {
"model": "openai/gpt-4o",
"api_key": "sk-..."
},
"tpm": 150000,
"rpm": 500
}
],
routing_strategy="least-busy", # or "simple-shuffle", "latency-based-routing"
enable_pre_call_checks=True
)
# Requests automatically distributed
for i in range(100):
response = await router.acompletion(
model="gpt-4o",
messages=[{"role": "user", "content": f"Query {i}"}]
)
Rate Limiting
Copy
from litellm import Router
import redis
# Redis-based rate limiting
redis_client = redis.Redis(host="localhost", port=6379)
router = Router(
model_list=[
{
"model_name": "gpt-4o",
"litellm_params": {"model": "gpt-4o"},
"tpm": 90000,
"rpm": 500
}
],
redis_host="localhost",
redis_port=6379,
routing_strategy="usage-based-routing"
)
# Custom rate limiting
from dataclasses import dataclass
from datetime import datetime, timedelta
import asyncio
@dataclass
class RateLimit:
requests: int
tokens: int
window_seconds: int = 60
class RateLimitedRouter:
"""Router with custom rate limiting"""
def __init__(
self,
router: Router,
limits: dict[str, RateLimit]
):
self.router = router
self.limits = limits
self.usage = {}
self.lock = asyncio.Lock()
async def _check_limit(self, model: str) -> bool:
"""Check if within rate limits"""
limit = self.limits.get(model)
if not limit:
return True
now = datetime.now()
window_start = now - timedelta(seconds=limit.window_seconds)
async with self.lock:
# Clean old entries
if model in self.usage:
self.usage[model] = [
u for u in self.usage[model]
if u["time"] > window_start
]
else:
self.usage[model] = []
# Check limits
current_requests = len(self.usage[model])
current_tokens = sum(u["tokens"] for u in self.usage[model])
return (
current_requests < limit.requests and
current_tokens < limit.tokens
)
async def _record_usage(self, model: str, tokens: int):
"""Record usage"""
async with self.lock:
if model not in self.usage:
self.usage[model] = []
self.usage[model].append({
"time": datetime.now(),
"tokens": tokens
})
async def completion(self, model: str, **kwargs):
"""Rate-limited completion"""
while not await self._check_limit(model):
await asyncio.sleep(0.1)
response = await self.router.acompletion(model=model, **kwargs)
total_tokens = response.usage.total_tokens
await self._record_usage(model, total_tokens)
return response
Caching Integration
Copy
from litellm import completion
import litellm
import redis
# Enable Redis caching
litellm.cache = litellm.Cache(
type="redis",
host="localhost",
port=6379,
ttl=3600 # 1 hour
)
# Semantic caching with embeddings
litellm.cache = litellm.Cache(
type="redis",
host="localhost",
port=6379,
similarity_threshold=0.8, # Return cached if > 80% similar
supported_call_types=["completion", "acompletion"]
)
# Request with caching
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": "What is AI?"}],
caching=True
)
# Check if response was cached
if hasattr(response, "_hidden_params"):
was_cached = response._hidden_params.get("cache_hit", False)
print(f"Cache hit: {was_cached}")
# Disable caching for specific request
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": "What time is it?"}],
caching=False # Skip cache for time-sensitive queries
)
Custom Provider Wrapper
Copy
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import litellm
from litellm import completion, acompletion
@dataclass
class ModelConfig:
model: str
provider: str
max_tokens: int = 4096
temperature: float = 0.7
cost_per_1k_input: float = 0.01
cost_per_1k_output: float = 0.03
class UnifiedLLMClient:
"""Unified client for LLM operations"""
MODELS = {
"fast": ModelConfig(
model="groq/llama-3.3-70b-versatile",
provider="groq",
cost_per_1k_input=0.00059,
cost_per_1k_output=0.00079
),
"smart": ModelConfig(
model="gpt-4o",
provider="openai",
cost_per_1k_input=0.0025,
cost_per_1k_output=0.010
),
"cheap": ModelConfig(
model="gpt-4o-mini",
provider="openai",
cost_per_1k_input=0.00015,
cost_per_1k_output=0.0006
),
"creative": ModelConfig(
model="claude-3-5-sonnet-20241022",
provider="anthropic",
cost_per_1k_input=0.003,
cost_per_1k_output=0.015
)
}
def __init__(self, default_model: str = "smart"):
self.default_model = default_model
self.total_cost = 0.0
self.request_count = 0
def _get_config(self, model_key: str) -> ModelConfig:
if model_key in self.MODELS:
return self.MODELS[model_key]
# Treat as raw model name
return ModelConfig(model=model_key, provider="custom")
def _calculate_cost(
self,
config: ModelConfig,
input_tokens: int,
output_tokens: int
) -> float:
return (
(input_tokens / 1000) * config.cost_per_1k_input +
(output_tokens / 1000) * config.cost_per_1k_output
)
async def complete(
self,
messages: List[Dict[str, str]],
model: str = None,
**kwargs
) -> Dict[str, Any]:
"""Unified completion with tracking"""
model_key = model or self.default_model
config = self._get_config(model_key)
response = await acompletion(
model=config.model,
messages=messages,
max_tokens=kwargs.get("max_tokens", config.max_tokens),
temperature=kwargs.get("temperature", config.temperature),
**{k: v for k, v in kwargs.items()
if k not in ["max_tokens", "temperature"]}
)
# Track usage
usage = response.usage
cost = self._calculate_cost(
config,
usage.prompt_tokens,
usage.completion_tokens
)
self.total_cost += cost
self.request_count += 1
return {
"content": response.choices[0].message.content,
"model": config.model,
"usage": {
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"cost": cost
}
}
def get_stats(self) -> Dict[str, Any]:
return {
"total_requests": self.request_count,
"total_cost": round(self.total_cost, 4)
}
# Usage
client = UnifiedLLMClient(default_model="smart")
# Use semantic model names
response = await client.complete(
messages=[{"role": "user", "content": "Quick question"}],
model="fast" # Uses Groq for speed
)
response = await client.complete(
messages=[{"role": "user", "content": "Write a story"}],
model="creative" # Uses Claude for creativity
)
print(client.get_stats())
Streaming with Router
Copy
from litellm import Router
router = Router(model_list=[...])
async def stream_completion(messages: list):
"""Stream responses through router"""
response = await router.acompletion(
model="gpt-4o",
messages=messages,
stream=True
)
full_content = ""
async for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
yield content
return full_content
# FastAPI streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(request: dict):
async def generate():
async for chunk in stream_completion(request["messages"]):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Observability Integration
Copy
import litellm
from litellm.integrations.custom_logger import CustomLogger
class LLMLogger(CustomLogger):
"""Custom logger for LLM calls"""
def log_pre_api_call(self, model, messages, kwargs):
print(f"Calling {model} with {len(messages)} messages")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
duration = end_time - start_time
tokens = response_obj.usage.total_tokens
print(f"Success: {tokens} tokens in {duration:.2f}s")
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"Failure: {response_obj}")
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
# Async logging (e.g., to database)
pass
# Register logger
litellm.callbacks = [LLMLogger()]
# Or use built-in integrations
litellm.success_callback = ["langfuse"] # Send to Langfuse
litellm.failure_callback = ["sentry"] # Errors to Sentry
Model Comparison
| Provider | Model | Speed | Quality | Cost |
|---|---|---|---|---|
| Groq | llama-3.3-70b | Fastest | Good | Low |
| OpenAI | gpt-4o-mini | Fast | Good | Low |
| OpenAI | gpt-4o | Medium | Excellent | Medium |
| Anthropic | claude-3-5-sonnet | Medium | Excellent | Medium |
| gemini-1.5-pro | Medium | Excellent | Medium |
What is Next
Semantic Search
Learn hybrid search, reranking, and advanced retrieval techniques