Why RAG is the Killer App
RAG (Retrieval-Augmented Generation) is how you build AI products that actually work with real data. ChatGPT’s Browsing, Perplexity’s search, enterprise knowledge bots—all RAG.Industry Reality: 90% of enterprise AI projects are RAG systems. Mastering RAG means you can build products that work with any company’s data without expensive fine-tuning.
The RAG Mental Model
Copy
┌──────────────────────────┐
│ User Question │
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ 1. Query Processing │ ← Rewrite, expand, decompose
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ 2. Retrieval Engine │ ← Vector + keyword search
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ 3. Re-ranking │ ← Cross-encoder scoring
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ 4. Context Building │ ← Compress, format, citations
└────────────┬─────────────┘
│
┌────────────▼─────────────┐
│ 5. LLM Generation │ ← Answer with sources
└──────────────────────────┘
Production RAG System
Complete Implementation
Copy
from openai import OpenAI
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import asyncio
import asyncpg
import json
from datetime import datetime
class RetrievalStrategy(Enum):
VECTOR = "vector"
KEYWORD = "keyword"
HYBRID = "hybrid"
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
score: float = 0.0
@dataclass
class RAGResponse:
answer: str
sources: List[Dict[str, Any]]
confidence: float
retrieval_time_ms: float
generation_time_ms: float
total_tokens: int
model: str
@dataclass
class RAGConfig:
"""Configuration for RAG pipeline"""
# Retrieval
retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID
top_k: int = 10
similarity_threshold: float = 0.7
# Re-ranking
enable_reranking: bool = True
rerank_top_k: int = 5
# Generation
model: str = "gpt-4o"
temperature: float = 0
max_tokens: int = 1000
# Query processing
enable_query_expansion: bool = True
enable_hyde: bool = False # Hypothetical Document Embeddings
class ProductionRAG:
"""Enterprise-grade RAG system"""
def __init__(self, database_url: str, config: RAGConfig = None):
self.database_url = database_url
self.config = config or RAGConfig()
self.openai = OpenAI()
self.pool: asyncpg.Pool = None
async def initialize(self):
"""Initialize database pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=5,
max_size=20
)
async def close(self):
if self.pool:
await self.pool.close()
# ============= EMBEDDING =============
def _get_embedding(self, text: str) -> List[float]:
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [e.embedding for e in response.data]
# ============= QUERY PROCESSING =============
async def _process_query(self, query: str) -> List[str]:
"""Transform query for better retrieval"""
queries = [query]
if self.config.enable_query_expansion:
expanded = await self._expand_query(query)
queries.extend(expanded)
if self.config.enable_hyde:
hyde_doc = await self._generate_hyde(query)
queries.append(hyde_doc)
return queries
async def _expand_query(self, query: str) -> List[str]:
"""Generate query variations"""
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Generate 3 alternative phrasings of this search query.
Return as JSON: {"queries": ["...", "...", "..."]}"""
},
{"role": "user", "content": query}
],
response_format={"type": "json_object"},
temperature=0.7
)
result = json.loads(response.choices[0].message.content)
return result.get("queries", [])
async def _generate_hyde(self, query: str) -> str:
"""Hypothetical Document Embeddings - generate ideal answer"""
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """You are a helpful assistant. Write a short, factual paragraph
that would be the perfect answer to the user's question."""
},
{"role": "user", "content": query}
],
max_tokens=200
)
return response.choices[0].message.content
# ============= RETRIEVAL =============
async def _retrieve_vector(
self,
query_embedding: List[float],
top_k: int
) -> List[Document]:
"""Vector similarity search"""
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
id::text,
content,
metadata,
1 - (embedding <=> $1::vector) as score
FROM documents
WHERE 1 - (embedding <=> $1::vector) > $2
ORDER BY embedding <=> $1::vector
LIMIT $3
""", str(query_embedding), self.config.similarity_threshold, top_k)
return [
Document(
id=row['id'],
content=row['content'],
metadata=json.loads(row['metadata']),
score=row['score']
)
for row in rows
]
async def _retrieve_keyword(
self,
query: str,
top_k: int
) -> List[Document]:
"""Full-text keyword search"""
async with self.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
id::text,
content,
metadata,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', $1)) as score
FROM documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $1)
ORDER BY score DESC
LIMIT $2
""", query, top_k)
return [
Document(
id=row['id'],
content=row['content'],
metadata=json.loads(row['metadata']),
score=row['score']
)
for row in rows
]
async def _retrieve_hybrid(
self,
query: str,
query_embedding: List[float],
top_k: int
) -> List[Document]:
"""Hybrid search with Reciprocal Rank Fusion"""
# Get both result sets
vector_results = await self._retrieve_vector(query_embedding, top_k * 2)
keyword_results = await self._retrieve_keyword(query, top_k * 2)
# Build document map
doc_map: Dict[str, Document] = {}
vector_ranks: Dict[str, int] = {}
keyword_ranks: Dict[str, int] = {}
for i, doc in enumerate(vector_results):
doc_map[doc.id] = doc
vector_ranks[doc.id] = i + 1
for i, doc in enumerate(keyword_results):
if doc.id not in doc_map:
doc_map[doc.id] = doc
keyword_ranks[doc.id] = i + 1
# Reciprocal Rank Fusion
k = 60 # RRF constant
rrf_scores: Dict[str, float] = {}
for doc_id in doc_map:
v_rank = vector_ranks.get(doc_id, len(vector_results) + 1)
k_rank = keyword_ranks.get(doc_id, len(keyword_results) + 1)
rrf_scores[doc_id] = 1/(k + v_rank) + 1/(k + k_rank)
# Sort by RRF score
sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
return [doc_map[doc_id] for doc_id in sorted_ids[:top_k]]
async def _retrieve(
self,
queries: List[str]
) -> List[Document]:
"""Main retrieval method"""
import time
start = time.time()
all_docs: Dict[str, Document] = {}
for query in queries:
query_embedding = self._get_embedding(query)
if self.config.retrieval_strategy == RetrievalStrategy.VECTOR:
docs = await self._retrieve_vector(query_embedding, self.config.top_k)
elif self.config.retrieval_strategy == RetrievalStrategy.KEYWORD:
docs = await self._retrieve_keyword(query, self.config.top_k)
else: # HYBRID
docs = await self._retrieve_hybrid(query, query_embedding, self.config.top_k)
for doc in docs:
if doc.id not in all_docs or doc.score > all_docs[doc.id].score:
all_docs[doc.id] = doc
# Sort by score and limit
sorted_docs = sorted(all_docs.values(), key=lambda x: x.score, reverse=True)
return sorted_docs[:self.config.top_k]
# ============= RE-RANKING =============
async def _rerank(
self,
query: str,
documents: List[Document]
) -> List[Document]:
"""Re-rank documents using LLM as judge"""
if not self.config.enable_reranking or len(documents) <= self.config.rerank_top_k:
return documents[:self.config.rerank_top_k]
# Use LLM to score relevance
doc_texts = "\n\n".join([
f"[Doc {i+1}] {doc.content[:500]}"
for i, doc in enumerate(documents)
])
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Rate the relevance of each document to the query.
Return JSON: {"rankings": [{"doc": 1, "score": 0.9}, ...]}
Score from 0 (irrelevant) to 1 (highly relevant)."""
},
{
"role": "user",
"content": f"Query: {query}\n\nDocuments:\n{doc_texts}"
}
],
response_format={"type": "json_object"}
)
rankings = json.loads(response.choices[0].message.content)
# Apply new scores
for ranking in rankings.get("rankings", []):
idx = ranking["doc"] - 1
if 0 <= idx < len(documents):
documents[idx].score = ranking["score"]
# Re-sort and return top_k
return sorted(documents, key=lambda x: x.score, reverse=True)[:self.config.rerank_top_k]
# ============= GENERATION =============
def _build_context(self, documents: List[Document]) -> tuple[str, List[Dict]]:
"""Build context string and sources list"""
context_parts = []
sources = []
for i, doc in enumerate(documents, 1):
context_parts.append(f"[Source {i}]\n{doc.content}")
sources.append({
"id": i,
"doc_id": doc.id,
"title": doc.metadata.get("title", "Untitled"),
"source": doc.metadata.get("source", "Unknown"),
"chunk_index": doc.metadata.get("chunk_index"),
"score": round(doc.score, 3)
})
return "\n\n---\n\n".join(context_parts), sources
async def _generate(
self,
query: str,
context: str,
sources: List[Dict]
) -> RAGResponse:
"""Generate answer using LLM"""
import time
start = time.time()
system_prompt = """You are a helpful assistant that answers questions based on provided sources.
RULES:
1. Only use information from the provided sources
2. Always cite sources using [Source N] format
3. If sources don't contain the answer, say "I don't have information about that in my sources"
4. Be concise but thorough
5. If multiple sources agree, cite all of them"""
response = self.openai.chat.completions.create(
model=self.config.model,
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": f"""Sources:
{context}
Question: {query}
Please answer based on the sources above:"""
}
],
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
gen_time = (time.time() - start) * 1000
return RAGResponse(
answer=response.choices[0].message.content,
sources=sources,
confidence=self._estimate_confidence(response.choices[0].message.content, sources),
retrieval_time_ms=0, # Set by caller
generation_time_ms=gen_time,
total_tokens=response.usage.total_tokens,
model=self.config.model
)
def _estimate_confidence(self, answer: str, sources: List[Dict]) -> float:
"""Estimate answer confidence based on source citations"""
# Count how many sources are cited
cited = sum(1 for s in sources if f"[Source {s['id']}]" in answer)
citation_ratio = cited / len(sources) if sources else 0
# Check for uncertainty markers
uncertainty_phrases = [
"i don't have information",
"not mentioned",
"cannot find",
"unclear",
"may not be"
]
has_uncertainty = any(p in answer.lower() for p in uncertainty_phrases)
if has_uncertainty:
return 0.3
elif citation_ratio >= 0.5:
return 0.9
elif citation_ratio > 0:
return 0.7
else:
return 0.5
# ============= MAIN QUERY METHOD =============
async def query(self, question: str) -> RAGResponse:
"""Main RAG query method"""
import time
# 1. Process query
queries = await self._process_query(question)
# 2. Retrieve
retrieval_start = time.time()
documents = await self._retrieve(queries)
retrieval_time = (time.time() - retrieval_start) * 1000
if not documents:
return RAGResponse(
answer="I couldn't find any relevant information to answer your question.",
sources=[],
confidence=0.0,
retrieval_time_ms=retrieval_time,
generation_time_ms=0,
total_tokens=0,
model=self.config.model
)
# 3. Re-rank
if self.config.enable_reranking:
documents = await self._rerank(question, documents)
# 4. Build context
context, sources = self._build_context(documents)
# 5. Generate
response = await self._generate(question, context, sources)
response.retrieval_time_ms = retrieval_time
return response
# Usage
async def main():
rag = ProductionRAG(
database_url="postgresql://user:pass@localhost/docs",
config=RAGConfig(
retrieval_strategy=RetrievalStrategy.HYBRID,
enable_reranking=True,
enable_query_expansion=True
)
)
await rag.initialize()
response = await rag.query("What is our company's vacation policy?")
print(f"Answer: {response.answer}")
print(f"\nSources:")
for source in response.sources:
print(f" [{source['id']}] {source['title']} (score: {source['score']})")
print(f"\nConfidence: {response.confidence}")
print(f"Retrieval: {response.retrieval_time_ms:.1f}ms")
print(f"Generation: {response.generation_time_ms:.1f}ms")
await rag.close()
asyncio.run(main())
Advanced Techniques
1. Parent-Child Retrieval
Store small chunks for retrieval, return larger parent chunks for context:Copy
class ParentChildRAG:
"""Retrieve on child chunks, return parent context"""
async def add_document(self, content: str, doc_id: str):
"""Store with parent-child relationships"""
# Create parent chunks (large, ~2000 chars)
parent_chunks = self.chunker.chunk(content, chunk_size=2000)
for p_idx, parent in enumerate(parent_chunks):
parent_id = f"{doc_id}_p{p_idx}"
# Create child chunks (small, ~300 chars)
child_chunks = self.chunker.chunk(parent, chunk_size=300)
# Store parent (no embedding needed)
await self._store_parent(parent_id, parent)
# Store children with embeddings
for c_idx, child in enumerate(child_chunks):
child_id = f"{parent_id}_c{c_idx}"
await self._store_child(
child_id,
child,
embedding=self._get_embedding(child),
parent_id=parent_id
)
async def retrieve(self, query: str) -> List[str]:
"""Search children, return parents"""
# Search on child embeddings
child_results = await self._vector_search(query)
# Get unique parent IDs
parent_ids = list(set(r['parent_id'] for r in child_results))
# Return parent content
return await self._get_parents(parent_ids)
2. Agentic RAG with Query Decomposition
Copy
class AgenticRAG:
"""RAG with multi-step reasoning"""
async def query(self, question: str) -> str:
# Step 1: Decompose complex question
sub_questions = await self._decompose(question)
# Step 2: Answer each sub-question
sub_answers = []
for sq in sub_questions:
docs = await self._retrieve(sq)
answer = await self._generate_partial(sq, docs)
sub_answers.append({"question": sq, "answer": answer})
# Step 3: Synthesize final answer
return await self._synthesize(question, sub_answers)
async def _decompose(self, question: str) -> List[str]:
"""Break complex question into sub-questions"""
response = self.openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """Break this question into simpler sub-questions that can be
answered independently. Return as JSON: {"sub_questions": [...]}"""
},
{"role": "user", "content": question}
],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result.get("sub_questions", [question])
async def _synthesize(
self,
original_question: str,
sub_answers: List[Dict]
) -> str:
"""Combine sub-answers into final response"""
context = "\n\n".join([
f"Q: {sa['question']}\nA: {sa['answer']}"
for sa in sub_answers
])
response = self.openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """Synthesize these partial answers into a complete,
coherent response to the original question."""
},
{
"role": "user",
"content": f"""Original question: {original_question}
Partial answers:
{context}
Provide a complete answer:"""
}
]
)
return response.choices[0].message.content
Evaluation Framework
Copy
from dataclasses import dataclass
from typing import List
import json
@dataclass
class EvalCase:
question: str
expected_docs: List[str] # Expected source doc IDs
expected_answer: str # Ground truth answer
@dataclass
class EvalResults:
retrieval_precision: float
retrieval_recall: float
answer_relevance: float
answer_faithfulness: float
latency_p50_ms: float
latency_p95_ms: float
class RAGEvaluator:
"""Evaluate RAG system quality"""
def __init__(self, rag: ProductionRAG):
self.rag = rag
self.openai = OpenAI()
async def evaluate(self, test_cases: List[EvalCase]) -> EvalResults:
"""Run evaluation on test cases"""
results = {
"retrieval_precision": [],
"retrieval_recall": [],
"answer_relevance": [],
"answer_faithfulness": [],
"latencies": []
}
for case in test_cases:
import time
start = time.time()
response = await self.rag.query(case.question)
latency = (time.time() - start) * 1000
results["latencies"].append(latency)
# Retrieval metrics
retrieved_ids = [s["doc_id"] for s in response.sources]
precision = len(set(retrieved_ids) & set(case.expected_docs)) / len(retrieved_ids) if retrieved_ids else 0
recall = len(set(retrieved_ids) & set(case.expected_docs)) / len(case.expected_docs) if case.expected_docs else 0
results["retrieval_precision"].append(precision)
results["retrieval_recall"].append(recall)
# Answer quality (LLM as judge)
relevance = await self._judge_relevance(
case.question,
response.answer,
case.expected_answer
)
faithfulness = await self._judge_faithfulness(
response.answer,
[s.get("content", "") for s in response.sources]
)
results["answer_relevance"].append(relevance)
results["answer_faithfulness"].append(faithfulness)
# Compute aggregates
latencies = sorted(results["latencies"])
return EvalResults(
retrieval_precision=sum(results["retrieval_precision"]) / len(test_cases),
retrieval_recall=sum(results["retrieval_recall"]) / len(test_cases),
answer_relevance=sum(results["answer_relevance"]) / len(test_cases),
answer_faithfulness=sum(results["answer_faithfulness"]) / len(test_cases),
latency_p50_ms=latencies[len(latencies) // 2],
latency_p95_ms=latencies[int(len(latencies) * 0.95)]
)
async def _judge_relevance(
self,
question: str,
answer: str,
expected: str
) -> float:
"""Score answer relevance 0-1"""
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Rate how well the answer addresses the question compared to the expected answer.
Return JSON: {"score": 0.0 to 1.0, "reason": "..."}"""
},
{
"role": "user",
"content": f"""Question: {question}
Expected Answer: {expected}
Actual Answer: {answer}"""
}
],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result.get("score", 0.5)
async def _judge_faithfulness(
self,
answer: str,
source_contents: List[str]
) -> float:
"""Score if answer is grounded in sources 0-1"""
sources = "\n---\n".join(source_contents)
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Rate if the answer is fully supported by the sources (no hallucinations).
Return JSON: {"score": 0.0 to 1.0, "unsupported_claims": [...]}"""
},
{
"role": "user",
"content": f"""Sources:
{sources}
Answer: {answer}"""
}
],
response_format={"type": "json_object"}
)
result = json.loads(response.choices[0].message.content)
return result.get("score", 0.5)
Common Failures and Fixes
Retrieval returns irrelevant documents
Retrieval returns irrelevant documents
Symptoms: Low precision, answer quality poorFixes:
- Improve chunking (smaller, semantic boundaries)
- Add query expansion
- Use hybrid search (vector + keyword)
- Add re-ranking step
- Tune similarity threshold
LLM ignores provided context
LLM ignores provided context
Symptoms: Answer doesn’t use sources, makes up factsFixes:
- Set temperature=0
- Put context closer to question
- Add explicit instructions: “Only use provided sources”
- Use structured output to force citations
- Try a different model (GPT-4 > GPT-4o-mini for this)
Missing relevant documents
Missing relevant documents
Symptoms: Low recall, answer says “no information”Fixes:
- Query expansion (multiple query variations)
- HyDE (hypothetical document embeddings)
- Lower similarity threshold
- Increase top_k before re-ranking
- Improve document coverage
Slow response times
Slow response times
Symptoms: >3s total latencyFixes:
- Cache embeddings (most common queries)
- Use async database connections with pooling
- Stream LLM responses
- Use faster embedding model
- Pre-compute common query answers
Key Takeaways
Hybrid Search Wins
Vector + keyword search with RRF scoring outperforms either alone for most use cases.
Re-ranking Is Worth It
Cross-encoder or LLM re-ranking significantly improves precision at modest latency cost.
Query Processing Matters
Query expansion and HyDE can dramatically improve recall for ambiguous queries.
Evaluate Continuously
Track retrieval precision, answer faithfulness, and latency. What you don’t measure, you can’t improve.
What’s Next
AI Agents
Build autonomous agents that use tools, make decisions, and complete multi-step tasks