Skip to main content
Effective logging is essential for debugging, monitoring, and auditing LLM applications in production.

Structured Logging Setup

Using structlog

import structlog
from datetime import datetime
from typing import Any
import json

# Configure structlog
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    wrapper_class=structlog.stdlib.BoundLogger,
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True
)

logger = structlog.get_logger()

# Logging LLM requests
def log_llm_request(
    model: str,
    messages: list,
    response: Any,
    latency_ms: float,
    request_id: str
):
    """Log LLM request with structured data"""
    
    logger.info(
        "llm_request",
        request_id=request_id,
        model=model,
        message_count=len(messages),
        input_tokens=response.usage.prompt_tokens,
        output_tokens=response.usage.completion_tokens,
        total_tokens=response.usage.total_tokens,
        latency_ms=latency_ms,
        finish_reason=response.choices[0].finish_reason
    )

Custom LLM Logger

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, List, Dict, Any
import uuid
import json

@dataclass
class LLMLogEntry:
    request_id: str
    timestamp: str
    model: str
    provider: str
    operation: str  # "completion", "embedding", "function_call"
    
    # Request details
    messages: Optional[List[Dict]] = None
    prompt_tokens: int = 0
    
    # Response details
    completion_tokens: int = 0
    finish_reason: str = None
    function_calls: List[Dict] = None
    
    # Performance
    latency_ms: float = 0
    
    # Metadata
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    tags: Dict[str, str] = None
    
    # Error
    error: Optional[str] = None
    error_type: Optional[str] = None

class LLMLogger:
    """Comprehensive LLM request logger"""
    
    def __init__(
        self,
        log_file: str = None,
        include_messages: bool = False,  # Privacy consideration
        include_responses: bool = False
    ):
        self.log_file = log_file
        self.include_messages = include_messages
        self.include_responses = include_responses
        self.logger = structlog.get_logger()
    
    def log_request(
        self,
        model: str,
        provider: str,
        messages: list,
        response: Any,
        latency_ms: float,
        user_id: str = None,
        session_id: str = None,
        tags: dict = None
    ) -> LLMLogEntry:
        """Log a successful LLM request"""
        
        request_id = str(uuid.uuid4())
        
        entry = LLMLogEntry(
            request_id=request_id,
            timestamp=datetime.utcnow().isoformat(),
            model=model,
            provider=provider,
            operation="completion",
            prompt_tokens=response.usage.prompt_tokens,
            completion_tokens=response.usage.completion_tokens,
            finish_reason=response.choices[0].finish_reason,
            latency_ms=latency_ms,
            user_id=user_id,
            session_id=session_id,
            tags=tags
        )
        
        if self.include_messages:
            entry.messages = messages
        
        # Check for function calls
        if response.choices[0].message.tool_calls:
            entry.function_calls = [
                {
                    "name": tc.function.name,
                    "arguments": tc.function.arguments
                }
                for tc in response.choices[0].message.tool_calls
            ]
            entry.operation = "function_call"
        
        self._write_log(entry)
        
        return entry
    
    def log_error(
        self,
        model: str,
        provider: str,
        messages: list,
        error: Exception,
        latency_ms: float,
        user_id: str = None
    ) -> LLMLogEntry:
        """Log a failed LLM request"""
        
        entry = LLMLogEntry(
            request_id=str(uuid.uuid4()),
            timestamp=datetime.utcnow().isoformat(),
            model=model,
            provider=provider,
            operation="completion",
            latency_ms=latency_ms,
            user_id=user_id,
            error=str(error),
            error_type=type(error).__name__
        )
        
        if self.include_messages:
            entry.messages = messages
        
        self._write_log(entry, level="error")
        
        return entry
    
    def _write_log(self, entry: LLMLogEntry, level: str = "info"):
        """Write log entry"""
        log_data = asdict(entry)
        
        # Remove None values
        log_data = {k: v for k, v in log_data.items() if v is not None}
        
        if level == "error":
            self.logger.error("llm_request", **log_data)
        else:
            self.logger.info("llm_request", **log_data)
        
        if self.log_file:
            with open(self.log_file, "a") as f:
                f.write(json.dumps(log_data) + "\n")

Request Tracing

from contextvars import ContextVar
from typing import Optional
import uuid

# Context variables for request tracing
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
span_id_var: ContextVar[str] = ContextVar("span_id", default="")

class TracingContext:
    """Manage distributed tracing context"""
    
    @staticmethod
    def new_trace() -> str:
        """Start a new trace"""
        trace_id = str(uuid.uuid4())
        trace_id_var.set(trace_id)
        return trace_id
    
    @staticmethod
    def new_span() -> str:
        """Create a new span within current trace"""
        span_id = str(uuid.uuid4())[:8]
        span_id_var.set(span_id)
        return span_id
    
    @staticmethod
    def get_context() -> dict:
        """Get current tracing context"""
        return {
            "request_id": request_id_var.get(),
            "trace_id": trace_id_var.get(),
            "span_id": span_id_var.get()
        }

class TracedLLMClient:
    """LLM client with distributed tracing"""
    
    def __init__(self, client, logger: LLMLogger):
        self.client = client
        self.logger = logger
    
    async def complete(
        self,
        messages: list,
        model: str = "gpt-4o",
        **kwargs
    ):
        """Complete with tracing"""
        import time
        
        span_id = TracingContext.new_span()
        context = TracingContext.get_context()
        
        start = time.time()
        
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            
            latency = (time.time() - start) * 1000
            
            self.logger.log_request(
                model=model,
                provider="openai",
                messages=messages,
                response=response,
                latency_ms=latency,
                tags={
                    "trace_id": context["trace_id"],
                    "span_id": span_id
                }
            )
            
            return response
            
        except Exception as e:
            latency = (time.time() - start) * 1000
            
            self.logger.log_error(
                model=model,
                provider="openai",
                messages=messages,
                error=e,
                latency_ms=latency
            )
            raise

# FastAPI middleware for tracing
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

class TracingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Extract or create trace ID
        trace_id = request.headers.get("X-Trace-ID") or str(uuid.uuid4())
        request_id = str(uuid.uuid4())
        
        trace_id_var.set(trace_id)
        request_id_var.set(request_id)
        
        response = await call_next(request)
        
        response.headers["X-Trace-ID"] = trace_id
        response.headers["X-Request-ID"] = request_id
        
        return response

app = FastAPI()
app.add_middleware(TracingMiddleware)

Audit Logging

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional, Any
import hashlib
import json

class AuditAction(str, Enum):
    QUERY = "query"
    COMPLETION = "completion"
    MODERATION = "moderation"
    CONTENT_FILTER = "content_filter"
    DATA_ACCESS = "data_access"
    CONFIG_CHANGE = "config_change"

@dataclass
class AuditEntry:
    timestamp: str
    action: AuditAction
    user_id: str
    resource: str
    details: dict
    ip_address: Optional[str] = None
    user_agent: Optional[str] = None
    content_hash: Optional[str] = None  # Hash of content for verification

class AuditLogger:
    """Audit logging for compliance and security"""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
    
    def _hash_content(self, content: str) -> str:
        """Create SHA-256 hash of content"""
        return hashlib.sha256(content.encode()).hexdigest()
    
    def log_query(
        self,
        user_id: str,
        query: str,
        model: str,
        response_preview: str = None,
        ip_address: str = None,
        user_agent: str = None
    ):
        """Log user query"""
        entry = AuditEntry(
            timestamp=datetime.utcnow().isoformat(),
            action=AuditAction.QUERY,
            user_id=user_id,
            resource=model,
            details={
                "query_length": len(query),
                "response_preview": response_preview[:100] if response_preview else None
            },
            ip_address=ip_address,
            user_agent=user_agent,
            content_hash=self._hash_content(query)
        )
        
        self.storage.write(entry)
    
    def log_content_filter(
        self,
        user_id: str,
        content: str,
        filter_result: dict,
        action_taken: str
    ):
        """Log content filter activation"""
        entry = AuditEntry(
            timestamp=datetime.utcnow().isoformat(),
            action=AuditAction.CONTENT_FILTER,
            user_id=user_id,
            resource="content_filter",
            details={
                "content_length": len(content),
                "filter_result": filter_result,
                "action_taken": action_taken
            },
            content_hash=self._hash_content(content)
        )
        
        self.storage.write(entry)
    
    def log_data_access(
        self,
        user_id: str,
        data_source: str,
        query: str,
        records_accessed: int
    ):
        """Log data access for RAG"""
        entry = AuditEntry(
            timestamp=datetime.utcnow().isoformat(),
            action=AuditAction.DATA_ACCESS,
            user_id=user_id,
            resource=data_source,
            details={
                "query": query,
                "records_accessed": records_accessed
            }
        )
        
        self.storage.write(entry)

# Storage backends
class FileAuditStorage:
    def __init__(self, path: str):
        self.path = path
    
    def write(self, entry: AuditEntry):
        with open(self.path, "a") as f:
            f.write(json.dumps(asdict(entry)) + "\n")

class DatabaseAuditStorage:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def write(self, entry: AuditEntry):
        self.db.execute(
            "INSERT INTO audit_logs VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
            (entry.timestamp, entry.action.value, entry.user_id,
             entry.resource, json.dumps(entry.details),
             entry.ip_address, entry.user_agent, entry.content_hash)
        )

Debug Mode

import os
from typing import Callable
from functools import wraps

class DebugMode:
    """Debug mode for LLM applications"""
    
    def __init__(self):
        self.enabled = os.getenv("LLM_DEBUG", "false").lower() == "true"
        self.log_prompts = os.getenv("LLM_LOG_PROMPTS", "false").lower() == "true"
        self.log_responses = os.getenv("LLM_LOG_RESPONSES", "false").lower() == "true"
        self.save_conversations = os.getenv("LLM_SAVE_CONVERSATIONS", "false").lower() == "true"
    
    def debug_llm_call(self, func: Callable) -> Callable:
        """Decorator to add debug logging to LLM calls"""
        
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if not self.enabled:
                return await func(*args, **kwargs)
            
            # Log inputs
            messages = kwargs.get("messages", [])
            model = kwargs.get("model", "unknown")
            
            print(f"\n{'='*60}")
            print(f"LLM Call: {func.__name__}")
            print(f"Model: {model}")
            
            if self.log_prompts:
                print(f"\nMessages:")
                for msg in messages:
                    print(f"  [{msg['role']}]: {msg['content'][:200]}...")
            
            # Make the call
            import time
            start = time.time()
            
            try:
                result = await func(*args, **kwargs)
                latency = (time.time() - start) * 1000
                
                print(f"\nLatency: {latency:.0f}ms")
                print(f"Tokens: {result.usage.total_tokens}")
                
                if self.log_responses:
                    content = result.choices[0].message.content
                    print(f"\nResponse: {content[:500]}...")
                
                if self.save_conversations:
                    self._save_conversation(messages, result)
                
                print(f"{'='*60}\n")
                
                return result
                
            except Exception as e:
                print(f"\nError: {type(e).__name__}: {e}")
                print(f"{'='*60}\n")
                raise
        
        return wrapper
    
    def _save_conversation(self, messages: list, response):
        """Save conversation to file for debugging"""
        import json
        from datetime import datetime
        
        filename = f"debug_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        
        data = {
            "messages": messages,
            "response": response.choices[0].message.content,
            "model": response.model,
            "usage": {
                "prompt_tokens": response.usage.prompt_tokens,
                "completion_tokens": response.usage.completion_tokens
            }
        }
        
        with open(filename, "w") as f:
            json.dump(data, f, indent=2)

debug = DebugMode()

# Usage
@debug.debug_llm_call
async def complete(messages: list, model: str = "gpt-4o"):
    return await client.chat.completions.create(
        model=model,
        messages=messages
    )

Error Diagnostics

from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime

@dataclass
class DiagnosticInfo:
    timestamp: str
    error_type: str
    error_message: str
    model: str
    provider: str
    request_context: dict
    suggested_fixes: List[str]
    documentation_links: List[str]

class ErrorDiagnostics:
    """Diagnose and suggest fixes for LLM errors"""
    
    ERROR_PATTERNS = {
        "rate_limit": {
            "patterns": ["rate limit", "429", "too many requests"],
            "suggestions": [
                "Implement exponential backoff",
                "Reduce request frequency",
                "Use batch API for bulk operations",
                "Consider upgrading API tier"
            ]
        },
        "context_length": {
            "patterns": ["context length", "maximum context", "token limit"],
            "suggestions": [
                "Truncate or summarize input",
                "Use a model with larger context window",
                "Implement chunking strategy",
                "Remove less relevant context"
            ]
        },
        "invalid_api_key": {
            "patterns": ["invalid api key", "authentication", "401"],
            "suggestions": [
                "Verify API key is correct",
                "Check environment variable is set",
                "Ensure API key has required permissions"
            ]
        },
        "content_filter": {
            "patterns": ["content filter", "policy", "refused"],
            "suggestions": [
                "Review content for policy violations",
                "Rephrase the request",
                "Add appropriate content warnings"
            ]
        }
    }
    
    def diagnose(
        self,
        error: Exception,
        model: str,
        provider: str,
        context: dict = None
    ) -> DiagnosticInfo:
        """Diagnose error and provide suggestions"""
        
        error_str = str(error).lower()
        error_type = "unknown"
        suggestions = ["Check API status page", "Review request parameters"]
        
        for err_type, config in self.ERROR_PATTERNS.items():
            if any(p in error_str for p in config["patterns"]):
                error_type = err_type
                suggestions = config["suggestions"]
                break
        
        return DiagnosticInfo(
            timestamp=datetime.utcnow().isoformat(),
            error_type=error_type,
            error_message=str(error),
            model=model,
            provider=provider,
            request_context=context or {},
            suggested_fixes=suggestions,
            documentation_links=self._get_docs(provider, error_type)
        )
    
    def _get_docs(self, provider: str, error_type: str) -> List[str]:
        """Get relevant documentation links"""
        docs = {
            "openai": {
                "rate_limit": "https://platform.openai.com/docs/guides/rate-limits",
                "context_length": "https://platform.openai.com/docs/models",
            },
            "anthropic": {
                "rate_limit": "https://docs.anthropic.com/en/api/rate-limits",
                "context_length": "https://docs.anthropic.com/en/docs/models",
            }
        }
        
        provider_docs = docs.get(provider, {})
        return [provider_docs.get(error_type, "")]

diagnostics = ErrorDiagnostics()

# Usage in error handler
try:
    response = await client.chat.completions.create(...)
except Exception as e:
    info = diagnostics.diagnose(
        error=e,
        model="gpt-4o",
        provider="openai",
        context={"message_count": len(messages)}
    )
    
    logger.error(
        "llm_error_diagnosed",
        error_type=info.error_type,
        suggestions=info.suggested_fixes
    )

Log Analysis

import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import List, Dict
from pathlib import Path

class LogAnalyzer:
    """Analyze LLM logs for insights"""
    
    def __init__(self, log_path: str):
        self.log_path = Path(log_path)
        self.entries = self._load_logs()
    
    def _load_logs(self) -> List[dict]:
        """Load log entries"""
        entries = []
        with open(self.log_path) as f:
            for line in f:
                try:
                    entries.append(json.loads(line))
                except json.JSONDecodeError:
                    continue
        return entries
    
    def get_usage_stats(self) -> dict:
        """Get token usage statistics"""
        total_input = sum(e.get("prompt_tokens", 0) for e in self.entries)
        total_output = sum(e.get("completion_tokens", 0) for e in self.entries)
        
        return {
            "total_requests": len(self.entries),
            "total_input_tokens": total_input,
            "total_output_tokens": total_output,
            "avg_input_tokens": total_input / len(self.entries) if self.entries else 0,
            "avg_output_tokens": total_output / len(self.entries) if self.entries else 0
        }
    
    def get_latency_stats(self) -> dict:
        """Get latency statistics"""
        latencies = [e.get("latency_ms", 0) for e in self.entries]
        latencies.sort()
        
        return {
            "mean_ms": sum(latencies) / len(latencies) if latencies else 0,
            "p50_ms": latencies[len(latencies) // 2] if latencies else 0,
            "p95_ms": latencies[int(len(latencies) * 0.95)] if latencies else 0,
            "p99_ms": latencies[int(len(latencies) * 0.99)] if latencies else 0
        }
    
    def get_error_summary(self) -> dict:
        """Summarize errors"""
        errors = [e for e in self.entries if e.get("error")]
        error_types = Counter(e.get("error_type") for e in errors)
        
        return {
            "total_errors": len(errors),
            "error_rate": len(errors) / len(self.entries) if self.entries else 0,
            "by_type": dict(error_types)
        }
    
    def get_model_breakdown(self) -> dict:
        """Break down usage by model"""
        by_model = defaultdict(lambda: {"count": 0, "tokens": 0})
        
        for e in self.entries:
            model = e.get("model", "unknown")
            by_model[model]["count"] += 1
            by_model[model]["tokens"] += e.get("prompt_tokens", 0) + e.get("completion_tokens", 0)
        
        return dict(by_model)

Logging Levels

LevelWhen to UseExamples
DEBUGDevelopment onlyFull prompts, responses
INFONormal operationsToken counts, latency
WARNINGPotential issuesRetries, fallbacks
ERRORFailuresAPI errors, timeouts
CRITICALSystem failuresAll providers down

What is Next

API Design for AI

Learn to design robust APIs for LLM applications