December 2025 Update: Covers LangSmith, Langfuse, Phoenix, and custom observability solutions for production LLM systems.
Why Observability Matters for LLMs
LLMs are non-deterministic black boxes. Without observability, you can’t:- Debug why a response was wrong
- Identify cost spikes
- Detect quality degradation
- Optimize performance
Copy
Without Observability With Observability
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"The AI gave wrong answer" ├── Prompt: "..."
├── Retrieved Docs: 5 chunks
"No idea why" ├── Latency: 2.3s
├── Tokens: 1,847 (input: 1,200)
"Can't reproduce" ├── Cost: $0.012
└── User: user_123
└── Root cause: missing context
Key Metrics to Track
Core Metrics
| Category | Metrics | Why It Matters |
|---|---|---|
| Latency | P50, P95, P99 response time | User experience |
| Cost | Tokens per request, $ per request | Budget control |
| Quality | User feedback, success rate | Model effectiveness |
| Errors | Rate, types, retry success | Reliability |
| Usage | Requests/min, active users | Capacity planning |
LLM-Specific Metrics
Copy
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
@dataclass
class LLMTrace:
"""Complete trace for an LLM interaction"""
trace_id: str
timestamp: datetime
# Request
model: str
prompt: str
system_prompt: Optional[str]
messages: List[dict]
# Response
response: str
finish_reason: str
# Token usage
input_tokens: int
output_tokens: int
total_tokens: int
# Timing
latency_ms: float
time_to_first_token_ms: Optional[float]
# Cost
cost_usd: float
# Context (for RAG)
retrieved_documents: Optional[List[dict]]
retrieval_latency_ms: Optional[float]
# Tool calls
tool_calls: Optional[List[dict]]
tool_results: Optional[List[dict]]
# Metadata
user_id: Optional[str]
session_id: Optional[str]
environment: str
version: str
# Quality signals
user_feedback: Optional[str] # thumbs_up, thumbs_down
success: Optional[bool]
Langfuse: Open-Source LLM Observability
Langfuse provides comprehensive tracing for LLM applications.Setup
Copy
pip install langfuse
Copy
import os
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
# Initialize
langfuse = Langfuse(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host="https://cloud.langfuse.com" # or self-hosted
)
Tracing LLM Calls
Copy
from openai import OpenAI
from langfuse.openai import openai # Drop-in replacement
# Automatic tracing with OpenAI
client = openai.OpenAI()
@observe() # Decorator creates a trace
def chat(user_message: str, user_id: str) -> str:
# Add metadata to current trace
langfuse_context.update_current_observation(
user_id=user_id,
metadata={"feature": "chat"}
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}]
)
return response.choices[0].message.content
@observe()
def rag_pipeline(query: str, user_id: str) -> dict:
"""Full RAG pipeline with tracing"""
# Trace retrieval as a separate span
langfuse_context.update_current_observation(name="rag-pipeline")
# Retrieval span
with langfuse_context.observe(name="retrieval") as span:
docs = retrieve_documents(query)
span.update(
input={"query": query},
output={"num_docs": len(docs)},
metadata={"retrieval_strategy": "hybrid"}
)
# Build context
context = "\n".join([d["content"] for d in docs])
# LLM generation (auto-traced)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on context."},
{"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}
]
)
answer = response.choices[0].message.content
# Score the trace
langfuse_context.score_current_trace(
name="relevance",
value=0.9, # From your evaluation
comment="High relevance based on source alignment"
)
return {"answer": answer, "sources": docs}
Custom Metrics and Evaluations
Copy
from langfuse import Langfuse
langfuse = Langfuse()
def evaluate_and_log(trace_id: str, answer: str, expected: str):
"""Log evaluation scores to Langfuse"""
# Calculate metrics
is_correct = check_answer(answer, expected)
relevance = calculate_relevance(answer, expected)
# Log scores
langfuse.score(
trace_id=trace_id,
name="correctness",
value=1.0 if is_correct else 0.0
)
langfuse.score(
trace_id=trace_id,
name="relevance",
value=relevance
)
def log_user_feedback(trace_id: str, feedback: str):
"""Log user feedback"""
langfuse.score(
trace_id=trace_id,
name="user_feedback",
value=1.0 if feedback == "thumbs_up" else 0.0,
comment=feedback
)
LangSmith: LangChain’s Platform
LangSmith provides deep integration with LangChain and LangGraph.Setup
Copy
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "my-ai-app"
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langsmith import traceable
llm = ChatOpenAI(model="gpt-4o")
Tracing Chains
Copy
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
# Chains are automatically traced
prompt = ChatPromptTemplate.from_template(
"Answer this question: {question}"
)
chain = (
{"question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# Invoke with metadata
response = chain.invoke(
"What is machine learning?",
config={
"metadata": {"user_id": "user_123"},
"tags": ["production", "v2"]
}
)
Custom Tracing
Copy
from langsmith import traceable
@traceable(name="custom-rag")
def my_rag_function(query: str) -> str:
"""Custom function with tracing"""
docs = retrieve(query)
answer = generate(query, docs)
return answer
@traceable(run_type="retriever")
def retrieve(query: str) -> list:
# Traced as retriever type
return vector_db.search(query)
@traceable(run_type="llm")
def generate(query: str, docs: list) -> str:
# Traced as LLM call
return llm.invoke(format_prompt(query, docs))
Feedback and Evaluation
Copy
from langsmith import Client
client = Client()
# Log feedback
client.create_feedback(
run_id="run-uuid",
key="correctness",
score=1.0,
comment="Answer was accurate"
)
# Run evaluations
from langsmith.evaluation import evaluate
results = evaluate(
lambda x: my_rag_function(x["question"]),
data="my-dataset",
evaluators=[
"qa", # Built-in QA evaluator
"relevance",
my_custom_evaluator
]
)
Arize Phoenix: Open-Source Tracing
Phoenix provides local-first observability with a beautiful UI.Setup
Copy
pip install arize-phoenix openinference-instrumentation-openai
Copy
import phoenix as px
# Launch Phoenix UI
session = px.launch_app()
print(f"Phoenix UI: {session.url}")
# Instrument OpenAI
from openinference.instrumentation.openai import OpenAIInstrumentor
from phoenix.otel import register
tracer_provider = register()
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)
# Now all OpenAI calls are traced
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
Tracing RAG Pipelines
Copy
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
# Instrument LlamaIndex
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
# Now all LlamaIndex operations are traced
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
# Query is fully traced
response = query_engine.query("What is the main topic?")
Custom Observability Stack
Build your own observability for complete control:Copy
import time
import uuid
import json
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional, Any
import structlog
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
# Configure OpenTelemetry
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))
)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
class LLMObserver:
"""Custom LLM observability"""
def __init__(self, service_name: str):
self.service_name = service_name
self.metrics_collector = MetricsCollector()
def trace_llm_call(
self,
model: str,
messages: list,
user_id: Optional[str] = None,
session_id: Optional[str] = None
):
"""Context manager for tracing LLM calls"""
return LLMTraceContext(
observer=self,
model=model,
messages=messages,
user_id=user_id,
session_id=session_id
)
class LLMTraceContext:
"""Context manager for LLM call tracing"""
def __init__(self, observer: LLMObserver, **kwargs):
self.observer = observer
self.trace_id = str(uuid.uuid4())
self.start_time = None
self.kwargs = kwargs
def __enter__(self):
self.start_time = time.perf_counter()
# Start OTel span
self.span = tracer.start_span(
"llm.call",
attributes={
"llm.model": self.kwargs["model"],
"llm.user_id": self.kwargs.get("user_id", "anonymous"),
"trace.id": self.trace_id
}
)
logger.info(
"llm_call_started",
trace_id=self.trace_id,
model=self.kwargs["model"],
user_id=self.kwargs.get("user_id")
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration_ms = (time.perf_counter() - self.start_time) * 1000
if exc_type:
self.span.set_status(trace.Status(trace.StatusCode.ERROR, str(exc_val)))
logger.error(
"llm_call_failed",
trace_id=self.trace_id,
error=str(exc_val),
duration_ms=duration_ms
)
else:
self.span.set_status(trace.Status(trace.StatusCode.OK))
self.span.set_attribute("llm.duration_ms", duration_ms)
self.span.end()
# Record metrics
self.observer.metrics_collector.record_latency(
self.kwargs["model"],
duration_ms
)
def record_response(
self,
response: str,
input_tokens: int,
output_tokens: int,
cost_usd: float
):
"""Record response details"""
self.span.set_attributes({
"llm.input_tokens": input_tokens,
"llm.output_tokens": output_tokens,
"llm.total_tokens": input_tokens + output_tokens,
"llm.cost_usd": cost_usd
})
logger.info(
"llm_call_completed",
trace_id=self.trace_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost_usd
)
# Record cost metrics
self.observer.metrics_collector.record_cost(
self.kwargs["model"],
cost_usd
)
def record_feedback(self, feedback: str, score: float):
"""Record user feedback"""
logger.info(
"user_feedback",
trace_id=self.trace_id,
feedback=feedback,
score=score
)
# Usage
observer = LLMObserver("my-ai-app")
def chat(user_message: str, user_id: str) -> str:
with observer.trace_llm_call(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
user_id=user_id
) as trace:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}]
)
trace.record_response(
response=response.choices[0].message.content,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
cost_usd=calculate_cost(response.usage)
)
return response.choices[0].message.content
Dashboards and Alerting
Key Dashboards
Copy
# Prometheus metrics for Grafana
from prometheus_client import Counter, Histogram, Gauge
# Counters
llm_requests_total = Counter(
"llm_requests_total",
"Total LLM requests",
["model", "status", "environment"]
)
# Histograms
llm_latency_seconds = Histogram(
"llm_latency_seconds",
"LLM request latency",
["model"],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
llm_tokens_used = Histogram(
"llm_tokens_used",
"Tokens used per request",
["model", "token_type"],
buckets=[100, 500, 1000, 2000, 4000, 8000, 16000]
)
# Gauges
llm_cost_usd = Gauge(
"llm_cost_usd_total",
"Cumulative LLM cost in USD",
["model"]
)
Alert Rules
Copy
# prometheus_alerts.yml
groups:
- name: llm_alerts
rules:
- alert: HighLLMLatency
expr: histogram_quantile(0.95, llm_latency_seconds) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "High LLM latency detected"
- alert: LLMErrorRateHigh
expr: |
rate(llm_requests_total{status="error"}[5m])
/ rate(llm_requests_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "LLM error rate above 5%"
- alert: DailyCostExceeded
expr: sum(llm_cost_usd_total) > 1000
labels:
severity: warning
annotations:
summary: "Daily LLM cost exceeded $1000"
Debugging LLM Issues
Common Issues and Diagnosis
Copy
class LLMDebugger:
"""Debug common LLM issues"""
def analyze_trace(self, trace_id: str) -> dict:
"""Analyze a trace for issues"""
trace = self.fetch_trace(trace_id)
issues = []
# Check for high latency
if trace.latency_ms > 5000:
issues.append({
"type": "high_latency",
"details": f"Latency {trace.latency_ms}ms exceeds threshold",
"suggestions": [
"Use streaming for long responses",
"Consider smaller model for simple tasks",
"Check network latency to API provider"
]
})
# Check for token explosion
if trace.input_tokens > 10000:
issues.append({
"type": "large_context",
"details": f"Input tokens {trace.input_tokens} is high",
"suggestions": [
"Reduce context size",
"Summarize long documents",
"Use better chunking strategy"
]
})
# Check for quality issues
if trace.user_feedback == "thumbs_down":
issues.append({
"type": "quality_issue",
"details": "User gave negative feedback",
"suggestions": [
"Review retrieved documents for relevance",
"Check system prompt clarity",
"Analyze response for hallucinations"
]
})
return {
"trace_id": trace_id,
"issues": issues,
"trace_details": trace
}
Key Takeaways
Trace Everything
Log every LLM call with inputs, outputs, tokens, latency, and cost.
Structured Logging
Use structured logs (JSON) for easy querying and analysis.
Track Quality
Collect user feedback and run automated evaluations.
Set Alerts
Alert on latency spikes, error rates, and cost anomalies.
What’s Next
AI Security & Guardrails
Learn how to secure LLM applications and implement safety guardrails