Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Why Vector Databases Matter

Every AI product with memory or search uses vector databases. ChatGPT’s memory, Notion AI, Perplexity’s search, GitHub Copilot’s context — all vector databases under the hood. A traditional database answers “find the row where name = ‘John’.” A vector database answers “find me things similar to this” — and that’s the fundamental operation behind semantic search, RAG, recommendations, and deduplication.
Industry Reality: Semantic search is replacing keyword search everywhere. Companies building AI features need engineers who understand vector databases. This is a must-have skill.

The Mental Model

The idea: convert text (or images, or audio) into a list of numbers (a vector) that captures meaning. Similar meanings produce similar vectors. Then use the database to find the closest vectors to your query.
Text → Embedding Model → [0.1, 0.3, -0.2, ...] → Vector Database

Query → Embedding Model → [0.12, 0.28, -0.18, ...] → Find Similar → Results
Think of it like a map of meaning. “vacation policy” and “PTO guidelines” end up near each other on this map even though they share no words. Vector databases are GPS for navigating that map efficiently — finding the nearest neighbors in milliseconds across millions of points.

Choosing Your Database

DatabaseBest ForScaleCostSetup
pgvectorExisting Postgres appsMillionsFree (self-host)Add extension
PineconeProduction, scaleBillions$70/month+Managed
ChromaDevelopment, prototypingThousandsFreepip install
WeaviateMulti-modal, GraphQLMillionsFree/ManagedDocker
QdrantSelf-hosted productionMillionsFreeDocker
Recommendation: Start with Chroma for development, migrate to pgvector or Pinecone for production.

Index Type Comparison

The index type determines how vectors are organized for fast search. This is the most impactful performance decision after choosing your database.
Index TypeBuild TimeQuery SpeedMemoryRecall@10Best For
Flat (brute force)NoneSlowestLow100% (exact)Under 50K vectors, benchmarks
IVFFlatFastGoodLow95-99%100K-5M vectors, balanced workloads
HNSWSlowFastestHigh (2-3x)97-99.5%Latency-critical production apps
DiskANN (managed DBs)MediumFastVery Low95-99%Billions of vectors, cost-sensitive
Decision framework:
  • Under 100K vectors: Flat or HNSW. Everything is fast at this scale.
  • 100K-10M vectors: HNSW for query-heavy workloads, IVFFlat if you need faster index rebuilds.
  • Over 10M vectors: Managed solution (Pinecone, Qdrant) that handles sharding automatically. Self-hosted pgvector becomes hard to tune.
  • Frequent inserts: IVFFlat handles inserts better than HNSW. HNSW graph rebuilds are expensive.

pgvector: Production-Ready PostgreSQL

Complete Setup

-- 1. Enable extension (requires PostgreSQL 11+)
CREATE EXTENSION vector;

-- 2. Create optimized table structure
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    content TEXT NOT NULL,
    embedding vector(1536),  -- Match your model's dimension
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 3. Create indexes -- this is where performance lives or dies
-- IVFFlat: Partitions vectors into "lists" (clusters). Searches only the 
-- nearest clusters instead of all vectors. Good balance of speed and accuracy.
-- Rule of thumb: lists = sqrt(num_rows). 1M rows -> lists = 1000.
CREATE INDEX idx_documents_embedding ON documents 
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- HNSW: Builds a multi-layer graph for faster traversal. Better query speed 
-- than IVFFlat but uses more memory and takes longer to build.
-- Use this if query latency matters more than insert speed (most production apps).
CREATE INDEX idx_documents_hnsw ON documents 
USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);

-- Metadata index for filtering -- critical for multi-tenant apps
CREATE INDEX idx_documents_metadata ON documents USING gin (metadata);

-- 4. Helper function for similarity search
CREATE OR REPLACE FUNCTION search_documents(
    query_embedding vector(1536),
    match_threshold float DEFAULT 0.7,
    match_count int DEFAULT 10,
    filter_metadata jsonb DEFAULT NULL
)
RETURNS TABLE (
    id UUID,
    content TEXT,
    metadata JSONB,
    similarity float
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        d.id,
        d.content,
        d.metadata,
        1 - (d.embedding <=> query_embedding) AS similarity
    FROM documents d
    WHERE 
        1 - (d.embedding <=> query_embedding) > match_threshold
        AND (filter_metadata IS NULL OR d.metadata @> filter_metadata)
    ORDER BY d.embedding <=> query_embedding
    LIMIT match_count;
END;
$$ LANGUAGE plpgsql;

Production Python Integration

import asyncio
from typing import Optional, List
from dataclasses import dataclass
import asyncpg
from openai import OpenAI
import json

@dataclass
class Document:
    id: str
    content: str
    metadata: dict
    similarity: float = 0.0

class VectorStore:
    """Production pgvector store with connection pooling.
    
    Key design decisions:
    - Connection pooling (asyncpg pool) -- creating a new connection per query
      adds 50-100ms. Pooling amortizes this to near zero.
    - Batch embedding -- OpenAI charges per API call, not per embedding. 
      Sending 100 texts in one call is 100x cheaper than 100 separate calls.
    - Metadata filtering in SQL -- filter before the vector search, not after,
      so the index only scans relevant partitions.
    """
    
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.pool: asyncpg.Pool = None
        self.openai = OpenAI()
    
    async def connect(self):
        """Initialize connection pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
    
    async def close(self):
        """Close connection pool"""
        if self.pool:
            await self.pool.close()
    
    def _get_embedding(self, text: str) -> List[float]:
        """Get embedding from OpenAI"""
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    async def add_document(
        self,
        content: str,
        metadata: dict = None
    ) -> str:
        """Add a single document"""
        embedding = self._get_embedding(content)
        
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                INSERT INTO documents (content, embedding, metadata)
                VALUES ($1, $2, $3)
                RETURNING id
            """, content, str(embedding), json.dumps(metadata or {}))
            
            return str(row['id'])
    
    async def add_documents_batch(
        self,
        documents: List[dict],
        batch_size: int = 100
    ) -> List[str]:
        """Add documents in batches for efficiency"""
        ids = []
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Get embeddings in batch (OpenAI supports up to 2048)
            texts = [doc['content'] for doc in batch]
            response = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=texts
            )
            embeddings = [e.embedding for e in response.data]
            
            # Insert batch
            async with self.pool.acquire() as conn:
                for doc, emb in zip(batch, embeddings):
                    row = await conn.fetchrow("""
                        INSERT INTO documents (content, embedding, metadata)
                        VALUES ($1, $2, $3)
                        RETURNING id
                    """, doc['content'], str(emb), json.dumps(doc.get('metadata', {})))
                    ids.append(str(row['id']))
        
        return ids
    
    async def search(
        self,
        query: str,
        limit: int = 10,
        threshold: float = 0.7,
        filter_metadata: dict = None
    ) -> List[Document]:
        """Semantic search with optional metadata filter"""
        query_embedding = self._get_embedding(query)
        
        sql = """
            SELECT 
                id::text,
                content,
                metadata,
                1 - (embedding <=> $1::vector) as similarity
            FROM documents
            WHERE 1 - (embedding <=> $1::vector) > $2
        """
        params = [str(query_embedding), threshold]
        
        if filter_metadata:
            sql += " AND metadata @> $3"
            params.append(json.dumps(filter_metadata))
        
        sql += f" ORDER BY embedding <=> $1::vector LIMIT {limit}"
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql, *params)
            
            return [
                Document(
                    id=row['id'],
                    content=row['content'],
                    metadata=json.loads(row['metadata']),
                    similarity=row['similarity']
                )
                for row in rows
            ]
    
    async def delete(self, document_id: str):
        """Delete a document"""
        async with self.pool.acquire() as conn:
            await conn.execute(
                "DELETE FROM documents WHERE id = $1",
                document_id
            )


# Usage
async def main():
    store = VectorStore("postgresql://user:pass@localhost/mydb")
    await store.connect()
    
    # Add documents
    await store.add_documents_batch([
        {"content": "Python is great for AI", "metadata": {"topic": "python"}},
        {"content": "JavaScript powers the web", "metadata": {"topic": "javascript"}},
        {"content": "PostgreSQL is a powerful database", "metadata": {"topic": "databases"}},
    ])
    
    # Search
    results = await store.search(
        "programming languages for machine learning",
        limit=5,
        filter_metadata={"topic": "python"}
    )
    
    for doc in results:
        print(f"{doc.similarity:.3f}: {doc.content}")
    
    await store.close()

asyncio.run(main())

Pinecone: Managed Scale

Complete Setup

from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
from typing import List, Optional
from dataclasses import dataclass
import hashlib
import time

@dataclass
class SearchResult:
    id: str
    content: str
    metadata: dict
    score: float

class PineconeVectorStore:
    """Production Pinecone integration"""
    
    def __init__(
        self,
        api_key: str,
        index_name: str,
        dimension: int = 1536,
        metric: str = "cosine"
    ):
        self.pc = Pinecone(api_key=api_key)
        self.openai = OpenAI()
        self.index_name = index_name
        self.dimension = dimension
        self.metric = metric
        
        # Create index if not exists
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )
            # Wait for index to be ready
            while not self.pc.describe_index(index_name).status['ready']:
                time.sleep(1)
        
        self.index = self.pc.Index(index_name)
    
    def _get_embedding(self, text: str) -> List[float]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """Batch embedding for efficiency"""
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        return [e.embedding for e in response.data]
    
    def _generate_id(self, content: str) -> str:
        """Generate deterministic ID from content"""
        return hashlib.md5(content.encode()).hexdigest()
    
    def upsert(
        self,
        documents: List[dict],
        namespace: str = "",
        batch_size: int = 100
    ):
        """Upsert documents with batching"""
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            texts = [doc['content'] for doc in batch]
            embeddings = self._get_embeddings_batch(texts)
            
            vectors = []
            for doc, emb in zip(batch, embeddings):
                vectors.append({
                    "id": doc.get('id', self._generate_id(doc['content'])),
                    "values": emb,
                    "metadata": {
                        "content": doc['content'],
                        **doc.get('metadata', {})
                    }
                })
            
            self.index.upsert(vectors=vectors, namespace=namespace)
    
    def search(
        self,
        query: str,
        top_k: int = 10,
        namespace: str = "",
        filter: dict = None,
        include_content: bool = True
    ) -> List[SearchResult]:
        """Semantic search with optional filters"""
        query_embedding = self._get_embedding(query)
        
        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            namespace=namespace,
            filter=filter,
            include_metadata=True
        )
        
        return [
            SearchResult(
                id=match.id,
                content=match.metadata.get('content', '') if include_content else '',
                metadata={k: v for k, v in match.metadata.items() if k != 'content'},
                score=match.score
            )
            for match in results.matches
        ]
    
    def delete(self, ids: List[str], namespace: str = ""):
        """Delete vectors by ID"""
        self.index.delete(ids=ids, namespace=namespace)
    
    def delete_all(self, namespace: str = ""):
        """Delete all vectors in namespace"""
        self.index.delete(delete_all=True, namespace=namespace)
    
    def get_stats(self) -> dict:
        """Get index statistics"""
        return self.index.describe_index_stats()


# Usage
store = PineconeVectorStore(
    api_key="your-api-key",
    index_name="my-app-vectors"
)

# Multi-tenant with namespaces
store.upsert([
    {"content": "User's private document 1", "metadata": {"type": "note"}},
    {"content": "User's private document 2", "metadata": {"type": "article"}},
], namespace="user_123")

# Search within user's namespace
results = store.search(
    "my notes",
    top_k=5,
    namespace="user_123",
    filter={"type": {"$eq": "note"}}
)

Pinecone Filter Syntax

# Equality
filter={"category": {"$eq": "tech"}}

# Not equal
filter={"status": {"$ne": "archived"}}

# In list
filter={"tag": {"$in": ["python", "javascript"]}}

# Numeric comparison
filter={"price": {"$lt": 100}}
filter={"rating": {"$gte": 4.0}}

# Compound filters
filter={
    "$and": [
        {"category": {"$eq": "tech"}},
        {"price": {"$lt": 100}}
    ]
}

filter={
    "$or": [
        {"priority": {"$eq": "high"}},
        {"due_date": {"$lt": "2024-01-01"}}
    ]
}

Chunking: The Art of Splitting

Bad chunking = bad search results. This is where most RAG systems fail, and it is the single highest-leverage improvement you can make. The analogy: imagine ripping pages out of a book at random page boundaries. Some pages start mid-sentence, others combine the end of one chapter with the start of another. Your search results will be garbage because the chunks don’t represent coherent ideas. Smart chunking splits at semantic boundaries (paragraphs, sections, headers) so each chunk is a self-contained unit of meaning.

Smart Chunking System

from dataclasses import dataclass
from typing import List, Optional
import re

@dataclass
class Chunk:
    content: str
    metadata: dict
    index: int
    total_chunks: int

class SmartChunker:
    """Context-aware document chunking"""
    
    def __init__(
        self,
        chunk_size: int = 1000,      # Characters per chunk -- start here and tune
        chunk_overlap: int = 200,     # Overlap prevents info loss at boundaries
        min_chunk_size: int = 100     # Drop tiny fragments that add noise
    ):
        # Tip: chunk_size depends on your content type:
        #   Code: 500-800 chars (functions/methods are natural units)
        #   Docs: 800-1200 chars (paragraphs/sections)
        #   Legal: 1000-1500 chars (clauses are dense with information)
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
    
    def chunk_text(
        self,
        text: str,
        doc_id: str,
        base_metadata: dict = None
    ) -> List[Chunk]:
        """Split text into overlapping chunks"""
        # Clean text
        text = self._clean_text(text)
        
        # Split by semantic boundaries
        splits = self._semantic_split(text)
        
        # Merge small chunks, split large ones
        chunks = self._normalize_chunks(splits)
        
        # Add overlap
        chunks = self._add_overlap(chunks)
        
        # Create chunk objects with metadata
        return [
            Chunk(
                content=chunk,
                metadata={
                    **(base_metadata or {}),
                    "doc_id": doc_id,
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                },
                index=i,
                total_chunks=len(chunks)
            )
            for i, chunk in enumerate(chunks)
        ]
    
    def _clean_text(self, text: str) -> str:
        """Normalize whitespace and clean text"""
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r' {2,}', ' ', text)
        return text.strip()
    
    def _semantic_split(self, text: str) -> List[str]:
        """Split on semantic boundaries"""
        # Priority: headers > paragraphs > sentences > words
        
        # Try splitting by headers first (Markdown)
        header_pattern = r'\n(?=#{1,6} )'
        parts = re.split(header_pattern, text)
        if len(parts) > 1:
            return parts
        
        # Split by double newlines (paragraphs)
        parts = text.split('\n\n')
        if len(parts) > 1:
            return parts
        
        # Split by sentences
        sentence_pattern = r'(?<=[.!?])\s+'
        parts = re.split(sentence_pattern, text)
        return parts
    
    def _normalize_chunks(self, splits: List[str]) -> List[str]:
        """Merge small chunks, split large ones"""
        normalized = []
        current = ""
        
        for split in splits:
            # If adding this split exceeds chunk_size, save current and start new
            if len(current) + len(split) > self.chunk_size:
                if current:
                    normalized.append(current.strip())
                
                # If split itself is too large, recursively split
                if len(split) > self.chunk_size:
                    for i in range(0, len(split), self.chunk_size - self.chunk_overlap):
                        normalized.append(split[i:i + self.chunk_size].strip())
                else:
                    current = split
            else:
                current = current + "\n\n" + split if current else split
        
        if current and len(current) >= self.min_chunk_size:
            normalized.append(current.strip())
        
        return normalized
    
    def _add_overlap(self, chunks: List[str]) -> List[str]:
        """Add overlap between chunks for context continuity"""
        if len(chunks) <= 1:
            return chunks
        
        overlapped = []
        for i, chunk in enumerate(chunks):
            if i > 0:
                # Add end of previous chunk
                prev_overlap = chunks[i-1][-self.chunk_overlap:]
                chunk = prev_overlap + "\n...\n" + chunk
            
            overlapped.append(chunk)
        
        return overlapped


# Usage
chunker = SmartChunker(chunk_size=1000, chunk_overlap=200)

document = """
# Introduction to Machine Learning

Machine learning is a subset of artificial intelligence...

## Supervised Learning

In supervised learning, the algorithm learns from labeled data...

## Unsupervised Learning

Unsupervised learning finds hidden patterns in unlabeled data...
"""

chunks = chunker.chunk_text(
    document,
    doc_id="ml_intro_001",
    base_metadata={"source": "textbook", "topic": "ml"}
)

for chunk in chunks:
    print(f"Chunk {chunk.index + 1}/{chunk.total_chunks}")
    print(f"Length: {len(chunk.content)}")
    print(chunk.content[:100] + "...")
    print()

Hybrid Search: Best of Both Worlds

Semantic search finds “things that mean the same thing.” Keyword search finds “things that use the same words.” Neither alone is sufficient: semantic search misses exact terms like error codes (“ERR_4012”) and function names (asyncpg.create_pool), while keyword search misses synonyms and paraphrases. Combining both is the single biggest retrieval quality improvement you can make — expect 10-25% better recall. Combine semantic search with keyword search for better results.
from rank_bm25 import BM25Okapi
import numpy as np
from typing import List, Tuple

class HybridSearch:
    """Combine vector and keyword search"""
    
    def __init__(self, vector_store: VectorStore):
        self.vector_store = vector_store
        self.documents: List[str] = []
        self.bm25: BM25Okapi = None
    
    def add_documents(self, documents: List[dict]):
        """Add documents to both stores"""
        # Add to vector store
        asyncio.run(self.vector_store.add_documents_batch(documents))
        
        # Build BM25 index
        self.documents = [doc['content'] for doc in documents]
        tokenized = [doc.lower().split() for doc in self.documents]
        self.bm25 = BM25Okapi(tokenized)
    
    def search(
        self,
        query: str,
        top_k: int = 10,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[Tuple[str, float]]:
        """Hybrid search with weighted combination"""
        
        # Vector search
        vector_results = asyncio.run(
            self.vector_store.search(query, limit=top_k * 2)
        )
        vector_scores = {r.content: r.similarity for r in vector_results}
        
        # Keyword search (BM25)
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        
        # Normalize BM25 scores to 0-1
        if max(bm25_scores) > 0:
            bm25_scores = bm25_scores / max(bm25_scores)
        
        bm25_dict = {
            doc: score 
            for doc, score in zip(self.documents, bm25_scores)
        }
        
        # Combine scores using Reciprocal Rank Fusion
        combined_scores = {}
        all_docs = set(vector_scores.keys()) | set(bm25_dict.keys())
        
        for doc in all_docs:
            v_score = vector_scores.get(doc, 0) * vector_weight
            k_score = bm25_dict.get(doc, 0) * keyword_weight
            combined_scores[doc] = v_score + k_score
        
        # Sort and return top_k
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]
        
        return sorted_results


# When to use hybrid search:
# - Technical documentation (exact function names + semantic meaning)
# - Legal documents (specific clause numbers + concepts)
# - Product catalogs (SKUs + descriptions)

Mini-Project: Document Q&A System

from openai import OpenAI
from typing import List
import asyncio

class DocumentQA:
    """Complete document Q&A system with citations"""
    
    def __init__(self, database_url: str):
        self.store = VectorStore(database_url)
        self.chunker = SmartChunker()
        self.openai = OpenAI()
    
    async def initialize(self):
        await self.store.connect()
    
    async def add_document(
        self,
        content: str,
        doc_id: str,
        title: str = "",
        source: str = ""
    ):
        """Process and store a document"""
        chunks = self.chunker.chunk_text(
            content,
            doc_id=doc_id,
            base_metadata={"title": title, "source": source}
        )
        
        documents = [
            {"content": chunk.content, "metadata": chunk.metadata}
            for chunk in chunks
        ]
        
        await self.store.add_documents_batch(documents)
        
        return len(chunks)
    
    async def ask(
        self,
        question: str,
        top_k: int = 5,
        include_citations: bool = True
    ) -> dict:
        """Answer question using relevant context"""
        
        # Retrieve relevant chunks
        results = await self.store.search(question, limit=top_k)
        
        if not results:
            return {
                "answer": "I couldn't find relevant information to answer your question.",
                "sources": []
            }
        
        # Build context
        context_parts = []
        sources = []
        
        for i, result in enumerate(results, 1):
            context_parts.append(f"[{i}] {result.content}")
            sources.append({
                "id": i,
                "doc_id": result.metadata.get("doc_id"),
                "title": result.metadata.get("title"),
                "chunk_index": result.metadata.get("chunk_index"),
                "similarity": result.similarity
            })
        
        context = "\n\n".join(context_parts)
        
        # Generate answer
        system_prompt = """You are a helpful assistant that answers questions based on the provided context.
        
Rules:
- Only use information from the provided context
- If the context doesn't contain the answer, say so
- Cite sources using [1], [2], etc. when referencing specific information
- Be concise but thorough"""
        
        response = self.openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
            ],
            temperature=0
        )
        
        answer = response.choices[0].message.content
        
        return {
            "answer": answer,
            "sources": sources if include_citations else []
        }


# Usage
async def main():
    qa = DocumentQA("postgresql://user:pass@localhost/docs")
    await qa.initialize()
    
    # Add some documents
    await qa.add_document(
        content="Machine learning is a subset of AI...",
        doc_id="ml_guide",
        title="Machine Learning Guide",
        source="internal_docs"
    )
    
    # Ask questions
    result = await qa.ask("What is machine learning?")
    print(f"Answer: {result['answer']}")
    print(f"Sources: {result['sources']}")

asyncio.run(main())

Performance Optimization

Index Tuning for pgvector

Index tuning is the difference between 10ms and 500ms queries at scale. The two main knobs: how the index is built (affects accuracy) and how it’s searched (affects speed vs. recall trade-off).
-- Check if your index is actually being used (if idx_scan is 0, something is wrong)
SELECT indexname, idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE tablename = 'documents';

-- Tune IVFFlat lists based on data size
-- Rule: lists = sqrt(num_rows). Too few = slow (scanning large clusters). 
-- Too many = inaccurate (clusters too small to be representative).
-- 1M rows -> lists = 1000
-- 10M rows -> lists = 3162

-- Increase probes for better recall (at the cost of latency)
-- Default is 1 (only search the nearest cluster). For production, start with 10.
-- Higher probes = better recall but slower. Benchmark to find your sweet spot.
SET ivfflat.probes = 10;

-- For HNSW, tune ef_search (controls graph traversal depth)
-- Higher = more accurate but slower. 40 is default; 100 is common for production.
SET hnsw.ef_search = 40;

Embedding Caching

Embeddings are deterministic: the same text always produces the same vector. This means you can cache them aggressively. In practice, 30-50% of embedding requests in a RAG system are duplicates (common queries, re-indexed documents). Caching can cut your OpenAI embedding costs by 50-90%.
import hashlib
import json
from pathlib import Path

class EmbeddingCache:
    """Cache embeddings to reduce API costs.
    
    Two-tier cache: memory (fast, limited) + disk (slower, unlimited).
    In production, consider Redis as the memory layer for multi-process sharing.
    """
    
    def __init__(self, cache_dir: str = ".embedding_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.memory_cache: dict = {}
    
    def _hash_text(self, text: str, model: str) -> str:
        key = f"{model}:{text}"
        return hashlib.md5(key.encode()).hexdigest()
    
    def get(self, text: str, model: str = "text-embedding-3-small") -> List[float] | None:
        cache_key = self._hash_text(text, model)
        
        # Check memory cache first
        if cache_key in self.memory_cache:
            return self.memory_cache[cache_key]
        
        # Check disk cache
        cache_file = self.cache_dir / f"{cache_key}.json"
        if cache_file.exists():
            with open(cache_file) as f:
                embedding = json.load(f)
                self.memory_cache[cache_key] = embedding
                return embedding
        
        return None
    
    def set(self, text: str, embedding: List[float], model: str = "text-embedding-3-small"):
        cache_key = self._hash_text(text, model)
        
        # Save to memory
        self.memory_cache[cache_key] = embedding
        
        # Save to disk
        cache_file = self.cache_dir / f"{cache_key}.json"
        with open(cache_file, 'w') as f:
            json.dump(embedding, f)

Key Takeaways

pgvector for Most Apps

If you already use PostgreSQL, pgvector is the easiest path. Great for millions of vectors.

Chunking Is Critical

Bad chunks = bad search. Use semantic boundaries, overlap, and test your chunk sizes.

Hybrid Search Wins

Combine vector + keyword search for best results, especially for technical content.

Cache Everything

Embeddings are expensive. Cache aggressively to reduce costs by 90%+.

What’s Next

RAG Systems

Build production Retrieval-Augmented Generation pipelines

Interview Deep-Dive

Strong Answer:
  • Given those parameters, pgvector is the strong default choice, and let me explain why before considering alternatives. You already run PostgreSQL, which means your team has operational expertise, monitoring, backup procedures, connection pooling, and deployment pipelines for Postgres. Adding the pgvector extension is one SQL command (CREATE EXTENSION vector). Introducing Pinecone or Qdrant means adding an entirely new service to your infrastructure: new deployment, new monitoring, new on-call runbook, new failure modes. The operational overhead of a new database is almost always underestimated.
  • At 5 million documents with 1536-dimension embeddings, you are looking at roughly 30GB of vector data. pgvector handles this comfortably on a single machine with 64GB RAM. With an HNSW index (m=16, ef_construction=64), you will get sub-50ms query latency at 95% recall for single queries. At 100 QPS, you need connection pooling (pgbouncer or asyncpg pool), and you may need to increase shared_buffers and effective_cache_size in PostgreSQL configuration to keep the HNSW index in memory. But this is standard Postgres tuning, not new knowledge.
  • Where pgvector falls short and I would switch: if the document count grows beyond 50 million, pgvector’s single-node architecture becomes a bottleneck. Pinecone or Qdrant can distribute across multiple nodes, giving you horizontal scalability. If you need multi-tenancy with thousands of separate namespaces (each customer has their own isolated vector space), Pinecone’s namespace feature handles this natively, while in pgvector you would use metadata filtering or separate tables, which adds query complexity. If your team has zero PostgreSQL experience and is already running on a serverless stack, managed Pinecone eliminates all operational burden.
  • The decision framework: operational fit first (does the team know the technology?), scale requirements second (how big will the data grow?), feature requirements third (namespaces, metadata filtering, hybrid search support), and cost fourth. Most teams that start with Pinecone because it sounds modern end up paying $500-2000/month for a managed service they could have run on their existing Postgres instance for free.
Follow-up: You deploy pgvector and it works well for 6 months. Then the dataset grows to 20 million documents and p95 query latency spikes from 40ms to 300ms. What happened and how do you fix it?The most likely cause is that the HNSW index no longer fits in memory. At 20 million documents with 1536 dimensions, the HNSW index is roughly 120GB, which exceeds most machines’ RAM. When the index pages are evicted from the OS page cache, queries trigger disk I/O instead of memory reads, and latency spikes. Three fixes in order of effort. First, increase the machine’s RAM to 256GB so the index fits in memory again — this is the cheapest and fastest fix if your cloud provider supports it. Second, switch to IVFFlat indexing instead of HNSW. IVFFlat partitions vectors into clusters and only loads the clusters relevant to each query, so it uses much less memory at the cost of slightly lower recall. Set lists = sqrt(20_000_000) = ~4500 and probes = 20-40 to balance recall and latency. Third, shard the data: split by a natural partition key (document date, customer ID, content category) across multiple Postgres instances, each holding a subset of the vectors. Query the relevant shard based on the query context. This is more complex but gives you horizontal scalability. If none of these are sufficient, it is time to migrate to a purpose-built distributed vector database like Qdrant or Weaviate that handles sharding natively.
Strong Answer:
  • IVFFlat (Inverted File with Flat quantization) partitions all vectors into a configurable number of clusters (called “lists”) using k-means. At query time, it identifies the nearest clusters to the query vector and performs an exhaustive search only within those clusters. Think of it as dividing a city into neighborhoods and only searching the nearby neighborhoods rather than every house in the city.
  • HNSW (Hierarchical Navigable Small World) builds a multi-layer graph where each vector is a node connected to its nearest neighbors. Higher layers have fewer nodes and longer-range connections (like highway exits), lower layers have more nodes and local connections (like city streets). A query traverses from the top layer down, getting progressively more precise. Think of it as GPS navigation: first find the right region, then the right city, then the right street.
  • Performance differences: HNSW has faster query times (2-10x faster than IVFFlat at the same recall level) because graph traversal is algorithmically more efficient than cluster scanning. But HNSW uses 2-3x more memory than IVFFlat because it stores the graph structure alongside the vectors. HNSW also has significantly slower index build times — building the graph for 5 million vectors can take 30 minutes versus 5 minutes for IVFFlat.
  • Choose IVFFlat when: memory is constrained, you do frequent bulk inserts (IVFFlat is faster to rebuild), or your data changes frequently enough that you need to reindex regularly. IVFFlat also works well when recall requirements are moderate (90% is acceptable). Choose HNSW when: query latency is critical (real-time search, user-facing features), you need high recall (95%+), and your data is relatively stable (not reindexing every hour).
  • Tuning parameters that matter. For IVFFlat: lists controls the number of clusters. Rule of thumb: sqrt(num_rows). Too few lists means each cluster is large and queries are slow. Too many lists means clusters are too small and recall drops. probes controls how many clusters to search at query time. Default is 1 (terrible recall). Set it to 10-20 for production. Higher probes = better recall but slower queries. Benchmark to find your sweet spot.
  • For HNSW: m controls how many neighbors each node connects to. Higher m = better recall but more memory and slower build. Default 16 works well up to 10 million vectors. ef_construction controls build-time quality. Higher values produce better graphs but slower builds. 64-128 is typical. ef_search (set at query time via SET hnsw.ef_search) controls query-time graph exploration depth. Higher = better recall, slower queries. Default 40; production systems often use 100-200.
Follow-up: You set up HNSW with m=16, ef_construction=64, and ef_search=100. Recall is 97% but latency is 80ms, and the product team wants sub-20ms. What levers do you pull?First, verify the bottleneck is actually the index search and not something else (network, connection pooling, query parsing). Use EXPLAIN ANALYZE on the vector search query. If the index scan itself is 80ms, the issue is graph traversal depth. Lower ef_search from 100 to 40 — this will drop latency to roughly 30-40ms, but recall might drop to 93-94%. If that recall is acceptable, you are done. If not, the next lever is reducing vector dimensions. If you are using 1536-dimension embeddings, consider text-embedding-3-small with dimensions=512 or dimensions=256 using OpenAI’s dimension reduction feature. Halving dimensions roughly halves search time and memory usage, with a modest recall reduction (typically 1-3%). The final lever is hardware: move to a machine with faster NVMe storage and more RAM to ensure the entire index is in memory. A common mistake is running vector search on shared database instances with memory pressure from other workloads — give the vector index its own machine or at least its own memory allocation.
Strong Answer:
  • Cosine similarity scores between embeddings are meaningful for relative ranking within a single query but dangerous for absolute thresholds across queries. Within your query, 0.82 being higher than 0.79 reliably means the first result is more relevant. But comparing 0.82 from query A with 0.82 from query B tells you almost nothing — the absolute score depends heavily on the query length, specificity, and the embedding model’s behavior.
  • Here is the concrete problem with fixed thresholds: a short, specific query like “asyncpg connection pool configuration” against a document that is exactly about that topic might score 0.92. A broad query like “how to build software” against a relevant but general document might score 0.78. A threshold of 0.80 would correctly include the first result but incorrectly exclude the second, even though both are relevant to their respective queries. Conversely, an irrelevant document might score 0.81 for a query where many documents are topically similar, passing the threshold despite being useless.
  • For production filtering, I recommend three approaches instead of a fixed threshold. First, use a relative threshold: take the top result’s score and accept results within 0.1 of it. If the top result is 0.82, accept everything above 0.72. This adapts to the score distribution of each query. Second, use the score gap: if there is a sudden drop in scores (top 5 are 0.82-0.79 but the 6th is 0.65), use the gap as a natural cutoff. Third, do not filter by embedding score at all — retrieve a fixed top-k (say 20) and use a reranker to determine true relevance. The reranker’s cross-encoder scores are better calibrated for absolute thresholds because the model sees the query and document together.
  • The deeper issue is that cosine similarity in high-dimensional spaces has a known problem: all pairs tend to cluster in a narrow range (typically 0.6-0.95 for related content). The discriminative power lives in small differences within that range, which is why relative ranking works but absolute thresholds do not. This is called the “hubness problem” in high-dimensional spaces.
Follow-up: A stakeholder asks you to build a feature that shows “confidence: 82%” to users next to each search result, based on the cosine similarity. Is this a good idea?It is a bad idea for three reasons. First, 0.82 cosine similarity does not mean 82% confidence in relevance — it is a geometric measure of vector angle proximity, not a probability. Displaying it as a percentage gives users a false sense of calibrated confidence. Second, as I mentioned, the absolute scores are not comparable across queries, so users will be confused when a “75% confident” result is actually more helpful than an “88% confident” result from a different query. Third, users do not need confidence scores to assess relevance — they need good results in good order. Google does not show relevance scores, and neither should you. If the stakeholder insists on a visual signal, I would show a relative indicator: “Best Match” for the top result, “Related” for others above a relative threshold, and hide results below the threshold entirely. This conveys the ranking information without the false precision of a number. If you absolutely must show a number, calibrate it: train a logistic regression on (cosine_similarity, query_length, result_rank) -> binary_relevance using human-labeled data, and display the calibrated probability. But this is significant additional work for marginal UX benefit.
Strong Answer:
  • Embedding caching exploits a fundamental property: embeddings are deterministic. The same text with the same model always produces the same vector. Once you have embedded a text, you never need to embed it again. In a RAG system, the same popular queries (“how do I reset my password”) are asked hundreds of times, and you pay for a new embedding each time. Caching eliminates this waste entirely.
  • Architecture: two-tier cache. Layer one is an in-memory dictionary (or Redis in multi-process deployments) keyed by md5(model_name + text). Layer two is disk-based (SQLite or a simple file cache) for persistence across restarts. On an embedding request: check memory cache first (sub-millisecond), then disk cache (1-5ms), then fall back to the API call (50-100ms). Store the result in both cache layers.
  • For query embeddings (the user’s search query), the cache hit rate in a typical product is 30-50% because users ask similar questions. For document embeddings (the corpus you are indexing), the cache hit rate should be near 100% — you only need to embed each document chunk once, and re-embedding only happens when the document content changes. The highest-impact optimization is caching document embeddings aggressively and invalidating only when the source content changes.
  • Measuring effectiveness requires four metrics: cache hit rate (target: 40%+ for queries, 95%+ for documents), cost savings (compare monthly embedding API spend before and after caching), latency improvement (cached responses are 10-100x faster than API calls), and cache freshness (are you serving stale embeddings for content that has changed?). The most important is cost savings, because that is the business justification. Track embedding_api_calls_without_cache - embedding_api_calls_with_cache monthly.
  • Cache invalidation strategy: for document embeddings, invalidate when the document content changes. Hash the document content and compare against the stored hash. For query embeddings, use a TTL of 24 hours to 7 days — queries do not change meaning over time, so long TTLs are safe. The only reason to expire query embeddings is to control cache size.
  • Common mistake: caching embeddings but not caching the API response metadata (model version, dimensions). If you upgrade from text-embedding-3-small to a new model, all cached embeddings are invalid because different models produce incompatible vector spaces. Include the model name in the cache key to prevent cross-model contamination.
Follow-up: You implement caching and hit rate is only 15% for queries, much lower than expected. What is going on?Low cache hit rate on queries means users are not asking the same questions verbatim. They are asking similar questions with different phrasings: “password reset,” “how to reset password,” “I forgot my password,” “can’t log in.” These are semantically identical but produce different cache keys because the exact text differs. Two fixes. First, normalize queries before hashing: lowercase, strip punctuation, collapse whitespace, and optionally stem words. “How to Reset Password?” and “how to reset password” should hit the same cache entry. Second, implement semantic caching: embed the query, check if any cached query embedding is within cosine similarity 0.95 of the new query. If so, return the cached embedding rather than making a new API call. This catches paraphrases that normalize differently. The trade-off is that the cache lookup itself requires a vector similarity search, which adds complexity. In practice, keep a small in-memory index (FAISS or Annoy) of the last 10,000 query embeddings and search it before hitting the API. The 1-2ms overhead of the similarity search is negligible compared to the 50-100ms API call it replaces.