December 2025 Update: Production patterns for building resilient LLM applications with multi-provider fallback chains, intelligent routing, and cost-optimized model selection.
Why Multi-Provider Strategy?
Copy
Single Provider Risk Multi-Provider Benefits
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Single point of failure High availability
Rate limit bottlenecks Load distribution
Price increases Cost optimization
Model deprecation Future-proofing
Quality issues Best model per task
Unified LLM Interface
Copy
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, AsyncIterator
from enum import Enum
class Provider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
GOOGLE = "google"
TOGETHER = "together"
GROQ = "groq"
LOCAL = "local"
@dataclass
class LLMMessage:
role: str # "system", "user", "assistant"
content: str
def to_openai(self) -> dict:
return {"role": self.role, "content": self.content}
def to_anthropic(self) -> dict:
# Anthropic uses separate system parameter
return {"role": self.role, "content": self.content}
@dataclass
class LLMResponse:
content: str
model: str
provider: Provider
usage: dict
latency_ms: float
raw_response: Any = None
class BaseLLMClient(ABC):
"""Abstract base for LLM clients"""
provider: Provider
@abstractmethod
async def complete(
self,
messages: List[LLMMessage],
model: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> LLMResponse:
pass
@abstractmethod
async def stream(
self,
messages: List[LLMMessage],
model: str = None,
**kwargs
) -> AsyncIterator[str]:
pass
@abstractmethod
async def health_check(self) -> bool:
pass
Provider Implementations
Copy
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
import time
class OpenAIClient(BaseLLMClient):
provider = Provider.OPENAI
def __init__(self, api_key: str = None):
self.client = AsyncOpenAI(api_key=api_key)
self.default_model = "gpt-4o"
async def complete(
self,
messages: List[LLMMessage],
model: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> LLMResponse:
start = time.time()
response = await self.client.chat.completions.create(
model=model or self.default_model,
messages=[m.to_openai() for m in messages],
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
return LLMResponse(
content=response.choices[0].message.content,
model=response.model,
provider=self.provider,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
},
latency_ms=(time.time() - start) * 1000,
raw_response=response
)
async def stream(
self,
messages: List[LLMMessage],
model: str = None,
**kwargs
) -> AsyncIterator[str]:
stream = await self.client.chat.completions.create(
model=model or self.default_model,
messages=[m.to_openai() for m in messages],
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def health_check(self) -> bool:
try:
await self.client.models.list()
return True
except Exception:
return False
class AnthropicClient(BaseLLMClient):
provider = Provider.ANTHROPIC
def __init__(self, api_key: str = None):
self.client = AsyncAnthropic(api_key=api_key)
self.default_model = "claude-3-5-sonnet-20241022"
async def complete(
self,
messages: List[LLMMessage],
model: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> LLMResponse:
start = time.time()
# Extract system message
system = None
non_system = []
for m in messages:
if m.role == "system":
system = m.content
else:
non_system.append({"role": m.role, "content": m.content})
response = await self.client.messages.create(
model=model or self.default_model,
messages=non_system,
system=system,
max_tokens=max_tokens,
temperature=temperature
)
return LLMResponse(
content=response.content[0].text,
model=response.model,
provider=self.provider,
usage={
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens
},
latency_ms=(time.time() - start) * 1000,
raw_response=response
)
async def stream(
self,
messages: List[LLMMessage],
model: str = None,
**kwargs
) -> AsyncIterator[str]:
system = None
non_system = []
for m in messages:
if m.role == "system":
system = m.content
else:
non_system.append({"role": m.role, "content": m.content})
async with self.client.messages.stream(
model=model or self.default_model,
messages=non_system,
system=system,
max_tokens=kwargs.get("max_tokens", 1000)
) as stream:
async for text in stream.text_stream:
yield text
async def health_check(self) -> bool:
try:
# Simple completion to check
await self.complete(
[LLMMessage(role="user", content="hi")],
max_tokens=5
)
return True
except Exception:
return False
class GroqClient(BaseLLMClient):
"""Ultra-fast inference with Groq"""
provider = Provider.GROQ
def __init__(self, api_key: str = None):
from groq import AsyncGroq
self.client = AsyncGroq(api_key=api_key)
self.default_model = "llama-3.3-70b-versatile"
async def complete(
self,
messages: List[LLMMessage],
model: str = None,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> LLMResponse:
start = time.time()
response = await self.client.chat.completions.create(
model=model or self.default_model,
messages=[m.to_openai() for m in messages],
temperature=temperature,
max_tokens=max_tokens
)
return LLMResponse(
content=response.choices[0].message.content,
model=response.model,
provider=self.provider,
usage={
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens
},
latency_ms=(time.time() - start) * 1000,
raw_response=response
)
async def stream(self, messages, model=None, **kwargs):
stream = await self.client.chat.completions.create(
model=model or self.default_model,
messages=[m.to_openai() for m in messages],
stream=True,
**kwargs
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def health_check(self) -> bool:
try:
await self.client.models.list()
return True
except Exception:
return False
Fallback Chain
Copy
from dataclasses import dataclass, field
from typing import List, Optional, Callable
import logging
@dataclass
class FallbackConfig:
providers: List[BaseLLMClient]
retry_on: tuple = (Exception,)
max_retries_per_provider: int = 1
on_fallback: Optional[Callable] = None
class FallbackChain:
"""Execute with automatic fallback between providers"""
def __init__(self, config: FallbackConfig):
self.config = config
self.providers = config.providers
self.logger = logging.getLogger(__name__)
async def complete(
self,
messages: List[LLMMessage],
**kwargs
) -> LLMResponse:
"""Complete with fallback"""
errors = []
for provider in self.providers:
for attempt in range(self.config.max_retries_per_provider):
try:
self.logger.info(
f"Trying {provider.provider.value} "
f"(attempt {attempt + 1})"
)
response = await provider.complete(messages, **kwargs)
# Success
if errors:
self.logger.info(
f"Succeeded with {provider.provider.value} "
f"after {len(errors)} failures"
)
return response
except self.config.retry_on as e:
errors.append({
"provider": provider.provider.value,
"attempt": attempt + 1,
"error": str(e)
})
self.logger.warning(
f"{provider.provider.value} failed: {e}"
)
# Notify on fallback
if self.config.on_fallback:
self.config.on_fallback(provider.provider, e)
# All providers failed
raise Exception(
f"All providers failed. Errors: {errors}"
)
async def stream(
self,
messages: List[LLMMessage],
**kwargs
) -> AsyncIterator[str]:
"""Stream with fallback"""
for provider in self.providers:
try:
async for chunk in provider.stream(messages, **kwargs):
yield chunk
return
except Exception as e:
self.logger.warning(f"{provider.provider.value} failed: {e}")
continue
raise Exception("All providers failed for streaming")
# Usage
chain = FallbackChain(
FallbackConfig(
providers=[
OpenAIClient(),
AnthropicClient(),
GroqClient()
],
on_fallback=lambda p, e: print(f"Falling back from {p}: {e}")
)
)
response = await chain.complete([
LLMMessage(role="user", content="Hello!")
])
Intelligent Model Router
Route requests to optimal models based on task:Copy
from dataclasses import dataclass
from typing import Dict, Callable, Optional
from enum import Enum
class TaskType(str, Enum):
CODING = "coding"
CREATIVE = "creative"
ANALYSIS = "analysis"
CHAT = "chat"
SUMMARIZATION = "summarization"
TRANSLATION = "translation"
MATH = "math"
FAST = "fast" # Latency-critical
@dataclass
class ModelConfig:
provider: Provider
model: str
max_tokens: int = 4000
cost_per_1k_input: float = 0.01
cost_per_1k_output: float = 0.03
avg_latency_ms: float = 1000
class ModelRouter:
"""Route requests to optimal models"""
# Default model recommendations per task
TASK_MODELS: Dict[TaskType, list[ModelConfig]] = {
TaskType.CODING: [
ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022",
cost_per_1k_input=0.003, cost_per_1k_output=0.015),
ModelConfig(Provider.OPENAI, "gpt-4o",
cost_per_1k_input=0.0025, cost_per_1k_output=0.010),
],
TaskType.CREATIVE: [
ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
ModelConfig(Provider.OPENAI, "gpt-4o"),
],
TaskType.ANALYSIS: [
ModelConfig(Provider.OPENAI, "gpt-4o"),
ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
],
TaskType.CHAT: [
ModelConfig(Provider.OPENAI, "gpt-4o-mini",
cost_per_1k_input=0.00015, cost_per_1k_output=0.0006),
ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
cost_per_1k_input=0.00059, cost_per_1k_output=0.00079),
],
TaskType.FAST: [
ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
avg_latency_ms=200),
ModelConfig(Provider.GROQ, "mixtral-8x7b-32768",
avg_latency_ms=150),
],
TaskType.MATH: [
ModelConfig(Provider.OPENAI, "gpt-4o"),
ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
]
}
def __init__(
self,
clients: Dict[Provider, BaseLLMClient],
task_classifier: Optional[Callable[[str], TaskType]] = None
):
self.clients = clients
self.classifier = task_classifier or self._default_classifier
def _default_classifier(self, prompt: str) -> TaskType:
"""Simple keyword-based classification"""
prompt_lower = prompt.lower()
if any(kw in prompt_lower for kw in ["code", "function", "debug", "program"]):
return TaskType.CODING
if any(kw in prompt_lower for kw in ["write", "story", "creative", "imagine"]):
return TaskType.CREATIVE
if any(kw in prompt_lower for kw in ["analyze", "data", "compare", "evaluate"]):
return TaskType.ANALYSIS
if any(kw in prompt_lower for kw in ["summarize", "summary", "tldr"]):
return TaskType.SUMMARIZATION
if any(kw in prompt_lower for kw in ["translate", "translation"]):
return TaskType.TRANSLATION
if any(kw in prompt_lower for kw in ["math", "calculate", "equation", "solve"]):
return TaskType.MATH
return TaskType.CHAT
def get_models_for_task(
self,
task: TaskType,
optimize_for: str = "quality" # "quality", "cost", "speed"
) -> list[ModelConfig]:
"""Get ranked models for a task"""
models = self.TASK_MODELS.get(task, self.TASK_MODELS[TaskType.CHAT])
if optimize_for == "cost":
return sorted(models, key=lambda m: m.cost_per_1k_input)
elif optimize_for == "speed":
return sorted(models, key=lambda m: m.avg_latency_ms)
return models # Default order is quality-optimized
async def route(
self,
messages: List[LLMMessage],
task: TaskType = None,
optimize_for: str = "quality",
**kwargs
) -> LLMResponse:
"""Route request to optimal model"""
# Classify task if not provided
if task is None:
user_message = next(
(m.content for m in messages if m.role == "user"),
""
)
task = self.classifier(user_message)
# Get models for task
models = self.get_models_for_task(task, optimize_for)
# Try each model
for model_config in models:
client = self.clients.get(model_config.provider)
if not client:
continue
try:
return await client.complete(
messages,
model=model_config.model,
**kwargs
)
except Exception as e:
logging.warning(f"Model {model_config.model} failed: {e}")
continue
raise Exception(f"No available model for task: {task}")
# Usage
router = ModelRouter(
clients={
Provider.OPENAI: OpenAIClient(),
Provider.ANTHROPIC: AnthropicClient(),
Provider.GROQ: GroqClient()
}
)
# Automatic routing based on content
response = await router.route([
LLMMessage(role="user", content="Write a Python function to sort a list")
])
# Routes to coding-optimized model
# Explicit task with optimization
response = await router.route(
[LLMMessage(role="user", content="Quick question: what's 2+2?")],
task=TaskType.FAST,
optimize_for="speed"
)
# Routes to Groq for fastest response
Load Balancing
Copy
import random
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class ProviderStats:
requests: int = 0
errors: int = 0
total_latency: float = 0
@property
def avg_latency(self) -> float:
return self.total_latency / self.requests if self.requests else 0
@property
def error_rate(self) -> float:
return self.errors / self.requests if self.requests else 0
class LoadBalancer:
"""Load balance across providers"""
def __init__(
self,
clients: Dict[Provider, BaseLLMClient],
strategy: str = "round_robin" # "round_robin", "random", "least_latency", "weighted"
):
self.clients = clients
self.strategy = strategy
self.stats: Dict[Provider, ProviderStats] = defaultdict(ProviderStats)
self.weights: Dict[Provider, float] = {p: 1.0 for p in clients.keys()}
self._rr_index = 0
def _select_round_robin(self) -> Provider:
providers = list(self.clients.keys())
provider = providers[self._rr_index % len(providers)]
self._rr_index += 1
return provider
def _select_random(self) -> Provider:
return random.choice(list(self.clients.keys()))
def _select_least_latency(self) -> Provider:
# Prefer providers with lower average latency
available = [
(p, self.stats[p].avg_latency)
for p in self.clients.keys()
]
# New providers get priority (latency=0)
available.sort(key=lambda x: x[1])
return available[0][0]
def _select_weighted(self) -> Provider:
providers = list(self.clients.keys())
weights = [self.weights[p] for p in providers]
return random.choices(providers, weights=weights)[0]
def select_provider(self) -> Provider:
"""Select next provider based on strategy"""
strategies = {
"round_robin": self._select_round_robin,
"random": self._select_random,
"least_latency": self._select_least_latency,
"weighted": self._select_weighted
}
return strategies[self.strategy]()
async def complete(
self,
messages: List[LLMMessage],
**kwargs
) -> LLMResponse:
"""Complete with load balancing"""
provider = self.select_provider()
client = self.clients[provider]
try:
response = await client.complete(messages, **kwargs)
# Update stats
self.stats[provider].requests += 1
self.stats[provider].total_latency += response.latency_ms
return response
except Exception as e:
self.stats[provider].requests += 1
self.stats[provider].errors += 1
raise
def set_weight(self, provider: Provider, weight: float):
"""Adjust provider weight"""
self.weights[provider] = weight
def get_stats(self) -> Dict[Provider, dict]:
"""Get provider statistics"""
return {
p: {
"requests": s.requests,
"errors": s.errors,
"error_rate": f"{s.error_rate:.1%}",
"avg_latency_ms": f"{s.avg_latency:.0f}"
}
for p, s in self.stats.items()
}
# Usage
balancer = LoadBalancer(
clients={
Provider.OPENAI: OpenAIClient(),
Provider.ANTHROPIC: AnthropicClient()
},
strategy="least_latency"
)
# Requests distributed across providers
for _ in range(100):
response = await balancer.complete([
LLMMessage(role="user", content="Hello!")
])
print(balancer.get_stats())
Cost-Based Routing
Copy
class CostOptimizedRouter:
"""Route based on cost constraints"""
COSTS = {
(Provider.OPENAI, "gpt-4o"): (0.0025, 0.010),
(Provider.OPENAI, "gpt-4o-mini"): (0.00015, 0.0006),
(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"): (0.003, 0.015),
(Provider.GROQ, "llama-3.3-70b-versatile"): (0.00059, 0.00079)
}
def __init__(
self,
clients: Dict[Provider, BaseLLMClient],
daily_budget: float = 100.0,
prefer_quality: bool = True
):
self.clients = clients
self.daily_budget = daily_budget
self.prefer_quality = prefer_quality
self.daily_spend = 0.0
self.last_reset = datetime.now().date()
def _reset_if_new_day(self):
today = datetime.now().date()
if today != self.last_reset:
self.daily_spend = 0.0
self.last_reset = today
def estimate_cost(
self,
provider: Provider,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
costs = self.COSTS.get((provider, model), (0.01, 0.03))
return (input_tokens / 1000 * costs[0]) + (output_tokens / 1000 * costs[1])
def get_cheapest_option(
self,
estimated_input: int,
estimated_output: int
) -> tuple[Provider, str]:
"""Get cheapest available option"""
options = []
for (provider, model), (in_cost, out_cost) in self.COSTS.items():
if provider not in self.clients:
continue
cost = self.estimate_cost(
provider, model, estimated_input, estimated_output
)
options.append((provider, model, cost))
options.sort(key=lambda x: x[2])
return options[0][0], options[0][1]
async def complete(
self,
messages: List[LLMMessage],
estimated_output: int = 500,
**kwargs
) -> LLMResponse:
"""Route with cost awareness"""
self._reset_if_new_day()
# Estimate input tokens
input_text = " ".join(m.content for m in messages)
estimated_input = len(input_text) // 4
remaining_budget = self.daily_budget - self.daily_spend
# Select model based on budget
if remaining_budget < self.daily_budget * 0.2:
# Low budget - use cheapest
provider, model = self.get_cheapest_option(
estimated_input, estimated_output
)
elif self.prefer_quality:
# Quality preference - use best
provider, model = Provider.OPENAI, "gpt-4o"
else:
# Balanced
provider, model = Provider.OPENAI, "gpt-4o-mini"
client = self.clients[provider]
response = await client.complete(messages, model=model, **kwargs)
# Track spend
actual_cost = self.estimate_cost(
provider, model,
response.usage["input_tokens"],
response.usage["output_tokens"]
)
self.daily_spend += actual_cost
return response
Key Takeaways
Unified Interface
Abstract providers behind a common interface for flexibility
Automatic Fallback
Chain providers so failures automatically try alternatives
Smart Routing
Route to optimal models based on task type and requirements
Cost Control
Implement budget controls and cost-optimized routing
What’s Next
Evaluation & Testing
Learn to evaluate and test your LLM applications