Skip to main content

Databases for AI Engineers

AI applications need databases for:
  • User data: Authentication, preferences, usage tracking
  • Vector storage: Embeddings with pgvector
  • Conversations: Chat history, messages
  • Documents: Uploaded files, chunks, metadata
This crash course covers PostgreSQL with SQLAlchemy (the most popular Python ORM).

Setup

# Install dependencies
pip install sqlalchemy asyncpg psycopg2-binary alembic

# For async support
pip install sqlalchemy[asyncio]

# PostgreSQL with Docker
docker run -d \
  --name postgres \
  -e POSTGRES_USER=admin \
  -e POSTGRES_PASSWORD=password \
  -e POSTGRES_DB=aiapp \
  -p 5432:5432 \
  pgvector/pgvector:pg16

SQLAlchemy Basics

Connection Setup

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

# Sync connection
DATABASE_URL = "postgresql://admin:password@localhost:5432/aiapp"

engine = create_engine(DATABASE_URL, echo=True)  # echo=True for SQL logging
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency for FastAPI
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "postgresql+asyncpg://admin:password@localhost:5432/aiapp"

engine = create_async_engine(DATABASE_URL, echo=True)

AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

Base = declarative_base()

# Async dependency for FastAPI
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

Defining Models

# models.py
from sqlalchemy import Column, String, Integer, Float, Boolean, DateTime, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from datetime import datetime
import uuid

from database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    email = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(255))
    plan = Column(String(50), default="free")  # free, pro, enterprise
    api_key = Column(String(64), unique=True, index=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    documents = relationship("Document", back_populates="user", cascade="all, delete-orphan")
    conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
    
    def __repr__(self):
        return f"<User {self.email}>"


class Document(Base):
    __tablename__ = "documents"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    filename = Column(String(255), nullable=False)
    file_type = Column(String(50), nullable=False)
    file_size = Column(Integer, nullable=False)
    status = Column(String(50), default="processing")  # processing, ready, error
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    user = relationship("User", back_populates="documents")
    chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
    
    __table_args__ = (
        {"schema": None}  # Use default schema
    )


class DocumentChunk(Base):
    __tablename__ = "document_chunks"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
    content = Column(Text, nullable=False)
    chunk_index = Column(Integer, nullable=False)
    # Note: For vector column, see pgvector section below
    metadata = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    document = relationship("Document", back_populates="chunks")


class Conversation(Base):
    __tablename__ = "conversations"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    title = Column(String(255))
    document_ids = Column(ARRAY(UUID(as_uuid=True)), default=[])
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    user = relationship("User", back_populates="conversations")
    messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")


class Message(Base):
    __tablename__ = "messages"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    conversation_id = Column(UUID(as_uuid=True), ForeignKey("conversations.id", ondelete="CASCADE"), nullable=False)
    role = Column(String(50), nullable=False)  # user, assistant, system
    content = Column(Text, nullable=False)
    sources = Column(JSON, default=[])  # Citations
    token_count = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Relationships
    conversation = relationship("Conversation", back_populates="messages")

CRUD Operations

Create

# Sync
def create_user(db: Session, email: str, name: str) -> User:
    user = User(email=email, name=name, api_key=generate_api_key())
    db.add(user)
    db.commit()
    db.refresh(user)  # Reload with generated values (id, created_at)
    return user

# Async
async def create_user(db: AsyncSession, email: str, name: str) -> User:
    user = User(email=email, name=name, api_key=generate_api_key())
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

# Bulk insert
async def create_chunks(db: AsyncSession, chunks: list[dict]) -> None:
    db.add_all([DocumentChunk(**chunk) for chunk in chunks])
    await db.commit()

Read

from sqlalchemy import select
from sqlalchemy.orm import selectinload

# Get by ID
async def get_user(db: AsyncSession, user_id: str) -> User | None:
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# Get by field
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(
        select(User).where(User.email == email)
    )
    return result.scalar_one_or_none()

# Get multiple
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
    result = await db.execute(
        select(User).offset(skip).limit(limit)
    )
    return result.scalars().all()

# With eager loading (avoid N+1 queries)
async def get_user_with_documents(db: AsyncSession, user_id: str) -> User | None:
    result = await db.execute(
        select(User)
        .options(selectinload(User.documents))
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

# Complex queries
async def get_recent_documents(
    db: AsyncSession, 
    user_id: str, 
    status: str = "ready",
    limit: int = 10
) -> list[Document]:
    result = await db.execute(
        select(Document)
        .where(Document.user_id == user_id)
        .where(Document.status == status)
        .order_by(Document.created_at.desc())
        .limit(limit)
    )
    return result.scalars().all()

Update

# Update single record
async def update_document_status(
    db: AsyncSession, 
    document_id: str, 
    status: str
) -> Document | None:
    result = await db.execute(
        select(Document).where(Document.id == document_id)
    )
    document = result.scalar_one_or_none()
    
    if document:
        document.status = status
        await db.commit()
        await db.refresh(document)
    
    return document

# Bulk update
from sqlalchemy import update

async def mark_all_processed(db: AsyncSession, user_id: str):
    await db.execute(
        update(Document)
        .where(Document.user_id == user_id)
        .where(Document.status == "processing")
        .values(status="ready")
    )
    await db.commit()

Delete

# Delete single
async def delete_document(db: AsyncSession, document_id: str) -> bool:
    result = await db.execute(
        select(Document).where(Document.id == document_id)
    )
    document = result.scalar_one_or_none()
    
    if document:
        await db.delete(document)
        await db.commit()
        return True
    return False

# Bulk delete
from sqlalchemy import delete

async def delete_old_chunks(db: AsyncSession, document_id: str):
    await db.execute(
        delete(DocumentChunk).where(DocumentChunk.document_id == document_id)
    )
    await db.commit()

Repository Pattern

Organize database operations cleanly:
# repositories/user_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models import User

class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def create(self, email: str, name: str) -> User:
        user = User(email=email, name=name)
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
        return user
    
    async def get_by_id(self, user_id: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()
    
    async def get_by_api_key(self, api_key: str) -> User | None:
        result = await self.db.execute(
            select(User).where(User.api_key == api_key)
        )
        return result.scalar_one_or_none()
    
    async def get_with_documents(self, user_id: str) -> User | None:
        result = await self.db.execute(
            select(User)
            .options(selectinload(User.documents))
            .where(User.id == user_id)
        )
        return result.scalar_one_or_none()
    
    async def update_plan(self, user_id: str, plan: str) -> User | None:
        user = await self.get_by_id(user_id)
        if user:
            user.plan = plan
            await self.db.commit()
            await self.db.refresh(user)
        return user

# Usage in FastAPI
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(404, "User not found")
    return user

pgvector for Embeddings

Essential for AI applications with semantic search:
# Enable pgvector extension (run once)
# CREATE EXTENSION IF NOT EXISTS vector;

from sqlalchemy import Column, Index
from pgvector.sqlalchemy import Vector

class DocumentChunk(Base):
    __tablename__ = "document_chunks"
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"), nullable=False)
    content = Column(Text, nullable=False)
    embedding = Column(Vector(1536))  # OpenAI embedding dimension
    chunk_index = Column(Integer, nullable=False)
    metadata = Column(JSON, default={})
    
    # Create HNSW index for fast similarity search
    __table_args__ = (
        Index(
            'idx_chunks_embedding',
            embedding,
            postgresql_using='hnsw',
            postgresql_with={'m': 16, 'ef_construction': 64},
            postgresql_ops={'embedding': 'vector_cosine_ops'}
        ),
    )
from sqlalchemy import select, func
from pgvector.sqlalchemy import Vector

async def search_similar_chunks(
    db: AsyncSession,
    query_embedding: list[float],
    user_id: str,
    limit: int = 5,
    threshold: float = 0.7
) -> list[tuple[DocumentChunk, float]]:
    """Search for similar chunks using cosine similarity"""
    
    # Calculate similarity (1 - cosine distance)
    similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
    
    result = await db.execute(
        select(DocumentChunk, similarity.label('similarity'))
        .join(Document)
        .where(Document.user_id == user_id)
        .where(similarity >= threshold)
        .order_by(similarity.desc())
        .limit(limit)
    )
    
    return [(row.DocumentChunk, row.similarity) for row in result.all()]

# Alternative: L2 distance
async def search_by_l2_distance(
    db: AsyncSession,
    query_embedding: list[float],
    limit: int = 5
) -> list[DocumentChunk]:
    result = await db.execute(
        select(DocumentChunk)
        .order_by(DocumentChunk.embedding.l2_distance(query_embedding))
        .limit(limit)
    )
    return result.scalars().all()

Complete RAG Repository

# repositories/rag_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from openai import AsyncOpenAI
from models import Document, DocumentChunk

class RAGRepository:
    def __init__(self, db: AsyncSession):
        self.db = db
        self.openai = AsyncOpenAI()
    
    async def store_document(
        self,
        user_id: str,
        filename: str,
        chunks: list[str]
    ) -> Document:
        """Store document and its chunks with embeddings"""
        # Create document
        document = Document(
            user_id=user_id,
            filename=filename,
            file_type=filename.split('.')[-1],
            file_size=sum(len(c) for c in chunks),
            status="processing"
        )
        self.db.add(document)
        await self.db.flush()  # Get document.id without committing
        
        # Generate embeddings
        response = await self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=chunks
        )
        
        # Store chunks with embeddings
        for i, (chunk, embedding_data) in enumerate(zip(chunks, response.data)):
            db_chunk = DocumentChunk(
                document_id=document.id,
                content=chunk,
                embedding=embedding_data.embedding,
                chunk_index=i
            )
            self.db.add(db_chunk)
        
        document.status = "ready"
        await self.db.commit()
        await self.db.refresh(document)
        
        return document
    
    async def search(
        self,
        user_id: str,
        query: str,
        document_ids: list[str] | None = None,
        limit: int = 5
    ) -> list[dict]:
        """Search for relevant chunks"""
        # Get query embedding
        response = await self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=query
        )
        query_embedding = response.data[0].embedding
        
        # Build query
        similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
        
        stmt = (
            select(
                DocumentChunk,
                Document.filename,
                similarity.label('similarity')
            )
            .join(Document)
            .where(Document.user_id == user_id)
            .where(Document.status == "ready")
        )
        
        if document_ids:
            stmt = stmt.where(Document.id.in_(document_ids))
        
        stmt = stmt.order_by(similarity.desc()).limit(limit)
        
        result = await self.db.execute(stmt)
        
        return [
            {
                "content": row.DocumentChunk.content,
                "document": row.filename,
                "similarity": float(row.similarity)
            }
            for row in result.all()
        ]
    
    async def delete_document(self, document_id: str) -> bool:
        """Delete document and all its chunks"""
        result = await self.db.execute(
            select(Document).where(Document.id == document_id)
        )
        document = result.scalar_one_or_none()
        
        if document:
            await self.db.delete(document)  # Cascades to chunks
            await self.db.commit()
            return True
        return False

Migrations with Alembic

# Initialize alembic
alembic init alembic

# Edit alembic.ini
# sqlalchemy.url = postgresql://admin:password@localhost:5432/aiapp

# Edit alembic/env.py
# alembic/env.py
from models import Base  # Import your models
target_metadata = Base.metadata
# Create migration
alembic revision --autogenerate -m "create users table"

# Run migrations
alembic upgrade head

# Rollback
alembic downgrade -1
Example migration:
# alembic/versions/001_create_users.py
"""create users table

Revision ID: 001
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', UUID(as_uuid=True), primary_key=True),
        sa.Column('email', sa.String(255), nullable=False, unique=True),
        sa.Column('name', sa.String(255)),
        sa.Column('plan', sa.String(50), default='free'),
        sa.Column('api_key', sa.String(64), unique=True),
        sa.Column('created_at', sa.DateTime, server_default=sa.func.now()),
        sa.Column('updated_at', sa.DateTime, server_default=sa.func.now()),
    )
    op.create_index('idx_users_email', 'users', ['email'])
    op.create_index('idx_users_api_key', 'users', ['api_key'])

def downgrade():
    op.drop_table('users')

Transactions

# Automatic transaction (commit on success, rollback on error)
async def transfer_documents(
    db: AsyncSession,
    from_user_id: str,
    to_user_id: str,
    document_ids: list[str]
):
    async with db.begin():  # Transaction context
        # Update all documents
        await db.execute(
            update(Document)
            .where(Document.id.in_(document_ids))
            .where(Document.user_id == from_user_id)
            .values(user_id=to_user_id)
        )
        
        # Update chunks
        await db.execute(
            update(DocumentChunk)
            .where(DocumentChunk.document_id.in_(document_ids))
            .values(user_id=to_user_id)
        )
        # Auto-commits if no exception

# Manual transaction control
async def complex_operation(db: AsyncSession):
    try:
        # Multiple operations
        user = User(email="[email protected]")
        db.add(user)
        await db.flush()  # Get ID without commit
        
        document = Document(user_id=user.id, filename="test.pdf")
        db.add(document)
        
        await db.commit()  # Commit all
    except Exception as e:
        await db.rollback()  # Rollback on error
        raise

Connection Pooling

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool

engine = create_async_engine(
    DATABASE_URL,
    poolclass=AsyncAdaptedQueuePool,
    pool_size=20,           # Connections to keep open
    max_overflow=10,        # Extra connections when pool is full
    pool_timeout=30,        # Seconds to wait for connection
    pool_recycle=1800,      # Recycle connections after 30 min
    pool_pre_ping=True,     # Verify connection before use
)

FastAPI Integration

Complete example:
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager

from database import engine, AsyncSessionLocal, Base
from repositories import UserRepository, RAGRepository

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create tables on startup
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # Cleanup on shutdown
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.post("/users")
async def create_user(email: str, name: str, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    try:
        user = await repo.create(email, name)
        return {"id": str(user.id), "email": user.email}
    except IntegrityError:
        raise HTTPException(400, "Email already exists")

@app.post("/documents/upload")
async def upload_document(
    filename: str,
    content: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    repo = RAGRepository(db)
    chunks = chunk_text(content)  # Your chunking logic
    document = await repo.store_document(user_id, filename, chunks)
    return {"document_id": str(document.id), "chunks": len(chunks)}

@app.post("/search")
async def search(
    query: str,
    user_id: str,
    db: AsyncSession = Depends(get_db)
):
    repo = RAGRepository(db)
    results = await repo.search(user_id, query)
    return {"results": results}

Quick Reference

OperationCode
Createdb.add(obj); await db.commit()
Read oneselect(Model).where(Model.id == id)
Read manyselect(Model).limit(10)
Updateobj.field = value; await db.commit()
Deleteawait db.delete(obj)
Eager load.options(selectinload(Model.relation))
Transactionasync with db.begin():
You’re ready! With Python, FastAPI, and database skills, continue to the LLM Fundamentals module.