Databases for AI Engineers
AI applications need databases for:- User data: Authentication, preferences, usage tracking
- Vector storage: Embeddings with pgvector
- Conversations: Chat history, messages
- Documents: Uploaded files, chunks, metadata
Setup
Copy
# Install dependencies
pip install sqlalchemy asyncpg psycopg2-binary alembic
# For async support
pip install sqlalchemy[asyncio]
# PostgreSQL with Docker
docker run -d \
--name postgres \
-e POSTGRES_USER=admin \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=aiapp \
-p 5432:5432 \
pgvector/pgvector:pg16
SQLAlchemy Basics
Connection Setup
Copy
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
# Sync connection
DATABASE_URL = "postgresql://admin:password@localhost:5432/aiapp"
engine = create_engine(DATABASE_URL, echo=True) # echo=True for SQL logging
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency for FastAPI
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Async Connection (Recommended)
Copy
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "postgresql+asyncpg://admin:password@localhost:5432/aiapp"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
# Async dependency for FastAPI
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Defining Models
Copy
# models.py
from sqlalchemy import Column, String, Integer, Float, Boolean, DateTime, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID, ARRAY
from datetime import datetime
import uuid
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(255))
plan = Column(String(50), default="free") # free, pro, enterprise
api_key = Column(String(64), unique=True, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
documents = relationship("Document", back_populates="user", cascade="all, delete-orphan")
conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User {self.email}>"
class Document(Base):
__tablename__ = "documents"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
filename = Column(String(255), nullable=False)
file_type = Column(String(50), nullable=False)
file_size = Column(Integer, nullable=False)
status = Column(String(50), default="processing") # processing, ready, error
metadata = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="documents")
chunks = relationship("DocumentChunk", back_populates="document", cascade="all, delete-orphan")
__table_args__ = (
{"schema": None} # Use default schema
)
class DocumentChunk(Base):
__tablename__ = "document_chunks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False)
content = Column(Text, nullable=False)
chunk_index = Column(Integer, nullable=False)
# Note: For vector column, see pgvector section below
metadata = Column(JSON, default={})
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
document = relationship("Document", back_populates="chunks")
class Conversation(Base):
__tablename__ = "conversations"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
title = Column(String(255))
document_ids = Column(ARRAY(UUID(as_uuid=True)), default=[])
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="conversations")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
class Message(Base):
__tablename__ = "messages"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
conversation_id = Column(UUID(as_uuid=True), ForeignKey("conversations.id", ondelete="CASCADE"), nullable=False)
role = Column(String(50), nullable=False) # user, assistant, system
content = Column(Text, nullable=False)
sources = Column(JSON, default=[]) # Citations
token_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
conversation = relationship("Conversation", back_populates="messages")
CRUD Operations
Create
Copy
# Sync
def create_user(db: Session, email: str, name: str) -> User:
user = User(email=email, name=name, api_key=generate_api_key())
db.add(user)
db.commit()
db.refresh(user) # Reload with generated values (id, created_at)
return user
# Async
async def create_user(db: AsyncSession, email: str, name: str) -> User:
user = User(email=email, name=name, api_key=generate_api_key())
db.add(user)
await db.commit()
await db.refresh(user)
return user
# Bulk insert
async def create_chunks(db: AsyncSession, chunks: list[dict]) -> None:
db.add_all([DocumentChunk(**chunk) for chunk in chunks])
await db.commit()
Read
Copy
from sqlalchemy import select
from sqlalchemy.orm import selectinload
# Get by ID
async def get_user(db: AsyncSession, user_id: str) -> User | None:
result = await db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
# Get by field
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
# Get multiple
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> list[User]:
result = await db.execute(
select(User).offset(skip).limit(limit)
)
return result.scalars().all()
# With eager loading (avoid N+1 queries)
async def get_user_with_documents(db: AsyncSession, user_id: str) -> User | None:
result = await db.execute(
select(User)
.options(selectinload(User.documents))
.where(User.id == user_id)
)
return result.scalar_one_or_none()
# Complex queries
async def get_recent_documents(
db: AsyncSession,
user_id: str,
status: str = "ready",
limit: int = 10
) -> list[Document]:
result = await db.execute(
select(Document)
.where(Document.user_id == user_id)
.where(Document.status == status)
.order_by(Document.created_at.desc())
.limit(limit)
)
return result.scalars().all()
Update
Copy
# Update single record
async def update_document_status(
db: AsyncSession,
document_id: str,
status: str
) -> Document | None:
result = await db.execute(
select(Document).where(Document.id == document_id)
)
document = result.scalar_one_or_none()
if document:
document.status = status
await db.commit()
await db.refresh(document)
return document
# Bulk update
from sqlalchemy import update
async def mark_all_processed(db: AsyncSession, user_id: str):
await db.execute(
update(Document)
.where(Document.user_id == user_id)
.where(Document.status == "processing")
.values(status="ready")
)
await db.commit()
Delete
Copy
# Delete single
async def delete_document(db: AsyncSession, document_id: str) -> bool:
result = await db.execute(
select(Document).where(Document.id == document_id)
)
document = result.scalar_one_or_none()
if document:
await db.delete(document)
await db.commit()
return True
return False
# Bulk delete
from sqlalchemy import delete
async def delete_old_chunks(db: AsyncSession, document_id: str):
await db.execute(
delete(DocumentChunk).where(DocumentChunk.document_id == document_id)
)
await db.commit()
Repository Pattern
Organize database operations cleanly:Copy
# repositories/user_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models import User
class UserRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def create(self, email: str, name: str) -> User:
user = User(email=email, name=name)
self.db.add(user)
await self.db.commit()
await self.db.refresh(user)
return user
async def get_by_id(self, user_id: str) -> User | None:
result = await self.db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_by_api_key(self, api_key: str) -> User | None:
result = await self.db.execute(
select(User).where(User.api_key == api_key)
)
return result.scalar_one_or_none()
async def get_with_documents(self, user_id: str) -> User | None:
result = await self.db.execute(
select(User)
.options(selectinload(User.documents))
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def update_plan(self, user_id: str, plan: str) -> User | None:
user = await self.get_by_id(user_id)
if user:
user.plan = plan
await self.db.commit()
await self.db.refresh(user)
return user
# Usage in FastAPI
@app.get("/users/{user_id}")
async def get_user(user_id: str, db: AsyncSession = Depends(get_db)):
repo = UserRepository(db)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(404, "User not found")
return user
pgvector for Embeddings
Essential for AI applications with semantic search:Copy
# Enable pgvector extension (run once)
# CREATE EXTENSION IF NOT EXISTS vector;
from sqlalchemy import Column, Index
from pgvector.sqlalchemy import Vector
class DocumentChunk(Base):
__tablename__ = "document_chunks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"), nullable=False)
content = Column(Text, nullable=False)
embedding = Column(Vector(1536)) # OpenAI embedding dimension
chunk_index = Column(Integer, nullable=False)
metadata = Column(JSON, default={})
# Create HNSW index for fast similarity search
__table_args__ = (
Index(
'idx_chunks_embedding',
embedding,
postgresql_using='hnsw',
postgresql_with={'m': 16, 'ef_construction': 64},
postgresql_ops={'embedding': 'vector_cosine_ops'}
),
)
Vector Similarity Search
Copy
from sqlalchemy import select, func
from pgvector.sqlalchemy import Vector
async def search_similar_chunks(
db: AsyncSession,
query_embedding: list[float],
user_id: str,
limit: int = 5,
threshold: float = 0.7
) -> list[tuple[DocumentChunk, float]]:
"""Search for similar chunks using cosine similarity"""
# Calculate similarity (1 - cosine distance)
similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
result = await db.execute(
select(DocumentChunk, similarity.label('similarity'))
.join(Document)
.where(Document.user_id == user_id)
.where(similarity >= threshold)
.order_by(similarity.desc())
.limit(limit)
)
return [(row.DocumentChunk, row.similarity) for row in result.all()]
# Alternative: L2 distance
async def search_by_l2_distance(
db: AsyncSession,
query_embedding: list[float],
limit: int = 5
) -> list[DocumentChunk]:
result = await db.execute(
select(DocumentChunk)
.order_by(DocumentChunk.embedding.l2_distance(query_embedding))
.limit(limit)
)
return result.scalars().all()
Complete RAG Repository
Copy
# repositories/rag_repository.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, delete
from openai import AsyncOpenAI
from models import Document, DocumentChunk
class RAGRepository:
def __init__(self, db: AsyncSession):
self.db = db
self.openai = AsyncOpenAI()
async def store_document(
self,
user_id: str,
filename: str,
chunks: list[str]
) -> Document:
"""Store document and its chunks with embeddings"""
# Create document
document = Document(
user_id=user_id,
filename=filename,
file_type=filename.split('.')[-1],
file_size=sum(len(c) for c in chunks),
status="processing"
)
self.db.add(document)
await self.db.flush() # Get document.id without committing
# Generate embeddings
response = await self.openai.embeddings.create(
model="text-embedding-3-small",
input=chunks
)
# Store chunks with embeddings
for i, (chunk, embedding_data) in enumerate(zip(chunks, response.data)):
db_chunk = DocumentChunk(
document_id=document.id,
content=chunk,
embedding=embedding_data.embedding,
chunk_index=i
)
self.db.add(db_chunk)
document.status = "ready"
await self.db.commit()
await self.db.refresh(document)
return document
async def search(
self,
user_id: str,
query: str,
document_ids: list[str] | None = None,
limit: int = 5
) -> list[dict]:
"""Search for relevant chunks"""
# Get query embedding
response = await self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = response.data[0].embedding
# Build query
similarity = 1 - DocumentChunk.embedding.cosine_distance(query_embedding)
stmt = (
select(
DocumentChunk,
Document.filename,
similarity.label('similarity')
)
.join(Document)
.where(Document.user_id == user_id)
.where(Document.status == "ready")
)
if document_ids:
stmt = stmt.where(Document.id.in_(document_ids))
stmt = stmt.order_by(similarity.desc()).limit(limit)
result = await self.db.execute(stmt)
return [
{
"content": row.DocumentChunk.content,
"document": row.filename,
"similarity": float(row.similarity)
}
for row in result.all()
]
async def delete_document(self, document_id: str) -> bool:
"""Delete document and all its chunks"""
result = await self.db.execute(
select(Document).where(Document.id == document_id)
)
document = result.scalar_one_or_none()
if document:
await self.db.delete(document) # Cascades to chunks
await self.db.commit()
return True
return False
Migrations with Alembic
Copy
# Initialize alembic
alembic init alembic
# Edit alembic.ini
# sqlalchemy.url = postgresql://admin:password@localhost:5432/aiapp
# Edit alembic/env.py
Copy
# alembic/env.py
from models import Base # Import your models
target_metadata = Base.metadata
Copy
# Create migration
alembic revision --autogenerate -m "create users table"
# Run migrations
alembic upgrade head
# Rollback
alembic downgrade -1
Copy
# alembic/versions/001_create_users.py
"""create users table
Revision ID: 001
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
def upgrade():
op.create_table(
'users',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('email', sa.String(255), nullable=False, unique=True),
sa.Column('name', sa.String(255)),
sa.Column('plan', sa.String(50), default='free'),
sa.Column('api_key', sa.String(64), unique=True),
sa.Column('created_at', sa.DateTime, server_default=sa.func.now()),
sa.Column('updated_at', sa.DateTime, server_default=sa.func.now()),
)
op.create_index('idx_users_email', 'users', ['email'])
op.create_index('idx_users_api_key', 'users', ['api_key'])
def downgrade():
op.drop_table('users')
Transactions
Copy
# Automatic transaction (commit on success, rollback on error)
async def transfer_documents(
db: AsyncSession,
from_user_id: str,
to_user_id: str,
document_ids: list[str]
):
async with db.begin(): # Transaction context
# Update all documents
await db.execute(
update(Document)
.where(Document.id.in_(document_ids))
.where(Document.user_id == from_user_id)
.values(user_id=to_user_id)
)
# Update chunks
await db.execute(
update(DocumentChunk)
.where(DocumentChunk.document_id.in_(document_ids))
.values(user_id=to_user_id)
)
# Auto-commits if no exception
# Manual transaction control
async def complex_operation(db: AsyncSession):
try:
# Multiple operations
user = User(email="[email protected]")
db.add(user)
await db.flush() # Get ID without commit
document = Document(user_id=user.id, filename="test.pdf")
db.add(document)
await db.commit() # Commit all
except Exception as e:
await db.rollback() # Rollback on error
raise
Connection Pooling
Copy
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool
engine = create_async_engine(
DATABASE_URL,
poolclass=AsyncAdaptedQueuePool,
pool_size=20, # Connections to keep open
max_overflow=10, # Extra connections when pool is full
pool_timeout=30, # Seconds to wait for connection
pool_recycle=1800, # Recycle connections after 30 min
pool_pre_ping=True, # Verify connection before use
)
FastAPI Integration
Complete example:Copy
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from contextlib import asynccontextmanager
from database import engine, AsyncSessionLocal, Base
from repositories import UserRepository, RAGRepository
@asynccontextmanager
async def lifespan(app: FastAPI):
# Create tables on startup
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Cleanup on shutdown
await engine.dispose()
app = FastAPI(lifespan=lifespan)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.post("/users")
async def create_user(email: str, name: str, db: AsyncSession = Depends(get_db)):
repo = UserRepository(db)
try:
user = await repo.create(email, name)
return {"id": str(user.id), "email": user.email}
except IntegrityError:
raise HTTPException(400, "Email already exists")
@app.post("/documents/upload")
async def upload_document(
filename: str,
content: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
repo = RAGRepository(db)
chunks = chunk_text(content) # Your chunking logic
document = await repo.store_document(user_id, filename, chunks)
return {"document_id": str(document.id), "chunks": len(chunks)}
@app.post("/search")
async def search(
query: str,
user_id: str,
db: AsyncSession = Depends(get_db)
):
repo = RAGRepository(db)
results = await repo.search(user_id, query)
return {"results": results}
Quick Reference
| Operation | Code |
|---|---|
| Create | db.add(obj); await db.commit() |
| Read one | select(Model).where(Model.id == id) |
| Read many | select(Model).limit(10) |
| Update | obj.field = value; await db.commit() |
| Delete | await db.delete(obj) |
| Eager load | .options(selectinload(Model.relation)) |
| Transaction | async with db.begin(): |
You’re ready! With Python, FastAPI, and database skills, continue to the LLM Fundamentals module.