Skip to main content
Model optimization is crucial for production deployments. This chapter covers techniques for reducing latency, memory usage, and cost while maintaining quality.

Quantization

GGUF and llama.cpp

from llama_cpp import Llama
import time


def load_quantized_model(
    model_path: str,
    n_ctx: int = 2048,
    n_gpu_layers: int = -1
) -> Llama:
    """Load a quantized GGUF model."""
    return Llama(
        model_path=model_path,
        n_ctx=n_ctx,
        n_gpu_layers=n_gpu_layers,  # -1 = all layers on GPU
        verbose=False
    )


def benchmark_inference(
    model: Llama,
    prompt: str,
    max_tokens: int = 100,
    iterations: int = 5
) -> dict:
    """Benchmark model inference speed."""
    times = []
    tokens_generated = []
    
    for _ in range(iterations):
        start = time.time()
        
        output = model(
            prompt,
            max_tokens=max_tokens,
            temperature=0.7,
            echo=False
        )
        
        elapsed = time.time() - start
        times.append(elapsed)
        
        tokens = len(output["choices"][0]["text"].split())
        tokens_generated.append(tokens)
    
    avg_time = sum(times) / len(times)
    avg_tokens = sum(tokens_generated) / len(tokens_generated)
    
    return {
        "avg_latency_ms": avg_time * 1000,
        "avg_tokens": avg_tokens,
        "tokens_per_second": avg_tokens / avg_time,
        "iterations": iterations
    }


# Usage
model = load_quantized_model(
    "models/mistral-7b-instruct-v0.1.Q4_K_M.gguf",
    n_ctx=4096,
    n_gpu_layers=35
)

# Simple inference
response = model(
    "Explain quantum computing in simple terms:",
    max_tokens=150,
    temperature=0.7
)

print(response["choices"][0]["text"])

# Benchmark
results = benchmark_inference(
    model,
    "Write a haiku about programming:",
    max_tokens=50
)

print(f"\nPerformance:")
print(f"  Latency: {results['avg_latency_ms']:.0f}ms")
print(f"  Throughput: {results['tokens_per_second']:.1f} tokens/sec")

Quantization Comparison

from dataclasses import dataclass
from typing import Optional
import os


@dataclass
class QuantConfig:
    """Quantization configuration."""
    name: str
    bits: int
    memory_factor: float  # Relative to FP16
    quality_factor: float  # Relative to FP16


# Common GGUF quantization levels
QUANT_CONFIGS = {
    "Q2_K": QuantConfig("Q2_K", 2, 0.125, 0.85),
    "Q3_K_S": QuantConfig("Q3_K_S", 3, 0.19, 0.88),
    "Q3_K_M": QuantConfig("Q3_K_M", 3, 0.21, 0.90),
    "Q4_0": QuantConfig("Q4_0", 4, 0.25, 0.92),
    "Q4_K_M": QuantConfig("Q4_K_M", 4, 0.27, 0.94),
    "Q5_K_M": QuantConfig("Q5_K_M", 5, 0.34, 0.96),
    "Q6_K": QuantConfig("Q6_K", 6, 0.39, 0.98),
    "Q8_0": QuantConfig("Q8_0", 8, 0.50, 0.99),
    "F16": QuantConfig("F16", 16, 1.0, 1.0),
}


def estimate_memory(
    model_params_b: float,
    quant_type: str
) -> dict:
    """Estimate memory requirements for quantized model."""
    config = QUANT_CONFIGS.get(quant_type)
    if not config:
        raise ValueError(f"Unknown quantization type: {quant_type}")
    
    # FP16 baseline: ~2 bytes per parameter
    fp16_size_gb = model_params_b * 2 / 1024
    quantized_size_gb = fp16_size_gb * config.memory_factor
    
    # Add context memory (rough estimate)
    context_memory_gb = 0.5  # Variable based on context length
    
    return {
        "model_size_gb": quantized_size_gb,
        "total_memory_gb": quantized_size_gb + context_memory_gb,
        "quality_factor": config.quality_factor,
        "bits": config.bits
    }


def select_quantization(
    model_params_b: float,
    available_memory_gb: float,
    min_quality: float = 0.90
) -> list[str]:
    """Select viable quantization options for given constraints."""
    viable = []
    
    for name, config in QUANT_CONFIGS.items():
        estimate = estimate_memory(model_params_b, name)
        
        if (estimate["total_memory_gb"] <= available_memory_gb and 
            estimate["quality_factor"] >= min_quality):
            viable.append({
                "quant_type": name,
                "memory_gb": estimate["total_memory_gb"],
                "quality": estimate["quality_factor"]
            })
    
    return sorted(viable, key=lambda x: x["quality"], reverse=True)


# Usage
# For a 7B parameter model
options = select_quantization(
    model_params_b=7,
    available_memory_gb=8,  # 8GB GPU
    min_quality=0.90
)

print("Viable quantization options:")
for opt in options:
    print(f"  {opt['quant_type']}: {opt['memory_gb']:.1f}GB, quality: {opt['quality']:.0%}")

vLLM Serving

Basic vLLM Setup

from vllm import LLM, SamplingParams


def create_vllm_engine(
    model: str,
    tensor_parallel_size: int = 1,
    gpu_memory_utilization: float = 0.9,
    max_model_len: int = 4096
) -> LLM:
    """Create vLLM engine for high-throughput inference."""
    return LLM(
        model=model,
        tensor_parallel_size=tensor_parallel_size,
        gpu_memory_utilization=gpu_memory_utilization,
        max_model_len=max_model_len
    )


def batch_generate(
    engine: LLM,
    prompts: list[str],
    max_tokens: int = 256,
    temperature: float = 0.7,
    top_p: float = 0.9
) -> list[str]:
    """Generate completions for multiple prompts."""
    sampling_params = SamplingParams(
        max_tokens=max_tokens,
        temperature=temperature,
        top_p=top_p
    )
    
    outputs = engine.generate(prompts, sampling_params)
    
    return [output.outputs[0].text for output in outputs]


# Usage
engine = create_vllm_engine(
    model="mistralai/Mistral-7B-Instruct-v0.1",
    tensor_parallel_size=1,
    gpu_memory_utilization=0.85
)

prompts = [
    "Explain machine learning:",
    "What is Python?",
    "How does the internet work?",
    "Describe cloud computing:",
]

responses = batch_generate(engine, prompts, max_tokens=100)

for prompt, response in zip(prompts, responses):
    print(f"Q: {prompt}")
    print(f"A: {response[:100]}...")
    print()

vLLM API Server

# Launch vLLM server (run in terminal):
# python -m vllm.entrypoints.openai.api_server \
#     --model mistralai/Mistral-7B-Instruct-v0.1 \
#     --host 0.0.0.0 \
#     --port 8000


from openai import OpenAI


def create_vllm_client(base_url: str = "http://localhost:8000/v1"):
    """Create client for vLLM server (OpenAI-compatible)."""
    return OpenAI(
        base_url=base_url,
        api_key="not-needed"  # vLLM doesn't require API key
    )


def query_vllm(
    client: OpenAI,
    prompt: str,
    model: str = "mistralai/Mistral-7B-Instruct-v0.1",
    max_tokens: int = 256
) -> str:
    """Query vLLM server."""
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=max_tokens,
        temperature=0.7
    )
    
    return response.choices[0].message.content


async def batch_query_vllm(
    client: OpenAI,
    prompts: list[str],
    model: str = "mistralai/Mistral-7B-Instruct-v0.1"
) -> list[str]:
    """Batch query vLLM server with async requests."""
    import asyncio
    from openai import AsyncOpenAI
    
    async_client = AsyncOpenAI(
        base_url=client.base_url,
        api_key="not-needed"
    )
    
    async def single_query(prompt: str) -> str:
        response = await async_client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=256
        )
        return response.choices[0].message.content
    
    tasks = [single_query(p) for p in prompts]
    return await asyncio.gather(*tasks)


# Usage
client = create_vllm_client()

# Single query
response = query_vllm(client, "What is machine learning?")
print(response)

# Batch query
import asyncio

prompts = ["Explain AI:", "What is Python?", "Define cloud computing:"]
responses = asyncio.run(batch_query_vllm(client, prompts))
for r in responses:
    print(r[:100], "...")

Speculative Decoding

from dataclasses import dataclass
from typing import Optional
import time


@dataclass
class SpeculativeConfig:
    """Configuration for speculative decoding."""
    draft_model: str
    target_model: str
    num_speculative_tokens: int = 4
    acceptance_threshold: float = 0.9


class SpeculativeDecoder:
    """Speculative decoding for faster inference."""
    
    def __init__(
        self,
        draft_model,  # Small, fast model
        target_model,  # Large, accurate model
        num_speculative_tokens: int = 4
    ):
        self.draft = draft_model
        self.target = target_model
        self.k = num_speculative_tokens
        
    def generate(
        self,
        prompt: str,
        max_tokens: int = 100
    ) -> dict:
        """Generate with speculative decoding."""
        generated = []
        total_draft_tokens = 0
        total_accepted = 0
        
        current_prompt = prompt
        
        while len(generated) < max_tokens:
            # Draft: generate k tokens with small model
            draft_tokens = self._draft_generate(current_prompt, self.k)
            total_draft_tokens += len(draft_tokens)
            
            # Target: verify draft tokens
            accepted, correction = self._target_verify(
                current_prompt,
                draft_tokens
            )
            
            total_accepted += accepted
            
            if correction:
                generated.extend(draft_tokens[:accepted])
                generated.append(correction)
                current_prompt = prompt + "".join(generated)
            else:
                generated.extend(draft_tokens)
                current_prompt = prompt + "".join(generated)
            
            # Check for end
            if len(generated) >= max_tokens:
                break
        
        return {
            "text": "".join(generated[:max_tokens]),
            "acceptance_rate": total_accepted / total_draft_tokens if total_draft_tokens else 0,
            "tokens_generated": len(generated[:max_tokens])
        }
    
    def _draft_generate(self, prompt: str, k: int) -> list[str]:
        """Generate k tokens with draft model."""
        # Simplified - actual implementation uses proper tokenization
        output = self.draft(prompt, max_tokens=k)
        text = output.get("text", "")
        return list(text)  # Simplified token representation
    
    def _target_verify(
        self,
        prompt: str,
        draft_tokens: list[str]
    ) -> tuple[int, Optional[str]]:
        """Verify draft tokens with target model."""
        # In real implementation, compute log probs for each position
        # Accept tokens where P_target / P_draft > threshold
        
        full_text = prompt + "".join(draft_tokens)
        target_output = self.target(full_text, max_tokens=1)
        
        # Simplified acceptance logic
        accepted = len(draft_tokens) - 1  # Accept most
        correction = target_output.get("text", "")[:1] if target_output else None
        
        return accepted, correction


# Conceptual usage (requires actual model instances)
"""
from llama_cpp import Llama

draft = Llama("models/tiny-llama-1B.gguf", n_ctx=2048)
target = Llama("models/llama-7B.gguf", n_ctx=2048)

decoder = SpeculativeDecoder(draft, target, num_speculative_tokens=4)
result = decoder.generate("Write a story:", max_tokens=200)

print(f"Acceptance rate: {result['acceptance_rate']:.0%}")
print(result['text'])
"""

KV Cache Optimization

from dataclasses import dataclass, field
import hashlib
from typing import Optional


@dataclass
class CacheEntry:
    """A cached KV state."""
    key: str
    prefix: str
    kv_state: bytes  # Serialized KV cache
    timestamp: float
    size_bytes: int


class KVCacheManager:
    """Manage KV cache for prefix reuse."""
    
    def __init__(self, max_cache_size_gb: float = 4.0):
        self.cache: dict[str, CacheEntry] = {}
        self.max_size = max_cache_size_gb * 1024 * 1024 * 1024
        self.current_size = 0
    
    def _compute_key(self, prefix: str) -> str:
        """Compute cache key for prefix."""
        return hashlib.md5(prefix.encode()).hexdigest()
    
    def get(self, prefix: str) -> Optional[CacheEntry]:
        """Get cached KV state for prefix."""
        key = self._compute_key(prefix)
        return self.cache.get(key)
    
    def put(
        self,
        prefix: str,
        kv_state: bytes,
        timestamp: float
    ):
        """Cache KV state for prefix."""
        key = self._compute_key(prefix)
        size = len(kv_state)
        
        # Evict if needed
        while self.current_size + size > self.max_size and self.cache:
            self._evict_lru()
        
        entry = CacheEntry(
            key=key,
            prefix=prefix,
            kv_state=kv_state,
            timestamp=timestamp,
            size_bytes=size
        )
        
        self.cache[key] = entry
        self.current_size += size
    
    def _evict_lru(self):
        """Evict least recently used entry."""
        if not self.cache:
            return
        
        oldest_key = min(
            self.cache.keys(),
            key=lambda k: self.cache[k].timestamp
        )
        
        entry = self.cache.pop(oldest_key)
        self.current_size -= entry.size_bytes
    
    def find_longest_prefix_match(self, text: str) -> Optional[CacheEntry]:
        """Find cached entry with longest matching prefix."""
        best_match = None
        best_length = 0
        
        for entry in self.cache.values():
            if text.startswith(entry.prefix) and len(entry.prefix) > best_length:
                best_match = entry
                best_length = len(entry.prefix)
        
        return best_match


class OptimizedInference:
    """Inference with KV cache optimization."""
    
    def __init__(self, model, cache_manager: KVCacheManager):
        self.model = model
        self.cache = cache_manager
    
    def generate(
        self,
        prompt: str,
        max_tokens: int = 100,
        use_cache: bool = True
    ) -> dict:
        """Generate with KV cache reuse."""
        import time
        
        cache_hit = False
        cached_prefix_len = 0
        
        if use_cache:
            # Check for prefix match
            match = self.cache.find_longest_prefix_match(prompt)
            if match:
                cache_hit = True
                cached_prefix_len = len(match.prefix)
                # Load KV state from cache
                # self.model.load_kv_state(match.kv_state)
        
        start = time.time()
        
        # Generate (would start from cached position)
        output = self.model(prompt, max_tokens=max_tokens)
        
        elapsed = time.time() - start
        
        # Cache the new KV state
        if use_cache and not cache_hit:
            # kv_state = self.model.get_kv_state()
            kv_state = b""  # Placeholder
            self.cache.put(prompt, kv_state, time.time())
        
        return {
            "text": output.get("text", ""),
            "cache_hit": cache_hit,
            "cached_prefix_len": cached_prefix_len,
            "latency_ms": elapsed * 1000
        }


# Usage pattern
"""
cache_mgr = KVCacheManager(max_cache_size_gb=2.0)
inference = OptimizedInference(model, cache_mgr)

# First call - caches the system prompt
result = inference.generate("You are a helpful assistant. User: Hello")
print(f"Cache hit: {result['cache_hit']}")  # False

# Second call - reuses cached system prompt
result = inference.generate("You are a helpful assistant. User: How are you?")
print(f"Cache hit: {result['cache_hit']}")  # True
print(f"Cached prefix: {result['cached_prefix_len']} chars")
"""

Batch Processing

import asyncio
from dataclasses import dataclass
from typing import List, Optional
import time


@dataclass
class BatchRequest:
    """A request in a batch."""
    id: str
    prompt: str
    max_tokens: int = 100
    priority: int = 0


@dataclass
class BatchResponse:
    """Response for a batch request."""
    id: str
    text: str
    latency_ms: float


class DynamicBatcher:
    """Dynamic batching for inference requests."""
    
    def __init__(
        self,
        model,
        max_batch_size: int = 8,
        max_wait_ms: float = 50.0
    ):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.pending: List[BatchRequest] = []
        self.results: dict[str, BatchResponse] = {}
        self._lock = asyncio.Lock()
    
    async def add_request(self, request: BatchRequest) -> BatchResponse:
        """Add request and wait for result."""
        async with self._lock:
            self.pending.append(request)
        
        # Wait for batch processing
        while request.id not in self.results:
            await asyncio.sleep(0.001)
        
        return self.results.pop(request.id)
    
    async def process_batches(self):
        """Background task to process batches."""
        while True:
            batch = []
            wait_start = time.time()
            
            # Collect requests for batch
            while len(batch) < self.max_batch_size:
                async with self._lock:
                    if self.pending:
                        # Sort by priority
                        self.pending.sort(key=lambda r: r.priority, reverse=True)
                        batch.append(self.pending.pop(0))
                
                # Check wait timeout
                elapsed = (time.time() - wait_start) * 1000
                if elapsed >= self.max_wait_ms and batch:
                    break
                
                if not batch:
                    await asyncio.sleep(0.001)
            
            if batch:
                await self._process_batch(batch)
    
    async def _process_batch(self, batch: List[BatchRequest]):
        """Process a batch of requests."""
        prompts = [r.prompt for r in batch]
        max_tokens = max(r.max_tokens for r in batch)
        
        start = time.time()
        
        # Batch inference (model-specific implementation)
        outputs = self._batch_generate(prompts, max_tokens)
        
        elapsed_ms = (time.time() - start) * 1000
        per_request_ms = elapsed_ms / len(batch)
        
        # Store results
        for i, request in enumerate(batch):
            self.results[request.id] = BatchResponse(
                id=request.id,
                text=outputs[i] if i < len(outputs) else "",
                latency_ms=per_request_ms
            )
    
    def _batch_generate(
        self,
        prompts: List[str],
        max_tokens: int
    ) -> List[str]:
        """Generate for batch of prompts."""
        # Actual implementation depends on model
        return [f"Response to: {p[:20]}" for p in prompts]


# Usage
"""
async def main():
    batcher = DynamicBatcher(model, max_batch_size=4, max_wait_ms=10)
    
    # Start batch processor
    asyncio.create_task(batcher.process_batches())
    
    # Submit requests
    requests = [
        BatchRequest(id=f"req_{i}", prompt=f"Question {i}:", priority=i % 3)
        for i in range(10)
    ]
    
    # Process concurrently
    tasks = [batcher.add_request(r) for r in requests]
    responses = await asyncio.gather(*tasks)
    
    for resp in responses:
        print(f"{resp.id}: {resp.latency_ms:.1f}ms")

asyncio.run(main())
"""

Memory Optimization

import gc
import torch
from typing import Optional


class MemoryOptimizer:
    """Optimize GPU memory for model inference."""
    
    def __init__(self, device: str = "cuda"):
        self.device = device
    
    def get_memory_stats(self) -> dict:
        """Get current GPU memory statistics."""
        if not torch.cuda.is_available():
            return {"error": "CUDA not available"}
        
        allocated = torch.cuda.memory_allocated() / 1024**3
        reserved = torch.cuda.memory_reserved() / 1024**3
        max_allocated = torch.cuda.max_memory_allocated() / 1024**3
        
        return {
            "allocated_gb": allocated,
            "reserved_gb": reserved,
            "max_allocated_gb": max_allocated,
            "free_gb": reserved - allocated
        }
    
    def clear_cache(self):
        """Clear GPU cache and run garbage collection."""
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            torch.cuda.synchronize()
    
    def optimize_for_inference(self, model):
        """Apply inference optimizations to model."""
        model.eval()
        
        # Disable gradient computation
        for param in model.parameters():
            param.requires_grad = False
        
        # Use inference mode
        torch.set_grad_enabled(False)
        
        return model
    
    def enable_gradient_checkpointing(self, model) -> None:
        """Enable gradient checkpointing to save memory."""
        if hasattr(model, "gradient_checkpointing_enable"):
            model.gradient_checkpointing_enable()
    
    def profile_inference(
        self,
        model,
        sample_input,
        iterations: int = 10
    ) -> dict:
        """Profile memory usage during inference."""
        if torch.cuda.is_available():
            torch.cuda.reset_peak_memory_stats()
        
        self.clear_cache()
        initial_memory = self.get_memory_stats()
        
        times = []
        for _ in range(iterations):
            start = torch.cuda.Event(enable_timing=True)
            end = torch.cuda.Event(enable_timing=True)
            
            start.record()
            with torch.no_grad():
                _ = model(sample_input)
            end.record()
            
            torch.cuda.synchronize()
            times.append(start.elapsed_time(end))
        
        final_memory = self.get_memory_stats()
        
        return {
            "avg_latency_ms": sum(times) / len(times),
            "min_latency_ms": min(times),
            "max_latency_ms": max(times),
            "memory_used_gb": final_memory["max_allocated_gb"],
            "memory_increase_gb": (
                final_memory["max_allocated_gb"] - 
                initial_memory["allocated_gb"]
            )
        }


# Usage
optimizer = MemoryOptimizer()

# Check initial memory
print("Memory stats:", optimizer.get_memory_stats())

# Clear cache
optimizer.clear_cache()

# Profile model (with actual model)
"""
from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-v0.1",
    torch_dtype=torch.float16,
    device_map="auto"
)

model = optimizer.optimize_for_inference(model)

profile = optimizer.profile_inference(
    model, 
    torch.randint(0, 1000, (1, 512)).cuda(),
    iterations=5
)
print(profile)
"""
Optimization Trade-offs
  • Quantization reduces memory and improves speed at cost of quality
  • Batching increases throughput but adds latency
  • KV caching speeds up repeated prefixes
  • Speculative decoding works best when draft model matches target well
  • Always measure actual performance on your workload

Practice Exercise

Build an optimized inference service that:
  1. Supports multiple quantization levels
  2. Implements dynamic batching
  3. Uses KV cache for prefix reuse
  4. Monitors and optimizes memory usage
  5. Benchmarks throughput and latency
Focus on:
  • Balancing speed vs quality tradeoffs
  • Efficient memory utilization
  • Production-ready batching
  • Meaningful performance metrics