Build a Complete AI Product
This is the module that makes the course worth it. You’ll build a production-ready AI application that you can deploy, show to employers, or even monetize.What You’ll Build: A multi-tenant AI document assistant that lets users upload documents, ask questions, and get answers with citations. This is the architecture behind Notion AI, ChatPDF, and countless enterprise tools.
Project Overview
DocuMind AI
A SaaS document intelligence platform with:- 📄 Document upload and processing (PDF, DOCX, TXT)
- 🔍 Semantic search across documents
- 💬 AI chat with citations
- 👥 Multi-tenant (users only see their docs)
- 💰 Usage tracking and rate limiting
- 🔐 Authentication and API keys
Tech Stack
| Layer | Technology |
|---|---|
| Frontend | Next.js 14, Tailwind, shadcn/ui |
| Backend | FastAPI, Python 3.11+ |
| Database | PostgreSQL + pgvector |
| LLM | OpenAI GPT-4o |
| Auth | Clerk or NextAuth |
| Deployment | Vercel + Railway |
Architecture
Copy
┌────────────────────────────────────────────────────────────────┐
│ Frontend (Next.js) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Upload │ │ Chat │ │ Search │ │ Settings │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼─────────────┼─────────────┼──────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────────────────────────────────────────────┐
│ API Gateway (FastAPI) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Auth Middleware │ Rate Limiter │ Request Logger │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────┐ ┌──────────┐ │ ┌──────────┐ ┌──────────┐ │
│ │ Document │ │ Chat │ │ │ Search │ │ User │ │
│ │ Router │ │ Router │ │ │ Router │ │ Router │ │
│ └────┬─────┘ └────┬─────┘ │ └────┬─────┘ └────┬─────┘ │
└───────┼─────────────┼────────┼───────┼─────────────┼───────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ Services Layer │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Document │ │ RAG │ │ Usage │ │
│ │ Processor │ │ Engine │ │ Tracker │ │
│ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │
└────────┼───────────────┼───────────────┼────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ Data Layer │
│ ┌────────────────────┐ ┌────────────────────┐ │
│ │ PostgreSQL │ │ Redis │ │
│ │ + pgvector │ │ (Cache/Queue) │ │
│ └────────────────────┘ └────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
Part 1: Project Setup
Database Schema
Copy
-- schema.sql
-- Users table (synced from auth provider)
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255),
plan VARCHAR(50) DEFAULT 'free', -- free, pro, enterprise
api_key VARCHAR(64) UNIQUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Documents table
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
filename VARCHAR(255) NOT NULL,
file_type VARCHAR(50) NOT NULL,
file_size INTEGER NOT NULL,
status VARCHAR(50) DEFAULT 'processing', -- processing, ready, error
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_documents_user ON documents(user_id);
CREATE INDEX idx_documents_status ON documents(status);
-- Chunks table with embeddings
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE document_chunks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES documents(id) ON DELETE CASCADE,
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
content TEXT NOT NULL,
embedding vector(1536),
chunk_index INTEGER NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_chunks_document ON document_chunks(document_id);
CREATE INDEX idx_chunks_user ON document_chunks(user_id);
CREATE INDEX idx_chunks_embedding ON document_chunks
USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);
-- Conversations table
CREATE TABLE conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
title VARCHAR(255),
document_ids UUID[] DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Messages table
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL, -- user, assistant
content TEXT NOT NULL,
sources JSONB DEFAULT '[]', -- citations
token_count INTEGER DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_messages_conversation ON messages(conversation_id);
-- Usage tracking
CREATE TABLE usage_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
action VARCHAR(50) NOT NULL, -- embed, query, chat
tokens_used INTEGER DEFAULT 0,
cost_cents INTEGER DEFAULT 0,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_usage_user_date ON usage_logs(user_id, created_at);
-- Plan limits
CREATE TABLE plan_limits (
plan VARCHAR(50) PRIMARY KEY,
documents_limit INTEGER,
storage_mb INTEGER,
monthly_queries INTEGER,
monthly_tokens INTEGER
);
INSERT INTO plan_limits VALUES
('free', 10, 50, 100, 100000),
('pro', 100, 500, 1000, 1000000),
('enterprise', -1, -1, -1, -1); -- -1 = unlimited
FastAPI Backend Structure
Copy
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from app.routers import documents, chat, search, users
from app.core.database import init_db, close_db
from app.core.config import settings
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await init_db()
yield
# Shutdown
await close_db()
app = FastAPI(
title="DocuMind AI",
version="1.0.0",
lifespan=lifespan
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Routers
app.include_router(documents.router, prefix="/api/documents", tags=["documents"])
app.include_router(chat.router, prefix="/api/chat", tags=["chat"])
app.include_router(search.router, prefix="/api/search", tags=["search"])
app.include_router(users.router, prefix="/api/users", tags=["users"])
@app.get("/health")
async def health():
return {"status": "healthy"}
Copy
# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Database
DATABASE_URL: str
# OpenAI
OPENAI_API_KEY: str
# Auth
CLERK_SECRET_KEY: str
# Limits
MAX_FILE_SIZE_MB: int = 10
MAX_CHUNKS_PER_QUERY: int = 5
# CORS
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
@lru_cache()
def get_settings():
return Settings()
settings = get_settings()
Part 2: Document Processing
Copy
# app/services/document_processor.py
from dataclasses import dataclass
from typing import List
import asyncio
from pathlib import Path
import hashlib
from openai import OpenAI
import pypdf
from docx import Document as DocxDocument
from app.core.database import get_db
from app.models.document import Document, DocumentChunk
@dataclass
class ProcessedChunk:
content: str
metadata: dict
index: int
class DocumentProcessor:
"""Process and chunk documents for RAG"""
def __init__(self):
self.openai = OpenAI()
self.chunk_size = 1000
self.chunk_overlap = 200
async def process_document(
self,
file_path: Path,
document_id: str,
user_id: str
) -> int:
"""Process document and store chunks with embeddings"""
# Extract text
text = await self._extract_text(file_path)
# Chunk text
chunks = self._chunk_text(text)
# Generate embeddings in batches
embeddings = await self._embed_chunks(chunks)
# Store in database
async with get_db() as db:
for chunk, embedding in zip(chunks, embeddings):
await db.execute("""
INSERT INTO document_chunks
(document_id, user_id, content, embedding, chunk_index, metadata)
VALUES ($1, $2, $3, $4, $5, $6)
""", document_id, user_id, chunk.content,
embedding, chunk.index, chunk.metadata)
# Update document status
await db.execute("""
UPDATE documents SET status = 'ready' WHERE id = $1
""", document_id)
return len(chunks)
async def _extract_text(self, file_path: Path) -> str:
"""Extract text from various file types"""
suffix = file_path.suffix.lower()
if suffix == '.pdf':
return await self._extract_pdf(file_path)
elif suffix in ['.docx', '.doc']:
return await self._extract_docx(file_path)
elif suffix == '.txt':
return file_path.read_text()
else:
raise ValueError(f"Unsupported file type: {suffix}")
async def _extract_pdf(self, file_path: Path) -> str:
"""Extract text from PDF"""
def extract():
text_parts = []
with open(file_path, 'rb') as f:
reader = pypdf.PdfReader(f)
for page in reader.pages:
text_parts.append(page.extract_text())
return "\n\n".join(text_parts)
return await asyncio.to_thread(extract)
async def _extract_docx(self, file_path: Path) -> str:
"""Extract text from DOCX"""
def extract():
doc = DocxDocument(file_path)
return "\n\n".join([para.text for para in doc.paragraphs])
return await asyncio.to_thread(extract)
def _chunk_text(self, text: str) -> List[ProcessedChunk]:
"""Split text into overlapping chunks"""
chunks = []
start = 0
index = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
# Try to break at sentence boundary
if end < len(text):
last_period = chunk_text.rfind('. ')
if last_period > self.chunk_size * 0.5:
end = start + last_period + 1
chunk_text = text[start:end]
chunks.append(ProcessedChunk(
content=chunk_text.strip(),
metadata={"char_start": start, "char_end": end},
index=index
))
start = end - self.chunk_overlap
index += 1
return chunks
async def _embed_chunks(
self,
chunks: List[ProcessedChunk],
batch_size: int = 100
) -> List[List[float]]:
"""Generate embeddings for chunks"""
all_embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c.content for c in batch]
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
embeddings = [e.embedding for e in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
Part 3: RAG Engine
Copy
# app/services/rag_engine.py
from dataclasses import dataclass
from typing import List, Optional
import json
from openai import OpenAI
from app.core.database import get_db
from app.services.usage_tracker import UsageTracker
@dataclass
class Source:
document_id: str
document_name: str
chunk_content: str
similarity: float
@dataclass
class RAGResponse:
answer: str
sources: List[Source]
tokens_used: int
class RAGEngine:
"""Production RAG engine with citations"""
def __init__(self):
self.openai = OpenAI()
self.usage_tracker = UsageTracker()
async def query(
self,
user_id: str,
question: str,
document_ids: Optional[List[str]] = None,
top_k: int = 5,
conversation_history: List[dict] = None
) -> RAGResponse:
"""Answer question with RAG"""
# Check usage limits
await self.usage_tracker.check_limits(user_id, "query")
# Retrieve relevant chunks
sources = await self._retrieve(
user_id=user_id,
query=question,
document_ids=document_ids,
top_k=top_k
)
# Build context
context = self._build_context(sources)
# Generate answer
answer, tokens = await self._generate(
question=question,
context=context,
history=conversation_history
)
# Track usage
await self.usage_tracker.log_usage(
user_id=user_id,
action="query",
tokens_used=tokens
)
return RAGResponse(
answer=answer,
sources=sources,
tokens_used=tokens
)
async def _retrieve(
self,
user_id: str,
query: str,
document_ids: Optional[List[str]],
top_k: int
) -> List[Source]:
"""Retrieve relevant chunks"""
# Get query embedding
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=query
)
query_embedding = response.data[0].embedding
# Search vector database
async with get_db() as db:
sql = """
SELECT
c.document_id,
d.filename,
c.content,
1 - (c.embedding <=> $1::vector) as similarity
FROM document_chunks c
JOIN documents d ON c.document_id = d.id
WHERE c.user_id = $2
"""
params = [str(query_embedding), user_id]
if document_ids:
sql += " AND c.document_id = ANY($3)"
params.append(document_ids)
sql += """
ORDER BY c.embedding <=> $1::vector
LIMIT $4
"""
params.append(top_k)
rows = await db.fetch(sql, *params)
return [
Source(
document_id=row['document_id'],
document_name=row['filename'],
chunk_content=row['content'],
similarity=row['similarity']
)
for row in rows
]
def _build_context(self, sources: List[Source]) -> str:
"""Build context string with source markers"""
parts = []
for i, source in enumerate(sources, 1):
parts.append(f"[Source {i}: {source.document_name}]\n{source.chunk_content}")
return "\n\n---\n\n".join(parts)
async def _generate(
self,
question: str,
context: str,
history: Optional[List[dict]]
) -> tuple[str, int]:
"""Generate answer with GPT-4"""
system_prompt = """You are a helpful document assistant. Answer questions based on the provided sources.
RULES:
1. Only use information from the provided sources
2. Always cite sources using [Source N] format
3. If sources don't contain the answer, say "I couldn't find this information in your documents"
4. Be concise but thorough
5. If asked about something not in the documents, politely redirect to what you can help with"""
messages = [{"role": "system", "content": system_prompt}]
# Add conversation history
if history:
messages.extend(history[-6:]) # Last 3 exchanges
# Add current question with context
messages.append({
"role": "user",
"content": f"""Sources:
{context}
Question: {question}"""
})
response = self.openai.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0,
max_tokens=1000
)
return (
response.choices[0].message.content,
response.usage.total_tokens
)
Part 4: API Routes
Copy
# app/routers/chat.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List, Optional
from app.core.auth import get_current_user
from app.services.rag_engine import RAGEngine
from app.core.database import get_db
router = APIRouter()
rag_engine = RAGEngine()
class ChatRequest(BaseModel):
conversation_id: Optional[str] = None
message: str
document_ids: Optional[List[str]] = None
class ChatResponse(BaseModel):
conversation_id: str
answer: str
sources: List[dict]
tokens_used: int
@router.post("/", response_model=ChatResponse)
async def chat(
request: ChatRequest,
user = Depends(get_current_user)
):
"""Chat with your documents"""
# Get or create conversation
async with get_db() as db:
if request.conversation_id:
# Verify ownership
conv = await db.fetchrow(
"SELECT * FROM conversations WHERE id = $1 AND user_id = $2",
request.conversation_id, user.id
)
if not conv:
raise HTTPException(404, "Conversation not found")
# Get history
history = await db.fetch("""
SELECT role, content FROM messages
WHERE conversation_id = $1
ORDER BY created_at
""", request.conversation_id)
history = [dict(row) for row in history]
else:
# Create new conversation
conv = await db.fetchrow("""
INSERT INTO conversations (user_id, document_ids)
VALUES ($1, $2)
RETURNING *
""", user.id, request.document_ids or [])
history = []
# Get RAG response
response = await rag_engine.query(
user_id=user.id,
question=request.message,
document_ids=request.document_ids,
conversation_history=history
)
# Save messages
async with get_db() as db:
await db.execute("""
INSERT INTO messages (conversation_id, role, content)
VALUES ($1, 'user', $2)
""", conv['id'], request.message)
await db.execute("""
INSERT INTO messages (conversation_id, role, content, sources, token_count)
VALUES ($1, 'assistant', $2, $3, $4)
""", conv['id'], response.answer,
[{"doc": s.document_name, "similarity": s.similarity} for s in response.sources],
response.tokens_used)
return ChatResponse(
conversation_id=str(conv['id']),
answer=response.answer,
sources=[{
"document": s.document_name,
"content": s.chunk_content[:200] + "...",
"similarity": round(s.similarity, 3)
} for s in response.sources],
tokens_used=response.tokens_used
)
@router.post("/stream")
async def chat_stream(
request: ChatRequest,
user = Depends(get_current_user)
):
"""Stream chat response"""
async def generate():
# ... similar to above but with streaming
pass
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Part 5: Frontend (Next.js)
Copy
// app/chat/page.tsx
'use client';
import { useState, useRef, useEffect } from 'react';
import { Send, FileText, Loader2 } from 'lucide-react';
import { Button } from '@/components/ui/button';
import { Textarea } from '@/components/ui/textarea';
import { Card } from '@/components/ui/card';
interface Message {
role: 'user' | 'assistant';
content: string;
sources?: Array<{
document: string;
content: string;
similarity: number;
}>;
}
export default function ChatPage() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [loading, setLoading] = useState(false);
const [conversationId, setConversationId] = useState<string | null>(null);
const messagesEndRef = useRef<HTMLDivElement>(null);
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
};
useEffect(scrollToBottom, [messages]);
const sendMessage = async () => {
if (!input.trim() || loading) return;
const userMessage: Message = { role: 'user', content: input };
setMessages(prev => [...prev, userMessage]);
setInput('');
setLoading(true);
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: input,
conversation_id: conversationId,
}),
});
const data = await response.json();
setConversationId(data.conversation_id);
setMessages(prev => [...prev, {
role: 'assistant',
content: data.answer,
sources: data.sources,
}]);
} catch (error) {
console.error('Chat error:', error);
} finally {
setLoading(false);
}
};
return (
<div className="flex flex-col h-screen max-w-4xl mx-auto p-4">
{/* Messages */}
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((message, i) => (
<div
key={i}
className={`flex ${message.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<Card className={`p-4 max-w-[80%] ${
message.role === 'user'
? 'bg-primary text-primary-foreground'
: 'bg-muted'
}`}>
<p className="whitespace-pre-wrap">{message.content}</p>
{/* Sources */}
{message.sources && message.sources.length > 0 && (
<div className="mt-3 pt-3 border-t border-border/50">
<p className="text-xs font-medium mb-2">Sources:</p>
<div className="space-y-2">
{message.sources.map((source, j) => (
<div key={j} className="flex items-start gap-2 text-xs">
<FileText className="h-3 w-3 mt-0.5 flex-shrink-0" />
<div>
<span className="font-medium">{source.document}</span>
<span className="text-muted-foreground ml-2">
({Math.round(source.similarity * 100)}% match)
</span>
</div>
</div>
))}
</div>
</div>
)}
</Card>
</div>
))}
{loading && (
<div className="flex justify-start">
<Card className="p-4 bg-muted">
<Loader2 className="h-5 w-5 animate-spin" />
</Card>
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* Input */}
<div className="flex gap-2">
<Textarea
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Ask about your documents..."
className="min-h-[60px] resize-none"
onKeyDown={(e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
}}
/>
<Button
onClick={sendMessage}
disabled={loading || !input.trim()}
size="icon"
className="h-[60px] w-[60px]"
>
{loading ? (
<Loader2 className="h-5 w-5 animate-spin" />
) : (
<Send className="h-5 w-5" />
)}
</Button>
</div>
</div>
);
}
Part 6: Deployment
Docker Setup
Copy
# backend/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Copy
# docker-compose.yml
version: '3.8'
services:
backend:
build: ./backend
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/documind
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- db
- redis
db:
image: pgvector/pgvector:pg16
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=documind
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:
Production Deployment
Copy
# Deploy backend to Railway
railway login
railway init
railway add postgres
railway add redis
railway up
# Deploy frontend to Vercel
vercel deploy --prod
What You’ve Learned
Full-Stack AI Development
Build complete AI products from database to frontend
Production RAG
Implement RAG with chunking, embeddings, and citations
Multi-Tenancy
Handle multiple users with isolated data
Deployment
Deploy and scale AI applications
Extend Your Project
Ideas to make it even more impressive:- Add Voice Input: Use Whisper API for voice-to-text
- Multi-Language Support: Translate queries and responses
- Analytics Dashboard: Show usage patterns and popular queries
- Export to Notion/Docs: Let users export conversations
- Team Workspaces: Add collaboration features
- Custom Embeddings: Fine-tune for specific domains
Portfolio Ready
This project demonstrates:- ✅ End-to-end AI product development
- ✅ Production architecture patterns
- ✅ Modern tech stack proficiency
- ✅ Database design with vectors
- ✅ API design and authentication
- ✅ Frontend development
- ✅ Deployment and DevOps
Pro Tip: Deploy this project, add it to your resume, and link your GitHub. This single project can be your ticket to AI engineering roles.