Quantization
GGUF and llama.cpp
Copy
from llama_cpp import Llama
import time
def load_quantized_model(
model_path: str,
n_ctx: int = 2048,
n_gpu_layers: int = -1
) -> Llama:
"""Load a quantized GGUF model."""
return Llama(
model_path=model_path,
n_ctx=n_ctx,
n_gpu_layers=n_gpu_layers, # -1 = all layers on GPU
verbose=False
)
def benchmark_inference(
model: Llama,
prompt: str,
max_tokens: int = 100,
iterations: int = 5
) -> dict:
"""Benchmark model inference speed."""
times = []
tokens_generated = []
for _ in range(iterations):
start = time.time()
output = model(
prompt,
max_tokens=max_tokens,
temperature=0.7,
echo=False
)
elapsed = time.time() - start
times.append(elapsed)
tokens = len(output["choices"][0]["text"].split())
tokens_generated.append(tokens)
avg_time = sum(times) / len(times)
avg_tokens = sum(tokens_generated) / len(tokens_generated)
return {
"avg_latency_ms": avg_time * 1000,
"avg_tokens": avg_tokens,
"tokens_per_second": avg_tokens / avg_time,
"iterations": iterations
}
# Usage
model = load_quantized_model(
"models/mistral-7b-instruct-v0.1.Q4_K_M.gguf",
n_ctx=4096,
n_gpu_layers=35
)
# Simple inference
response = model(
"Explain quantum computing in simple terms:",
max_tokens=150,
temperature=0.7
)
print(response["choices"][0]["text"])
# Benchmark
results = benchmark_inference(
model,
"Write a haiku about programming:",
max_tokens=50
)
print(f"\nPerformance:")
print(f" Latency: {results['avg_latency_ms']:.0f}ms")
print(f" Throughput: {results['tokens_per_second']:.1f} tokens/sec")
Quantization Comparison
Copy
from dataclasses import dataclass
from typing import Optional
import os
@dataclass
class QuantConfig:
"""Quantization configuration."""
name: str
bits: int
memory_factor: float # Relative to FP16
quality_factor: float # Relative to FP16
# Common GGUF quantization levels
QUANT_CONFIGS = {
"Q2_K": QuantConfig("Q2_K", 2, 0.125, 0.85),
"Q3_K_S": QuantConfig("Q3_K_S", 3, 0.19, 0.88),
"Q3_K_M": QuantConfig("Q3_K_M", 3, 0.21, 0.90),
"Q4_0": QuantConfig("Q4_0", 4, 0.25, 0.92),
"Q4_K_M": QuantConfig("Q4_K_M", 4, 0.27, 0.94),
"Q5_K_M": QuantConfig("Q5_K_M", 5, 0.34, 0.96),
"Q6_K": QuantConfig("Q6_K", 6, 0.39, 0.98),
"Q8_0": QuantConfig("Q8_0", 8, 0.50, 0.99),
"F16": QuantConfig("F16", 16, 1.0, 1.0),
}
def estimate_memory(
model_params_b: float,
quant_type: str
) -> dict:
"""Estimate memory requirements for quantized model."""
config = QUANT_CONFIGS.get(quant_type)
if not config:
raise ValueError(f"Unknown quantization type: {quant_type}")
# FP16 baseline: ~2 bytes per parameter
fp16_size_gb = model_params_b * 2 / 1024
quantized_size_gb = fp16_size_gb * config.memory_factor
# Add context memory (rough estimate)
context_memory_gb = 0.5 # Variable based on context length
return {
"model_size_gb": quantized_size_gb,
"total_memory_gb": quantized_size_gb + context_memory_gb,
"quality_factor": config.quality_factor,
"bits": config.bits
}
def select_quantization(
model_params_b: float,
available_memory_gb: float,
min_quality: float = 0.90
) -> list[str]:
"""Select viable quantization options for given constraints."""
viable = []
for name, config in QUANT_CONFIGS.items():
estimate = estimate_memory(model_params_b, name)
if (estimate["total_memory_gb"] <= available_memory_gb and
estimate["quality_factor"] >= min_quality):
viable.append({
"quant_type": name,
"memory_gb": estimate["total_memory_gb"],
"quality": estimate["quality_factor"]
})
return sorted(viable, key=lambda x: x["quality"], reverse=True)
# Usage
# For a 7B parameter model
options = select_quantization(
model_params_b=7,
available_memory_gb=8, # 8GB GPU
min_quality=0.90
)
print("Viable quantization options:")
for opt in options:
print(f" {opt['quant_type']}: {opt['memory_gb']:.1f}GB, quality: {opt['quality']:.0%}")
vLLM Serving
Basic vLLM Setup
Copy
from vllm import LLM, SamplingParams
def create_vllm_engine(
model: str,
tensor_parallel_size: int = 1,
gpu_memory_utilization: float = 0.9,
max_model_len: int = 4096
) -> LLM:
"""Create vLLM engine for high-throughput inference."""
return LLM(
model=model,
tensor_parallel_size=tensor_parallel_size,
gpu_memory_utilization=gpu_memory_utilization,
max_model_len=max_model_len
)
def batch_generate(
engine: LLM,
prompts: list[str],
max_tokens: int = 256,
temperature: float = 0.7,
top_p: float = 0.9
) -> list[str]:
"""Generate completions for multiple prompts."""
sampling_params = SamplingParams(
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p
)
outputs = engine.generate(prompts, sampling_params)
return [output.outputs[0].text for output in outputs]
# Usage
engine = create_vllm_engine(
model="mistralai/Mistral-7B-Instruct-v0.1",
tensor_parallel_size=1,
gpu_memory_utilization=0.85
)
prompts = [
"Explain machine learning:",
"What is Python?",
"How does the internet work?",
"Describe cloud computing:",
]
responses = batch_generate(engine, prompts, max_tokens=100)
for prompt, response in zip(prompts, responses):
print(f"Q: {prompt}")
print(f"A: {response[:100]}...")
print()
vLLM API Server
Copy
# Launch vLLM server (run in terminal):
# python -m vllm.entrypoints.openai.api_server \
# --model mistralai/Mistral-7B-Instruct-v0.1 \
# --host 0.0.0.0 \
# --port 8000
from openai import OpenAI
def create_vllm_client(base_url: str = "http://localhost:8000/v1"):
"""Create client for vLLM server (OpenAI-compatible)."""
return OpenAI(
base_url=base_url,
api_key="not-needed" # vLLM doesn't require API key
)
def query_vllm(
client: OpenAI,
prompt: str,
model: str = "mistralai/Mistral-7B-Instruct-v0.1",
max_tokens: int = 256
) -> str:
"""Query vLLM server."""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=0.7
)
return response.choices[0].message.content
async def batch_query_vllm(
client: OpenAI,
prompts: list[str],
model: str = "mistralai/Mistral-7B-Instruct-v0.1"
) -> list[str]:
"""Batch query vLLM server with async requests."""
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(
base_url=client.base_url,
api_key="not-needed"
)
async def single_query(prompt: str) -> str:
response = await async_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=256
)
return response.choices[0].message.content
tasks = [single_query(p) for p in prompts]
return await asyncio.gather(*tasks)
# Usage
client = create_vllm_client()
# Single query
response = query_vllm(client, "What is machine learning?")
print(response)
# Batch query
import asyncio
prompts = ["Explain AI:", "What is Python?", "Define cloud computing:"]
responses = asyncio.run(batch_query_vllm(client, prompts))
for r in responses:
print(r[:100], "...")
Speculative Decoding
Copy
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class SpeculativeConfig:
"""Configuration for speculative decoding."""
draft_model: str
target_model: str
num_speculative_tokens: int = 4
acceptance_threshold: float = 0.9
class SpeculativeDecoder:
"""Speculative decoding for faster inference."""
def __init__(
self,
draft_model, # Small, fast model
target_model, # Large, accurate model
num_speculative_tokens: int = 4
):
self.draft = draft_model
self.target = target_model
self.k = num_speculative_tokens
def generate(
self,
prompt: str,
max_tokens: int = 100
) -> dict:
"""Generate with speculative decoding."""
generated = []
total_draft_tokens = 0
total_accepted = 0
current_prompt = prompt
while len(generated) < max_tokens:
# Draft: generate k tokens with small model
draft_tokens = self._draft_generate(current_prompt, self.k)
total_draft_tokens += len(draft_tokens)
# Target: verify draft tokens
accepted, correction = self._target_verify(
current_prompt,
draft_tokens
)
total_accepted += accepted
if correction:
generated.extend(draft_tokens[:accepted])
generated.append(correction)
current_prompt = prompt + "".join(generated)
else:
generated.extend(draft_tokens)
current_prompt = prompt + "".join(generated)
# Check for end
if len(generated) >= max_tokens:
break
return {
"text": "".join(generated[:max_tokens]),
"acceptance_rate": total_accepted / total_draft_tokens if total_draft_tokens else 0,
"tokens_generated": len(generated[:max_tokens])
}
def _draft_generate(self, prompt: str, k: int) -> list[str]:
"""Generate k tokens with draft model."""
# Simplified - actual implementation uses proper tokenization
output = self.draft(prompt, max_tokens=k)
text = output.get("text", "")
return list(text) # Simplified token representation
def _target_verify(
self,
prompt: str,
draft_tokens: list[str]
) -> tuple[int, Optional[str]]:
"""Verify draft tokens with target model."""
# In real implementation, compute log probs for each position
# Accept tokens where P_target / P_draft > threshold
full_text = prompt + "".join(draft_tokens)
target_output = self.target(full_text, max_tokens=1)
# Simplified acceptance logic
accepted = len(draft_tokens) - 1 # Accept most
correction = target_output.get("text", "")[:1] if target_output else None
return accepted, correction
# Conceptual usage (requires actual model instances)
"""
from llama_cpp import Llama
draft = Llama("models/tiny-llama-1B.gguf", n_ctx=2048)
target = Llama("models/llama-7B.gguf", n_ctx=2048)
decoder = SpeculativeDecoder(draft, target, num_speculative_tokens=4)
result = decoder.generate("Write a story:", max_tokens=200)
print(f"Acceptance rate: {result['acceptance_rate']:.0%}")
print(result['text'])
"""
KV Cache Optimization
Copy
from dataclasses import dataclass, field
import hashlib
from typing import Optional
@dataclass
class CacheEntry:
"""A cached KV state."""
key: str
prefix: str
kv_state: bytes # Serialized KV cache
timestamp: float
size_bytes: int
class KVCacheManager:
"""Manage KV cache for prefix reuse."""
def __init__(self, max_cache_size_gb: float = 4.0):
self.cache: dict[str, CacheEntry] = {}
self.max_size = max_cache_size_gb * 1024 * 1024 * 1024
self.current_size = 0
def _compute_key(self, prefix: str) -> str:
"""Compute cache key for prefix."""
return hashlib.md5(prefix.encode()).hexdigest()
def get(self, prefix: str) -> Optional[CacheEntry]:
"""Get cached KV state for prefix."""
key = self._compute_key(prefix)
return self.cache.get(key)
def put(
self,
prefix: str,
kv_state: bytes,
timestamp: float
):
"""Cache KV state for prefix."""
key = self._compute_key(prefix)
size = len(kv_state)
# Evict if needed
while self.current_size + size > self.max_size and self.cache:
self._evict_lru()
entry = CacheEntry(
key=key,
prefix=prefix,
kv_state=kv_state,
timestamp=timestamp,
size_bytes=size
)
self.cache[key] = entry
self.current_size += size
def _evict_lru(self):
"""Evict least recently used entry."""
if not self.cache:
return
oldest_key = min(
self.cache.keys(),
key=lambda k: self.cache[k].timestamp
)
entry = self.cache.pop(oldest_key)
self.current_size -= entry.size_bytes
def find_longest_prefix_match(self, text: str) -> Optional[CacheEntry]:
"""Find cached entry with longest matching prefix."""
best_match = None
best_length = 0
for entry in self.cache.values():
if text.startswith(entry.prefix) and len(entry.prefix) > best_length:
best_match = entry
best_length = len(entry.prefix)
return best_match
class OptimizedInference:
"""Inference with KV cache optimization."""
def __init__(self, model, cache_manager: KVCacheManager):
self.model = model
self.cache = cache_manager
def generate(
self,
prompt: str,
max_tokens: int = 100,
use_cache: bool = True
) -> dict:
"""Generate with KV cache reuse."""
import time
cache_hit = False
cached_prefix_len = 0
if use_cache:
# Check for prefix match
match = self.cache.find_longest_prefix_match(prompt)
if match:
cache_hit = True
cached_prefix_len = len(match.prefix)
# Load KV state from cache
# self.model.load_kv_state(match.kv_state)
start = time.time()
# Generate (would start from cached position)
output = self.model(prompt, max_tokens=max_tokens)
elapsed = time.time() - start
# Cache the new KV state
if use_cache and not cache_hit:
# kv_state = self.model.get_kv_state()
kv_state = b"" # Placeholder
self.cache.put(prompt, kv_state, time.time())
return {
"text": output.get("text", ""),
"cache_hit": cache_hit,
"cached_prefix_len": cached_prefix_len,
"latency_ms": elapsed * 1000
}
# Usage pattern
"""
cache_mgr = KVCacheManager(max_cache_size_gb=2.0)
inference = OptimizedInference(model, cache_mgr)
# First call - caches the system prompt
result = inference.generate("You are a helpful assistant. User: Hello")
print(f"Cache hit: {result['cache_hit']}") # False
# Second call - reuses cached system prompt
result = inference.generate("You are a helpful assistant. User: How are you?")
print(f"Cache hit: {result['cache_hit']}") # True
print(f"Cached prefix: {result['cached_prefix_len']} chars")
"""
Batch Processing
Copy
import asyncio
from dataclasses import dataclass
from typing import List, Optional
import time
@dataclass
class BatchRequest:
"""A request in a batch."""
id: str
prompt: str
max_tokens: int = 100
priority: int = 0
@dataclass
class BatchResponse:
"""Response for a batch request."""
id: str
text: str
latency_ms: float
class DynamicBatcher:
"""Dynamic batching for inference requests."""
def __init__(
self,
model,
max_batch_size: int = 8,
max_wait_ms: float = 50.0
):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending: List[BatchRequest] = []
self.results: dict[str, BatchResponse] = {}
self._lock = asyncio.Lock()
async def add_request(self, request: BatchRequest) -> BatchResponse:
"""Add request and wait for result."""
async with self._lock:
self.pending.append(request)
# Wait for batch processing
while request.id not in self.results:
await asyncio.sleep(0.001)
return self.results.pop(request.id)
async def process_batches(self):
"""Background task to process batches."""
while True:
batch = []
wait_start = time.time()
# Collect requests for batch
while len(batch) < self.max_batch_size:
async with self._lock:
if self.pending:
# Sort by priority
self.pending.sort(key=lambda r: r.priority, reverse=True)
batch.append(self.pending.pop(0))
# Check wait timeout
elapsed = (time.time() - wait_start) * 1000
if elapsed >= self.max_wait_ms and batch:
break
if not batch:
await asyncio.sleep(0.001)
if batch:
await self._process_batch(batch)
async def _process_batch(self, batch: List[BatchRequest]):
"""Process a batch of requests."""
prompts = [r.prompt for r in batch]
max_tokens = max(r.max_tokens for r in batch)
start = time.time()
# Batch inference (model-specific implementation)
outputs = self._batch_generate(prompts, max_tokens)
elapsed_ms = (time.time() - start) * 1000
per_request_ms = elapsed_ms / len(batch)
# Store results
for i, request in enumerate(batch):
self.results[request.id] = BatchResponse(
id=request.id,
text=outputs[i] if i < len(outputs) else "",
latency_ms=per_request_ms
)
def _batch_generate(
self,
prompts: List[str],
max_tokens: int
) -> List[str]:
"""Generate for batch of prompts."""
# Actual implementation depends on model
return [f"Response to: {p[:20]}" for p in prompts]
# Usage
"""
async def main():
batcher = DynamicBatcher(model, max_batch_size=4, max_wait_ms=10)
# Start batch processor
asyncio.create_task(batcher.process_batches())
# Submit requests
requests = [
BatchRequest(id=f"req_{i}", prompt=f"Question {i}:", priority=i % 3)
for i in range(10)
]
# Process concurrently
tasks = [batcher.add_request(r) for r in requests]
responses = await asyncio.gather(*tasks)
for resp in responses:
print(f"{resp.id}: {resp.latency_ms:.1f}ms")
asyncio.run(main())
"""
Memory Optimization
Copy
import gc
import torch
from typing import Optional
class MemoryOptimizer:
"""Optimize GPU memory for model inference."""
def __init__(self, device: str = "cuda"):
self.device = device
def get_memory_stats(self) -> dict:
"""Get current GPU memory statistics."""
if not torch.cuda.is_available():
return {"error": "CUDA not available"}
allocated = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
max_allocated = torch.cuda.max_memory_allocated() / 1024**3
return {
"allocated_gb": allocated,
"reserved_gb": reserved,
"max_allocated_gb": max_allocated,
"free_gb": reserved - allocated
}
def clear_cache(self):
"""Clear GPU cache and run garbage collection."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
def optimize_for_inference(self, model):
"""Apply inference optimizations to model."""
model.eval()
# Disable gradient computation
for param in model.parameters():
param.requires_grad = False
# Use inference mode
torch.set_grad_enabled(False)
return model
def enable_gradient_checkpointing(self, model) -> None:
"""Enable gradient checkpointing to save memory."""
if hasattr(model, "gradient_checkpointing_enable"):
model.gradient_checkpointing_enable()
def profile_inference(
self,
model,
sample_input,
iterations: int = 10
) -> dict:
"""Profile memory usage during inference."""
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
self.clear_cache()
initial_memory = self.get_memory_stats()
times = []
for _ in range(iterations):
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
with torch.no_grad():
_ = model(sample_input)
end.record()
torch.cuda.synchronize()
times.append(start.elapsed_time(end))
final_memory = self.get_memory_stats()
return {
"avg_latency_ms": sum(times) / len(times),
"min_latency_ms": min(times),
"max_latency_ms": max(times),
"memory_used_gb": final_memory["max_allocated_gb"],
"memory_increase_gb": (
final_memory["max_allocated_gb"] -
initial_memory["allocated_gb"]
)
}
# Usage
optimizer = MemoryOptimizer()
# Check initial memory
print("Memory stats:", optimizer.get_memory_stats())
# Clear cache
optimizer.clear_cache()
# Profile model (with actual model)
"""
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(
"mistralai/Mistral-7B-v0.1",
torch_dtype=torch.float16,
device_map="auto"
)
model = optimizer.optimize_for_inference(model)
profile = optimizer.profile_inference(
model,
torch.randint(0, 1000, (1, 512)).cuda(),
iterations=5
)
print(profile)
"""
Optimization Trade-offs
- Quantization reduces memory and improves speed at cost of quality
- Batching increases throughput but adds latency
- KV caching speeds up repeated prefixes
- Speculative decoding works best when draft model matches target well
- Always measure actual performance on your workload
Practice Exercise
Build an optimized inference service that:- Supports multiple quantization levels
- Implements dynamic batching
- Uses KV cache for prefix reuse
- Monitors and optimizes memory usage
- Benchmarks throughput and latency
- Balancing speed vs quality tradeoffs
- Efficient memory utilization
- Production-ready batching
- Meaningful performance metrics