Structured Logging Setup
Using structlog
Copy
import structlog
from datetime import datetime
from typing import Any
import json
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True
)
logger = structlog.get_logger()
# Logging LLM requests
def log_llm_request(
model: str,
messages: list,
response: Any,
latency_ms: float,
request_id: str
):
"""Log LLM request with structured data"""
logger.info(
"llm_request",
request_id=request_id,
model=model,
message_count=len(messages),
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
latency_ms=latency_ms,
finish_reason=response.choices[0].finish_reason
)
Custom LLM Logger
Copy
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, List, Dict, Any
import uuid
import json
@dataclass
class LLMLogEntry:
request_id: str
timestamp: str
model: str
provider: str
operation: str # "completion", "embedding", "function_call"
# Request details
messages: Optional[List[Dict]] = None
prompt_tokens: int = 0
# Response details
completion_tokens: int = 0
finish_reason: str = None
function_calls: List[Dict] = None
# Performance
latency_ms: float = 0
# Metadata
user_id: Optional[str] = None
session_id: Optional[str] = None
tags: Dict[str, str] = None
# Error
error: Optional[str] = None
error_type: Optional[str] = None
class LLMLogger:
"""Comprehensive LLM request logger"""
def __init__(
self,
log_file: str = None,
include_messages: bool = False, # Privacy consideration
include_responses: bool = False
):
self.log_file = log_file
self.include_messages = include_messages
self.include_responses = include_responses
self.logger = structlog.get_logger()
def log_request(
self,
model: str,
provider: str,
messages: list,
response: Any,
latency_ms: float,
user_id: str = None,
session_id: str = None,
tags: dict = None
) -> LLMLogEntry:
"""Log a successful LLM request"""
request_id = str(uuid.uuid4())
entry = LLMLogEntry(
request_id=request_id,
timestamp=datetime.utcnow().isoformat(),
model=model,
provider=provider,
operation="completion",
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
finish_reason=response.choices[0].finish_reason,
latency_ms=latency_ms,
user_id=user_id,
session_id=session_id,
tags=tags
)
if self.include_messages:
entry.messages = messages
# Check for function calls
if response.choices[0].message.tool_calls:
entry.function_calls = [
{
"name": tc.function.name,
"arguments": tc.function.arguments
}
for tc in response.choices[0].message.tool_calls
]
entry.operation = "function_call"
self._write_log(entry)
return entry
def log_error(
self,
model: str,
provider: str,
messages: list,
error: Exception,
latency_ms: float,
user_id: str = None
) -> LLMLogEntry:
"""Log a failed LLM request"""
entry = LLMLogEntry(
request_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat(),
model=model,
provider=provider,
operation="completion",
latency_ms=latency_ms,
user_id=user_id,
error=str(error),
error_type=type(error).__name__
)
if self.include_messages:
entry.messages = messages
self._write_log(entry, level="error")
return entry
def _write_log(self, entry: LLMLogEntry, level: str = "info"):
"""Write log entry"""
log_data = asdict(entry)
# Remove None values
log_data = {k: v for k, v in log_data.items() if v is not None}
if level == "error":
self.logger.error("llm_request", **log_data)
else:
self.logger.info("llm_request", **log_data)
if self.log_file:
with open(self.log_file, "a") as f:
f.write(json.dumps(log_data) + "\n")
Request Tracing
Copy
from contextvars import ContextVar
from typing import Optional
import uuid
# Context variables for request tracing
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
span_id_var: ContextVar[str] = ContextVar("span_id", default="")
class TracingContext:
"""Manage distributed tracing context"""
@staticmethod
def new_trace() -> str:
"""Start a new trace"""
trace_id = str(uuid.uuid4())
trace_id_var.set(trace_id)
return trace_id
@staticmethod
def new_span() -> str:
"""Create a new span within current trace"""
span_id = str(uuid.uuid4())[:8]
span_id_var.set(span_id)
return span_id
@staticmethod
def get_context() -> dict:
"""Get current tracing context"""
return {
"request_id": request_id_var.get(),
"trace_id": trace_id_var.get(),
"span_id": span_id_var.get()
}
class TracedLLMClient:
"""LLM client with distributed tracing"""
def __init__(self, client, logger: LLMLogger):
self.client = client
self.logger = logger
async def complete(
self,
messages: list,
model: str = "gpt-4o",
**kwargs
):
"""Complete with tracing"""
import time
span_id = TracingContext.new_span()
context = TracingContext.get_context()
start = time.time()
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
latency = (time.time() - start) * 1000
self.logger.log_request(
model=model,
provider="openai",
messages=messages,
response=response,
latency_ms=latency,
tags={
"trace_id": context["trace_id"],
"span_id": span_id
}
)
return response
except Exception as e:
latency = (time.time() - start) * 1000
self.logger.log_error(
model=model,
provider="openai",
messages=messages,
error=e,
latency_ms=latency
)
raise
# FastAPI middleware for tracing
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class TracingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Extract or create trace ID
trace_id = request.headers.get("X-Trace-ID") or str(uuid.uuid4())
request_id = str(uuid.uuid4())
trace_id_var.set(trace_id)
request_id_var.set(request_id)
response = await call_next(request)
response.headers["X-Trace-ID"] = trace_id
response.headers["X-Request-ID"] = request_id
return response
app = FastAPI()
app.add_middleware(TracingMiddleware)
Audit Logging
Copy
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional, Any
import hashlib
import json
class AuditAction(str, Enum):
QUERY = "query"
COMPLETION = "completion"
MODERATION = "moderation"
CONTENT_FILTER = "content_filter"
DATA_ACCESS = "data_access"
CONFIG_CHANGE = "config_change"
@dataclass
class AuditEntry:
timestamp: str
action: AuditAction
user_id: str
resource: str
details: dict
ip_address: Optional[str] = None
user_agent: Optional[str] = None
content_hash: Optional[str] = None # Hash of content for verification
class AuditLogger:
"""Audit logging for compliance and security"""
def __init__(self, storage_backend):
self.storage = storage_backend
def _hash_content(self, content: str) -> str:
"""Create SHA-256 hash of content"""
return hashlib.sha256(content.encode()).hexdigest()
def log_query(
self,
user_id: str,
query: str,
model: str,
response_preview: str = None,
ip_address: str = None,
user_agent: str = None
):
"""Log user query"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.QUERY,
user_id=user_id,
resource=model,
details={
"query_length": len(query),
"response_preview": response_preview[:100] if response_preview else None
},
ip_address=ip_address,
user_agent=user_agent,
content_hash=self._hash_content(query)
)
self.storage.write(entry)
def log_content_filter(
self,
user_id: str,
content: str,
filter_result: dict,
action_taken: str
):
"""Log content filter activation"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.CONTENT_FILTER,
user_id=user_id,
resource="content_filter",
details={
"content_length": len(content),
"filter_result": filter_result,
"action_taken": action_taken
},
content_hash=self._hash_content(content)
)
self.storage.write(entry)
def log_data_access(
self,
user_id: str,
data_source: str,
query: str,
records_accessed: int
):
"""Log data access for RAG"""
entry = AuditEntry(
timestamp=datetime.utcnow().isoformat(),
action=AuditAction.DATA_ACCESS,
user_id=user_id,
resource=data_source,
details={
"query": query,
"records_accessed": records_accessed
}
)
self.storage.write(entry)
# Storage backends
class FileAuditStorage:
def __init__(self, path: str):
self.path = path
def write(self, entry: AuditEntry):
with open(self.path, "a") as f:
f.write(json.dumps(asdict(entry)) + "\n")
class DatabaseAuditStorage:
def __init__(self, db_connection):
self.db = db_connection
def write(self, entry: AuditEntry):
self.db.execute(
"INSERT INTO audit_logs VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(entry.timestamp, entry.action.value, entry.user_id,
entry.resource, json.dumps(entry.details),
entry.ip_address, entry.user_agent, entry.content_hash)
)
Debug Mode
Copy
import os
from typing import Callable
from functools import wraps
class DebugMode:
"""Debug mode for LLM applications"""
def __init__(self):
self.enabled = os.getenv("LLM_DEBUG", "false").lower() == "true"
self.log_prompts = os.getenv("LLM_LOG_PROMPTS", "false").lower() == "true"
self.log_responses = os.getenv("LLM_LOG_RESPONSES", "false").lower() == "true"
self.save_conversations = os.getenv("LLM_SAVE_CONVERSATIONS", "false").lower() == "true"
def debug_llm_call(self, func: Callable) -> Callable:
"""Decorator to add debug logging to LLM calls"""
@wraps(func)
async def wrapper(*args, **kwargs):
if not self.enabled:
return await func(*args, **kwargs)
# Log inputs
messages = kwargs.get("messages", [])
model = kwargs.get("model", "unknown")
print(f"\n{'='*60}")
print(f"LLM Call: {func.__name__}")
print(f"Model: {model}")
if self.log_prompts:
print(f"\nMessages:")
for msg in messages:
print(f" [{msg['role']}]: {msg['content'][:200]}...")
# Make the call
import time
start = time.time()
try:
result = await func(*args, **kwargs)
latency = (time.time() - start) * 1000
print(f"\nLatency: {latency:.0f}ms")
print(f"Tokens: {result.usage.total_tokens}")
if self.log_responses:
content = result.choices[0].message.content
print(f"\nResponse: {content[:500]}...")
if self.save_conversations:
self._save_conversation(messages, result)
print(f"{'='*60}\n")
return result
except Exception as e:
print(f"\nError: {type(e).__name__}: {e}")
print(f"{'='*60}\n")
raise
return wrapper
def _save_conversation(self, messages: list, response):
"""Save conversation to file for debugging"""
import json
from datetime import datetime
filename = f"debug_conversation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
data = {
"messages": messages,
"response": response.choices[0].message.content,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens
}
}
with open(filename, "w") as f:
json.dump(data, f, indent=2)
debug = DebugMode()
# Usage
@debug.debug_llm_call
async def complete(messages: list, model: str = "gpt-4o"):
return await client.chat.completions.create(
model=model,
messages=messages
)
Error Diagnostics
Copy
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class DiagnosticInfo:
timestamp: str
error_type: str
error_message: str
model: str
provider: str
request_context: dict
suggested_fixes: List[str]
documentation_links: List[str]
class ErrorDiagnostics:
"""Diagnose and suggest fixes for LLM errors"""
ERROR_PATTERNS = {
"rate_limit": {
"patterns": ["rate limit", "429", "too many requests"],
"suggestions": [
"Implement exponential backoff",
"Reduce request frequency",
"Use batch API for bulk operations",
"Consider upgrading API tier"
]
},
"context_length": {
"patterns": ["context length", "maximum context", "token limit"],
"suggestions": [
"Truncate or summarize input",
"Use a model with larger context window",
"Implement chunking strategy",
"Remove less relevant context"
]
},
"invalid_api_key": {
"patterns": ["invalid api key", "authentication", "401"],
"suggestions": [
"Verify API key is correct",
"Check environment variable is set",
"Ensure API key has required permissions"
]
},
"content_filter": {
"patterns": ["content filter", "policy", "refused"],
"suggestions": [
"Review content for policy violations",
"Rephrase the request",
"Add appropriate content warnings"
]
}
}
def diagnose(
self,
error: Exception,
model: str,
provider: str,
context: dict = None
) -> DiagnosticInfo:
"""Diagnose error and provide suggestions"""
error_str = str(error).lower()
error_type = "unknown"
suggestions = ["Check API status page", "Review request parameters"]
for err_type, config in self.ERROR_PATTERNS.items():
if any(p in error_str for p in config["patterns"]):
error_type = err_type
suggestions = config["suggestions"]
break
return DiagnosticInfo(
timestamp=datetime.utcnow().isoformat(),
error_type=error_type,
error_message=str(error),
model=model,
provider=provider,
request_context=context or {},
suggested_fixes=suggestions,
documentation_links=self._get_docs(provider, error_type)
)
def _get_docs(self, provider: str, error_type: str) -> List[str]:
"""Get relevant documentation links"""
docs = {
"openai": {
"rate_limit": "https://platform.openai.com/docs/guides/rate-limits",
"context_length": "https://platform.openai.com/docs/models",
},
"anthropic": {
"rate_limit": "https://docs.anthropic.com/en/api/rate-limits",
"context_length": "https://docs.anthropic.com/en/docs/models",
}
}
provider_docs = docs.get(provider, {})
return [provider_docs.get(error_type, "")]
diagnostics = ErrorDiagnostics()
# Usage in error handler
try:
response = await client.chat.completions.create(...)
except Exception as e:
info = diagnostics.diagnose(
error=e,
model="gpt-4o",
provider="openai",
context={"message_count": len(messages)}
)
logger.error(
"llm_error_diagnosed",
error_type=info.error_type,
suggestions=info.suggested_fixes
)
Log Analysis
Copy
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
from typing import List, Dict
from pathlib import Path
class LogAnalyzer:
"""Analyze LLM logs for insights"""
def __init__(self, log_path: str):
self.log_path = Path(log_path)
self.entries = self._load_logs()
def _load_logs(self) -> List[dict]:
"""Load log entries"""
entries = []
with open(self.log_path) as f:
for line in f:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries
def get_usage_stats(self) -> dict:
"""Get token usage statistics"""
total_input = sum(e.get("prompt_tokens", 0) for e in self.entries)
total_output = sum(e.get("completion_tokens", 0) for e in self.entries)
return {
"total_requests": len(self.entries),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"avg_input_tokens": total_input / len(self.entries) if self.entries else 0,
"avg_output_tokens": total_output / len(self.entries) if self.entries else 0
}
def get_latency_stats(self) -> dict:
"""Get latency statistics"""
latencies = [e.get("latency_ms", 0) for e in self.entries]
latencies.sort()
return {
"mean_ms": sum(latencies) / len(latencies) if latencies else 0,
"p50_ms": latencies[len(latencies) // 2] if latencies else 0,
"p95_ms": latencies[int(len(latencies) * 0.95)] if latencies else 0,
"p99_ms": latencies[int(len(latencies) * 0.99)] if latencies else 0
}
def get_error_summary(self) -> dict:
"""Summarize errors"""
errors = [e for e in self.entries if e.get("error")]
error_types = Counter(e.get("error_type") for e in errors)
return {
"total_errors": len(errors),
"error_rate": len(errors) / len(self.entries) if self.entries else 0,
"by_type": dict(error_types)
}
def get_model_breakdown(self) -> dict:
"""Break down usage by model"""
by_model = defaultdict(lambda: {"count": 0, "tokens": 0})
for e in self.entries:
model = e.get("model", "unknown")
by_model[model]["count"] += 1
by_model[model]["tokens"] += e.get("prompt_tokens", 0) + e.get("completion_tokens", 0)
return dict(by_model)
Logging Levels
| Level | When to Use | Examples |
|---|---|---|
| DEBUG | Development only | Full prompts, responses |
| INFO | Normal operations | Token counts, latency |
| WARNING | Potential issues | Retries, fallbacks |
| ERROR | Failures | API errors, timeouts |
| CRITICAL | System failures | All providers down |
What is Next
API Design for AI
Learn to design robust APIs for LLM applications