Benchmark Framework
Building an Evaluation Harness
Create a structured framework for comparing models:Copy
from dataclasses import dataclass, field
from typing import Callable, Any
from abc import ABC, abstractmethod
import time
import json
from openai import OpenAI
import anthropic
@dataclass
class EvaluationResult:
"""Result from a single evaluation."""
model: str
prompt: str
response: str
latency_ms: float
input_tokens: int
output_tokens: int
cost: float
scores: dict[str, float] = field(default_factory=dict)
@dataclass
class BenchmarkResult:
"""Aggregated benchmark results."""
model: str
total_evaluations: int
avg_latency_ms: float
avg_input_tokens: float
avg_output_tokens: float
total_cost: float
avg_scores: dict[str, float]
individual_results: list[EvaluationResult]
class LLMProvider(ABC):
"""Abstract base class for LLM providers."""
@abstractmethod
def complete(self, prompt: str, **kwargs) -> tuple[str, dict]:
"""Generate completion, return (response, metadata)."""
pass
@property
@abstractmethod
def name(self) -> str:
"""Provider name."""
pass
class OpenAIProvider(LLMProvider):
"""OpenAI API provider."""
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4-turbo": {"input": 10.00, "output": 30.00},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}
def __init__(self, model: str = "gpt-4o"):
self.client = OpenAI()
self.model = model
@property
def name(self) -> str:
return f"openai/{self.model}"
def complete(self, prompt: str, **kwargs) -> tuple[str, dict]:
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
usage = response.usage
pricing = self.PRICING.get(self.model, {"input": 0, "output": 0})
cost = (
(usage.prompt_tokens / 1_000_000) * pricing["input"] +
(usage.completion_tokens / 1_000_000) * pricing["output"]
)
return response.choices[0].message.content, {
"input_tokens": usage.prompt_tokens,
"output_tokens": usage.completion_tokens,
"cost": cost
}
class AnthropicProvider(LLMProvider):
"""Anthropic API provider."""
PRICING = {
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
}
def __init__(self, model: str = "claude-sonnet-4-20250514"):
self.client = anthropic.Anthropic()
self.model = model
@property
def name(self) -> str:
return f"anthropic/{self.model}"
def complete(self, prompt: str, **kwargs) -> tuple[str, dict]:
response = self.client.messages.create(
model=self.model,
max_tokens=kwargs.get("max_tokens", 1024),
messages=[{"role": "user", "content": prompt}]
)
usage = response.usage
pricing = self.PRICING.get(self.model, {"input": 0, "output": 0})
cost = (
(usage.input_tokens / 1_000_000) * pricing["input"] +
(usage.output_tokens / 1_000_000) * pricing["output"]
)
return response.content[0].text, {
"input_tokens": usage.input_tokens,
"output_tokens": usage.output_tokens,
"cost": cost
}
class BenchmarkHarness:
"""Framework for running model benchmarks."""
def __init__(self, providers: list[LLMProvider]):
self.providers = providers
self.scorers: dict[str, Callable] = {}
def add_scorer(
self,
name: str,
scorer: Callable[[str, str, str], float]
):
"""Add a scoring function (prompt, response, reference) -> score."""
self.scorers[name] = scorer
def evaluate_single(
self,
provider: LLMProvider,
prompt: str,
reference: str = None,
**kwargs
) -> EvaluationResult:
"""Evaluate a single prompt on a provider."""
start_time = time.perf_counter()
response, metadata = provider.complete(prompt, **kwargs)
latency_ms = (time.perf_counter() - start_time) * 1000
# Run scorers
scores = {}
for name, scorer in self.scorers.items():
try:
scores[name] = scorer(prompt, response, reference)
except Exception as e:
scores[name] = 0.0
print(f"Scorer {name} failed: {e}")
return EvaluationResult(
model=provider.name,
prompt=prompt,
response=response,
latency_ms=latency_ms,
input_tokens=metadata.get("input_tokens", 0),
output_tokens=metadata.get("output_tokens", 0),
cost=metadata.get("cost", 0.0),
scores=scores
)
def run_benchmark(
self,
test_cases: list[dict],
**kwargs
) -> list[BenchmarkResult]:
"""Run benchmark across all providers."""
results = {}
for provider in self.providers:
results[provider.name] = []
for test_case in test_cases:
prompt = test_case["prompt"]
reference = test_case.get("reference")
result = self.evaluate_single(
provider, prompt, reference, **kwargs
)
results[provider.name].append(result)
# Aggregate results
benchmark_results = []
for provider_name, eval_results in results.items():
avg_latency = sum(r.latency_ms for r in eval_results) / len(eval_results)
avg_input = sum(r.input_tokens for r in eval_results) / len(eval_results)
avg_output = sum(r.output_tokens for r in eval_results) / len(eval_results)
total_cost = sum(r.cost for r in eval_results)
# Aggregate scores
avg_scores = {}
for score_name in self.scorers.keys():
scores = [r.scores.get(score_name, 0) for r in eval_results]
avg_scores[score_name] = sum(scores) / len(scores) if scores else 0
benchmark_results.append(BenchmarkResult(
model=provider_name,
total_evaluations=len(eval_results),
avg_latency_ms=avg_latency,
avg_input_tokens=avg_input,
avg_output_tokens=avg_output,
total_cost=total_cost,
avg_scores=avg_scores,
individual_results=eval_results
))
return benchmark_results
# Usage
providers = [
OpenAIProvider("gpt-4o"),
OpenAIProvider("gpt-4o-mini"),
AnthropicProvider("claude-sonnet-4-20250514"),
]
harness = BenchmarkHarness(providers)
Evaluation Metrics
Response Quality Scorers
Implement various quality metrics:Copy
import re
from difflib import SequenceMatcher
from openai import OpenAI
def exact_match_scorer(prompt: str, response: str, reference: str) -> float:
"""Check if response exactly matches reference."""
if not reference:
return 0.0
return 1.0 if response.strip() == reference.strip() else 0.0
def contains_answer_scorer(prompt: str, response: str, reference: str) -> float:
"""Check if response contains the reference answer."""
if not reference:
return 0.0
return 1.0 if reference.lower() in response.lower() else 0.0
def length_ratio_scorer(prompt: str, response: str, reference: str) -> float:
"""Score based on length similarity to reference."""
if not reference:
return 1.0
ref_len = len(reference)
resp_len = len(response)
if resp_len == 0:
return 0.0
ratio = min(ref_len, resp_len) / max(ref_len, resp_len)
return ratio
def similarity_scorer(prompt: str, response: str, reference: str) -> float:
"""Calculate text similarity using SequenceMatcher."""
if not reference:
return 0.0
return SequenceMatcher(None, response.lower(), reference.lower()).ratio()
def format_compliance_scorer(prompt: str, response: str, reference: str) -> float:
"""Check if response follows expected format."""
score = 0.0
checks = 0
# Check for JSON format if expected
if "json" in prompt.lower():
checks += 1
try:
json.loads(response)
score += 1.0
except json.JSONDecodeError:
# Try to extract JSON from response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
json.loads(json_match.group())
score += 0.5
except json.JSONDecodeError:
pass
# Check for bullet points if expected
if "list" in prompt.lower() or "bullet" in prompt.lower():
checks += 1
if re.search(r'[-*]\s', response) or re.search(r'\d+\.', response):
score += 1.0
return score / checks if checks > 0 else 1.0
class LLMJudge:
"""Use an LLM to evaluate response quality."""
def __init__(self, client: OpenAI, model: str = "gpt-4o"):
self.client = client
self.model = model
def score(
self,
prompt: str,
response: str,
reference: str = None,
criteria: list[str] = None
) -> dict[str, float]:
"""Score response using LLM judge."""
criteria = criteria or [
"accuracy",
"relevance",
"completeness",
"clarity"
]
judge_prompt = f"""Evaluate the following response to a prompt.
Prompt: {prompt}
Response: {response}
{"Reference Answer: " + reference if reference else ""}
Rate the response on these criteria (0-10 scale):
{chr(10).join(f"- {c}" for c in criteria)}
Return your scores as JSON:
{{"scores": {{{", ".join(f'"{c}": <score>' for c in criteria)}}}, "reasoning": "<brief explanation>"}}
"""
result = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": judge_prompt}],
response_format={"type": "json_object"}
)
data = json.loads(result.choices[0].message.content)
# Normalize to 0-1 scale
scores = {k: v / 10.0 for k, v in data["scores"].items()}
return scores
def create_scorer(self, criterion: str) -> Callable:
"""Create a scorer function for a specific criterion."""
def scorer(prompt: str, response: str, reference: str) -> float:
scores = self.score(prompt, response, reference, [criterion])
return scores.get(criterion, 0.0)
return scorer
# Add scorers to harness
harness.add_scorer("exact_match", exact_match_scorer)
harness.add_scorer("contains_answer", contains_answer_scorer)
harness.add_scorer("similarity", similarity_scorer)
harness.add_scorer("format_compliance", format_compliance_scorer)
# Add LLM judge scorer
client = OpenAI()
judge = LLMJudge(client)
harness.add_scorer("llm_accuracy", judge.create_scorer("accuracy"))
Task-Specific Metrics
Different tasks require specialized metrics:Copy
import re
from typing import Any
class TaskMetrics:
"""Collection of task-specific evaluation metrics."""
@staticmethod
def code_execution_score(
prompt: str,
response: str,
test_cases: list[dict]
) -> float:
"""Evaluate generated code by running test cases."""
# Extract code from response
code_match = re.search(r'```python\n(.*?)```', response, re.DOTALL)
if not code_match:
code_match = re.search(r'```\n(.*?)```', response, re.DOTALL)
if not code_match:
return 0.0
code = code_match.group(1)
passed = 0
for test in test_cases:
try:
# Execute in isolated namespace
namespace = {}
exec(code, namespace)
# Run test
result = eval(test["call"], namespace)
if result == test["expected"]:
passed += 1
except Exception:
continue
return passed / len(test_cases) if test_cases else 0.0
@staticmethod
def classification_score(
response: str,
expected_class: str,
valid_classes: list[str]
) -> float:
"""Score classification accuracy."""
response_lower = response.lower().strip()
# Check for exact match
if expected_class.lower() in response_lower:
return 1.0
# Check if any valid class is mentioned
for cls in valid_classes:
if cls.lower() in response_lower:
return 0.5 if cls.lower() != expected_class.lower() else 1.0
return 0.0
@staticmethod
def extraction_score(
response: str,
expected_entities: list[str]
) -> float:
"""Score entity extraction accuracy."""
if not expected_entities:
return 1.0
found = 0
for entity in expected_entities:
if entity.lower() in response.lower():
found += 1
return found / len(expected_entities)
@staticmethod
def summarization_score(
response: str,
source_text: str,
target_ratio: float = 0.2
) -> float:
"""Score summarization quality."""
if not source_text:
return 0.0
# Check compression ratio
actual_ratio = len(response) / len(source_text)
ratio_score = 1.0 - abs(actual_ratio - target_ratio) / target_ratio
ratio_score = max(0, min(1, ratio_score))
# Check for key content preservation (simple keyword overlap)
source_words = set(source_text.lower().split())
response_words = set(response.lower().split())
overlap = len(source_words & response_words)
coverage_score = min(1.0, overlap / (len(source_words) * 0.3))
return (ratio_score + coverage_score) / 2
# Usage in benchmark
def create_code_scorer(test_cases: list[dict]) -> Callable:
def scorer(prompt: str, response: str, reference: str) -> float:
return TaskMetrics.code_execution_score(prompt, response, test_cases)
return scorer
# Add to harness
harness.add_scorer("code_execution", create_code_scorer([
{"call": "fibonacci(10)", "expected": 55},
{"call": "fibonacci(0)", "expected": 0},
]))
Model Selection Framework
Decision Matrix
Systematically compare models across dimensions:Copy
from dataclasses import dataclass
from enum import Enum
class Priority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class SelectionCriteria:
"""Criteria for model selection with weights."""
latency: Priority = Priority.MEDIUM
cost: Priority = Priority.MEDIUM
quality: Priority = Priority.HIGH
context_length: Priority = Priority.MEDIUM
reasoning: Priority = Priority.MEDIUM
instruction_following: Priority = Priority.HIGH
class ModelSelector:
"""Select the best model based on requirements and benchmark results."""
MODEL_PROFILES = {
"openai/gpt-4o": {
"context_length": 128000,
"reasoning": 0.95,
"instruction_following": 0.95,
"speed_tier": "medium",
"cost_tier": "high",
},
"openai/gpt-4o-mini": {
"context_length": 128000,
"reasoning": 0.85,
"instruction_following": 0.90,
"speed_tier": "fast",
"cost_tier": "low",
},
"anthropic/claude-sonnet-4-20250514": {
"context_length": 200000,
"reasoning": 0.95,
"instruction_following": 0.95,
"speed_tier": "medium",
"cost_tier": "high",
},
"anthropic/claude-3-haiku-20240307": {
"context_length": 200000,
"reasoning": 0.80,
"instruction_following": 0.85,
"speed_tier": "fast",
"cost_tier": "low",
},
}
def __init__(self, criteria: SelectionCriteria):
self.criteria = criteria
def score_model(
self,
model_name: str,
benchmark_result: BenchmarkResult
) -> float:
"""Calculate weighted score for a model."""
profile = self.MODEL_PROFILES.get(model_name, {})
scores = {}
weights = {}
# Latency score (lower is better)
if benchmark_result.avg_latency_ms > 0:
latency_score = 1000 / benchmark_result.avg_latency_ms
scores["latency"] = min(1.0, latency_score)
else:
scores["latency"] = 0.5
weights["latency"] = self.criteria.latency.value
# Cost score (lower is better)
if benchmark_result.total_cost > 0:
cost_score = 0.01 / (benchmark_result.total_cost / benchmark_result.total_evaluations)
scores["cost"] = min(1.0, cost_score)
else:
scores["cost"] = 1.0
weights["cost"] = self.criteria.cost.value
# Quality score from benchmark
if "llm_accuracy" in benchmark_result.avg_scores:
scores["quality"] = benchmark_result.avg_scores["llm_accuracy"]
else:
scores["quality"] = benchmark_result.avg_scores.get("similarity", 0.5)
weights["quality"] = self.criteria.quality.value
# Context length score
max_context = max(p.get("context_length", 0) for p in self.MODEL_PROFILES.values())
scores["context_length"] = profile.get("context_length", 0) / max_context
weights["context_length"] = self.criteria.context_length.value
# Reasoning score from profile
scores["reasoning"] = profile.get("reasoning", 0.5)
weights["reasoning"] = self.criteria.reasoning.value
# Instruction following score
if "format_compliance" in benchmark_result.avg_scores:
scores["instruction_following"] = benchmark_result.avg_scores["format_compliance"]
else:
scores["instruction_following"] = profile.get("instruction_following", 0.5)
weights["instruction_following"] = self.criteria.instruction_following.value
# Calculate weighted average
total_weight = sum(weights.values())
weighted_score = sum(
scores[k] * weights[k] for k in scores
) / total_weight
return weighted_score
def select_best(
self,
benchmark_results: list[BenchmarkResult]
) -> tuple[str, dict]:
"""Select the best model from benchmark results."""
model_scores = {}
for result in benchmark_results:
score = self.score_model(result.model, result)
model_scores[result.model] = {
"weighted_score": score,
"latency_ms": result.avg_latency_ms,
"cost_per_eval": result.total_cost / result.total_evaluations,
"quality_scores": result.avg_scores,
}
best_model = max(model_scores.items(), key=lambda x: x[1]["weighted_score"])
return best_model[0], model_scores
def generate_report(
self,
benchmark_results: list[BenchmarkResult]
) -> str:
"""Generate a comparison report."""
best_model, all_scores = self.select_best(benchmark_results)
lines = [
"Model Comparison Report",
"=" * 50,
"",
]
for model, scores in sorted(
all_scores.items(),
key=lambda x: x[1]["weighted_score"],
reverse=True
):
marker = " [RECOMMENDED]" if model == best_model else ""
lines.append(f"{model}{marker}")
lines.append(f" Weighted Score: {scores['weighted_score']:.3f}")
lines.append(f" Avg Latency: {scores['latency_ms']:.1f}ms")
lines.append(f" Cost per Eval: ${scores['cost_per_eval']:.6f}")
lines.append(f" Quality Scores: {scores['quality_scores']}")
lines.append("")
return "\n".join(lines)
# Usage
criteria = SelectionCriteria(
latency=Priority.HIGH, # Fast responses needed
cost=Priority.MEDIUM, # Budget conscious
quality=Priority.CRITICAL, # Quality is paramount
)
selector = ModelSelector(criteria)
# Run benchmark
test_cases = [
{"prompt": "Explain quantum computing in one paragraph.", "reference": None},
{"prompt": "Write a Python function to reverse a string.", "reference": None},
{"prompt": "Summarize the key points of machine learning.", "reference": None},
]
results = harness.run_benchmark(test_cases)
report = selector.generate_report(results)
print(report)
A/B Testing Framework
Test models in production:Copy
import random
import hashlib
from datetime import datetime
from dataclasses import dataclass, field
from collections import defaultdict
import json
@dataclass
class ABTestConfig:
"""Configuration for A/B test."""
name: str
models: dict[str, float] # model -> traffic percentage
start_date: datetime
end_date: datetime = None
sticky_sessions: bool = True # Same user gets same model
@dataclass
class ABTestResult:
"""Aggregated A/B test results."""
model: str
total_requests: int
avg_latency_ms: float
error_rate: float
user_satisfaction: float # From feedback
conversion_rate: float # If applicable
class ABTestManager:
"""Manage A/B tests for model comparison."""
def __init__(self, providers: dict[str, LLMProvider]):
self.providers = providers
self.active_tests: dict[str, ABTestConfig] = {}
self.results: dict[str, list] = defaultdict(list)
def create_test(self, config: ABTestConfig) -> str:
"""Create a new A/B test."""
# Validate model percentages sum to 1
total = sum(config.models.values())
if abs(total - 1.0) > 0.001:
raise ValueError(f"Model percentages must sum to 1.0, got {total}")
# Validate all models exist
for model in config.models:
if model not in self.providers:
raise ValueError(f"Unknown model: {model}")
self.active_tests[config.name] = config
return config.name
def get_model_for_request(
self,
test_name: str,
user_id: str = None
) -> str:
"""Select model for a request based on test configuration."""
config = self.active_tests.get(test_name)
if not config:
raise ValueError(f"Test not found: {test_name}")
# Check if test is active
now = datetime.now()
if now < config.start_date:
raise ValueError(f"Test {test_name} has not started")
if config.end_date and now > config.end_date:
raise ValueError(f"Test {test_name} has ended")
# Sticky sessions: hash user_id to get consistent assignment
if config.sticky_sessions and user_id:
hash_value = int(hashlib.md5(
f"{test_name}:{user_id}".encode()
).hexdigest(), 16)
random_value = (hash_value % 10000) / 10000
else:
random_value = random.random()
# Select model based on traffic split
cumulative = 0.0
for model, percentage in config.models.items():
cumulative += percentage
if random_value < cumulative:
return model
# Fallback to last model
return list(config.models.keys())[-1]
def record_result(
self,
test_name: str,
model: str,
latency_ms: float,
success: bool,
user_feedback: float = None,
converted: bool = None
):
"""Record a single test result."""
self.results[test_name].append({
"model": model,
"latency_ms": latency_ms,
"success": success,
"user_feedback": user_feedback,
"converted": converted,
"timestamp": datetime.now().isoformat(),
})
def get_test_results(self, test_name: str) -> dict[str, ABTestResult]:
"""Get aggregated results for a test."""
if test_name not in self.results:
return {}
# Group by model
model_data = defaultdict(list)
for result in self.results[test_name]:
model_data[result["model"]].append(result)
test_results = {}
for model, data in model_data.items():
total = len(data)
avg_latency = sum(d["latency_ms"] for d in data) / total
error_rate = sum(1 for d in data if not d["success"]) / total
# Calculate satisfaction from feedback
feedback_data = [d["user_feedback"] for d in data if d["user_feedback"] is not None]
satisfaction = sum(feedback_data) / len(feedback_data) if feedback_data else 0.0
# Calculate conversion rate
conversion_data = [d["converted"] for d in data if d["converted"] is not None]
conversion = sum(1 for c in conversion_data if c) / len(conversion_data) if conversion_data else 0.0
test_results[model] = ABTestResult(
model=model,
total_requests=total,
avg_latency_ms=avg_latency,
error_rate=error_rate,
user_satisfaction=satisfaction,
conversion_rate=conversion,
)
return test_results
def determine_winner(
self,
test_name: str,
primary_metric: str = "user_satisfaction",
min_samples: int = 100
) -> tuple[str, float]:
"""Determine the winning model based on primary metric."""
results = self.get_test_results(test_name)
# Check sample size
for model, result in results.items():
if result.total_requests < min_samples:
raise ValueError(
f"Insufficient samples for {model}: "
f"{result.total_requests} < {min_samples}"
)
# Find best model
best_model = None
best_score = float("-inf")
for model, result in results.items():
score = getattr(result, primary_metric, 0)
if score > best_score:
best_score = score
best_model = model
return best_model, best_score
# Usage
providers = {
"gpt-4o": OpenAIProvider("gpt-4o"),
"gpt-4o-mini": OpenAIProvider("gpt-4o-mini"),
}
ab_manager = ABTestManager(providers)
# Create test with 50/50 split
config = ABTestConfig(
name="model_comparison_v1",
models={"gpt-4o": 0.5, "gpt-4o-mini": 0.5},
start_date=datetime.now(),
sticky_sessions=True
)
ab_manager.create_test(config)
# Simulate requests
for user_id in ["user_1", "user_2", "user_3"]:
model = ab_manager.get_model_for_request("model_comparison_v1", user_id)
print(f"User {user_id} -> {model}")
Model Selection Best Practices
- Test on representative data from your actual use case
- Consider total cost of ownership, not just per-token pricing
- Measure latency at the 95th percentile, not just average
- Run A/B tests long enough for statistical significance
Practice Exercise
Build a comprehensive model evaluation pipeline:- Create benchmark suites for different task types
- Implement at least 5 scoring metrics
- Build an automated selection recommendation system
- Set up A/B testing infrastructure
- Generate comparison reports with visualizations
- Statistical significance in comparisons
- Cost-quality tradeoff analysis
- Latency requirements for your use case
- Reproducible evaluation methodology