Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Production patterns for building resilient LLM applications with multi-provider fallback chains, intelligent routing, and cost-optimized model selection.

Why Multi-Provider Strategy?

Think of LLM providers like airlines. If you book every flight on one carrier and they cancel due to weather, you are stranded. Experienced travelers keep backup reservations on a different airline. Multi-provider LLM strategies work the same way — when OpenAI has an outage or Anthropic hits rate limits, your application gracefully re-routes to a healthy provider instead of showing users an error page.
Single Provider Risk              Multi-Provider Benefits
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Single point of failure           High availability
Rate limit bottlenecks            Load distribution
Price increases                   Cost optimization
Model deprecation                 Future-proofing
Quality issues                    Best model per task
Pitfall: Assuming providers fail independently. OpenAI and Azure OpenAI share infrastructure — an OpenAI outage often takes Azure OpenAI down with it. A real multi-provider strategy uses providers with genuinely separate infrastructure (e.g., OpenAI + Anthropic + Groq). Similarly, many “multi-provider” setups just wrap different models from the same vendor, which gives you zero additional resilience.

Unified LLM Interface

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List, Dict, Any, AsyncIterator
from enum import Enum

class Provider(str, Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    GOOGLE = "google"
    TOGETHER = "together"
    GROQ = "groq"
    LOCAL = "local"

@dataclass
class LLMMessage:
    role: str  # "system", "user", "assistant"
    content: str
    
    def to_openai(self) -> dict:
        return {"role": self.role, "content": self.content}
    
    def to_anthropic(self) -> dict:
        # Anthropic uses separate system parameter
        return {"role": self.role, "content": self.content}

@dataclass
class LLMResponse:
    content: str
    model: str
    provider: Provider
    usage: dict
    latency_ms: float
    raw_response: Any = None

class BaseLLMClient(ABC):
    """Abstract base for LLM clients.
    
    Every provider (OpenAI, Anthropic, Groq, etc.) implements this
    interface so the rest of your code never knows or cares which
    provider is actually fulfilling a request -- just like how a
    wall outlet gives you power regardless of the power plant type.
    """
    
    provider: Provider
    
    @abstractmethod
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        pass
    
    @abstractmethod
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        pass
    
    @abstractmethod
    async def health_check(self) -> bool:
        pass

Provider Implementations

from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
import time

class OpenAIClient(BaseLLMClient):
    provider = Provider.OPENAI
    
    def __init__(self, api_key: str = None):
        self.client = AsyncOpenAI(api_key=api_key)
        self.default_model = "gpt-4o"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        response = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs
        )
        
        return LLMResponse(
            content=response.choices[0].message.content,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        stream = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            stream=True,
            **kwargs
        )
        
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    async def health_check(self) -> bool:
        try:
            await self.client.models.list()
            return True
        except Exception:
            return False

class AnthropicClient(BaseLLMClient):
    provider = Provider.ANTHROPIC
    
    def __init__(self, api_key: str = None):
        self.client = AsyncAnthropic(api_key=api_key)
        self.default_model = "claude-3-5-sonnet-20241022"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        # Extract system message
        system = None
        non_system = []
        for m in messages:
            if m.role == "system":
                system = m.content
            else:
                non_system.append({"role": m.role, "content": m.content})
        
        response = await self.client.messages.create(
            model=model or self.default_model,
            messages=non_system,
            system=system,
            max_tokens=max_tokens,
            temperature=temperature
        )
        
        return LLMResponse(
            content=response.content[0].text,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.input_tokens,
                "output_tokens": response.usage.output_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        model: str = None,
        **kwargs
    ) -> AsyncIterator[str]:
        system = None
        non_system = []
        for m in messages:
            if m.role == "system":
                system = m.content
            else:
                non_system.append({"role": m.role, "content": m.content})
        
        async with self.client.messages.stream(
            model=model or self.default_model,
            messages=non_system,
            system=system,
            max_tokens=kwargs.get("max_tokens", 1000)
        ) as stream:
            async for text in stream.text_stream:
                yield text
    
    async def health_check(self) -> bool:
        try:
            # Simple completion to check
            await self.complete(
                [LLMMessage(role="user", content="hi")],
                max_tokens=5
            )
            return True
        except Exception:
            return False

class GroqClient(BaseLLMClient):
    """Ultra-fast inference with Groq"""
    provider = Provider.GROQ
    
    def __init__(self, api_key: str = None):
        from groq import AsyncGroq
        self.client = AsyncGroq(api_key=api_key)
        self.default_model = "llama-3.3-70b-versatile"
    
    async def complete(
        self,
        messages: List[LLMMessage],
        model: str = None,
        temperature: float = 0.7,
        max_tokens: int = 1000,
        **kwargs
    ) -> LLMResponse:
        start = time.time()
        
        response = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            temperature=temperature,
            max_tokens=max_tokens
        )
        
        return LLMResponse(
            content=response.choices[0].message.content,
            model=response.model,
            provider=self.provider,
            usage={
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens
            },
            latency_ms=(time.time() - start) * 1000,
            raw_response=response
        )
    
    async def stream(self, messages, model=None, **kwargs):
        stream = await self.client.chat.completions.create(
            model=model or self.default_model,
            messages=[m.to_openai() for m in messages],
            stream=True,
            **kwargs
        )
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    
    async def health_check(self) -> bool:
        try:
            await self.client.models.list()
            return True
        except Exception:
            return False

Fallback Chain

from dataclasses import dataclass, field
from typing import List, Optional, Callable
import logging

@dataclass
class FallbackConfig:
    providers: List[BaseLLMClient]
    retry_on: tuple = (Exception,)
    max_retries_per_provider: int = 1
    on_fallback: Optional[Callable] = None

class FallbackChain:
    """Execute with automatic fallback between providers.
    
    Works like a phone tree: try the first provider, and if it
    does not pick up (error, timeout, rate limit), move on to
    the next one. Each provider gets max_retries_per_provider
    attempts before the chain advances.
    """
    
    def __init__(self, config: FallbackConfig):
        self.config = config
        self.providers = config.providers
        self.logger = logging.getLogger(__name__)
    
    async def complete(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> LLMResponse:
        """Complete with fallback"""
        
        errors = []
        
        for provider in self.providers:
            for attempt in range(self.config.max_retries_per_provider):
                try:
                    self.logger.info(
                        f"Trying {provider.provider.value} "
                        f"(attempt {attempt + 1})"
                    )
                    
                    response = await provider.complete(messages, **kwargs)
                    
                    # Success
                    if errors:
                        self.logger.info(
                            f"Succeeded with {provider.provider.value} "
                            f"after {len(errors)} failures"
                        )
                    
                    return response
                
                except self.config.retry_on as e:
                    errors.append({
                        "provider": provider.provider.value,
                        "attempt": attempt + 1,
                        "error": str(e)
                    })
                    
                    self.logger.warning(
                        f"{provider.provider.value} failed: {e}"
                    )
                    
                    # Notify on fallback
                    if self.config.on_fallback:
                        self.config.on_fallback(provider.provider, e)
        
        # All providers failed
        raise Exception(
            f"All providers failed. Errors: {errors}"
        )
    
    async def stream(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> AsyncIterator[str]:
        """Stream with fallback"""
        
        for provider in self.providers:
            try:
                async for chunk in provider.stream(messages, **kwargs):
                    yield chunk
                return
            except Exception as e:
                self.logger.warning(f"{provider.provider.value} failed: {e}")
                continue
        
        raise Exception("All providers failed for streaming")

# Usage
chain = FallbackChain(
    FallbackConfig(
        providers=[
            OpenAIClient(),
            AnthropicClient(),
            GroqClient()
        ],
        on_fallback=lambda p, e: print(f"Falling back from {p}: {e}")
    )
)

response = await chain.complete([
    LLMMessage(role="user", content="Hello!")
])

Intelligent Model Router

Route requests to optimal models based on task. This is analogous to a hospital triage system — a broken finger goes to the ER nurse, a cardiac arrest goes to the trauma surgeon. Cheap tasks get cheap models; hard reasoning tasks get expensive frontier models. The router makes the call so every individual request handler does not have to.
A senior engineer would say: “Our routing layer saved us 60% on LLM costs by sending simple classification tasks to gpt-4o-mini instead of burning gpt-4o tokens on them. The quality difference on those tasks was negligible.”
from dataclasses import dataclass
from typing import Dict, Callable, Optional
from enum import Enum

class TaskType(str, Enum):
    CODING = "coding"
    CREATIVE = "creative"
    ANALYSIS = "analysis"
    CHAT = "chat"
    SUMMARIZATION = "summarization"
    TRANSLATION = "translation"
    MATH = "math"
    FAST = "fast"  # Latency-critical

@dataclass
class ModelConfig:
    provider: Provider
    model: str
    max_tokens: int = 4000
    cost_per_1k_input: float = 0.01
    cost_per_1k_output: float = 0.03
    avg_latency_ms: float = 1000

class ModelRouter:
    """Route requests to optimal models.
    
    Maps task types to ranked model lists. When a request arrives,
    the router classifies the task (coding, creative, math, etc.)
    and tries models in ranked order -- best quality first by
    default, or cheapest/fastest if you optimize for those axes.
    """
    
    # Default model recommendations per task
    TASK_MODELS: Dict[TaskType, list[ModelConfig]] = {
        TaskType.CODING: [
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022", 
                       cost_per_1k_input=0.003, cost_per_1k_output=0.015),
            ModelConfig(Provider.OPENAI, "gpt-4o",
                       cost_per_1k_input=0.0025, cost_per_1k_output=0.010),
        ],
        TaskType.CREATIVE: [
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
            ModelConfig(Provider.OPENAI, "gpt-4o"),
        ],
        TaskType.ANALYSIS: [
            ModelConfig(Provider.OPENAI, "gpt-4o"),
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
        ],
        TaskType.CHAT: [
            ModelConfig(Provider.OPENAI, "gpt-4o-mini",
                       cost_per_1k_input=0.00015, cost_per_1k_output=0.0006),
            ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
                       cost_per_1k_input=0.00059, cost_per_1k_output=0.00079),
        ],
        TaskType.FAST: [
            ModelConfig(Provider.GROQ, "llama-3.3-70b-versatile",
                       avg_latency_ms=200),
            ModelConfig(Provider.GROQ, "mixtral-8x7b-32768",
                       avg_latency_ms=150),
        ],
        TaskType.MATH: [
            ModelConfig(Provider.OPENAI, "gpt-4o"),
            ModelConfig(Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"),
        ]
    }
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        task_classifier: Optional[Callable[[str], TaskType]] = None
    ):
        self.clients = clients
        self.classifier = task_classifier or self._default_classifier
    
    def _default_classifier(self, prompt: str) -> TaskType:
        """Simple keyword-based classification.
        
        Production pitfall: keyword matching is fragile. A prompt like
        "explain the code review process" triggers CODING even though
        it is a general knowledge question. Consider using a small
        classifier model (gpt-4o-mini with structured output) or
        an embedding-based classifier for real deployments.
        """
        prompt_lower = prompt.lower()
        
        if any(kw in prompt_lower for kw in ["code", "function", "debug", "program"]):
            return TaskType.CODING
        if any(kw in prompt_lower for kw in ["write", "story", "creative", "imagine"]):
            return TaskType.CREATIVE
        if any(kw in prompt_lower for kw in ["analyze", "data", "compare", "evaluate"]):
            return TaskType.ANALYSIS
        if any(kw in prompt_lower for kw in ["summarize", "summary", "tldr"]):
            return TaskType.SUMMARIZATION
        if any(kw in prompt_lower for kw in ["translate", "translation"]):
            return TaskType.TRANSLATION
        if any(kw in prompt_lower for kw in ["math", "calculate", "equation", "solve"]):
            return TaskType.MATH
        
        return TaskType.CHAT
    
    def get_models_for_task(
        self,
        task: TaskType,
        optimize_for: str = "quality"  # "quality", "cost", "speed"
    ) -> list[ModelConfig]:
        """Get ranked models for a task"""
        models = self.TASK_MODELS.get(task, self.TASK_MODELS[TaskType.CHAT])
        
        if optimize_for == "cost":
            return sorted(models, key=lambda m: m.cost_per_1k_input)
        elif optimize_for == "speed":
            return sorted(models, key=lambda m: m.avg_latency_ms)
        
        return models  # Default order is quality-optimized
    
    async def route(
        self,
        messages: List[LLMMessage],
        task: TaskType = None,
        optimize_for: str = "quality",
        **kwargs
    ) -> LLMResponse:
        """Route request to optimal model"""
        
        # Classify task if not provided
        if task is None:
            user_message = next(
                (m.content for m in messages if m.role == "user"),
                ""
            )
            task = self.classifier(user_message)
        
        # Get models for task
        models = self.get_models_for_task(task, optimize_for)
        
        # Try each model
        for model_config in models:
            client = self.clients.get(model_config.provider)
            if not client:
                continue
            
            try:
                return await client.complete(
                    messages,
                    model=model_config.model,
                    **kwargs
                )
            except Exception as e:
                logging.warning(f"Model {model_config.model} failed: {e}")
                continue
        
        raise Exception(f"No available model for task: {task}")

# Usage
router = ModelRouter(
    clients={
        Provider.OPENAI: OpenAIClient(),
        Provider.ANTHROPIC: AnthropicClient(),
        Provider.GROQ: GroqClient()
    }
)

# Automatic routing based on content
response = await router.route([
    LLMMessage(role="user", content="Write a Python function to sort a list")
])
# Routes to coding-optimized model

# Explicit task with optimization
response = await router.route(
    [LLMMessage(role="user", content="Quick question: what's 2+2?")],
    task=TaskType.FAST,
    optimize_for="speed"
)
# Routes to Groq for fastest response

Load Balancing

Load balancing distributes requests across providers the same way a load balancer distributes HTTP traffic across web servers. The goal is to avoid hammering a single provider to the point where you hit rate limits or experience degraded latency while other providers sit idle.
Pitfall: Ignoring provider rate limits in your balancer. Round-robin works when all providers have equal capacity, but OpenAI might allow 3,500 RPM while Groq allows 30 RPM. A capacity-unaware balancer will overwhelm the smaller provider immediately. Always factor in per-provider rate limits when distributing traffic.
import random
from collections import defaultdict
from dataclasses import dataclass

@dataclass
class ProviderStats:
    requests: int = 0
    errors: int = 0
    total_latency: float = 0
    
    @property
    def avg_latency(self) -> float:
        return self.total_latency / self.requests if self.requests else 0
    
    @property
    def error_rate(self) -> float:
        return self.errors / self.requests if self.requests else 0

class LoadBalancer:
    """Load balance across providers.
    
    Supports four strategies:
    - round_robin: Simple rotation, good for equal-capacity providers
    - random: Statistical distribution, avoids thundering herd
    - least_latency: Routes to the historically fastest provider
    - weighted: Manual control, useful when providers differ in capacity
    """
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        strategy: str = "round_robin"  # "round_robin", "random", "least_latency", "weighted"
    ):
        self.clients = clients
        self.strategy = strategy
        self.stats: Dict[Provider, ProviderStats] = defaultdict(ProviderStats)
        self.weights: Dict[Provider, float] = {p: 1.0 for p in clients.keys()}
        self._rr_index = 0
    
    def _select_round_robin(self) -> Provider:
        providers = list(self.clients.keys())
        provider = providers[self._rr_index % len(providers)]
        self._rr_index += 1
        return provider
    
    def _select_random(self) -> Provider:
        return random.choice(list(self.clients.keys()))
    
    def _select_least_latency(self) -> Provider:
        # Prefer providers with lower average latency
        available = [
            (p, self.stats[p].avg_latency)
            for p in self.clients.keys()
        ]
        # New providers get priority (latency=0)
        available.sort(key=lambda x: x[1])
        return available[0][0]
    
    def _select_weighted(self) -> Provider:
        providers = list(self.clients.keys())
        weights = [self.weights[p] for p in providers]
        return random.choices(providers, weights=weights)[0]
    
    def select_provider(self) -> Provider:
        """Select next provider based on strategy"""
        strategies = {
            "round_robin": self._select_round_robin,
            "random": self._select_random,
            "least_latency": self._select_least_latency,
            "weighted": self._select_weighted
        }
        return strategies[self.strategy]()
    
    async def complete(
        self,
        messages: List[LLMMessage],
        **kwargs
    ) -> LLMResponse:
        """Complete with load balancing"""
        
        provider = self.select_provider()
        client = self.clients[provider]
        
        try:
            response = await client.complete(messages, **kwargs)
            
            # Update stats
            self.stats[provider].requests += 1
            self.stats[provider].total_latency += response.latency_ms
            
            return response
        
        except Exception as e:
            self.stats[provider].requests += 1
            self.stats[provider].errors += 1
            raise
    
    def set_weight(self, provider: Provider, weight: float):
        """Adjust provider weight"""
        self.weights[provider] = weight
    
    def get_stats(self) -> Dict[Provider, dict]:
        """Get provider statistics"""
        return {
            p: {
                "requests": s.requests,
                "errors": s.errors,
                "error_rate": f"{s.error_rate:.1%}",
                "avg_latency_ms": f"{s.avg_latency:.0f}"
            }
            for p, s in self.stats.items()
        }

# Usage
balancer = LoadBalancer(
    clients={
        Provider.OPENAI: OpenAIClient(),
        Provider.ANTHROPIC: AnthropicClient()
    },
    strategy="least_latency"
)

# Requests distributed across providers
for _ in range(100):
    response = await balancer.complete([
        LLMMessage(role="user", content="Hello!")
    ])

print(balancer.get_stats())

Cost-Based Routing

Think of this like a household budget. Early in the month you eat at restaurants (premium models); as the budget runs low you switch to home cooking (cheap models). The router tracks daily spend and automatically downgrades model quality to stay within budget — your application keeps working, just with a thriftier model behind the scenes.
class CostOptimizedRouter:
    """Route based on cost constraints.
    
    Tracks daily spend and switches to cheaper models when the
    budget runs low. At less than 20% remaining budget, it forces
    the cheapest available model regardless of quality preference.
    """
    
    COSTS = {
        (Provider.OPENAI, "gpt-4o"): (0.0025, 0.010),
        (Provider.OPENAI, "gpt-4o-mini"): (0.00015, 0.0006),
        (Provider.ANTHROPIC, "claude-3-5-sonnet-20241022"): (0.003, 0.015),
        (Provider.GROQ, "llama-3.3-70b-versatile"): (0.00059, 0.00079)
    }
    
    def __init__(
        self,
        clients: Dict[Provider, BaseLLMClient],
        daily_budget: float = 100.0,
        prefer_quality: bool = True
    ):
        self.clients = clients
        self.daily_budget = daily_budget
        self.prefer_quality = prefer_quality
        self.daily_spend = 0.0
        self.last_reset = datetime.now().date()
    
    def _reset_if_new_day(self):
        today = datetime.now().date()
        if today != self.last_reset:
            self.daily_spend = 0.0
            self.last_reset = today
    
    def estimate_cost(
        self,
        provider: Provider,
        model: str,
        input_tokens: int,
        output_tokens: int
    ) -> float:
        costs = self.COSTS.get((provider, model), (0.01, 0.03))
        return (input_tokens / 1000 * costs[0]) + (output_tokens / 1000 * costs[1])
    
    def get_cheapest_option(
        self,
        estimated_input: int,
        estimated_output: int
    ) -> tuple[Provider, str]:
        """Get cheapest available option"""
        options = []
        
        for (provider, model), (in_cost, out_cost) in self.COSTS.items():
            if provider not in self.clients:
                continue
            
            cost = self.estimate_cost(
                provider, model, estimated_input, estimated_output
            )
            options.append((provider, model, cost))
        
        options.sort(key=lambda x: x[2])
        return options[0][0], options[0][1]
    
    async def complete(
        self,
        messages: List[LLMMessage],
        estimated_output: int = 500,
        **kwargs
    ) -> LLMResponse:
        """Route with cost awareness"""
        
        self._reset_if_new_day()
        
        # Estimate input tokens
        input_text = " ".join(m.content for m in messages)
        estimated_input = len(input_text) // 4
        
        remaining_budget = self.daily_budget - self.daily_spend
        
        # Select model based on budget
        if remaining_budget < self.daily_budget * 0.2:
            # Low budget - use cheapest
            provider, model = self.get_cheapest_option(
                estimated_input, estimated_output
            )
        elif self.prefer_quality:
            # Quality preference - use best
            provider, model = Provider.OPENAI, "gpt-4o"
        else:
            # Balanced
            provider, model = Provider.OPENAI, "gpt-4o-mini"
        
        client = self.clients[provider]
        response = await client.complete(messages, model=model, **kwargs)
        
        # Track spend
        actual_cost = self.estimate_cost(
            provider, model,
            response.usage["input_tokens"],
            response.usage["output_tokens"]
        )
        self.daily_spend += actual_cost
        
        return response

Common Pitfalls

Different models have different strengths, system prompt formats, and token limits. A prompt optimized for GPT-4o may produce poor results on Claude or Llama. When falling back to a different provider, consider maintaining provider-specific prompt templates rather than sending identical messages everywhere.
If your fallback chain silently succeeds via the third provider 40% of the time, you are paying premium latency and possibly lower quality without anyone noticing. Always log and alert on fallback events. A healthy system should hit the primary provider 95%+ of the time.
When a provider refuses a request due to content policy (not a transient error), retrying on the next provider is wasteful — most providers have similar content policies. Distinguish between transient errors (rate limits, timeouts, 500s) and deterministic rejections (400s, content flags) and only fall back on the former.
Your primary model might be GPT-4o and your fallback might be Llama 3.3 70B. The fallback keeps you online, but the quality delta could break downstream parsing or user expectations. Consider adding a quality gate after fallback responses, or at minimum flagging to the user that a fallback model was used.

Key Takeaways

Unified Interface

Abstract providers behind a common interface for flexibility

Automatic Fallback

Chain providers so failures automatically try alternatives

Smart Routing

Route to optimal models based on task type and requirements

Cost Control

Implement budget controls and cost-optimized routing

What’s Next

Evaluation & Testing

Learn to evaluate and test your LLM applications

Interview Deep-Dive

Strong Answer:
  • The first principle is that a real multi-provider strategy requires providers with genuinely independent infrastructure. OpenAI and Azure OpenAI share underlying infrastructure, so they are not truly independent — an OpenAI outage frequently takes Azure OpenAI down with it. A real fallback chain looks like OpenAI as primary, Anthropic as secondary, Groq as tertiary. These run on completely separate infrastructure stacks.
  • The architecture starts with a unified interface: an abstract BaseLLMClient class that every provider implements with the same complete(), stream(), and health_check() methods. This abstraction is essential because each provider has different message formats (Anthropic separates system messages, OpenAI includes them in the messages array), different response structures, and different error types. The adapter layer normalizes these differences so the rest of my application is provider-agnostic.
  • The fallback chain wraps a list of these clients and tries them in priority order. When the primary provider raises an exception, it catches it, logs the failure, and tries the next provider. I configure retry behavior per provider: 1 retry with 2-second backoff for rate limit errors (which are usually transient), 0 retries for authentication errors (permanent), and 1 retry for timeout errors. Each provider also has a circuit breaker that trips after 3 consecutive failures within a 60-second window. Once tripped, the circuit breaker skips that provider entirely for 30 seconds before testing it again with a probe request.
  • The critical thing most implementations miss is that fallback is not free. Different providers produce different quality outputs, have different token limits, and support different features. If my primary is GPT-4o with function calling and my fallback is Llama-3 on Groq, the fallback might not support structured function calling. I design the fallback to gracefully degrade: the core text generation works on all providers, but advanced features like function calling or structured output might only be available on the primary. The application handles this by checking provider capabilities before making feature-dependent calls.
Red Flags: Candidate lists OpenAI and Azure OpenAI as independent fallbacks (they share infrastructure), does not mention circuit breaker patterns, or does not consider the capability differences between providers.Follow-up: How do you test your fallback system without waiting for actual provider outages?I test at three levels. First, unit tests with mock clients that simulate specific failure modes: timeout after 5 seconds, rate limit 429 response, authentication error, malformed response body, connection refused. I verify that each failure mode triggers the correct fallback behavior and that the error is logged with the right severity. Second, integration tests with a chaos proxy that sits between my application and the provider APIs. The proxy randomly injects failures at a configurable rate (10% of requests fail with 500, 5% timeout) so I can verify fallback behavior under realistic conditions. Third, a monthly “game day” where I literally block the primary provider at the network level (DNS blackhole or firewall rule) and observe the application’s behavior in a staging environment. The first time we ran this, we discovered our circuit breaker was not tripping fast enough — it took 45 seconds of failures before routing to the secondary, during which 200 users got errors. We tightened the threshold from 5 failures to 3 and the failover time dropped to under 10 seconds.
Strong Answer:
  • The model router is essentially a task classifier followed by a model selection strategy. The classifier determines the task type (coding, creative, analysis, chat, math, fast-lookup), and the selector picks the optimal model for that task based on configurable priorities: quality, cost, or speed.
  • For the classifier, I have used three approaches with increasing sophistication. Keyword matching works for prototypes: if the prompt contains “code,” “function,” “debug,” route to the coding model. It is fast (microseconds) but brittle — “write a story about a coder” gets misrouted. Embedding-based classification is my production default: I compute the query embedding and compare it against precomputed centroids for each task category. This handles paraphrases and costs a fraction of a cent per classification. LLM-based classification (asking a fast model “what type of task is this?”) is most accurate but adds 200-500ms of latency per request, so I only use it for high-stakes routing decisions.
  • The model selection maintains a configuration table mapping task types to ranked model lists. Coding goes to Claude Sonnet first (strong at code), then GPT-4o as fallback. Creative writing goes to Claude first (strong at creative), then GPT-4o. Fast lookups go to Groq’s Llama-3 (sub-200ms latency) first, then GPT-4o-mini. The ranking can be optimized for different dimensions: “quality” uses the default ranking, “cost” sorts by price per token, “speed” sorts by average latency.
  • The key production concern is that the router itself should not be a single point of failure. If the classifier errors or returns an unknown task type, the fallback is a default model (GPT-4o-mini for cost efficiency, or GPT-4o for quality). I also track routing decisions in production metrics so I can detect when the classifier starts misrouting — for example, if coding questions are being sent to the cheap chat model and users report quality degradation.
  • At one company, the router reduced our LLM costs by 40% because 60% of requests were simple chat queries that did not need GPT-4o. Routing those to GPT-4o-mini at 17x cheaper tokens made a huge dent in the bill without measurable quality loss on those specific query types.
Red Flags: Candidate routes everything through the same model, does not consider the latency cost of classification, or builds a router without a default fallback for unclassified queries.Follow-up: How do you evaluate whether your routing decisions are actually optimal — maybe Claude is better at coding than you think, or GPT-4o is better for creative tasks?I run periodic routing evaluation experiments. I take a sample of 200 production queries per task category and run them through every available model, then use pairwise LLM-as-Judge comparison to rank model quality per task type. This produces an empirical “model quality matrix” — task type versus model, with quality scores in each cell. I update the routing table based on this matrix quarterly. The evaluation costs about $50-100 to run (200 queries times 4-5 models times judge call cost) and has changed my routing decisions multiple times. For example, we assumed GPT-4o was best for analysis tasks, but the evaluation showed Claude Sonnet actually scored higher on data analysis questions because it produced more structured, actionable outputs. We swapped the ranking and user satisfaction scores improved. The other thing I monitor is per-route user satisfaction: if a specific task-to-model route has consistently lower user ratings than others, it is a signal that the routing is suboptimal for that category.
Strong Answer:
  • Cost-based routing is a dynamic optimization problem: early in the day when budget is plentiful, I route to the best-quality model. As the budget depletes, I progressively downgrade to cheaper models to ensure the application stays within budget and does not go dark at 3pm.
  • The implementation tracks daily spend in a fast datastore (Redis counter with daily TTL). Each request estimates its cost before execution (input token count estimate from character count divided by 4, multiplied by model pricing per token). The routing logic has three tiers based on remaining budget percentage. Above 80% remaining: route to the quality-optimal model for the task. Between 20-80% remaining: route to a balanced model (GPT-4o-mini instead of GPT-4o). Below 20% remaining: route to the cheapest available option (Groq Llama-3 or a self-hosted model). At 0% remaining, I either reject requests with a clear error or serve from cache only.
  • The nuance is that not all requests are equal in business value. A paying enterprise customer’s request should always get the best model, even when budget is tight. I implement a priority system: critical requests (enterprise tier, revenue-impacting features) always get quality routing, while low-priority requests (free tier, internal tooling) absorb the budget cuts first. This means the budget depletion curve affects different user segments at different thresholds.
  • After execution, I update the daily spend with the actual cost (from the API response’s usage field, not the estimate). The discrepancy between estimated and actual cost is usually under 20%, but I track it to catch cases where the model generates unexpectedly long outputs that blow the estimate.
  • One lesson from production: the budget needs a reserve. If I set a hard 100/daybudget,Iactuallyconfigurethesystemtostartdowngradingat100/day budget, I actually configure the system to start downgrading at 80 and halt at 95,keeping95, keeping 5 in reserve for critical requests and retries. Without the reserve, a burst of expensive requests at 95% budget can overshoot before the system reacts.
Red Flags: Candidate implements a hard cutoff at budget exhaustion without graceful degradation, does not differentiate between high-value and low-value requests, or forgets to account for estimation errors in the cost tracking.Follow-up: What happens when a fallback to a cheaper model produces noticeably worse quality? How do you handle user experience during budget-constrained periods?Transparency and expectation-setting are key. When the system routes to a cheaper model, I add metadata to the API response indicating the service tier: "model_tier": "standard" versus "model_tier": "premium". The frontend can use this to show a subtle indicator like “Using fast mode” or “Responses may be shorter during peak usage.” This is better than silently degrading quality and having users think the product is broken. For chat applications, I also adjust the system prompt for cheaper models to compensate: simpler instructions, fewer examples, more explicit constraints on output format. A GPT-4o-mini with a well-tuned prompt often matches GPT-4o with a generic prompt for straightforward tasks. The other strategy is strategic caching: during high-budget periods, I aggressively cache responses for common queries. During low-budget periods, the cache hit rate is higher because I have been warming it all day, which reduces the number of live API calls needed and effectively stretches the remaining budget.