Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Databases for AI Engineers

If the LLM is the brain of your AI application, the database is the filing cabinet that makes it useful. Without a database, every conversation starts from scratch, every document is re-processed on each request, and you have no way to tell User A’s data from User B’s. AI applications need databases for:
  • User data: Authentication, preferences, usage tracking
  • Vector storage: Embeddings with pgvector — this is the one that separates AI apps from traditional ones
  • Conversations: Chat history, messages (because users expect “remember what I said earlier”)
  • Documents: Uploaded files, chunks, metadata
This crash course covers PostgreSQL with SQLAlchemy (the most popular Python ORM). We use PostgreSQL specifically because pgvector lets you store embeddings alongside your relational data in one database, instead of running a separate vector database like Pinecone or Weaviate. One database means one connection to manage, one backup strategy, and one thing that can fail at 3 AM.

Setup

# Install dependencies
pip install sqlalchemy asyncpg psycopg2-binary alembic

# For async support
pip install sqlalchemy[asyncio]

# PostgreSQL with Docker
docker run -d \
  --name postgres \
  -e POSTGRES_USER=admin \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=aiapp \
  -p 5432:5432 \
  pgvector/pgvector:pg16

SQLAlchemy Basics

Connection Setup

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

# Sync connection
DATABASE_URL = "postgresql://admin:password@localhost:5432/aiapp"

engine = create_engine(DATABASE_URL, echo=True)  # echo=True for SQL logging
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency for FastAPI
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
Async is recommended because AI applications spend most of their time waiting — waiting for the LLM API, waiting for embedding generation, waiting for vector search. Async connections let your server handle other requests during those waits instead of blocking a thread. Think of it as the difference between a restaurant with one waiter who stands at your table until your food arrives vs. one who takes your order and serves other tables while the kitchen works.
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

# Note the "asyncpg" driver instead of "psycopg2" -- this is what enables async
DATABASE_URL = "postgresql+asyncpg://admin:password@localhost:5432/aiapp"

engine = create_async_engine(DATABASE_URL, echo=True)

AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

Base = declarative_base()

# Async dependency for FastAPI
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Defining Models

# models.py
from sqlalchemy import Column, String, Integer, Float, Boolean, DateTime, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from datetime import datetime
import uuid

from database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(255))
    plan = Column(String(50), default="free")  # free, pro, enterprise
    api_key = Column(String(64), unique=True, index=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    documents = relationship("Document", back_populates="user", cascade="all, delete-orphan")
    conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<User {self.email}>"


class Document(Base):
    __tablename__ = "documents"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    filename = Column(String(255), nullable=False)
    file_type = Column(String(50), nullable=False)
    file_size = Column(Integer, nullable=False)
    status = Column(String(50), default="processing")  # processing, ready, error
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    user = relationship("User", back_populates="documents")
    chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
    
    __table_args__ = (
        {"schema": None}  # Use default schema
    )


class DocumentChunk(Base):
    __tablename__ = "document_chunks"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
    content = Column(Text, nullable=False)
    chunk_index = Column(Integer, nullable=False)
    # Note: For vector column, see pgvector section below
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    document = relationship("Document", back_populates="chunks")


class Conversation(Base):
    __tablename__ = "conversations"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    title = Column(String(255))
    document_ids = Column(ARRAY(UUID(as_uuid=True)), default=[])
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    user = relationship("User", back_populates="conversations")
    messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")


class Message(Base):
    __tablename__ = "messages"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(UUID(as_uuid=True), ForeignKey("conversations.id", ondelete="CASCADE"), nullable=False)
    role = Column(String(50), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    sources = Column(JSON, default=[])  # Citations
    token_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    conversation = relationship("Conversation", back_populates="messages")

CRUD Operations

CRUD (Create, Read, Update, Delete) is the bread and butter of database work. If you have used any web framework before, this will feel familiar. The key difference in AI applications is that you will often need to create records in bulk (hundreds of embedding chunks at once) and read with complex filters (vector similarity search with user scoping). The patterns below cover both simple and batch operations.

Create

# Sync
def create_user(db: Session, email: str, name: str) -> User:
    user = User(email=email, name=name, api_key=generate_api_key())
    db.add(user)
    db.commit()
    db.refresh(user)  # Reload with generated values (id, created_at)
    return user

# Async
async def create_user(db: AsyncSession, email: str, name: str) -> User:
    user = User(email=email, name=name, api_key=generate_api_key())
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

# Bulk insert
async def create_chunks(db: AsyncSession, chunks: list[dict]) -> None:
    db.add_all([DocumentChunk(**chunk) for chunk in chunks])
    await db.commit()

Read

from sqlalchemy import select
from sqlalchemy.orm import selectinload

# Get by ID
async def get_user(db: AsyncSession, user_id: str) -> User | None:
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# Get by field
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(
        select(User).where(User.email == email)
    )
    return result.scalar_one_or_none()

# Get multiple
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
    result = await db.execute(
        select(User).offset(skip).limit(limit)
    )
    return result.scalars().all()

# With eager loading (avoid N+1 queries)
# N+1 is the most common database performance bug: loading a user, then
# issuing a separate query for each of their documents (1 + N queries).
# selectinload fetches all documents in a single second query (2 total).
async def get_user_with_documents(db: AsyncSession, user_id: str) -> User | None:
    result = await db.execute(
        select(User)
        .options(selectinload(User.documents))
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# Complex queries
async def get_recent_documents(
    db: AsyncSession, 
    user_id: str, 
    status: str = "ready",
    limit: int = 10
) -> list[Document]:
    result = await db.execute(
        select(Document)
        .where(Document.user_id == user_id)
        .where(Document.status == status)
        .order_by(Document.created_at.desc())
        .limit(limit)
    )
    return result.scalars().all()

Update

# Update single record
async def update_document_status(
    db: AsyncSession, 
    document_id: str, 
    status: str
) -> Document | None:
    result = await db.execute(
        select(Document).where(Document.id == document_id)
    )
    document = result.scalar_one_or_none()
    
    if document:
        document.status = status
        await db.commit()
        await db.refresh(document)
    
    return document

# Bulk update
from sqlalchemy import update

async def mark_all_processed(db: AsyncSession, user_id: str):
    await db.execute(
        update(Document)
        .where(Document.user_id == user_id)
        .where(Document.status == "processing")
        .values(status="ready")
    )
    await db.commit()

Delete

# Delete single
async def delete_document(db: AsyncSession, document_id: str) -> bool:
    result = await db.execute(
        select(Document).where(Document.id == document_id)
    )
    document = result.scalar_one_or_none()
    
    if document:
        await db.delete(document)
        await db.commit()
        return True
    return False

# Bulk delete
from sqlalchemy import delete

async def delete_old_chunks(db: AsyncSession, document_id: str):
    await db.execute(
        delete(DocumentChunk).where(DocumentChunk.document_id == document_id)
    )
    await db.commit()

Repository Pattern

The repository pattern puts all database logic for a model in one place, keeping your API routes clean. Without it, you end up with SQL scattered across your route handlers, making it impossible to test database logic independently or swap storage backends. Think of it as the “one class per table” rule for database access. Organize database operations cleanly:
# repositories/user_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models import User

class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def create(self, email: str, name: str) -> User:
        user = User(email=email, name=name)
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
        return user
    
    async def get_by_id(self, user_id: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()
    
    async def get_by_api_key(self, api_key: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.api_key == api_key)
        )
        return result.scalar_one_or_none()
    
    async def get_with_documents(self, user_id: str) -> User | None:
        result = await self.db.execute(
            select(User)
            .options(selectinload(User.documents))
            .where(User.id == user_id)
        )
        return result.scalar_one_or_none()
    
    async def update_plan(self, user_id: str, plan: str) -> User | None:
        user = await self.get_by_id(user_id)
        if user:
            user.plan = plan
            await self.db.commit()
            await self.db.refresh(user)
        return user

# Usage in FastAPI
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(404, "User not found")
    return user

pgvector for Embeddings

This is the section that makes PostgreSQL an AI database. pgvector adds a vector column type and similarity search operators directly to Postgres, so you can store embeddings right next to your relational data. The practical benefit: a single query can find “the 5 most similar document chunks belonging to user X that were uploaded after January 1st” — try doing that with a standalone vector database that knows nothing about your user model. Essential for AI applications with semantic search:
# Enable pgvector extension (run once)
# CREATE EXTENSION IF NOT EXISTS vector;

from sqlalchemy import Column, Index
from pgvector.sqlalchemy import Vector

class DocumentChunk(Base):
    __tablename__ = "document_chunks"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"), nullable=False)
    content = Column(Text, nullable=False)
    embedding = Column(Vector(1536))  # OpenAI embedding dimension
    chunk_index = Column(Integer, nullable=False)
    metadata = Column(JSON, default={})
    
    # Create HNSW index for fast similarity search
    __table_args__ = (
        Index(
            'idx_chunks_embedding',
            embedding,
            postgresql_using='hnsw',
            postgresql_with={'m': 16, 'ef_construction': 64},
            postgresql_ops={'embedding': 'vector_cosine_ops'}
        ),
    )
from sqlalchemy import select, func
from pgvector.sqlalchemy import Vector

async def search_similar_chunks(
    db: AsyncSession,
    query_embedding: list[float],
    user_id: str,
    limit: int = 5,
    threshold: float = 0.7
) -> list[tuple[DocumentChunk, float]]:
    """Search for similar chunks using cosine similarity"""
    
    # Calculate similarity (1 - cosine distance)
    similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
    
    result = await db.execute(
        select(DocumentChunk, similarity.label('similarity'))
        .join(Document)
        .where(Document.user_id == user_id)
        .where(similarity >= threshold)
        .order_by(similarity.desc())
        .limit(limit)
    )
    
    return [(row.DocumentChunk, row.similarity) for row in result.all()]

# Alternative: L2 distance
async def search_by_l2_distance(
    db: AsyncSession,
    query_embedding: list[float],
    limit: int = 5
) -> list[DocumentChunk]:
    result = await db.execute(
        select(DocumentChunk)
        .order_by(DocumentChunk.embedding.l2_distance(query_embedding))
        .limit(limit)
    )
    return result.scalars().all()

Complete RAG Repository

# repositories/rag_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from openai import AsyncOpenAI
from models import Document, DocumentChunk

class RAGRepository:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.openai = AsyncOpenAI()
    
    async def store_document(
        self,
        user_id: str,
        filename: str,
        chunks: list[str]
    ) -> Document:
        """Store document and its chunks with embeddings"""
        # Create document
        document = Document(
            user_id=user_id,
            filename=filename,
            file_type=filename.split('.')[-1],
            file_size=sum(len(c) for c in chunks),
            status="processing"
        )
        self.db.add(document)
        await self.db.flush()  # Get document.id without committing
        
        # Generate embeddings
        response = await self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=chunks
        )
        
        # Store chunks with embeddings
        for i, (chunk, embedding_data) in enumerate(zip(chunks, response.data)):
            db_chunk = DocumentChunk(
                document_id=document.id,
                content=chunk,
                embedding=embedding_data.embedding,
                chunk_index=i
            )
            self.db.add(db_chunk)
        
        document.status = "ready"
        await self.db.commit()
        await self.db.refresh(document)
        
        return document
    
    async def search(
        self,
        user_id: str,
        query: str,
        document_ids: list[str] | None = None,
        limit: int = 5
    ) -> list[dict]:
        """Search for relevant chunks"""
        # Get query embedding
        response = await self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=query
        )
        query_embedding = response.data[0].embedding
        
        # Build query
        similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
        
        stmt = (
            select(
                DocumentChunk,
                Document.filename,
                similarity.label('similarity')
            )
            .join(Document)
            .where(Document.user_id == user_id)
            .where(Document.status == "ready")
        )
        
        if document_ids:
            stmt = stmt.where(Document.id.in_(document_ids))
        
        stmt = stmt.order_by(similarity.desc()).limit(limit)
        
        result = await self.db.execute(stmt)
        
        return [
            {
                "content": row.DocumentChunk.content,
                "document": row.filename,
                "similarity": float(row.similarity)
            }
            for row in result.all()
        ]
    
    async def delete_document(self, document_id: str) -> bool:
        """Delete document and all its chunks"""
        result = await self.db.execute(
            select(Document).where(Document.id == document_id)
        )
        document = result.scalar_one_or_none()
        
        if document:
            await self.db.delete(document)  # Cascades to chunks
            await self.db.commit()
            return True
        return False

Migrations with Alembic

# Initialize alembic
alembic init alembic

# Edit alembic.ini
# sqlalchemy.url = postgresql://admin:password@localhost:5432/aiapp

# Edit alembic/env.py
# alembic/env.py
from models import Base  # Import your models
target_metadata = Base.metadata
# Create migration
alembic revision --autogenerate -m "create users table"

# Run migrations
alembic upgrade head

# Rollback
alembic downgrade -1
Example migration:
# alembic/versions/001_create_users.py
"""create users table

Revision ID: 001
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', UUID(as_uuid=True), primary_key=True),
        sa.Column('email', sa.String(255), nullable=False, unique=True),
        sa.Column('name', sa.String(255)),
        sa.Column('plan', sa.String(50), default='free'),
        sa.Column('api_key', sa.String(64), unique=True),
        sa.Column('created_at', sa.DateTime, server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime, server_default=sa.func.now()),
    )
    op.create_index('idx_users_email', 'users', ['email'])
    op.create_index('idx_users_api_key', 'users', ['api_key'])

def downgrade():
    op.drop_table('users')

Transactions

Transactions ensure that a group of database operations either all succeed or all fail — there is no in-between state. This is critical in AI applications when you store a document and its chunks: if the chunk insertion fails halfway through, you don’t want a document record pointing to half its chunks. The db.begin() context manager handles this automatically: commit on success, rollback on any exception.
# Automatic transaction (commit on success, rollback on error)
async def transfer_documents(
    db: AsyncSession,
    from_user_id: str,
    to_user_id: str,
    document_ids: list[str]
):
    async with db.begin():  # Transaction context
        # Update all documents
        await db.execute(
            update(Document)
            .where(Document.id.in_(document_ids))
            .where(Document.user_id == from_user_id)
            .values(user_id=to_user_id)
        )
        
        # Update chunks
        await db.execute(
            update(DocumentChunk)
            .where(DocumentChunk.document_id.in_(document_ids))
            .values(user_id=to_user_id)
        )
        # Auto-commits if no exception

# Manual transaction control
async def complex_operation(db: AsyncSession):
    try:
        # Multiple operations
        user = User(email="test@example.com")
        db.add(user)
        await db.flush()  # Get ID without commit
        
        document = Document(user_id=user.id, filename="test.pdf")
        db.add(document)
        
        await db.commit()  # Commit all
    except Exception as e:
        await db.rollback()  # Rollback on error
        raise

Connection Pooling

Database connections are expensive to create (TCP handshake, authentication, SSL negotiation). A connection pool keeps a set of connections open and reuses them, like a carpool lane for database traffic. Without pooling, every API request opens and closes a connection — under load, this causes connection exhaustion and your database starts rejecting new connections entirely.
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool

engine = create_async_engine(
    DATABASE_URL,
    poolclass=AsyncAdaptedQueuePool,
    pool_size=20,           # Baseline connections to keep warm
    max_overflow=10,        # Burst capacity (20+10=30 max connections)
    pool_timeout=30,        # How long to wait before giving up
    pool_recycle=1800,      # Replace stale connections every 30 min
    pool_pre_ping=True,     # Catch dead connections before they cause errors
    # Tip: Set pool_size to roughly 2x your expected concurrent requests.
    # For serverless, use pool_size=5 and max_overflow=15 (fewer idle connections).
)

FastAPI Integration

Complete example:
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

from database import engine, AsyncSessionLocal, Base
from repositories import UserRepository, RAGRepository

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create tables on startup
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # Cleanup on shutdown
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.post("/users")
async def create_user(email: str, name: str, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    try:
        user = await repo.create(email, name)
        return {"id": str(user.id), "email": user.email}
    except IntegrityError:
        raise HTTPException(400, "Email already exists")

@app.post("/documents/upload")
async def upload_document(
    filename: str,
    content: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    repo = RAGRepository(db)
    chunks = chunk_text(content)  # Your chunking logic
    document = await repo.store_document(user_id, filename, chunks)
    return {"document_id": str(document.id), "chunks": len(chunks)}

@app.post("/search")
async def search(
    query: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    repo = RAGRepository(db)
    results = await repo.search(user_id, query)
    return {"results": results}

pgvector vs. Dedicated Vector Databases

One of the most common architecture questions in AI engineering: should you use pgvector or a dedicated vector database? Here is an honest comparison.
Factorpgvector (PostgreSQL)Dedicated Vector DB (Pinecone, Qdrant, Weaviate)
Setup complexityAdd an extension to your existing PostgresNew service to deploy, monitor, and pay for
Query speed (under 1M vectors)Good (10-50ms with HNSW index)Excellent (1-10ms)
Query speed (over 10M vectors)Degrades without careful tuningDesigned for this scale
FilteringFull SQL — JOIN, WHERE, GROUP BY on relational dataLimited metadata filtering, varies by vendor
TransactionsACID transactions with your relational dataEventual consistency in most cases
CostFree (part of Postgres)$70-700+/month depending on scale
Backup and recoveryOne backup strategy for everythingSeparate backup system to manage
Hybrid queriesNatural — “find similar chunks for user X uploaded after Jan 1” in one queryRequires pre-filtering or post-filtering, often slower for complex filters
Decision framework:
  • Under 1M vectors, simple metadata filters, already using Postgres? Use pgvector. The operational simplicity wins.
  • Over 5M vectors, need sub-5ms p99 latency, minimal relational data? Use a dedicated vector DB.
  • Between 1M-5M vectors? Benchmark both with your actual queries. The answer depends on your filter complexity and latency requirements.

Common Database Pitfalls in AI Applications

Forgetting to index the user_id filter on vector search. Your similarity search query includes WHERE user_id = $1, but if user_id is not indexed, Postgres does a sequential scan before the vector search. At 100K+ rows, this turns a 20ms query into a 2-second query. Always create a composite index or ensure your HNSW index is partitioned by user. Using datetime.utcnow in SQLAlchemy defaults. The default=datetime.utcnow in the model definitions above calls utcnow() at class definition time, not at row creation time. Use default=datetime.utcnow (without parentheses, passing the function reference) or better, use server_default=sa.func.now() to let PostgreSQL handle timestamps consistently. Connection pool exhaustion under load. AI applications make slow external calls (LLM APIs, embedding APIs) while holding database connections. If your pool has 20 connections and 20 requests are all waiting on OpenAI for 5 seconds each, the 21st request gets a pool_timeout error. Either increase the pool size, release connections before making external calls, or use a task queue for long-running operations. Storing embeddings as JSON instead of vector type. Without the vector column type, you cannot use HNSW indexes or the <=> distance operator. Storing embeddings as JSONB arrays forces you to load every row into Python and compute similarity manually — which is orders of magnitude slower.

Quick Reference

OperationCode
Createdb.add(obj); await db.commit()
Read oneselect(Model).where(Model.id == id)
Read manyselect(Model).limit(10)
Updateobj.field = value; await db.commit()
Deleteawait db.delete(obj)
Eager load.options(selectinload(Model.relation))
Transactionasync with db.begin():
You’re ready! With Python, FastAPI, and database skills, continue to the LLM Fundamentals module.

Interview Deep-Dive

Strong Answer:
  • pgvector wins on operational simplicity. You have one database, one connection string, one backup strategy, one thing that pages you at 3 AM. Your vector search query can join directly against relational data: “find the 5 most similar chunks belonging to user X that were uploaded after January 1st” is a single SQL query. With a dedicated vector DB, you search vectors in one system, then join against Postgres for user filtering, which means two network round trips and application-level data stitching.
  • Dedicated vector databases (Pinecone, Weaviate, Qdrant) win on scale and specialized performance. At 10M+ vectors, they offer sub-5ms p99 latency with purpose-built indexing algorithms, horizontal sharding, and managed infrastructure. pgvector with HNSW can handle millions of vectors, but tuning m and ef_construction parameters for that scale requires expertise, and you are limited to vertical scaling on a single Postgres instance.
  • The decision framework is straightforward: under 1M vectors with existing Postgres and moderate latency requirements, use pgvector. The 20ms query time is fine for most applications, and you avoid the operational overhead of a second database. Over 5M vectors with sub-5ms latency requirements and minimal relational join needs, use a dedicated vector DB. Between 1M-5M, benchmark with your actual data and queries.
  • The hidden cost of dedicated vector DBs is data synchronization. When a user deletes a document, you need to delete its chunks from both Postgres (metadata) and Pinecone (vectors). If the Pinecone delete fails, you have orphaned vectors that return results for a deleted document. With pgvector, a single CASCADE DELETE handles both. This operational burden is real and often underestimated.
Follow-up: You choose pgvector and your similarity search query takes 800ms with 500K vectors. How do you diagnose and fix this?First, check if you have an HNSW index on the embedding column. Without it, pgvector does a sequential scan — computing cosine distance against every row. Run EXPLAIN ANALYZE on your query. If you see “Seq Scan,” create the HNSW index: CREATE INDEX ON document_chunks USING hnsw (embedding vector_cosine_ops) WITH (m=16, ef_construction=64). Second, check your WHERE filters. If you filter by user_id without a separate B-tree index on that column, Postgres might skip the HNSW index entirely. Create a composite approach: index user_id with B-tree, embeddings with HNSW, and let the query planner combine them. Third, check your ef_search parameter at query time (SET hnsw.ef_search = 40) — higher values improve recall but increase latency. At 500K vectors with a proper HNSW index, queries should be 10-30ms.
Strong Answer:
  • The N+1 problem is the most common database performance bug. You load 50 conversations with select(Conversation).limit(50) — that is 1 query. Then for each conversation, you access conversation.messages, which triggers a lazy load — that is 50 more queries. Total: 51 queries when you needed 2. At 5ms per query, that is 255ms instead of 10ms. In an AI application serving real-time chat, this latency is unacceptable.
  • Detection: enable SQLAlchemy’s echo mode (echo=True on the engine) during development and watch the SQL logs. If you see repeated SELECT statements with different IDs, you have an N+1. In production, use SQLAlchemy’s joinedload warnings or a query counter middleware that flags any request making more than 10 database queries.
  • Fix with eager loading. SQLAlchemy offers three strategies: selectinload (issues a second query with IN (id1, id2, ...) to fetch all related records at once — this is the safest default), joinedload (uses a JOIN to fetch in a single query — efficient for one-to-one relationships but can explode result sets for one-to-many), and subqueryload (uses a subquery). For conversations with messages, selectinload is the right choice: select(Conversation).options(selectinload(Conversation.messages)).limit(50). This executes 2 queries total regardless of how many conversations you load.
  • In AI applications specifically, the N+1 trap often hides in the document-chunk relationship. Loading a user’s documents, then accessing chunks for each document to run similarity search, can trigger hundreds of queries. Always eager-load the relationships you know you will access.
Follow-up: You have fixed the N+1 for conversations, but now the query returns 50 conversations with an average of 200 messages each — 10,000 message objects loaded into memory. How do you handle this?You do not need all 200 messages per conversation — you probably need the last 5-10 for display. Use a lateral join or a subquery to fetch only the N most recent messages per conversation. In SQLAlchemy, this is harder to express declaratively, so I would write it as a raw SQL query with a window function: ROW_NUMBER() OVER (PARTITION BY conversation_id ORDER BY created_at DESC) <= 10. Alternatively, load conversations first, then batch-load recent messages with a separate query filtered by conversation IDs and ordered with a LIMIT. This avoids loading 10,000 objects and instead loads 500 (50 conversations times 10 messages each).
Strong Answer:
  • Without explicit transaction management, what happens depends on your session configuration. If autocommit is off (the SQLAlchemy default), chunks 1-346 are in the session’s uncommitted buffer. The error on chunk 347 means the entire session needs to be rolled back — chunks 1-346 are discarded. The document record might have been committed earlier in a separate transaction, leaving an orphaned document with no chunks. This is the worst outcome: inconsistent state.
  • The correct design wraps the entire operation in a single transaction: create the document record, generate all embeddings, insert all chunks, and commit once. If any step fails, everything rolls back to a clean state. In SQLAlchemy: async with db.begin(): as a context manager that auto-commits on success and auto-rolls-back on exception.
  • The subtlety is that embedding generation is an external API call that should NOT be inside the transaction. You do not want to hold a database transaction open while waiting 10 seconds for OpenAI to return embeddings. The pattern is: (1) create the document with status=“processing” and commit, (2) generate embeddings outside any transaction, (3) open a new transaction, insert all chunks with embeddings, update document status to “ready”, and commit. If step 2 or 3 fails, the document stays in “processing” status and can be retried.
  • For extra robustness, use db.flush() instead of db.commit() to get the document ID without committing, then commit everything at the end. But be aware that flush still executes SQL — it just does not commit the transaction. If your embedding call fails after flush, you still need to rollback.
Follow-up: A power user uploads 50 PDFs simultaneously. Each triggers 500 chunk insertions. Your database connection pool has 20 connections and each upload holds a connection for 10+ seconds during embedding generation. What happens?Connection pool exhaustion. The 21st upload blocks on pool_timeout (default 30 seconds) and eventually throws an error. The fix is to never hold a database connection during external API calls. The pattern is: acquire connection, create document, release connection, call OpenAI for embeddings (no DB connection held), acquire connection again, insert chunks, release. In FastAPI with SQLAlchemy async, this means structuring your endpoint as multiple async with session: blocks rather than one long session. Alternatively, offload the entire upload pipeline to a background task queue (Celery, Dramatiq, or even FastAPI’s BackgroundTasks) so the API endpoint returns immediately with a “processing” status and the worker handles the slow parts without holding up the connection pool.