Skip to main content

Why Vector Databases Matter

Every AI product with memory or search uses vector databases. ChatGPT’s memory, Notion AI, Perplexity’s search, GitHub Copilot’s context—all vector databases under the hood.
Industry Reality: Semantic search is replacing keyword search everywhere. Companies building AI features need engineers who understand vector databases. This is a must-have skill.

The Mental Model

Text → Embedding Model → [0.1, 0.3, -0.2, ...] → Vector Database

Query → Embedding Model → [0.12, 0.28, -0.18, ...] → Find Similar → Results
Vector databases solve: “Find me things similar to this” at scale.

Choosing Your Database

DatabaseBest ForScaleCostSetup
pgvectorExisting Postgres appsMillionsFree (self-host)Add extension
PineconeProduction, scaleBillions$70/month+Managed
ChromaDevelopment, prototypingThousandsFreepip install
WeaviateMulti-modal, GraphQLMillionsFree/ManagedDocker
QdrantSelf-hosted productionMillionsFreeDocker
Recommendation: Start with Chroma for development, migrate to pgvector or Pinecone for production.

pgvector: Production-Ready PostgreSQL

Complete Setup

-- 1. Enable extension (requires PostgreSQL 11+)
CREATE EXTENSION vector;

-- 2. Create optimized table structure
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    content TEXT NOT NULL,
    embedding vector(1536),  -- Match your model's dimension
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- 3. Create indexes
-- IVFFlat: Good balance of speed and accuracy
CREATE INDEX idx_documents_embedding ON documents 
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- HNSW: Faster queries, more memory (PostgreSQL 15+)
CREATE INDEX idx_documents_hnsw ON documents 
USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);

-- Metadata index for filtering
CREATE INDEX idx_documents_metadata ON documents USING gin (metadata);

-- 4. Helper function for similarity search
CREATE OR REPLACE FUNCTION search_documents(
    query_embedding vector(1536),
    match_threshold float DEFAULT 0.7,
    match_count int DEFAULT 10,
    filter_metadata jsonb DEFAULT NULL
)
RETURNS TABLE (
    id UUID,
    content TEXT,
    metadata JSONB,
    similarity float
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        d.id,
        d.content,
        d.metadata,
        1 - (d.embedding <=> query_embedding) AS similarity
    FROM documents d
    WHERE 
        1 - (d.embedding <=> query_embedding) > match_threshold
        AND (filter_metadata IS NULL OR d.metadata @> filter_metadata)
    ORDER BY d.embedding <=> query_embedding
    LIMIT match_count;
END;
$$ LANGUAGE plpgsql;

Production Python Integration

import asyncio
from typing import Optional, List
from dataclasses import dataclass
import asyncpg
from openai import OpenAI
import json

@dataclass
class Document:
    id: str
    content: str
    metadata: dict
    similarity: float = 0.0

class VectorStore:
    """Production pgvector store with connection pooling"""
    
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.pool: asyncpg.Pool = None
        self.openai = OpenAI()
    
    async def connect(self):
        """Initialize connection pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
    
    async def close(self):
        """Close connection pool"""
        if self.pool:
            await self.pool.close()
    
    def _get_embedding(self, text: str) -> List[float]:
        """Get embedding from OpenAI"""
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    async def add_document(
        self,
        content: str,
        metadata: dict = None
    ) -> str:
        """Add a single document"""
        embedding = self._get_embedding(content)
        
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow("""
                INSERT INTO documents (content, embedding, metadata)
                VALUES ($1, $2, $3)
                RETURNING id
            """, content, str(embedding), json.dumps(metadata or {}))
            
            return str(row['id'])
    
    async def add_documents_batch(
        self,
        documents: List[dict],
        batch_size: int = 100
    ) -> List[str]:
        """Add documents in batches for efficiency"""
        ids = []
        
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            
            # Get embeddings in batch (OpenAI supports up to 2048)
            texts = [doc['content'] for doc in batch]
            response = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=texts
            )
            embeddings = [e.embedding for e in response.data]
            
            # Insert batch
            async with self.pool.acquire() as conn:
                for doc, emb in zip(batch, embeddings):
                    row = await conn.fetchrow("""
                        INSERT INTO documents (content, embedding, metadata)
                        VALUES ($1, $2, $3)
                        RETURNING id
                    """, doc['content'], str(emb), json.dumps(doc.get('metadata', {})))
                    ids.append(str(row['id']))
        
        return ids
    
    async def search(
        self,
        query: str,
        limit: int = 10,
        threshold: float = 0.7,
        filter_metadata: dict = None
    ) -> List[Document]:
        """Semantic search with optional metadata filter"""
        query_embedding = self._get_embedding(query)
        
        sql = """
            SELECT 
                id::text,
                content,
                metadata,
                1 - (embedding <=> $1::vector) as similarity
            FROM documents
            WHERE 1 - (embedding <=> $1::vector) > $2
        """
        params = [str(query_embedding), threshold]
        
        if filter_metadata:
            sql += " AND metadata @> $3"
            params.append(json.dumps(filter_metadata))
        
        sql += f" ORDER BY embedding <=> $1::vector LIMIT {limit}"
        
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql, *params)
            
            return [
                Document(
                    id=row['id'],
                    content=row['content'],
                    metadata=json.loads(row['metadata']),
                    similarity=row['similarity']
                )
                for row in rows
            ]
    
    async def delete(self, document_id: str):
        """Delete a document"""
        async with self.pool.acquire() as conn:
            await conn.execute(
                "DELETE FROM documents WHERE id = $1",
                document_id
            )


# Usage
async def main():
    store = VectorStore("postgresql://user:pass@localhost/mydb")
    await store.connect()
    
    # Add documents
    await store.add_documents_batch([
        {"content": "Python is great for AI", "metadata": {"topic": "python"}},
        {"content": "JavaScript powers the web", "metadata": {"topic": "javascript"}},
        {"content": "PostgreSQL is a powerful database", "metadata": {"topic": "databases"}},
    ])
    
    # Search
    results = await store.search(
        "programming languages for machine learning",
        limit=5,
        filter_metadata={"topic": "python"}
    )
    
    for doc in results:
        print(f"{doc.similarity:.3f}: {doc.content}")
    
    await store.close()

asyncio.run(main())

Pinecone: Managed Scale

Complete Setup

from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
from typing import List, Optional
from dataclasses import dataclass
import hashlib
import time

@dataclass
class SearchResult:
    id: str
    content: str
    metadata: dict
    score: float

class PineconeVectorStore:
    """Production Pinecone integration"""
    
    def __init__(
        self,
        api_key: str,
        index_name: str,
        dimension: int = 1536,
        metric: str = "cosine"
    ):
        self.pc = Pinecone(api_key=api_key)
        self.openai = OpenAI()
        self.index_name = index_name
        self.dimension = dimension
        self.metric = metric
        
        # Create index if not exists
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )
            # Wait for index to be ready
            while not self.pc.describe_index(index_name).status['ready']:
                time.sleep(1)
        
        self.index = self.pc.Index(index_name)
    
    def _get_embedding(self, text: str) -> List[float]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """Batch embedding for efficiency"""
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        return [e.embedding for e in response.data]
    
    def _generate_id(self, content: str) -> str:
        """Generate deterministic ID from content"""
        return hashlib.md5(content.encode()).hexdigest()
    
    def upsert(
        self,
        documents: List[dict],
        namespace: str = "",
        batch_size: int = 100
    ):
        """Upsert documents with batching"""
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i + batch_size]
            texts = [doc['content'] for doc in batch]
            embeddings = self._get_embeddings_batch(texts)
            
            vectors = []
            for doc, emb in zip(batch, embeddings):
                vectors.append({
                    "id": doc.get('id', self._generate_id(doc['content'])),
                    "values": emb,
                    "metadata": {
                        "content": doc['content'],
                        **doc.get('metadata', {})
                    }
                })
            
            self.index.upsert(vectors=vectors, namespace=namespace)
    
    def search(
        self,
        query: str,
        top_k: int = 10,
        namespace: str = "",
        filter: dict = None,
        include_content: bool = True
    ) -> List[SearchResult]:
        """Semantic search with optional filters"""
        query_embedding = self._get_embedding(query)
        
        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            namespace=namespace,
            filter=filter,
            include_metadata=True
        )
        
        return [
            SearchResult(
                id=match.id,
                content=match.metadata.get('content', '') if include_content else '',
                metadata={k: v for k, v in match.metadata.items() if k != 'content'},
                score=match.score
            )
            for match in results.matches
        ]
    
    def delete(self, ids: List[str], namespace: str = ""):
        """Delete vectors by ID"""
        self.index.delete(ids=ids, namespace=namespace)
    
    def delete_all(self, namespace: str = ""):
        """Delete all vectors in namespace"""
        self.index.delete(delete_all=True, namespace=namespace)
    
    def get_stats(self) -> dict:
        """Get index statistics"""
        return self.index.describe_index_stats()


# Usage
store = PineconeVectorStore(
    api_key="your-api-key",
    index_name="my-app-vectors"
)

# Multi-tenant with namespaces
store.upsert([
    {"content": "User's private document 1", "metadata": {"type": "note"}},
    {"content": "User's private document 2", "metadata": {"type": "article"}},
], namespace="user_123")

# Search within user's namespace
results = store.search(
    "my notes",
    top_k=5,
    namespace="user_123",
    filter={"type": {"$eq": "note"}}
)

Pinecone Filter Syntax

# Equality
filter={"category": {"$eq": "tech"}}

# Not equal
filter={"status": {"$ne": "archived"}}

# In list
filter={"tag": {"$in": ["python", "javascript"]}}

# Numeric comparison
filter={"price": {"$lt": 100}}
filter={"rating": {"$gte": 4.0}}

# Compound filters
filter={
    "$and": [
        {"category": {"$eq": "tech"}},
        {"price": {"$lt": 100}}
    ]
}

filter={
    "$or": [
        {"priority": {"$eq": "high"}},
        {"due_date": {"$lt": "2024-01-01"}}
    ]
}

Chunking: The Art of Splitting

Bad chunking = bad search results. This is where most RAG systems fail.

Smart Chunking System

from dataclasses import dataclass
from typing import List, Optional
import re

@dataclass
class Chunk:
    content: str
    metadata: dict
    index: int
    total_chunks: int

class SmartChunker:
    """Context-aware document chunking"""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        min_chunk_size: int = 100
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
    
    def chunk_text(
        self,
        text: str,
        doc_id: str,
        base_metadata: dict = None
    ) -> List[Chunk]:
        """Split text into overlapping chunks"""
        # Clean text
        text = self._clean_text(text)
        
        # Split by semantic boundaries
        splits = self._semantic_split(text)
        
        # Merge small chunks, split large ones
        chunks = self._normalize_chunks(splits)
        
        # Add overlap
        chunks = self._add_overlap(chunks)
        
        # Create chunk objects with metadata
        return [
            Chunk(
                content=chunk,
                metadata={
                    **(base_metadata or {}),
                    "doc_id": doc_id,
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                },
                index=i,
                total_chunks=len(chunks)
            )
            for i, chunk in enumerate(chunks)
        ]
    
    def _clean_text(self, text: str) -> str:
        """Normalize whitespace and clean text"""
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r' {2,}', ' ', text)
        return text.strip()
    
    def _semantic_split(self, text: str) -> List[str]:
        """Split on semantic boundaries"""
        # Priority: headers > paragraphs > sentences > words
        
        # Try splitting by headers first (Markdown)
        header_pattern = r'\n(?=#{1,6} )'
        parts = re.split(header_pattern, text)
        if len(parts) > 1:
            return parts
        
        # Split by double newlines (paragraphs)
        parts = text.split('\n\n')
        if len(parts) > 1:
            return parts
        
        # Split by sentences
        sentence_pattern = r'(?<=[.!?])\s+'
        parts = re.split(sentence_pattern, text)
        return parts
    
    def _normalize_chunks(self, splits: List[str]) -> List[str]:
        """Merge small chunks, split large ones"""
        normalized = []
        current = ""
        
        for split in splits:
            # If adding this split exceeds chunk_size, save current and start new
            if len(current) + len(split) > self.chunk_size:
                if current:
                    normalized.append(current.strip())
                
                # If split itself is too large, recursively split
                if len(split) > self.chunk_size:
                    for i in range(0, len(split), self.chunk_size - self.chunk_overlap):
                        normalized.append(split[i:i + self.chunk_size].strip())
                else:
                    current = split
            else:
                current = current + "\n\n" + split if current else split
        
        if current and len(current) >= self.min_chunk_size:
            normalized.append(current.strip())
        
        return normalized
    
    def _add_overlap(self, chunks: List[str]) -> List[str]:
        """Add overlap between chunks for context continuity"""
        if len(chunks) <= 1:
            return chunks
        
        overlapped = []
        for i, chunk in enumerate(chunks):
            if i > 0:
                # Add end of previous chunk
                prev_overlap = chunks[i-1][-self.chunk_overlap:]
                chunk = prev_overlap + "\n...\n" + chunk
            
            overlapped.append(chunk)
        
        return overlapped


# Usage
chunker = SmartChunker(chunk_size=1000, chunk_overlap=200)

document = """
# Introduction to Machine Learning

Machine learning is a subset of artificial intelligence...

## Supervised Learning

In supervised learning, the algorithm learns from labeled data...

## Unsupervised Learning

Unsupervised learning finds hidden patterns in unlabeled data...
"""

chunks = chunker.chunk_text(
    document,
    doc_id="ml_intro_001",
    base_metadata={"source": "textbook", "topic": "ml"}
)

for chunk in chunks:
    print(f"Chunk {chunk.index + 1}/{chunk.total_chunks}")
    print(f"Length: {len(chunk.content)}")
    print(chunk.content[:100] + "...")
    print()

Hybrid Search: Best of Both Worlds

Combine semantic search with keyword search for better results.
from rank_bm25 import BM25Okapi
import numpy as np
from typing import List, Tuple

class HybridSearch:
    """Combine vector and keyword search"""
    
    def __init__(self, vector_store: VectorStore):
        self.vector_store = vector_store
        self.documents: List[str] = []
        self.bm25: BM25Okapi = None
    
    def add_documents(self, documents: List[dict]):
        """Add documents to both stores"""
        # Add to vector store
        asyncio.run(self.vector_store.add_documents_batch(documents))
        
        # Build BM25 index
        self.documents = [doc['content'] for doc in documents]
        tokenized = [doc.lower().split() for doc in self.documents]
        self.bm25 = BM25Okapi(tokenized)
    
    def search(
        self,
        query: str,
        top_k: int = 10,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[Tuple[str, float]]:
        """Hybrid search with weighted combination"""
        
        # Vector search
        vector_results = asyncio.run(
            self.vector_store.search(query, limit=top_k * 2)
        )
        vector_scores = {r.content: r.similarity for r in vector_results}
        
        # Keyword search (BM25)
        tokenized_query = query.lower().split()
        bm25_scores = self.bm25.get_scores(tokenized_query)
        
        # Normalize BM25 scores to 0-1
        if max(bm25_scores) > 0:
            bm25_scores = bm25_scores / max(bm25_scores)
        
        bm25_dict = {
            doc: score 
            for doc, score in zip(self.documents, bm25_scores)
        }
        
        # Combine scores using Reciprocal Rank Fusion
        combined_scores = {}
        all_docs = set(vector_scores.keys()) | set(bm25_dict.keys())
        
        for doc in all_docs:
            v_score = vector_scores.get(doc, 0) * vector_weight
            k_score = bm25_dict.get(doc, 0) * keyword_weight
            combined_scores[doc] = v_score + k_score
        
        # Sort and return top_k
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]
        
        return sorted_results


# When to use hybrid search:
# - Technical documentation (exact function names + semantic meaning)
# - Legal documents (specific clause numbers + concepts)
# - Product catalogs (SKUs + descriptions)

Mini-Project: Document Q&A System

from openai import OpenAI
from typing import List
import asyncio

class DocumentQA:
    """Complete document Q&A system with citations"""
    
    def __init__(self, database_url: str):
        self.store = VectorStore(database_url)
        self.chunker = SmartChunker()
        self.openai = OpenAI()
    
    async def initialize(self):
        await self.store.connect()
    
    async def add_document(
        self,
        content: str,
        doc_id: str,
        title: str = "",
        source: str = ""
    ):
        """Process and store a document"""
        chunks = self.chunker.chunk_text(
            content,
            doc_id=doc_id,
            base_metadata={"title": title, "source": source}
        )
        
        documents = [
            {"content": chunk.content, "metadata": chunk.metadata}
            for chunk in chunks
        ]
        
        await self.store.add_documents_batch(documents)
        
        return len(chunks)
    
    async def ask(
        self,
        question: str,
        top_k: int = 5,
        include_citations: bool = True
    ) -> dict:
        """Answer question using relevant context"""
        
        # Retrieve relevant chunks
        results = await self.store.search(question, limit=top_k)
        
        if not results:
            return {
                "answer": "I couldn't find relevant information to answer your question.",
                "sources": []
            }
        
        # Build context
        context_parts = []
        sources = []
        
        for i, result in enumerate(results, 1):
            context_parts.append(f"[{i}] {result.content}")
            sources.append({
                "id": i,
                "doc_id": result.metadata.get("doc_id"),
                "title": result.metadata.get("title"),
                "chunk_index": result.metadata.get("chunk_index"),
                "similarity": result.similarity
            })
        
        context = "\n\n".join(context_parts)
        
        # Generate answer
        system_prompt = """You are a helpful assistant that answers questions based on the provided context.
        
Rules:
- Only use information from the provided context
- If the context doesn't contain the answer, say so
- Cite sources using [1], [2], etc. when referencing specific information
- Be concise but thorough"""
        
        response = self.openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
            ],
            temperature=0
        )
        
        answer = response.choices[0].message.content
        
        return {
            "answer": answer,
            "sources": sources if include_citations else []
        }


# Usage
async def main():
    qa = DocumentQA("postgresql://user:pass@localhost/docs")
    await qa.initialize()
    
    # Add some documents
    await qa.add_document(
        content="Machine learning is a subset of AI...",
        doc_id="ml_guide",
        title="Machine Learning Guide",
        source="internal_docs"
    )
    
    # Ask questions
    result = await qa.ask("What is machine learning?")
    print(f"Answer: {result['answer']}")
    print(f"Sources: {result['sources']}")

asyncio.run(main())

Performance Optimization

Index Tuning for pgvector

-- Check index usage
SELECT indexname, idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE tablename = 'documents';

-- Tune IVFFlat lists based on data size
-- Rule: lists ≈ sqrt(num_rows)
-- 1M rows → lists = 1000
-- 10M rows → lists = 3162

-- Increase probes for better recall (slower)
SET ivfflat.probes = 10;  -- Default is 1

-- For HNSW, tune ef_search
SET hnsw.ef_search = 40;  -- Default is 40, higher = better recall

Embedding Caching

import hashlib
import json
from pathlib import Path

class EmbeddingCache:
    """Cache embeddings to reduce API costs"""
    
    def __init__(self, cache_dir: str = ".embedding_cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.memory_cache: dict = {}
    
    def _hash_text(self, text: str, model: str) -> str:
        key = f"{model}:{text}"
        return hashlib.md5(key.encode()).hexdigest()
    
    def get(self, text: str, model: str = "text-embedding-3-small") -> List[float] | None:
        cache_key = self._hash_text(text, model)
        
        # Check memory cache first
        if cache_key in self.memory_cache:
            return self.memory_cache[cache_key]
        
        # Check disk cache
        cache_file = self.cache_dir / f"{cache_key}.json"
        if cache_file.exists():
            with open(cache_file) as f:
                embedding = json.load(f)
                self.memory_cache[cache_key] = embedding
                return embedding
        
        return None
    
    def set(self, text: str, embedding: List[float], model: str = "text-embedding-3-small"):
        cache_key = self._hash_text(text, model)
        
        # Save to memory
        self.memory_cache[cache_key] = embedding
        
        # Save to disk
        cache_file = self.cache_dir / f"{cache_key}.json"
        with open(cache_file, 'w') as f:
            json.dump(embedding, f)

Key Takeaways

pgvector for Most Apps

If you already use PostgreSQL, pgvector is the easiest path. Great for millions of vectors.

Chunking Is Critical

Bad chunks = bad search. Use semantic boundaries, overlap, and test your chunk sizes.

Hybrid Search Wins

Combine vector + keyword search for best results, especially for technical content.

Cache Everything

Embeddings are expensive. Cache aggressively to reduce costs by 90%+.

What’s Next

RAG Systems

Build production Retrieval-Augmented Generation pipelines