Skip to main content
December 2025 Update: Production patterns for building resilient LLM applications with multi-provider fallback chains, intelligent routing, and cost-optimized model selection.

Why Multi-Provider Strategy?

Single Provider Risk              Multi-Provider Benefits
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Single point of failure           High availability
Rate limit bottlenecks            Load distribution
Price increases                   Cost optimization
Model deprecation                 Future-proofing
Quality issues                    Best model per task

Unified LLM Interface

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, AsyncIterator
from enum import Enum

class Provider(str, Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GOOGLE = "google"
    TOGETHER = "together"
    GROQ = "groq"
    LOCAL = "local"

@dataclass
class LLMMessage:
    role: str  # "system", "user", "assistant"
    content: str
    
    def to_openai(self) -> dict:
        return {"role": self.role, "content": self.content}
    
    def to_anthropic(self) -> dict:
        # Anthropic uses separate system parameter
        return {"role": self.role, "content": self.content}

@dataclass
class LLMResponse:
    content: str
    model: str
    provider: Provider
    usage: dict
    latency_ms: float
    raw_response: Any = None

class BaseLLMClient(ABC):
    """Abstract base for LLM clients"""
    
    provider: Provider
    
    @abstractmethod
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        pass
    
    @abstractmethod
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        pass
    
    @abstractmethod
    async def health_check(self) -> bool:
        pass

Provider Implementations

from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
import time

class OpenAIClient(BaseLLMClient):
    provider = Provider.OPENAI
    
    def __init__(self, api_key: str = None):
        self.client = AsyncOpenAI(api_key=api_key)
        self.default_model = "gpt-4o"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        response = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs
        )
        
        return LLMResponse(
            content=response.choices[0].message.content,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        stream = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            stream=True,
            **kwargs
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    async def health_check(self) -> bool:
        try:
            await self.client.models.list()
            return True
        except Exception:
            return False

class AnthropicClient(BaseLLMClient):
    provider = Provider.ANTHROPIC
    
    def __init__(self, api_key: str = None):
        self.client = AsyncAnthropic(api_key=api_key)
        self.default_model = "claude-3-5-sonnet-20241022"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        # Extract system message
        system = None
        non_system = []
        for m in messages:
            if m.role == "system":
                system = m.content
            else:
                non_system.append({"role": m.role, "content": m.content})
        
        response = await self.client.messages.create(
            model=model or self.default_model,
            messages=non_system,
            system=system,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        return LLMResponse(
            content=response.content[0].text,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        system = None
        non_system = []
        for m in messages:
            if m.role == "system":
                system = m.content
            else:
                non_system.append({"role": m.role, "content": m.content})
        
        async with self.client.messages.stream(
            model=model or self.default_model,
            messages=non_system,
            system=system,
            max_tokens=kwargs.get("max_tokens", 1000)
        ) as stream:
            async for text in stream.text_stream:
                yield text
    
    async def health_check(self) -> bool:
        try:
            # Simple completion to check
            await self.complete(
                [LLMMessage(role="user", content="hi")],
                max_tokens=5
            )
            return True
        except Exception:
            return False

class GroqClient(BaseLLMClient):
    """Ultra-fast inference with Groq"""
    provider = Provider.GROQ
    
    def __init__(self, api_key: str = None):
        from groq import AsyncGroq
        self.client = AsyncGroq(api_key=api_key)
        self.default_model = "llama-3.3-70b-versatile"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        response = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            temperature=temperature,
            max_tokens=max_tokens
        )
        
        return LLMResponse(
            content=response.choices[0].message.content,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(self, messages, model=None, **kwargs):
        stream = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            stream=True,
            **kwargs
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    async def health_check(self) -> bool:
        try:
            await self.client.models.list()
            return True
        except Exception:
            return False

Fallback Chain

from dataclasses import dataclass, field
from typing import List, Optional, Callable
import logging

@dataclass
class FallbackConfig:
    providers: List[BaseLLMClient]
    retry_on: tuple = (Exception,)
    max_retries_per_provider: int = 1
    on_fallback: Optional[Callable] = None

class FallbackChain:
    """Execute with automatic fallback between providers"""
    
    def __init__(self, config: FallbackConfig):
        self.config = config
        self.providers = config.providers
        self.logger = logging.getLogger(__name__)
    
    async def complete(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> LLMResponse:
        """Complete with fallback"""
        
        errors = []
        
        for provider in self.providers:
            for attempt in range(self.config.max_retries_per_provider):
                try:
                    self.logger.info(
                        f"Trying {provider.provider.value} "
                        f"(attempt {attempt + 1})"
                    )
                    
                    response = await provider.complete(messages, **kwargs)
                    
                    # Success
                    if errors:
                        self.logger.info(
                            f"Succeeded with {provider.provider.value} "
                            f"after {len(errors)} failures"
                        )
                    
                    return response
                
                except self.config.retry_on as e:
                    errors.append({
                        "provider": provider.provider.value,
                        "attempt": attempt + 1,
                        "error": str(e)
                    })
                    
                    self.logger.warning(
                        f"{provider.provider.value} failed: {e}"
                    )
                    
                    # Notify on fallback
                    if self.config.on_fallback:
                        self.config.on_fallback(provider.provider, e)
        
        # All providers failed
        raise Exception(
            f"All providers failed. Errors: {errors}"
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> AsyncIterator[str]:
        """Stream with fallback"""
        
        for provider in self.providers:
            try:
                async for chunk in provider.stream(messages, **kwargs):
                    yield chunk
                return
            except Exception as e:
                self.logger.warning(f"{provider.provider.value} failed: {e}")
                continue
        
        raise Exception("All providers failed for streaming")

# Usage
chain = FallbackChain(
    FallbackConfig(
        providers=[
            OpenAIClient(),
            AnthropicClient(),
            GroqClient()
        ],
        on_fallback=lambda p, e: print(f"Falling back from {p}: {e}")
    )
)

response = await chain.complete([
    LLMMessage(role="user", content="Hello!")
])

Intelligent Model Router

Route requests to optimal models based on task:
from dataclasses import dataclass
from typing import Dict, Callable, Optional
from enum import Enum

class TaskType(str, Enum):
    CODING = "coding"
    CREATIVE = "creative"
    ANALYSIS = "analysis"
    CHAT = "chat"
    SUMMARIZATION = "summarization"
    TRANSLATION = "translation"
    MATH = "math"
    FAST = "fast"  # Latency-critical

@dataclass
class ModelConfig:
    provider: Provider
    model: str
    max_tokens: int = 4000
    cost_per_1k_input: float = 0.01
    cost_per_1k_output: float = 0.03
    avg_latency_ms: float = 1000

class ModelRouter:
    """Route requests to optimal models"""
    
    # Default model recommendations per task
    TASK_MODELS: Dict[TaskType, list[ModelConfig]] = {
        TaskType.CODING: [
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022", 
                       cost_per_1k_input=0.003, cost_per_1k_output=0.015),
            ModelConfig(Provider.OPENAI, "gpt-4o",
                       cost_per_1k_input=0.0025, cost_per_1k_output=0.010),
        ],
        TaskType.CREATIVE: [
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
            ModelConfig(Provider.OPENAI, "gpt-4o"),
        ],
        TaskType.ANALYSIS: [
            ModelConfig(Provider.OPENAI, "gpt-4o"),
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
        ],
        TaskType.CHAT: [
            ModelConfig(Provider.OPENAI, "gpt-4o-mini",
                       cost_per_1k_input=0.00015, cost_per_1k_output=0.0006),
            ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
                       cost_per_1k_input=0.00059, cost_per_1k_output=0.00079),
        ],
        TaskType.FAST: [
            ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
                       avg_latency_ms=200),
            ModelConfig(Provider.GROQ, "mixtral-8x7b-32768",
                       avg_latency_ms=150),
        ],
        TaskType.MATH: [
            ModelConfig(Provider.OPENAI, "gpt-4o"),
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
        ]
    }
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        task_classifier: Optional[Callable[[str], TaskType]] = None
    ):
        self.clients = clients
        self.classifier = task_classifier or self._default_classifier
    
    def _default_classifier(self, prompt: str) -> TaskType:
        """Simple keyword-based classification"""
        prompt_lower = prompt.lower()
        
        if any(kw in prompt_lower for kw in ["code", "function", "debug", "program"]):
            return TaskType.CODING
        if any(kw in prompt_lower for kw in ["write", "story", "creative", "imagine"]):
            return TaskType.CREATIVE
        if any(kw in prompt_lower for kw in ["analyze", "data", "compare", "evaluate"]):
            return TaskType.ANALYSIS
        if any(kw in prompt_lower for kw in ["summarize", "summary", "tldr"]):
            return TaskType.SUMMARIZATION
        if any(kw in prompt_lower for kw in ["translate", "translation"]):
            return TaskType.TRANSLATION
        if any(kw in prompt_lower for kw in ["math", "calculate", "equation", "solve"]):
            return TaskType.MATH
        
        return TaskType.CHAT
    
    def get_models_for_task(
        self,
        task: TaskType,
        optimize_for: str = "quality"  # "quality", "cost", "speed"
    ) -> list[ModelConfig]:
        """Get ranked models for a task"""
        models = self.TASK_MODELS.get(task, self.TASK_MODELS[TaskType.CHAT])
        
        if optimize_for == "cost":
            return sorted(models, key=lambda m: m.cost_per_1k_input)
        elif optimize_for == "speed":
            return sorted(models, key=lambda m: m.avg_latency_ms)
        
        return models  # Default order is quality-optimized
    
    async def route(
        self,
        messages: List[LLMMessage],
        task: TaskType = None,
        optimize_for: str = "quality",
        **kwargs
    ) -> LLMResponse:
        """Route request to optimal model"""
        
        # Classify task if not provided
        if task is None:
            user_message = next(
                (m.content for m in messages if m.role == "user"),
                ""
            )
            task = self.classifier(user_message)
        
        # Get models for task
        models = self.get_models_for_task(task, optimize_for)
        
        # Try each model
        for model_config in models:
            client = self.clients.get(model_config.provider)
            if not client:
                continue
            
            try:
                return await client.complete(
                    messages,
                    model=model_config.model,
                    **kwargs
                )
            except Exception as e:
                logging.warning(f"Model {model_config.model} failed: {e}")
                continue
        
        raise Exception(f"No available model for task: {task}")

# Usage
router = ModelRouter(
    clients={
        Provider.OPENAI: OpenAIClient(),
        Provider.ANTHROPIC: AnthropicClient(),
        Provider.GROQ: GroqClient()
    }
)

# Automatic routing based on content
response = await router.route([
    LLMMessage(role="user", content="Write a Python function to sort a list")
])
# Routes to coding-optimized model

# Explicit task with optimization
response = await router.route(
    [LLMMessage(role="user", content="Quick question: what's 2+2?")],
    task=TaskType.FAST,
    optimize_for="speed"
)
# Routes to Groq for fastest response

Load Balancing

import random
from collections import defaultdict
from dataclasses import dataclass

@dataclass
class ProviderStats:
    requests: int = 0
    errors: int = 0
    total_latency: float = 0
    
    @property
    def avg_latency(self) -> float:
        return self.total_latency / self.requests if self.requests else 0
    
    @property
    def error_rate(self) -> float:
        return self.errors / self.requests if self.requests else 0

class LoadBalancer:
    """Load balance across providers"""
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        strategy: str = "round_robin"  # "round_robin", "random", "least_latency", "weighted"
    ):
        self.clients = clients
        self.strategy = strategy
        self.stats: Dict[Provider, ProviderStats] = defaultdict(ProviderStats)
        self.weights: Dict[Provider, float] = {p: 1.0 for p in clients.keys()}
        self._rr_index = 0
    
    def _select_round_robin(self) -> Provider:
        providers = list(self.clients.keys())
        provider = providers[self._rr_index % len(providers)]
        self._rr_index += 1
        return provider
    
    def _select_random(self) -> Provider:
        return random.choice(list(self.clients.keys()))
    
    def _select_least_latency(self) -> Provider:
        # Prefer providers with lower average latency
        available = [
            (p, self.stats[p].avg_latency)
            for p in self.clients.keys()
        ]
        # New providers get priority (latency=0)
        available.sort(key=lambda x: x[1])
        return available[0][0]
    
    def _select_weighted(self) -> Provider:
        providers = list(self.clients.keys())
        weights = [self.weights[p] for p in providers]
        return random.choices(providers, weights=weights)[0]
    
    def select_provider(self) -> Provider:
        """Select next provider based on strategy"""
        strategies = {
            "round_robin": self._select_round_robin,
            "random": self._select_random,
            "least_latency": self._select_least_latency,
            "weighted": self._select_weighted
        }
        return strategies[self.strategy]()
    
    async def complete(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> LLMResponse:
        """Complete with load balancing"""
        
        provider = self.select_provider()
        client = self.clients[provider]
        
        try:
            response = await client.complete(messages, **kwargs)
            
            # Update stats
            self.stats[provider].requests += 1
            self.stats[provider].total_latency += response.latency_ms
            
            return response
        
        except Exception as e:
            self.stats[provider].requests += 1
            self.stats[provider].errors += 1
            raise
    
    def set_weight(self, provider: Provider, weight: float):
        """Adjust provider weight"""
        self.weights[provider] = weight
    
    def get_stats(self) -> Dict[Provider, dict]:
        """Get provider statistics"""
        return {
            p: {
                "requests": s.requests,
                "errors": s.errors,
                "error_rate": f"{s.error_rate:.1%}",
                "avg_latency_ms": f"{s.avg_latency:.0f}"
            }
            for p, s in self.stats.items()
        }

# Usage
balancer = LoadBalancer(
    clients={
        Provider.OPENAI: OpenAIClient(),
        Provider.ANTHROPIC: AnthropicClient()
    },
    strategy="least_latency"
)

# Requests distributed across providers
for _ in range(100):
    response = await balancer.complete([
        LLMMessage(role="user", content="Hello!")
    ])

print(balancer.get_stats())

Cost-Based Routing

class CostOptimizedRouter:
    """Route based on cost constraints"""
    
    COSTS = {
        (Provider.OPENAI, "gpt-4o"): (0.0025, 0.010),
        (Provider.OPENAI, "gpt-4o-mini"): (0.00015, 0.0006),
        (Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"): (0.003, 0.015),
        (Provider.GROQ, "llama-3.3-70b-versatile"): (0.00059, 0.00079)
    }
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        daily_budget: float = 100.0,
        prefer_quality: bool = True
    ):
        self.clients = clients
        self.daily_budget = daily_budget
        self.prefer_quality = prefer_quality
        self.daily_spend = 0.0
        self.last_reset = datetime.now().date()
    
    def _reset_if_new_day(self):
        today = datetime.now().date()
        if today != self.last_reset:
            self.daily_spend = 0.0
            self.last_reset = today
    
    def estimate_cost(
        self,
        provider: Provider,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        costs = self.COSTS.get((provider, model), (0.01, 0.03))
        return (input_tokens / 1000 * costs[0]) + (output_tokens / 1000 * costs[1])
    
    def get_cheapest_option(
        self,
        estimated_input: int,
        estimated_output: int
    ) -> tuple[Provider, str]:
        """Get cheapest available option"""
        options = []
        
        for (provider, model), (in_cost, out_cost) in self.COSTS.items():
            if provider not in self.clients:
                continue
            
            cost = self.estimate_cost(
                provider, model, estimated_input, estimated_output
            )
            options.append((provider, model, cost))
        
        options.sort(key=lambda x: x[2])
        return options[0][0], options[0][1]
    
    async def complete(
        self,
        messages: List[LLMMessage],
        estimated_output: int = 500,
        **kwargs
    ) -> LLMResponse:
        """Route with cost awareness"""
        
        self._reset_if_new_day()
        
        # Estimate input tokens
        input_text = " ".join(m.content for m in messages)
        estimated_input = len(input_text) // 4
        
        remaining_budget = self.daily_budget - self.daily_spend
        
        # Select model based on budget
        if remaining_budget < self.daily_budget * 0.2:
            # Low budget - use cheapest
            provider, model = self.get_cheapest_option(
                estimated_input, estimated_output
            )
        elif self.prefer_quality:
            # Quality preference - use best
            provider, model = Provider.OPENAI, "gpt-4o"
        else:
            # Balanced
            provider, model = Provider.OPENAI, "gpt-4o-mini"
        
        client = self.clients[provider]
        response = await client.complete(messages, model=model, **kwargs)
        
        # Track spend
        actual_cost = self.estimate_cost(
            provider, model,
            response.usage["input_tokens"],
            response.usage["output_tokens"]
        )
        self.daily_spend += actual_cost
        
        return response

Key Takeaways

Unified Interface

Abstract providers behind a common interface for flexibility

Automatic Fallback

Chain providers so failures automatically try alternatives

Smart Routing

Route to optimal models based on task type and requirements

Cost Control

Implement budget controls and cost-optimized routing

What’s Next

Evaluation & Testing

Learn to evaluate and test your LLM applications