Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Build a Complete AI Product

This is the module that makes the course worth it. You’ll build a production-ready AI application that you can deploy, show to employers, or even monetize. Think of this as the difference between a cooking class where you follow recipes and one where you run the kitchen for a night. Everything you have learned about embeddings, RAG, chunking, and cost optimization comes together here in a single, deployable product. The decisions you make — which model to call, how to chunk documents, when to cache — stop being theoretical and become things that cost you real money or delight real users.
What You’ll Build: A multi-tenant AI document assistant that lets users upload documents, ask questions, and get answers with citations. This is the architecture behind Notion AI, ChatPDF, and countless enterprise tools.

Project Overview

DocuMind AI

A SaaS document intelligence platform with:
  • 📄 Document upload and processing (PDF, DOCX, TXT)
  • 🔍 Semantic search across documents
  • 💬 AI chat with citations
  • 👥 Multi-tenant (users only see their docs)
  • 💰 Usage tracking and rate limiting
  • 🔐 Authentication and API keys

Tech Stack

LayerTechnology
FrontendNext.js 14, Tailwind, shadcn/ui
BackendFastAPI, Python 3.11+
DatabasePostgreSQL + pgvector
LLMOpenAI GPT-4o
AuthClerk or NextAuth
DeploymentVercel + Railway

Architecture

┌────────────────────────────────────────────────────────────────┐
│                         Frontend (Next.js)                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │  Upload  │  │   Chat   │  │  Search  │  │ Settings │       │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘       │
└───────┼─────────────┼─────────────┼─────────────┼──────────────┘
        │             │             │             │
        ▼             ▼             ▼             ▼
┌────────────────────────────────────────────────────────────────┐
│                       API Gateway (FastAPI)                     │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │  Auth Middleware  │  Rate Limiter  │  Request Logger     │  │
│  └──────────────────────────────────────────────────────────┘  │
│                              │                                  │
│  ┌──────────┐  ┌──────────┐  │  ┌──────────┐  ┌──────────┐    │
│  │ Document │  │   Chat   │  │  │  Search  │  │   User   │    │
│  │  Router  │  │  Router  │  │  │  Router  │  │  Router  │    │
│  └────┬─────┘  └────┬─────┘  │  └────┬─────┘  └────┬─────┘    │
└───────┼─────────────┼────────┼───────┼─────────────┼───────────┘
        │             │        │       │             │
        ▼             ▼        ▼       ▼             ▼
┌──────────────────────────────────────────────────────────────┐
│                        Services Layer                         │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐             │
│  │  Document  │  │    RAG     │  │   Usage    │             │
│  │ Processor  │  │   Engine   │  │  Tracker   │             │
│  └─────┬──────┘  └─────┬──────┘  └─────┬──────┘             │
└────────┼───────────────┼───────────────┼────────────────────┘
         │               │               │
         ▼               ▼               ▼
┌──────────────────────────────────────────────────────────────┐
│                      Data Layer                               │
│  ┌────────────────────┐  ┌────────────────────┐             │
│  │   PostgreSQL       │  │      Redis         │             │
│  │   + pgvector       │  │   (Cache/Queue)    │             │
│  └────────────────────┘  └────────────────────┘             │
└──────────────────────────────────────────────────────────────┘

Part 1: Project Setup

Database Schema

The schema below is designed around one core principle: every row of user data must be scoped to a tenant. In a multi-tenant SaaS, accidentally leaking one user’s documents into another user’s search results is a showstopper. That is why user_id appears on both documents and document_chunks — the duplication is intentional so that every vector search query can filter by user without an extra JOIN.
-- schema.sql

-- Users table (synced from auth provider)
-- We keep a local copy so we can enforce plan limits without
-- round-tripping to the auth provider on every request.
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255),
    plan VARCHAR(50) DEFAULT 'free',  -- free, pro, enterprise
    api_key VARCHAR(64) UNIQUE,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Documents table
CREATE TABLE documents (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    filename VARCHAR(255) NOT NULL,
    file_type VARCHAR(50) NOT NULL,
    file_size INTEGER NOT NULL,
    status VARCHAR(50) DEFAULT 'processing',  -- processing, ready, error
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_documents_user ON documents(user_id);
CREATE INDEX idx_documents_status ON documents(status);

-- Chunks table with embeddings
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE document_chunks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    content TEXT NOT NULL,
    embedding vector(1536),
    chunk_index INTEGER NOT NULL,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_user ON document_chunks(user_id);
-- HNSW index: m=16 gives a good recall/speed balance for datasets under ~1M rows.
-- ef_construction=64 controls build-time quality -- higher is slower to build but
-- more accurate at query time. For production with >1M chunks, benchmark with m=32.
CREATE INDEX idx_chunks_embedding ON document_chunks 
    USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);

-- Conversations table
CREATE TABLE conversations (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    title VARCHAR(255),
    document_ids UUID[] DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Messages table
CREATE TABLE messages (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
    role VARCHAR(50) NOT NULL,  -- user, assistant
    content TEXT NOT NULL,
    sources JSONB DEFAULT '[]',  -- citations
    token_count INTEGER DEFAULT 0,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_messages_conversation ON messages(conversation_id);

-- Usage tracking
CREATE TABLE usage_logs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    action VARCHAR(50) NOT NULL,  -- embed, query, chat
    tokens_used INTEGER DEFAULT 0,
    cost_cents INTEGER DEFAULT 0,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_usage_user_date ON usage_logs(user_id, created_at);

-- Plan limits
CREATE TABLE plan_limits (
    plan VARCHAR(50) PRIMARY KEY,
    documents_limit INTEGER,
    storage_mb INTEGER,
    monthly_queries INTEGER,
    monthly_tokens INTEGER
);

INSERT INTO plan_limits VALUES
    ('free', 10, 50, 100, 100000),
    ('pro', 100, 500, 1000, 1000000),
    ('enterprise', -1, -1, -1, -1);  -- -1 = unlimited

FastAPI Backend Structure

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager

from app.routers import documents, chat, search, users
from app.core.database import init_db, close_db
from app.core.config import settings

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await init_db()
    yield
    # Shutdown
    await close_db()

app = FastAPI(
    title="DocuMind AI",
    version="1.0.0",
    lifespan=lifespan
)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Routers
app.include_router(documents.router, prefix="/api/documents", tags=["documents"])
app.include_router(chat.router, prefix="/api/chat", tags=["chat"])
app.include_router(search.router, prefix="/api/search", tags=["search"])
app.include_router(users.router, prefix="/api/users", tags=["users"])

@app.get("/health")
async def health():
    return {"status": "healthy"}
# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # Database
    DATABASE_URL: str
    
    # OpenAI
    OPENAI_API_KEY: str
    
    # Auth
    CLERK_SECRET_KEY: str
    
    # Limits
    MAX_FILE_SIZE_MB: int = 10
    MAX_CHUNKS_PER_QUERY: int = 5
    
    # CORS
    ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
    
    class Config:
        env_file = ".env"

@lru_cache()
def get_settings():
    return Settings()

settings = get_settings()

Part 2: Document Processing

# app/services/document_processor.py
from dataclasses import dataclass
from typing import List
import asyncio
from pathlib import Path
import hashlib

from openai import OpenAI
import pypdf
from docx import Document as DocxDocument

from app.core.database import get_db
from app.models.document import Document, DocumentChunk

@dataclass
class ProcessedChunk:
    content: str
    metadata: dict
    index: int

class DocumentProcessor:
    """Process and chunk documents for RAG"""
    
    def __init__(self):
        self.openai = OpenAI()
        # 1000 chars ~ 250 tokens. This is a sweet spot: large enough to
        # carry a complete idea, small enough for precise retrieval.
        self.chunk_size = 1000
        # 200-char overlap ensures sentences at chunk boundaries aren't
        # orphaned. Think of it like overlapping puzzle pieces.
        self.chunk_overlap = 200
    
    async def process_document(
        self,
        file_path: Path,
        document_id: str,
        user_id: str
    ) -> int:
        """Process document and store chunks with embeddings"""
        # Extract text
        text = await self._extract_text(file_path)
        
        # Chunk text
        chunks = self._chunk_text(text)
        
        # Generate embeddings in batches
        embeddings = await self._embed_chunks(chunks)
        
        # Store in database
        async with get_db() as db:
            for chunk, embedding in zip(chunks, embeddings):
                await db.execute("""
                    INSERT INTO document_chunks 
                    (document_id, user_id, content, embedding, chunk_index, metadata)
                    VALUES ($1, $2, $3, $4, $5, $6)
                """, document_id, user_id, chunk.content, 
                embedding, chunk.index, chunk.metadata)
            
            # Update document status
            await db.execute("""
                UPDATE documents SET status = 'ready' WHERE id = $1
            """, document_id)
        
        return len(chunks)
    
    async def _extract_text(self, file_path: Path) -> str:
        """Extract text from various file types"""
        suffix = file_path.suffix.lower()
        
        if suffix == '.pdf':
            return await self._extract_pdf(file_path)
        elif suffix in ['.docx', '.doc']:
            return await self._extract_docx(file_path)
        elif suffix == '.txt':
            return file_path.read_text()
        else:
            raise ValueError(f"Unsupported file type: {suffix}")
    
    async def _extract_pdf(self, file_path: Path) -> str:
        """Extract text from PDF.
        
        Pitfall: pypdf works well for text-based PDFs but returns empty
        strings for scanned documents. If you need OCR, swap in PyMuPDF
        (fitz) or run Tesseract as a fallback when extracted text is
        suspiciously short.
        """
        def extract():
            text_parts = []
            with open(file_path, 'rb') as f:
                reader = pypdf.PdfReader(f)
                for page in reader.pages:
                    text_parts.append(page.extract_text())
            return "\n\n".join(text_parts)
        
        # Run in a thread because PDF parsing is CPU-bound and would
        # block the async event loop otherwise.
        return await asyncio.to_thread(extract)
    
    async def _extract_docx(self, file_path: Path) -> str:
        """Extract text from DOCX"""
        def extract():
            doc = DocxDocument(file_path)
            return "\n\n".join([para.text for para in doc.paragraphs])
        
        return await asyncio.to_thread(extract)
    
    def _chunk_text(self, text: str) -> List[ProcessedChunk]:
        """Split text into overlapping chunks"""
        chunks = []
        start = 0
        index = 0
        
        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]
            
            # Try to break at sentence boundary
            if end < len(text):
                last_period = chunk_text.rfind('. ')
                if last_period > self.chunk_size * 0.5:
                    end = start + last_period + 1
                    chunk_text = text[start:end]
            
            chunks.append(ProcessedChunk(
                content=chunk_text.strip(),
                metadata={"char_start": start, "char_end": end},
                index=index
            ))
            
            start = end - self.chunk_overlap
            index += 1
        
        return chunks
    
    async def _embed_chunks(
        self, 
        chunks: List[ProcessedChunk],
        batch_size: int = 100
    ) -> List[List[float]]:
        """Generate embeddings for chunks"""
        all_embeddings = []
        
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.content for c in batch]
            
            response = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=texts
            )
            
            embeddings = [e.embedding for e in response.data]
            all_embeddings.extend(embeddings)
        
        return all_embeddings

Part 3: RAG Engine

# app/services/rag_engine.py
from dataclasses import dataclass
from typing import List, Optional
import json

from openai import OpenAI

from app.core.database import get_db
from app.services.usage_tracker import UsageTracker

@dataclass
class Source:
    document_id: str
    document_name: str
    chunk_content: str
    similarity: float

@dataclass
class RAGResponse:
    answer: str
    sources: List[Source]
    tokens_used: int

class RAGEngine:
    """Production RAG engine with citations"""
    
    def __init__(self):
        self.openai = OpenAI()
        self.usage_tracker = UsageTracker()
    
    async def query(
        self,
        user_id: str,
        question: str,
        document_ids: Optional[List[str]] = None,
        top_k: int = 5,
        conversation_history: List[dict] = None
    ) -> RAGResponse:
        """Answer question with RAG"""
        # Check usage limits
        await self.usage_tracker.check_limits(user_id, "query")
        
        # Retrieve relevant chunks
        sources = await self._retrieve(
            user_id=user_id,
            query=question,
            document_ids=document_ids,
            top_k=top_k
        )
        
        # Build context
        context = self._build_context(sources)
        
        # Generate answer
        answer, tokens = await self._generate(
            question=question,
            context=context,
            history=conversation_history
        )
        
        # Track usage
        await self.usage_tracker.log_usage(
            user_id=user_id,
            action="query",
            tokens_used=tokens
        )
        
        return RAGResponse(
            answer=answer,
            sources=sources,
            tokens_used=tokens
        )
    
    async def _retrieve(
        self,
        user_id: str,
        query: str,
        document_ids: Optional[List[str]],
        top_k: int
    ) -> List[Source]:
        """Retrieve relevant chunks"""
        # Get query embedding
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=query
        )
        query_embedding = response.data[0].embedding
        
        # Search vector database
        async with get_db() as db:
            sql = """
                SELECT 
                    c.document_id,
                    d.filename,
                    c.content,
                    1 - (c.embedding <=> $1::vector) as similarity
                FROM document_chunks c
                JOIN documents d ON c.document_id = d.id
                WHERE c.user_id = $2
            """
            params = [str(query_embedding), user_id]
            
            if document_ids:
                sql += " AND c.document_id = ANY($3)"
                params.append(document_ids)
            
            sql += """
                ORDER BY c.embedding <=> $1::vector
                LIMIT $4
            """
            params.append(top_k)
            
            rows = await db.fetch(sql, *params)
            
            return [
                Source(
                    document_id=row['document_id'],
                    document_name=row['filename'],
                    chunk_content=row['content'],
                    similarity=row['similarity']
                )
                for row in rows
            ]
    
    def _build_context(self, sources: List[Source]) -> str:
        """Build context string with source markers"""
        parts = []
        for i, source in enumerate(sources, 1):
            parts.append(f"[Source {i}: {source.document_name}]\n{source.chunk_content}")
        return "\n\n---\n\n".join(parts)
    
    async def _generate(
        self,
        question: str,
        context: str,
        history: Optional[List[dict]]
    ) -> tuple[str, int]:
        """Generate answer with GPT-4"""
        # The system prompt is the guardrail that keeps the LLM grounded in
        # the user's documents. Without rule #3, the model will happily
        # hallucinate answers from its training data -- which destroys user
        # trust the moment they fact-check a citation that doesn't exist.
        system_prompt = """You are a helpful document assistant. Answer questions based on the provided sources.

RULES:
1. Only use information from the provided sources
2. Always cite sources using [Source N] format
3. If sources don't contain the answer, say "I couldn't find this information in your documents"
4. Be concise but thorough
5. If asked about something not in the documents, politely redirect to what you can help with"""
        
        messages = [{"role": "system", "content": system_prompt}]
        
        # Add conversation history -- keep only the last 3 exchanges (6 messages)
        # to stay well within token limits. For longer memory, swap in a
        # summarization strategy from the Context Management chapter.
        if history:
            messages.extend(history[-6:])
        
        # Add current question with context
        messages.append({
            "role": "user",
            "content": f"""Sources:
{context}

Question: {question}"""
        })
        
        response = self.openai.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            temperature=0,
            max_tokens=1000
        )
        
        return (
            response.choices[0].message.content,
            response.usage.total_tokens
        )

Part 4: API Routes

# app/routers/chat.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Optional

from app.core.auth import get_current_user
from app.services.rag_engine import RAGEngine
from app.core.database import get_db

router = APIRouter()
rag_engine = RAGEngine()

class ChatRequest(BaseModel):
    conversation_id: Optional[str] = None
    message: str
    document_ids: Optional[List[str]] = None

class ChatResponse(BaseModel):
    conversation_id: str
    answer: str
    sources: List[dict]
    tokens_used: int

@router.post("/", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    user = Depends(get_current_user)
):
    """Chat with your documents"""
    # Get or create conversation
    async with get_db() as db:
        if request.conversation_id:
            # Verify ownership
            conv = await db.fetchrow(
                "SELECT * FROM conversations WHERE id = $1 AND user_id = $2",
                request.conversation_id, user.id
            )
            if not conv:
                raise HTTPException(404, "Conversation not found")
            
            # Get history
            history = await db.fetch("""
                SELECT role, content FROM messages 
                WHERE conversation_id = $1 
                ORDER BY created_at
            """, request.conversation_id)
            history = [dict(row) for row in history]
        else:
            # Create new conversation
            conv = await db.fetchrow("""
                INSERT INTO conversations (user_id, document_ids)
                VALUES ($1, $2)
                RETURNING *
            """, user.id, request.document_ids or [])
            history = []
    
    # Get RAG response
    response = await rag_engine.query(
        user_id=user.id,
        question=request.message,
        document_ids=request.document_ids,
        conversation_history=history
    )
    
    # Save messages
    async with get_db() as db:
        await db.execute("""
            INSERT INTO messages (conversation_id, role, content)
            VALUES ($1, 'user', $2)
        """, conv['id'], request.message)
        
        await db.execute("""
            INSERT INTO messages (conversation_id, role, content, sources, token_count)
            VALUES ($1, 'assistant', $2, $3, $4)
        """, conv['id'], response.answer, 
            [{"doc": s.document_name, "similarity": s.similarity} for s in response.sources],
            response.tokens_used)
    
    return ChatResponse(
        conversation_id=str(conv['id']),
        answer=response.answer,
        sources=[{
            "document": s.document_name,
            "content": s.chunk_content[:200] + "...",
            "similarity": round(s.similarity, 3)
        } for s in response.sources],
        tokens_used=response.tokens_used
    )

@router.post("/stream")
async def chat_stream(
    request: ChatRequest,
    user = Depends(get_current_user)
):
    """Stream chat response"""
    async def generate():
        # ... similar to above but with streaming
        pass
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Part 5: Frontend (Next.js)

// app/chat/page.tsx
'use client';

import { useState, useRef, useEffect } from 'react';
import { Send, FileText, Loader2 } from 'lucide-react';
import { Button } from '@/components/ui/button';
import { Textarea } from '@/components/ui/textarea';
import { Card } from '@/components/ui/card';

interface Message {
  role: 'user' | 'assistant';
  content: string;
  sources?: Array<{
    document: string;
    content: string;
    similarity: number;
  }>;
}

export default function ChatPage() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [loading, setLoading] = useState(false);
  const [conversationId, setConversationId] = useState<string | null>(null);
  const messagesEndRef = useRef<HTMLDivElement>(null);

  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
  };

  useEffect(scrollToBottom, [messages]);

  const sendMessage = async () => {
    if (!input.trim() || loading) return;

    const userMessage: Message = { role: 'user', content: input };
    setMessages(prev => [...prev, userMessage]);
    setInput('');
    setLoading(true);

    try {
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          message: input,
          conversation_id: conversationId,
        }),
      });

      const data = await response.json();
      
      setConversationId(data.conversation_id);
      setMessages(prev => [...prev, {
        role: 'assistant',
        content: data.answer,
        sources: data.sources,
      }]);
    } catch (error) {
      console.error('Chat error:', error);
    } finally {
      setLoading(false);
    }
  };

  return (
    <div className="flex flex-col h-screen max-w-4xl mx-auto p-4">
      {/* Messages */}
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((message, i) => (
          <div
            key={i}
            className={`flex ${message.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <Card className={`p-4 max-w-[80%] ${
              message.role === 'user' 
                ? 'bg-primary text-primary-foreground' 
                : 'bg-muted'
            }`}>
              <p className="whitespace-pre-wrap">{message.content}</p>
              
              {/* Sources */}
              {message.sources && message.sources.length > 0 && (
                <div className="mt-3 pt-3 border-t border-border/50">
                  <p className="text-xs font-medium mb-2">Sources:</p>
                  <div className="space-y-2">
                    {message.sources.map((source, j) => (
                      <div key={j} className="flex items-start gap-2 text-xs">
                        <FileText className="h-3 w-3 mt-0.5 flex-shrink-0" />
                        <div>
                          <span className="font-medium">{source.document}</span>
                          <span className="text-muted-foreground ml-2">
                            ({Math.round(source.similarity * 100)}% match)
                          </span>
                        </div>
                      </div>
                    ))}
                  </div>
                </div>
              )}
            </Card>
          </div>
        ))}
        
        {loading && (
          <div className="flex justify-start">
            <Card className="p-4 bg-muted">
              <Loader2 className="h-5 w-5 animate-spin" />
            </Card>
          </div>
        )}
        
        <div ref={messagesEndRef} />
      </div>

      {/* Input */}
      <div className="flex gap-2">
        <Textarea
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Ask about your documents..."
          className="min-h-[60px] resize-none"
          onKeyDown={(e) => {
            if (e.key === 'Enter' && !e.shiftKey) {
              e.preventDefault();
              sendMessage();
            }
          }}
        />
        <Button 
          onClick={sendMessage} 
          disabled={loading || !input.trim()}
          size="icon"
          className="h-[60px] w-[60px]"
        >
          {loading ? (
            <Loader2 className="h-5 w-5 animate-spin" />
          ) : (
            <Send className="h-5 w-5" />
          )}
        </Button>
      </div>
    </div>
  );
}

Part 6: Deployment

Before you deploy, a practical tip: run through the entire upload-to-chat flow locally with Docker Compose first. The number-one cause of “it works on my machine” failures in AI apps is missing environment variables (especially OPENAI_API_KEY) and mismatched embedding dimensions between what you stored and what you query with.

Docker Setup

# backend/Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  backend:
    build: ./backend
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/documind
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - db
      - redis

  db:
    image: pgvector/pgvector:pg16
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=documind
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data

volumes:
  postgres_data:
  redis_data:

Production Deployment

# Deploy backend to Railway
railway login
railway init
railway add postgres
railway add redis
railway up

# Deploy frontend to Vercel
vercel deploy --prod

What You’ve Learned

Full-Stack AI Development

Build complete AI products from database to frontend

Production RAG

Implement RAG with chunking, embeddings, and citations

Multi-Tenancy

Handle multiple users with isolated data

Deployment

Deploy and scale AI applications

Tech Stack Decision Framework

The stack above is opinionated. Here is why each choice was made and when you should deviate.
DecisionDefault ChoiceWhen to ChangeAlternative
Frontend frameworkNext.js 14Need mobile-first or prefer Vue ecosystemNuxt 3, SvelteKit
Backend frameworkFastAPIAlready have Node team, need WebSockets nativelyExpress + tRPC, Hono
DatabasePostgreSQL + pgvectorOver 10M vectors, need sub-10ms p99 latencyPinecone or Qdrant for vectors, keep Postgres for relational
LLM providerOpenAI GPT-4oNeed longer context (200K+), cost-sensitive at scaleClaude 3.5 Sonnet, Gemini 1.5 Pro
Auth providerClerkSelf-hosted requirement, existing NextAuth setupNextAuth, Supabase Auth, Auth0
DeploymentVercel + RailwayNeed GPU inference, on-prem requirementFly.io, Render, AWS ECS
Embedding modeltext-embedding-3-smallDomain-specific vocabulary, data sovereigntyBGE-large (self-hosted), Cohere embed-v3
The decision that matters most: PostgreSQL + pgvector vs. a dedicated vector database. At under 1M chunks, pgvector in a single Postgres instance is simpler, cheaper, and fast enough. Beyond that, measure your p99 query latency — if it exceeds your SLA, add a dedicated vector store while keeping Postgres for relational data.

Edge Cases You Will Hit in Production

These are the issues that don’t show up in demos but break real deployments. Plan for them before launch, not after. Scanned PDFs returning empty text. The pypdf extractor returns empty strings for image-only PDFs. Add a fallback: if extracted text is under 50 characters for a multi-page PDF, run OCR with PyMuPDF’s built-in OCR or Tesseract. Surface a clear status to the user (“Processing with OCR — this may take longer”). Embedding dimension mismatches. If you change embedding models (e.g., from text-embedding-3-small at 1536 dimensions to text-embedding-3-large at 3072), existing vectors in the database become incompatible. You must re-embed all stored chunks. Add a model_version column to document_chunks and check it at query time. Conversation history token overflow. The RAG engine keeps the last 6 messages, but a user pasting a 5000-word document as a message will blow through the token limit in a single turn. Add a max_tokens_per_message guard that truncates or summarizes oversized user inputs before they enter the history. Concurrent document processing. Two uploads arriving simultaneously for the same user can cause race conditions on the usage tracker. Use database-level advisory locks or an idempotency key on the upload endpoint to prevent double-counting. Multi-tenant data leakage in vector search. The WHERE c.user_id = $2 filter is your security boundary. If this filter is accidentally removed or bypassed by a new query path, User A sees User B’s documents. Add an integration test that explicitly verifies cross-tenant isolation on every search endpoint.

Extend Your Project

Once the core works, each of these extensions teaches you a new production skill. Pick the one closest to the job you want — a voice-input feature demonstrates real-time media handling, while team workspaces demonstrate authorization modeling. Ideas to make it even more impressive:
  1. Add Voice Input: Use Whisper API for voice-to-text
  2. Multi-Language Support: Translate queries and responses
  3. Analytics Dashboard: Show usage patterns and popular queries
  4. Export to Notion/Docs: Let users export conversations
  5. Team Workspaces: Add collaboration features
  6. Custom Embeddings: Fine-tune for specific domains

Production Readiness Checklist

Before you call this project “deployed,” walk through this checklist. Each item addresses a real failure mode that has taken down AI SaaS products.
CategoryCheckWhy It Matters
SecurityAPI keys stored in env vars, never in codeOne leaked key in a git commit costs you thousands
SecurityCORS restricted to your domain(s) onlyallow_origins=["*"] lets any site call your API
SecurityRate limiting enabled per userOne runaway script can exhaust your OpenAI quota
DataEmbedding model version tracked per chunkModel changes silently break similarity search
DataDocument upload size limit enforced (both client and server)A 500MB PDF will OOM your worker
ReliabilityBackground job for document processing with retryInline processing blocks the API response and fails silently
ReliabilityHealth check endpoint verifies DB and Redis connectivity/health returning 200 with a dead database is worse than no health check
CostUsage tracking with daily budget alertsYou will forget about this until the invoice arrives
CostGPT-4o-mini used for classification/routing, GPT-4o for generation only15x cost difference for tasks where quality is equivalent
ObservabilityRequest logging with user ID, model, token count, latencyYou cannot debug what you cannot see

Portfolio Ready

This project demonstrates:
  • End-to-end AI product development
  • Production architecture patterns
  • Modern tech stack proficiency
  • Database design with vectors
  • API design and authentication
  • Frontend development
  • Deployment and DevOps
Pro Tip: Deploy this project, add it to your resume, and link your GitHub. This single project can be your ticket to AI engineering roles.