Skip to main content

Why RAG is the Killer App

RAG (Retrieval-Augmented Generation) is how you build AI products that actually work with real data. ChatGPT’s Browsing, Perplexity’s search, enterprise knowledge bots—all RAG.
Industry Reality: 90% of enterprise AI projects are RAG systems. Mastering RAG means you can build products that work with any company’s data without expensive fine-tuning.

The RAG Mental Model

                    ┌──────────────────────────┐
                    │      User Question       │
                    └────────────┬─────────────┘

                    ┌────────────▼─────────────┐
                    │   1. Query Processing    │  ← Rewrite, expand, decompose
                    └────────────┬─────────────┘

                    ┌────────────▼─────────────┐
                    │   2. Retrieval Engine    │  ← Vector + keyword search
                    └────────────┬─────────────┘

                    ┌────────────▼─────────────┐
                    │   3. Re-ranking          │  ← Cross-encoder scoring
                    └────────────┬─────────────┘

                    ┌────────────▼─────────────┐
                    │   4. Context Building    │  ← Compress, format, citations
                    └────────────┬─────────────┘

                    ┌────────────▼─────────────┐
                    │   5. LLM Generation      │  ← Answer with sources
                    └──────────────────────────┘

Production RAG System

Complete Implementation

from openai import OpenAI
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
import asyncio
import asyncpg
import json
from datetime import datetime

class RetrievalStrategy(Enum):
    VECTOR = "vector"
    KEYWORD = "keyword"
    HYBRID = "hybrid"

@dataclass
class Document:
    id: str
    content: str
    metadata: Dict[str, Any]
    score: float = 0.0
    
@dataclass
class RAGResponse:
    answer: str
    sources: List[Dict[str, Any]]
    confidence: float
    retrieval_time_ms: float
    generation_time_ms: float
    total_tokens: int
    model: str

@dataclass
class RAGConfig:
    """Configuration for RAG pipeline"""
    # Retrieval
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.HYBRID
    top_k: int = 10
    similarity_threshold: float = 0.7
    
    # Re-ranking
    enable_reranking: bool = True
    rerank_top_k: int = 5
    
    # Generation
    model: str = "gpt-4o"
    temperature: float = 0
    max_tokens: int = 1000
    
    # Query processing
    enable_query_expansion: bool = True
    enable_hyde: bool = False  # Hypothetical Document Embeddings


class ProductionRAG:
    """Enterprise-grade RAG system"""
    
    def __init__(self, database_url: str, config: RAGConfig = None):
        self.database_url = database_url
        self.config = config or RAGConfig()
        self.openai = OpenAI()
        self.pool: asyncpg.Pool = None
    
    async def initialize(self):
        """Initialize database pool"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=5,
            max_size=20
        )
    
    async def close(self):
        if self.pool:
            await self.pool.close()
    
    # ============= EMBEDDING =============
    
    def _get_embedding(self, text: str) -> List[float]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    
    def _get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=texts
        )
        return [e.embedding for e in response.data]
    
    # ============= QUERY PROCESSING =============
    
    async def _process_query(self, query: str) -> List[str]:
        """Transform query for better retrieval"""
        queries = [query]
        
        if self.config.enable_query_expansion:
            expanded = await self._expand_query(query)
            queries.extend(expanded)
        
        if self.config.enable_hyde:
            hyde_doc = await self._generate_hyde(query)
            queries.append(hyde_doc)
        
        return queries
    
    async def _expand_query(self, query: str) -> List[str]:
        """Generate query variations"""
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Generate 3 alternative phrasings of this search query.
                    Return as JSON: {"queries": ["...", "...", "..."]}"""
                },
                {"role": "user", "content": query}
            ],
            response_format={"type": "json_object"},
            temperature=0.7
        )
        
        result = json.loads(response.choices[0].message.content)
        return result.get("queries", [])
    
    async def _generate_hyde(self, query: str) -> str:
        """Hypothetical Document Embeddings - generate ideal answer"""
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """You are a helpful assistant. Write a short, factual paragraph 
                    that would be the perfect answer to the user's question."""
                },
                {"role": "user", "content": query}
            ],
            max_tokens=200
        )
        return response.choices[0].message.content
    
    # ============= RETRIEVAL =============
    
    async def _retrieve_vector(
        self,
        query_embedding: List[float],
        top_k: int
    ) -> List[Document]:
        """Vector similarity search"""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT 
                    id::text,
                    content,
                    metadata,
                    1 - (embedding <=> $1::vector) as score
                FROM documents
                WHERE 1 - (embedding <=> $1::vector) > $2
                ORDER BY embedding <=> $1::vector
                LIMIT $3
            """, str(query_embedding), self.config.similarity_threshold, top_k)
            
            return [
                Document(
                    id=row['id'],
                    content=row['content'],
                    metadata=json.loads(row['metadata']),
                    score=row['score']
                )
                for row in rows
            ]
    
    async def _retrieve_keyword(
        self,
        query: str,
        top_k: int
    ) -> List[Document]:
        """Full-text keyword search"""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT 
                    id::text,
                    content,
                    metadata,
                    ts_rank(to_tsvector('english', content), 
                            plainto_tsquery('english', $1)) as score
                FROM documents
                WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $1)
                ORDER BY score DESC
                LIMIT $2
            """, query, top_k)
            
            return [
                Document(
                    id=row['id'],
                    content=row['content'],
                    metadata=json.loads(row['metadata']),
                    score=row['score']
                )
                for row in rows
            ]
    
    async def _retrieve_hybrid(
        self,
        query: str,
        query_embedding: List[float],
        top_k: int
    ) -> List[Document]:
        """Hybrid search with Reciprocal Rank Fusion"""
        # Get both result sets
        vector_results = await self._retrieve_vector(query_embedding, top_k * 2)
        keyword_results = await self._retrieve_keyword(query, top_k * 2)
        
        # Build document map
        doc_map: Dict[str, Document] = {}
        vector_ranks: Dict[str, int] = {}
        keyword_ranks: Dict[str, int] = {}
        
        for i, doc in enumerate(vector_results):
            doc_map[doc.id] = doc
            vector_ranks[doc.id] = i + 1
        
        for i, doc in enumerate(keyword_results):
            if doc.id not in doc_map:
                doc_map[doc.id] = doc
            keyword_ranks[doc.id] = i + 1
        
        # Reciprocal Rank Fusion
        k = 60  # RRF constant
        rrf_scores: Dict[str, float] = {}
        
        for doc_id in doc_map:
            v_rank = vector_ranks.get(doc_id, len(vector_results) + 1)
            k_rank = keyword_ranks.get(doc_id, len(keyword_results) + 1)
            rrf_scores[doc_id] = 1/(k + v_rank) + 1/(k + k_rank)
        
        # Sort by RRF score
        sorted_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)
        
        return [doc_map[doc_id] for doc_id in sorted_ids[:top_k]]
    
    async def _retrieve(
        self,
        queries: List[str]
    ) -> List[Document]:
        """Main retrieval method"""
        import time
        start = time.time()
        
        all_docs: Dict[str, Document] = {}
        
        for query in queries:
            query_embedding = self._get_embedding(query)
            
            if self.config.retrieval_strategy == RetrievalStrategy.VECTOR:
                docs = await self._retrieve_vector(query_embedding, self.config.top_k)
            elif self.config.retrieval_strategy == RetrievalStrategy.KEYWORD:
                docs = await self._retrieve_keyword(query, self.config.top_k)
            else:  # HYBRID
                docs = await self._retrieve_hybrid(query, query_embedding, self.config.top_k)
            
            for doc in docs:
                if doc.id not in all_docs or doc.score > all_docs[doc.id].score:
                    all_docs[doc.id] = doc
        
        # Sort by score and limit
        sorted_docs = sorted(all_docs.values(), key=lambda x: x.score, reverse=True)
        return sorted_docs[:self.config.top_k]
    
    # ============= RE-RANKING =============
    
    async def _rerank(
        self,
        query: str,
        documents: List[Document]
    ) -> List[Document]:
        """Re-rank documents using LLM as judge"""
        if not self.config.enable_reranking or len(documents) <= self.config.rerank_top_k:
            return documents[:self.config.rerank_top_k]
        
        # Use LLM to score relevance
        doc_texts = "\n\n".join([
            f"[Doc {i+1}] {doc.content[:500]}"
            for i, doc in enumerate(documents)
        ])
        
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Rate the relevance of each document to the query.
                    Return JSON: {"rankings": [{"doc": 1, "score": 0.9}, ...]}
                    Score from 0 (irrelevant) to 1 (highly relevant)."""
                },
                {
                    "role": "user",
                    "content": f"Query: {query}\n\nDocuments:\n{doc_texts}"
                }
            ],
            response_format={"type": "json_object"}
        )
        
        rankings = json.loads(response.choices[0].message.content)
        
        # Apply new scores
        for ranking in rankings.get("rankings", []):
            idx = ranking["doc"] - 1
            if 0 <= idx < len(documents):
                documents[idx].score = ranking["score"]
        
        # Re-sort and return top_k
        return sorted(documents, key=lambda x: x.score, reverse=True)[:self.config.rerank_top_k]
    
    # ============= GENERATION =============
    
    def _build_context(self, documents: List[Document]) -> tuple[str, List[Dict]]:
        """Build context string and sources list"""
        context_parts = []
        sources = []
        
        for i, doc in enumerate(documents, 1):
            context_parts.append(f"[Source {i}]\n{doc.content}")
            sources.append({
                "id": i,
                "doc_id": doc.id,
                "title": doc.metadata.get("title", "Untitled"),
                "source": doc.metadata.get("source", "Unknown"),
                "chunk_index": doc.metadata.get("chunk_index"),
                "score": round(doc.score, 3)
            })
        
        return "\n\n---\n\n".join(context_parts), sources
    
    async def _generate(
        self,
        query: str,
        context: str,
        sources: List[Dict]
    ) -> RAGResponse:
        """Generate answer using LLM"""
        import time
        start = time.time()
        
        system_prompt = """You are a helpful assistant that answers questions based on provided sources.

RULES:
1. Only use information from the provided sources
2. Always cite sources using [Source N] format
3. If sources don't contain the answer, say "I don't have information about that in my sources"
4. Be concise but thorough
5. If multiple sources agree, cite all of them"""
        
        response = self.openai.chat.completions.create(
            model=self.config.model,
            messages=[
                {"role": "system", "content": system_prompt},
                {
                    "role": "user",
                    "content": f"""Sources:
{context}

Question: {query}

Please answer based on the sources above:"""
                }
            ],
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        gen_time = (time.time() - start) * 1000
        
        return RAGResponse(
            answer=response.choices[0].message.content,
            sources=sources,
            confidence=self._estimate_confidence(response.choices[0].message.content, sources),
            retrieval_time_ms=0,  # Set by caller
            generation_time_ms=gen_time,
            total_tokens=response.usage.total_tokens,
            model=self.config.model
        )
    
    def _estimate_confidence(self, answer: str, sources: List[Dict]) -> float:
        """Estimate answer confidence based on source citations"""
        # Count how many sources are cited
        cited = sum(1 for s in sources if f"[Source {s['id']}]" in answer)
        citation_ratio = cited / len(sources) if sources else 0
        
        # Check for uncertainty markers
        uncertainty_phrases = [
            "i don't have information",
            "not mentioned",
            "cannot find",
            "unclear",
            "may not be"
        ]
        has_uncertainty = any(p in answer.lower() for p in uncertainty_phrases)
        
        if has_uncertainty:
            return 0.3
        elif citation_ratio >= 0.5:
            return 0.9
        elif citation_ratio > 0:
            return 0.7
        else:
            return 0.5
    
    # ============= MAIN QUERY METHOD =============
    
    async def query(self, question: str) -> RAGResponse:
        """Main RAG query method"""
        import time
        
        # 1. Process query
        queries = await self._process_query(question)
        
        # 2. Retrieve
        retrieval_start = time.time()
        documents = await self._retrieve(queries)
        retrieval_time = (time.time() - retrieval_start) * 1000
        
        if not documents:
            return RAGResponse(
                answer="I couldn't find any relevant information to answer your question.",
                sources=[],
                confidence=0.0,
                retrieval_time_ms=retrieval_time,
                generation_time_ms=0,
                total_tokens=0,
                model=self.config.model
            )
        
        # 3. Re-rank
        if self.config.enable_reranking:
            documents = await self._rerank(question, documents)
        
        # 4. Build context
        context, sources = self._build_context(documents)
        
        # 5. Generate
        response = await self._generate(question, context, sources)
        response.retrieval_time_ms = retrieval_time
        
        return response


# Usage
async def main():
    rag = ProductionRAG(
        database_url="postgresql://user:pass@localhost/docs",
        config=RAGConfig(
            retrieval_strategy=RetrievalStrategy.HYBRID,
            enable_reranking=True,
            enable_query_expansion=True
        )
    )
    
    await rag.initialize()
    
    response = await rag.query("What is our company's vacation policy?")
    
    print(f"Answer: {response.answer}")
    print(f"\nSources:")
    for source in response.sources:
        print(f"  [{source['id']}] {source['title']} (score: {source['score']})")
    print(f"\nConfidence: {response.confidence}")
    print(f"Retrieval: {response.retrieval_time_ms:.1f}ms")
    print(f"Generation: {response.generation_time_ms:.1f}ms")
    
    await rag.close()

asyncio.run(main())

Advanced Techniques

1. Parent-Child Retrieval

Store small chunks for retrieval, return larger parent chunks for context:
class ParentChildRAG:
    """Retrieve on child chunks, return parent context"""
    
    async def add_document(self, content: str, doc_id: str):
        """Store with parent-child relationships"""
        # Create parent chunks (large, ~2000 chars)
        parent_chunks = self.chunker.chunk(content, chunk_size=2000)
        
        for p_idx, parent in enumerate(parent_chunks):
            parent_id = f"{doc_id}_p{p_idx}"
            
            # Create child chunks (small, ~300 chars)
            child_chunks = self.chunker.chunk(parent, chunk_size=300)
            
            # Store parent (no embedding needed)
            await self._store_parent(parent_id, parent)
            
            # Store children with embeddings
            for c_idx, child in enumerate(child_chunks):
                child_id = f"{parent_id}_c{c_idx}"
                await self._store_child(
                    child_id,
                    child,
                    embedding=self._get_embedding(child),
                    parent_id=parent_id
                )
    
    async def retrieve(self, query: str) -> List[str]:
        """Search children, return parents"""
        # Search on child embeddings
        child_results = await self._vector_search(query)
        
        # Get unique parent IDs
        parent_ids = list(set(r['parent_id'] for r in child_results))
        
        # Return parent content
        return await self._get_parents(parent_ids)

2. Agentic RAG with Query Decomposition

class AgenticRAG:
    """RAG with multi-step reasoning"""
    
    async def query(self, question: str) -> str:
        # Step 1: Decompose complex question
        sub_questions = await self._decompose(question)
        
        # Step 2: Answer each sub-question
        sub_answers = []
        for sq in sub_questions:
            docs = await self._retrieve(sq)
            answer = await self._generate_partial(sq, docs)
            sub_answers.append({"question": sq, "answer": answer})
        
        # Step 3: Synthesize final answer
        return await self._synthesize(question, sub_answers)
    
    async def _decompose(self, question: str) -> List[str]:
        """Break complex question into sub-questions"""
        response = self.openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """Break this question into simpler sub-questions that can be 
                    answered independently. Return as JSON: {"sub_questions": [...]}"""
                },
                {"role": "user", "content": question}
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        return result.get("sub_questions", [question])
    
    async def _synthesize(
        self,
        original_question: str,
        sub_answers: List[Dict]
    ) -> str:
        """Combine sub-answers into final response"""
        context = "\n\n".join([
            f"Q: {sa['question']}\nA: {sa['answer']}"
            for sa in sub_answers
        ])
        
        response = self.openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """Synthesize these partial answers into a complete, 
                    coherent response to the original question."""
                },
                {
                    "role": "user",
                    "content": f"""Original question: {original_question}

Partial answers:
{context}

Provide a complete answer:"""
                }
            ]
        )
        
        return response.choices[0].message.content

Evaluation Framework

from dataclasses import dataclass
from typing import List
import json

@dataclass
class EvalCase:
    question: str
    expected_docs: List[str]  # Expected source doc IDs
    expected_answer: str       # Ground truth answer
    
@dataclass
class EvalResults:
    retrieval_precision: float
    retrieval_recall: float
    answer_relevance: float
    answer_faithfulness: float
    latency_p50_ms: float
    latency_p95_ms: float

class RAGEvaluator:
    """Evaluate RAG system quality"""
    
    def __init__(self, rag: ProductionRAG):
        self.rag = rag
        self.openai = OpenAI()
    
    async def evaluate(self, test_cases: List[EvalCase]) -> EvalResults:
        """Run evaluation on test cases"""
        results = {
            "retrieval_precision": [],
            "retrieval_recall": [],
            "answer_relevance": [],
            "answer_faithfulness": [],
            "latencies": []
        }
        
        for case in test_cases:
            import time
            start = time.time()
            
            response = await self.rag.query(case.question)
            
            latency = (time.time() - start) * 1000
            results["latencies"].append(latency)
            
            # Retrieval metrics
            retrieved_ids = [s["doc_id"] for s in response.sources]
            
            precision = len(set(retrieved_ids) & set(case.expected_docs)) / len(retrieved_ids) if retrieved_ids else 0
            recall = len(set(retrieved_ids) & set(case.expected_docs)) / len(case.expected_docs) if case.expected_docs else 0
            
            results["retrieval_precision"].append(precision)
            results["retrieval_recall"].append(recall)
            
            # Answer quality (LLM as judge)
            relevance = await self._judge_relevance(
                case.question, 
                response.answer, 
                case.expected_answer
            )
            faithfulness = await self._judge_faithfulness(
                response.answer,
                [s.get("content", "") for s in response.sources]
            )
            
            results["answer_relevance"].append(relevance)
            results["answer_faithfulness"].append(faithfulness)
        
        # Compute aggregates
        latencies = sorted(results["latencies"])
        
        return EvalResults(
            retrieval_precision=sum(results["retrieval_precision"]) / len(test_cases),
            retrieval_recall=sum(results["retrieval_recall"]) / len(test_cases),
            answer_relevance=sum(results["answer_relevance"]) / len(test_cases),
            answer_faithfulness=sum(results["answer_faithfulness"]) / len(test_cases),
            latency_p50_ms=latencies[len(latencies) // 2],
            latency_p95_ms=latencies[int(len(latencies) * 0.95)]
        )
    
    async def _judge_relevance(
        self,
        question: str,
        answer: str,
        expected: str
    ) -> float:
        """Score answer relevance 0-1"""
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Rate how well the answer addresses the question compared to the expected answer.
                    Return JSON: {"score": 0.0 to 1.0, "reason": "..."}"""
                },
                {
                    "role": "user",
                    "content": f"""Question: {question}
                    
Expected Answer: {expected}

Actual Answer: {answer}"""
                }
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        return result.get("score", 0.5)
    
    async def _judge_faithfulness(
        self,
        answer: str,
        source_contents: List[str]
    ) -> float:
        """Score if answer is grounded in sources 0-1"""
        sources = "\n---\n".join(source_contents)
        
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": """Rate if the answer is fully supported by the sources (no hallucinations).
                    Return JSON: {"score": 0.0 to 1.0, "unsupported_claims": [...]}"""
                },
                {
                    "role": "user",
                    "content": f"""Sources:
{sources}

Answer: {answer}"""
                }
            ],
            response_format={"type": "json_object"}
        )
        
        result = json.loads(response.choices[0].message.content)
        return result.get("score", 0.5)

Common Failures and Fixes

Symptoms: Low precision, answer quality poorFixes:
  1. Improve chunking (smaller, semantic boundaries)
  2. Add query expansion
  3. Use hybrid search (vector + keyword)
  4. Add re-ranking step
  5. Tune similarity threshold
Symptoms: Answer doesn’t use sources, makes up factsFixes:
  1. Set temperature=0
  2. Put context closer to question
  3. Add explicit instructions: “Only use provided sources”
  4. Use structured output to force citations
  5. Try a different model (GPT-4 > GPT-4o-mini for this)
Symptoms: Low recall, answer says “no information”Fixes:
  1. Query expansion (multiple query variations)
  2. HyDE (hypothetical document embeddings)
  3. Lower similarity threshold
  4. Increase top_k before re-ranking
  5. Improve document coverage
Symptoms: >3s total latencyFixes:
  1. Cache embeddings (most common queries)
  2. Use async database connections with pooling
  3. Stream LLM responses
  4. Use faster embedding model
  5. Pre-compute common query answers

Key Takeaways

Hybrid Search Wins

Vector + keyword search with RRF scoring outperforms either alone for most use cases.

Re-ranking Is Worth It

Cross-encoder or LLM re-ranking significantly improves precision at modest latency cost.

Query Processing Matters

Query expansion and HyDE can dramatically improve recall for ambiguous queries.

Evaluate Continuously

Track retrieval precision, answer faithfulness, and latency. What you don’t measure, you can’t improve.

What’s Next

AI Agents

Build autonomous agents that use tools, make decisions, and complete multi-step tasks