Why Vector Databases Matter
Every AI product with memory or search uses vector databases. ChatGPT’s memory, Notion AI, Perplexity’s search, GitHub Copilot’s context—all vector databases under the hood.Industry Reality: Semantic search is replacing keyword search everywhere. Companies building AI features need engineers who understand vector databases. This is a must-have skill.
The Mental Model
Copy
Text → Embedding Model → [0.1, 0.3, -0.2, ...] → Vector Database
↓
Query → Embedding Model → [0.12, 0.28, -0.18, ...] → Find Similar → Results
Choosing Your Database
| Database | Best For | Scale | Cost | Setup |
|---|---|---|---|---|
| pgvector | Existing Postgres apps | Millions | Free (self-host) | Add extension |
| Pinecone | Production, scale | Billions | $70/month+ | Managed |
| Chroma | Development, prototyping | Thousands | Free | pip install |
| Weaviate | Multi-modal, GraphQL | Millions | Free/Managed | Docker |
| Qdrant | Self-hosted production | Millions | Free | Docker |
Recommendation: Start with Chroma for development, migrate to pgvector or Pinecone for production.
pgvector: Production-Ready PostgreSQL
Complete Setup
Copy
-- 1. Enable extension (requires PostgreSQL 11+)
CREATE EXTENSION vector;
-- 2. Create optimized table structure
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
content TEXT NOT NULL,
embedding vector(1536), -- Match your model's dimension
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- 3. Create indexes
-- IVFFlat: Good balance of speed and accuracy
CREATE INDEX idx_documents_embedding ON documents
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- HNSW: Faster queries, more memory (PostgreSQL 15+)
CREATE INDEX idx_documents_hnsw ON documents
USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);
-- Metadata index for filtering
CREATE INDEX idx_documents_metadata ON documents USING gin (metadata);
-- 4. Helper function for similarity search
CREATE OR REPLACE FUNCTION search_documents(
query_embedding vector(1536),
match_threshold float DEFAULT 0.7,
match_count int DEFAULT 10,
filter_metadata jsonb DEFAULT NULL
)
RETURNS TABLE (
id UUID,
content TEXT,
metadata JSONB,
similarity float
) AS $$
BEGIN
RETURN QUERY
SELECT
d.id,
d.content,
d.metadata,
1 - (d.embedding <=> query_embedding) AS similarity
FROM documents d
WHERE
1 - (d.embedding <=> query_embedding) > match_threshold
AND (filter_metadata IS NULL OR d.metadata @> filter_metadata)
ORDER BY d.embedding <=> query_embedding
LIMIT match_count;
END;
$$ LANGUAGE plpgsql;
Production Python Integration
Copy
import asyncio
from typing import Optional, List
from dataclasses import dataclass
import asyncpg
from openai import OpenAI
import json
@dataclass
class Document:
id: str
content: str
metadata: dict
similarity: float = 0.0
class VectorStore:
"""Production pgvector store with connection pooling"""
def __init__(self, database_url: str):
self.database_url = database_url
self.pool: asyncpg.Pool = None
self.openai = OpenAI()
async def connect(self):
"""Initialize connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20,
command_timeout=60
)
async def close(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
def _get_embedding(self, text: str) -> List[float]:
"""Get embedding from OpenAI"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
async def add_document(
self,
content: str,
metadata: dict = None
) -> str:
"""Add a single document"""
embedding = self._get_embedding(content)
async with self.pool.acquire() as conn:
row = await conn.fetchrow("""
INSERT INTO documents (content, embedding, metadata)
VALUES ($1, $2, $3)
RETURNING id
""", content, str(embedding), json.dumps(metadata or {}))
return str(row['id'])
async def add_documents_batch(
self,
documents: List[dict],
batch_size: int = 100
) -> List[str]:
"""Add documents in batches for efficiency"""
ids = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
# Get embeddings in batch (OpenAI supports up to 2048)
texts = [doc['content'] for doc in batch]
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
embeddings = [e.embedding for e in response.data]
# Insert batch
async with self.pool.acquire() as conn:
for doc, emb in zip(batch, embeddings):
row = await conn.fetchrow("""
INSERT INTO documents (content, embedding, metadata)
VALUES ($1, $2, $3)
RETURNING id
""", doc['content'], str(emb), json.dumps(doc.get('metadata', {})))
ids.append(str(row['id']))
return ids
async def search(
self,
query: str,
limit: int = 10,
threshold: float = 0.7,
filter_metadata: dict = None
) -> List[Document]:
"""Semantic search with optional metadata filter"""
query_embedding = self._get_embedding(query)
sql = """
SELECT
id::text,
content,
metadata,
1 - (embedding <=> $1::vector) as similarity
FROM documents
WHERE 1 - (embedding <=> $1::vector) > $2
"""
params = [str(query_embedding), threshold]
if filter_metadata:
sql += " AND metadata @> $3"
params.append(json.dumps(filter_metadata))
sql += f" ORDER BY embedding <=> $1::vector LIMIT {limit}"
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [
Document(
id=row['id'],
content=row['content'],
metadata=json.loads(row['metadata']),
similarity=row['similarity']
)
for row in rows
]
async def delete(self, document_id: str):
"""Delete a document"""
async with self.pool.acquire() as conn:
await conn.execute(
"DELETE FROM documents WHERE id = $1",
document_id
)
# Usage
async def main():
store = VectorStore("postgresql://user:pass@localhost/mydb")
await store.connect()
# Add documents
await store.add_documents_batch([
{"content": "Python is great for AI", "metadata": {"topic": "python"}},
{"content": "JavaScript powers the web", "metadata": {"topic": "javascript"}},
{"content": "PostgreSQL is a powerful database", "metadata": {"topic": "databases"}},
])
# Search
results = await store.search(
"programming languages for machine learning",
limit=5,
filter_metadata={"topic": "python"}
)
for doc in results:
print(f"{doc.similarity:.3f}: {doc.content}")
await store.close()
asyncio.run(main())
Pinecone: Managed Scale
Complete Setup
Copy
from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
from typing import List, Optional
from dataclasses import dataclass
import hashlib
import time
@dataclass
class SearchResult:
id: str
content: str
metadata: dict
score: float
class PineconeVectorStore:
"""Production Pinecone integration"""
def __init__(
self,
api_key: str,
index_name: str,
dimension: int = 1536,
metric: str = "cosine"
):
self.pc = Pinecone(api_key=api_key)
self.openai = OpenAI()
self.index_name = index_name
self.dimension = dimension
self.metric = metric
# Create index if not exists
if index_name not in self.pc.list_indexes().names():
self.pc.create_index(
name=index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# Wait for index to be ready
while not self.pc.describe_index(index_name).status['ready']:
time.sleep(1)
self.index = self.pc.Index(index_name)
def _get_embedding(self, text: str) -> List[float]:
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Batch embedding for efficiency"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [e.embedding for e in response.data]
def _generate_id(self, content: str) -> str:
"""Generate deterministic ID from content"""
return hashlib.md5(content.encode()).hexdigest()
def upsert(
self,
documents: List[dict],
namespace: str = "",
batch_size: int = 100
):
"""Upsert documents with batching"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
texts = [doc['content'] for doc in batch]
embeddings = self._get_embeddings_batch(texts)
vectors = []
for doc, emb in zip(batch, embeddings):
vectors.append({
"id": doc.get('id', self._generate_id(doc['content'])),
"values": emb,
"metadata": {
"content": doc['content'],
**doc.get('metadata', {})
}
})
self.index.upsert(vectors=vectors, namespace=namespace)
def search(
self,
query: str,
top_k: int = 10,
namespace: str = "",
filter: dict = None,
include_content: bool = True
) -> List[SearchResult]:
"""Semantic search with optional filters"""
query_embedding = self._get_embedding(query)
results = self.index.query(
vector=query_embedding,
top_k=top_k,
namespace=namespace,
filter=filter,
include_metadata=True
)
return [
SearchResult(
id=match.id,
content=match.metadata.get('content', '') if include_content else '',
metadata={k: v for k, v in match.metadata.items() if k != 'content'},
score=match.score
)
for match in results.matches
]
def delete(self, ids: List[str], namespace: str = ""):
"""Delete vectors by ID"""
self.index.delete(ids=ids, namespace=namespace)
def delete_all(self, namespace: str = ""):
"""Delete all vectors in namespace"""
self.index.delete(delete_all=True, namespace=namespace)
def get_stats(self) -> dict:
"""Get index statistics"""
return self.index.describe_index_stats()
# Usage
store = PineconeVectorStore(
api_key="your-api-key",
index_name="my-app-vectors"
)
# Multi-tenant with namespaces
store.upsert([
{"content": "User's private document 1", "metadata": {"type": "note"}},
{"content": "User's private document 2", "metadata": {"type": "article"}},
], namespace="user_123")
# Search within user's namespace
results = store.search(
"my notes",
top_k=5,
namespace="user_123",
filter={"type": {"$eq": "note"}}
)
Pinecone Filter Syntax
Copy
# Equality
filter={"category": {"$eq": "tech"}}
# Not equal
filter={"status": {"$ne": "archived"}}
# In list
filter={"tag": {"$in": ["python", "javascript"]}}
# Numeric comparison
filter={"price": {"$lt": 100}}
filter={"rating": {"$gte": 4.0}}
# Compound filters
filter={
"$and": [
{"category": {"$eq": "tech"}},
{"price": {"$lt": 100}}
]
}
filter={
"$or": [
{"priority": {"$eq": "high"}},
{"due_date": {"$lt": "2024-01-01"}}
]
}
Chunking: The Art of Splitting
Bad chunking = bad search results. This is where most RAG systems fail.Smart Chunking System
Copy
from dataclasses import dataclass
from typing import List, Optional
import re
@dataclass
class Chunk:
content: str
metadata: dict
index: int
total_chunks: int
class SmartChunker:
"""Context-aware document chunking"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
min_chunk_size: int = 100
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
def chunk_text(
self,
text: str,
doc_id: str,
base_metadata: dict = None
) -> List[Chunk]:
"""Split text into overlapping chunks"""
# Clean text
text = self._clean_text(text)
# Split by semantic boundaries
splits = self._semantic_split(text)
# Merge small chunks, split large ones
chunks = self._normalize_chunks(splits)
# Add overlap
chunks = self._add_overlap(chunks)
# Create chunk objects with metadata
return [
Chunk(
content=chunk,
metadata={
**(base_metadata or {}),
"doc_id": doc_id,
"chunk_index": i,
"total_chunks": len(chunks),
},
index=i,
total_chunks=len(chunks)
)
for i, chunk in enumerate(chunks)
]
def _clean_text(self, text: str) -> str:
"""Normalize whitespace and clean text"""
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
return text.strip()
def _semantic_split(self, text: str) -> List[str]:
"""Split on semantic boundaries"""
# Priority: headers > paragraphs > sentences > words
# Try splitting by headers first (Markdown)
header_pattern = r'\n(?=#{1,6} )'
parts = re.split(header_pattern, text)
if len(parts) > 1:
return parts
# Split by double newlines (paragraphs)
parts = text.split('\n\n')
if len(parts) > 1:
return parts
# Split by sentences
sentence_pattern = r'(?<=[.!?])\s+'
parts = re.split(sentence_pattern, text)
return parts
def _normalize_chunks(self, splits: List[str]) -> List[str]:
"""Merge small chunks, split large ones"""
normalized = []
current = ""
for split in splits:
# If adding this split exceeds chunk_size, save current and start new
if len(current) + len(split) > self.chunk_size:
if current:
normalized.append(current.strip())
# If split itself is too large, recursively split
if len(split) > self.chunk_size:
for i in range(0, len(split), self.chunk_size - self.chunk_overlap):
normalized.append(split[i:i + self.chunk_size].strip())
else:
current = split
else:
current = current + "\n\n" + split if current else split
if current and len(current) >= self.min_chunk_size:
normalized.append(current.strip())
return normalized
def _add_overlap(self, chunks: List[str]) -> List[str]:
"""Add overlap between chunks for context continuity"""
if len(chunks) <= 1:
return chunks
overlapped = []
for i, chunk in enumerate(chunks):
if i > 0:
# Add end of previous chunk
prev_overlap = chunks[i-1][-self.chunk_overlap:]
chunk = prev_overlap + "\n...\n" + chunk
overlapped.append(chunk)
return overlapped
# Usage
chunker = SmartChunker(chunk_size=1000, chunk_overlap=200)
document = """
# Introduction to Machine Learning
Machine learning is a subset of artificial intelligence...
## Supervised Learning
In supervised learning, the algorithm learns from labeled data...
## Unsupervised Learning
Unsupervised learning finds hidden patterns in unlabeled data...
"""
chunks = chunker.chunk_text(
document,
doc_id="ml_intro_001",
base_metadata={"source": "textbook", "topic": "ml"}
)
for chunk in chunks:
print(f"Chunk {chunk.index + 1}/{chunk.total_chunks}")
print(f"Length: {len(chunk.content)}")
print(chunk.content[:100] + "...")
print()
Hybrid Search: Best of Both Worlds
Combine semantic search with keyword search for better results.Copy
from rank_bm25 import BM25Okapi
import numpy as np
from typing import List, Tuple
class HybridSearch:
"""Combine vector and keyword search"""
def __init__(self, vector_store: VectorStore):
self.vector_store = vector_store
self.documents: List[str] = []
self.bm25: BM25Okapi = None
def add_documents(self, documents: List[dict]):
"""Add documents to both stores"""
# Add to vector store
asyncio.run(self.vector_store.add_documents_batch(documents))
# Build BM25 index
self.documents = [doc['content'] for doc in documents]
tokenized = [doc.lower().split() for doc in self.documents]
self.bm25 = BM25Okapi(tokenized)
def search(
self,
query: str,
top_k: int = 10,
vector_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[Tuple[str, float]]:
"""Hybrid search with weighted combination"""
# Vector search
vector_results = asyncio.run(
self.vector_store.search(query, limit=top_k * 2)
)
vector_scores = {r.content: r.similarity for r in vector_results}
# Keyword search (BM25)
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
# Normalize BM25 scores to 0-1
if max(bm25_scores) > 0:
bm25_scores = bm25_scores / max(bm25_scores)
bm25_dict = {
doc: score
for doc, score in zip(self.documents, bm25_scores)
}
# Combine scores using Reciprocal Rank Fusion
combined_scores = {}
all_docs = set(vector_scores.keys()) | set(bm25_dict.keys())
for doc in all_docs:
v_score = vector_scores.get(doc, 0) * vector_weight
k_score = bm25_dict.get(doc, 0) * keyword_weight
combined_scores[doc] = v_score + k_score
# Sort and return top_k
sorted_results = sorted(
combined_scores.items(),
key=lambda x: x[1],
reverse=True
)[:top_k]
return sorted_results
# When to use hybrid search:
# - Technical documentation (exact function names + semantic meaning)
# - Legal documents (specific clause numbers + concepts)
# - Product catalogs (SKUs + descriptions)
Mini-Project: Document Q&A System
Copy
from openai import OpenAI
from typing import List
import asyncio
class DocumentQA:
"""Complete document Q&A system with citations"""
def __init__(self, database_url: str):
self.store = VectorStore(database_url)
self.chunker = SmartChunker()
self.openai = OpenAI()
async def initialize(self):
await self.store.connect()
async def add_document(
self,
content: str,
doc_id: str,
title: str = "",
source: str = ""
):
"""Process and store a document"""
chunks = self.chunker.chunk_text(
content,
doc_id=doc_id,
base_metadata={"title": title, "source": source}
)
documents = [
{"content": chunk.content, "metadata": chunk.metadata}
for chunk in chunks
]
await self.store.add_documents_batch(documents)
return len(chunks)
async def ask(
self,
question: str,
top_k: int = 5,
include_citations: bool = True
) -> dict:
"""Answer question using relevant context"""
# Retrieve relevant chunks
results = await self.store.search(question, limit=top_k)
if not results:
return {
"answer": "I couldn't find relevant information to answer your question.",
"sources": []
}
# Build context
context_parts = []
sources = []
for i, result in enumerate(results, 1):
context_parts.append(f"[{i}] {result.content}")
sources.append({
"id": i,
"doc_id": result.metadata.get("doc_id"),
"title": result.metadata.get("title"),
"chunk_index": result.metadata.get("chunk_index"),
"similarity": result.similarity
})
context = "\n\n".join(context_parts)
# Generate answer
system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Rules:
- Only use information from the provided context
- If the context doesn't contain the answer, say so
- Cite sources using [1], [2], etc. when referencing specific information
- Be concise but thorough"""
response = self.openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
],
temperature=0
)
answer = response.choices[0].message.content
return {
"answer": answer,
"sources": sources if include_citations else []
}
# Usage
async def main():
qa = DocumentQA("postgresql://user:pass@localhost/docs")
await qa.initialize()
# Add some documents
await qa.add_document(
content="Machine learning is a subset of AI...",
doc_id="ml_guide",
title="Machine Learning Guide",
source="internal_docs"
)
# Ask questions
result = await qa.ask("What is machine learning?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
asyncio.run(main())
Performance Optimization
Index Tuning for pgvector
Copy
-- Check index usage
SELECT indexname, idx_scan, idx_tup_read
FROM pg_stat_user_indexes
WHERE tablename = 'documents';
-- Tune IVFFlat lists based on data size
-- Rule: lists ≈ sqrt(num_rows)
-- 1M rows → lists = 1000
-- 10M rows → lists = 3162
-- Increase probes for better recall (slower)
SET ivfflat.probes = 10; -- Default is 1
-- For HNSW, tune ef_search
SET hnsw.ef_search = 40; -- Default is 40, higher = better recall
Embedding Caching
Copy
import hashlib
import json
from pathlib import Path
class EmbeddingCache:
"""Cache embeddings to reduce API costs"""
def __init__(self, cache_dir: str = ".embedding_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.memory_cache: dict = {}
def _hash_text(self, text: str, model: str) -> str:
key = f"{model}:{text}"
return hashlib.md5(key.encode()).hexdigest()
def get(self, text: str, model: str = "text-embedding-3-small") -> List[float] | None:
cache_key = self._hash_text(text, model)
# Check memory cache first
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# Check disk cache
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
embedding = json.load(f)
self.memory_cache[cache_key] = embedding
return embedding
return None
def set(self, text: str, embedding: List[float], model: str = "text-embedding-3-small"):
cache_key = self._hash_text(text, model)
# Save to memory
self.memory_cache[cache_key] = embedding
# Save to disk
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump(embedding, f)
Key Takeaways
pgvector for Most Apps
If you already use PostgreSQL, pgvector is the easiest path. Great for millions of vectors.
Chunking Is Critical
Bad chunks = bad search. Use semantic boundaries, overlap, and test your chunk sizes.
Hybrid Search Wins
Combine vector + keyword search for best results, especially for technical content.
Cache Everything
Embeddings are expensive. Cache aggressively to reduce costs by 90%+.
What’s Next
RAG Systems
Build production Retrieval-Augmented Generation pipelines