Search Methods Comparison
Copy
Method Strengths Weaknesses
-----------------------------------------------------------------
Keyword (BM25) Exact matches, fast Misses synonyms
Semantic Understands meaning Misses keywords
Hybrid Best of both More complex
BM25 Implementation
BM25 (Best Match 25) is a probabilistic ranking function:Copy
import math
from collections import Counter
from typing import List, Dict, Tuple
class BM25:
"""BM25 ranking algorithm implementation"""
def __init__(
self,
documents: List[str],
k1: float = 1.5,
b: float = 0.75
):
self.k1 = k1
self.b = b
self.documents = documents
self.doc_count = len(documents)
# Tokenize documents
self.tokenized_docs = [self._tokenize(doc) for doc in documents]
# Calculate document lengths
self.doc_lengths = [len(doc) for doc in self.tokenized_docs]
self.avg_doc_length = sum(self.doc_lengths) / self.doc_count
# Build inverted index
self.doc_freqs = {} # term -> number of docs containing term
self.term_freqs = [] # per-document term frequencies
for doc_tokens in self.tokenized_docs:
term_freq = Counter(doc_tokens)
self.term_freqs.append(term_freq)
for term in set(doc_tokens):
self.doc_freqs[term] = self.doc_freqs.get(term, 0) + 1
def _tokenize(self, text: str) -> List[str]:
"""Simple tokenization"""
return text.lower().split()
def _idf(self, term: str) -> float:
"""Calculate inverse document frequency"""
doc_freq = self.doc_freqs.get(term, 0)
return math.log(
(self.doc_count - doc_freq + 0.5) / (doc_freq + 0.5) + 1
)
def _score_document(
self,
query_terms: List[str],
doc_idx: int
) -> float:
"""Score a single document against query"""
score = 0.0
doc_len = self.doc_lengths[doc_idx]
term_freqs = self.term_freqs[doc_idx]
for term in query_terms:
if term not in term_freqs:
continue
tf = term_freqs[term]
idf = self._idf(term)
# BM25 formula
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (
1 - self.b + self.b * (doc_len / self.avg_doc_length)
)
score += idf * (numerator / denominator)
return score
def search(
self,
query: str,
top_k: int = 10
) -> List[Tuple[int, float]]:
"""Search documents and return top-k results"""
query_terms = self._tokenize(query)
scores = []
for idx in range(self.doc_count):
score = self._score_document(query_terms, idx)
if score > 0:
scores.append((idx, score))
# Sort by score descending
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]
# Usage
documents = [
"Machine learning is a subset of artificial intelligence",
"Deep learning uses neural networks with many layers",
"Natural language processing enables computers to understand text",
"Computer vision allows machines to interpret images"
]
bm25 = BM25(documents)
results = bm25.search("neural network deep learning")
for idx, score in results:
print(f"Score: {score:.3f} - {documents[idx][:50]}...")
Semantic Search with Embeddings
Copy
from openai import OpenAI
import numpy as np
from typing import List, Tuple
client = OpenAI()
class SemanticSearch:
"""Semantic search using embeddings"""
def __init__(self, model: str = "text-embedding-3-small"):
self.model = model
self.documents = []
self.embeddings = []
def _embed(self, texts: List[str]) -> np.ndarray:
"""Get embeddings for texts"""
response = client.embeddings.create(
model=self.model,
input=texts
)
return np.array([e.embedding for e in response.data])
def add_documents(self, documents: List[str]):
"""Add documents to the index"""
self.documents.extend(documents)
embeddings = self._embed(documents)
if len(self.embeddings) == 0:
self.embeddings = embeddings
else:
self.embeddings = np.vstack([self.embeddings, embeddings])
def search(
self,
query: str,
top_k: int = 10
) -> List[Tuple[int, float, str]]:
"""Search for similar documents"""
query_embedding = self._embed([query])[0]
# Cosine similarity
similarities = np.dot(self.embeddings, query_embedding) / (
np.linalg.norm(self.embeddings, axis=1) *
np.linalg.norm(query_embedding)
)
# Get top-k indices
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
results.append((
int(idx),
float(similarities[idx]),
self.documents[idx]
))
return results
# Usage
semantic = SemanticSearch()
semantic.add_documents(documents)
results = semantic.search("How do machines learn?")
Hybrid Search
Combine BM25 and semantic search for best results:Copy
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class SearchResult:
doc_id: int
document: str
bm25_score: float = 0.0
semantic_score: float = 0.0
hybrid_score: float = 0.0
class HybridSearch:
"""Combine BM25 and semantic search"""
def __init__(
self,
documents: List[str],
alpha: float = 0.5, # Weight for semantic (1-alpha for BM25)
embedding_model: str = "text-embedding-3-small"
):
self.documents = documents
self.alpha = alpha
# Initialize both search methods
self.bm25 = BM25(documents)
self.semantic = SemanticSearch(embedding_model)
self.semantic.add_documents(documents)
def _normalize_scores(
self,
scores: List[Tuple[int, float]]
) -> Dict[int, float]:
"""Min-max normalize scores to 0-1"""
if not scores:
return {}
values = [s[1] for s in scores]
min_val = min(values)
max_val = max(values)
range_val = max_val - min_val if max_val > min_val else 1
return {
idx: (score - min_val) / range_val
for idx, score in scores
}
def search(
self,
query: str,
top_k: int = 10
) -> List[SearchResult]:
"""Perform hybrid search"""
# Get BM25 results
bm25_results = self.bm25.search(query, top_k=top_k * 2)
bm25_scores = self._normalize_scores(bm25_results)
# Get semantic results
semantic_results = self.semantic.search(query, top_k=top_k * 2)
semantic_scores = {idx: score for idx, score, _ in semantic_results}
# Combine all unique document IDs
all_docs = set(bm25_scores.keys()) | set(semantic_scores.keys())
# Calculate hybrid scores
results = []
for doc_id in all_docs:
bm25 = bm25_scores.get(doc_id, 0.0)
semantic = semantic_scores.get(doc_id, 0.0)
hybrid = self.alpha * semantic + (1 - self.alpha) * bm25
results.append(SearchResult(
doc_id=doc_id,
document=self.documents[doc_id],
bm25_score=bm25,
semantic_score=semantic,
hybrid_score=hybrid
))
# Sort by hybrid score
results.sort(key=lambda x: x.hybrid_score, reverse=True)
return results[:top_k]
# Usage
hybrid = HybridSearch(documents, alpha=0.7) # 70% semantic, 30% BM25
results = hybrid.search("machine learning algorithms")
for r in results:
print(f"Hybrid: {r.hybrid_score:.3f} (BM25: {r.bm25_score:.3f}, "
f"Semantic: {r.semantic_score:.3f})")
print(f" {r.document[:60]}...")
Reciprocal Rank Fusion (RRF)
Copy
from collections import defaultdict
from typing import List, Dict
def reciprocal_rank_fusion(
rankings: List[List[int]],
k: int = 60
) -> List[Tuple[int, float]]:
"""
Combine multiple rankings using RRF.
Args:
rankings: List of ranked document ID lists
k: Constant to prevent high ranks from dominating
Returns:
List of (doc_id, rrf_score) sorted by score
"""
rrf_scores = defaultdict(float)
for ranking in rankings:
for rank, doc_id in enumerate(ranking, start=1):
rrf_scores[doc_id] += 1 / (k + rank)
# Sort by RRF score
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
return sorted_docs
class RRFHybridSearch:
"""Hybrid search using Reciprocal Rank Fusion"""
def __init__(self, documents: List[str]):
self.documents = documents
self.bm25 = BM25(documents)
self.semantic = SemanticSearch()
self.semantic.add_documents(documents)
def search(
self,
query: str,
top_k: int = 10,
rrf_k: int = 60
) -> List[Tuple[int, float, str]]:
"""Search using RRF to combine rankings"""
# Get rankings from both methods
bm25_results = self.bm25.search(query, top_k=top_k * 2)
bm25_ranking = [idx for idx, _ in bm25_results]
semantic_results = self.semantic.search(query, top_k=top_k * 2)
semantic_ranking = [idx for idx, _, _ in semantic_results]
# Apply RRF
fused = reciprocal_rank_fusion(
[bm25_ranking, semantic_ranking],
k=rrf_k
)
results = []
for doc_id, score in fused[:top_k]:
results.append((doc_id, score, self.documents[doc_id]))
return results
Reranking
Rerank initial results with a more powerful model:Copy
from dataclasses import dataclass
from typing import List
@dataclass
class RerankResult:
index: int
document: str
relevance_score: float
class CrossEncoderReranker:
"""Rerank using cross-encoder models"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
from sentence_transformers import CrossEncoder
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
documents: List[str],
top_k: int = None
) -> List[RerankResult]:
"""Rerank documents by relevance to query"""
# Create query-document pairs
pairs = [[query, doc] for doc in documents]
# Get relevance scores
scores = self.model.predict(pairs)
# Create results with scores
results = [
RerankResult(
index=i,
document=doc,
relevance_score=float(score)
)
for i, (doc, score) in enumerate(zip(documents, scores))
]
# Sort by relevance
results.sort(key=lambda x: x.relevance_score, reverse=True)
if top_k:
results = results[:top_k]
return results
# Cohere Rerank API
class CohereReranker:
"""Rerank using Cohere API"""
def __init__(self, api_key: str = None):
import cohere
self.client = cohere.Client(api_key)
def rerank(
self,
query: str,
documents: List[str],
top_k: int = 10
) -> List[RerankResult]:
"""Rerank using Cohere rerank endpoint"""
response = self.client.rerank(
model="rerank-english-v3.0",
query=query,
documents=documents,
top_n=top_k
)
return [
RerankResult(
index=r.index,
document=documents[r.index],
relevance_score=r.relevance_score
)
for r in response.results
]
# Two-stage retrieval: retrieve then rerank
class TwoStageRetriever:
"""Retrieve candidates then rerank"""
def __init__(
self,
documents: List[str],
retrieval_k: int = 100,
final_k: int = 10
):
self.documents = documents
self.retrieval_k = retrieval_k
self.final_k = final_k
# Fast retrieval
self.hybrid = HybridSearch(documents)
# Accurate reranking
self.reranker = CrossEncoderReranker()
def search(self, query: str) -> List[RerankResult]:
"""Two-stage search"""
# Stage 1: Fast retrieval of candidates
candidates = self.hybrid.search(query, top_k=self.retrieval_k)
candidate_docs = [c.document for c in candidates]
# Stage 2: Accurate reranking
reranked = self.reranker.rerank(
query,
candidate_docs,
top_k=self.final_k
)
return reranked
Query Expansion
Improve recall by expanding queries:Copy
from openai import OpenAI
client = OpenAI()
class QueryExpander:
"""Expand queries for better recall"""
def __init__(self):
self.client = OpenAI()
def expand_with_synonyms(self, query: str) -> List[str]:
"""Generate query variations with synonyms"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "Generate 3 alternative phrasings of the search query. Return one per line, no numbering."
},
{"role": "user", "content": query}
],
temperature=0.7
)
variations = response.choices[0].message.content.strip().split("\n")
return [query] + [v.strip() for v in variations if v.strip()]
def expand_with_hypothetical_answer(self, query: str) -> str:
"""HyDE: Generate hypothetical answer for better semantic match"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "Write a short paragraph that would answer the following question. Be factual and specific."
},
{"role": "user", "content": query}
],
temperature=0.3
)
return response.choices[0].message.content
class ExpandedSearch:
"""Search with query expansion"""
def __init__(self, documents: List[str]):
self.documents = documents
self.semantic = SemanticSearch()
self.semantic.add_documents(documents)
self.expander = QueryExpander()
def search_with_expansion(
self,
query: str,
top_k: int = 10,
expansion_method: str = "synonyms" # or "hyde"
) -> List[Tuple[int, float, str]]:
"""Search with query expansion"""
if expansion_method == "synonyms":
queries = self.expander.expand_with_synonyms(query)
elif expansion_method == "hyde":
hyde_query = self.expander.expand_with_hypothetical_answer(query)
queries = [hyde_query]
else:
queries = [query]
# Search with all query variations
all_results = {}
for q in queries:
results = self.semantic.search(q, top_k=top_k)
for idx, score, doc in results:
if idx not in all_results:
all_results[idx] = {"scores": [], "doc": doc}
all_results[idx]["scores"].append(score)
# Aggregate scores (max pooling)
final_results = [
(idx, max(data["scores"]), data["doc"])
for idx, data in all_results.items()
]
final_results.sort(key=lambda x: x[1], reverse=True)
return final_results[:top_k]
Contextual Retrieval
Add context to chunks before embedding:Copy
class ContextualChunker:
"""Add document context to chunks for better retrieval"""
def __init__(self):
self.client = OpenAI()
def add_context(
self,
document: str,
chunk: str
) -> str:
"""Generate contextual prefix for chunk"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": """Provide a brief context (1-2 sentences) that situates this chunk within the larger document. The context should help understand what the chunk is about without reading the full document."""
},
{
"role": "user",
"content": f"Document:\n{document[:2000]}...\n\nChunk:\n{chunk}"
}
],
max_tokens=100,
temperature=0.3
)
context = response.choices[0].message.content
return f"{context}\n\n{chunk}"
def process_document(
self,
document: str,
chunk_size: int = 500,
overlap: int = 50
) -> List[str]:
"""Chunk document and add context to each chunk"""
# Simple chunking
chunks = []
start = 0
while start < len(document):
end = start + chunk_size
chunk = document[start:end]
chunks.append(chunk)
start = end - overlap
# Add context to each chunk
contextual_chunks = []
for chunk in chunks:
contextual = self.add_context(document, chunk)
contextual_chunks.append(contextual)
return contextual_chunks
Search Pipeline
Copy
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class SearchConfig:
retrieval_k: int = 100
rerank_k: int = 10
use_expansion: bool = True
use_reranking: bool = True
hybrid_alpha: float = 0.7
class SearchPipeline:
"""Complete search pipeline"""
def __init__(
self,
documents: List[str],
config: SearchConfig = None
):
self.documents = documents
self.config = config or SearchConfig()
# Initialize components
self.hybrid = HybridSearch(documents, alpha=self.config.hybrid_alpha)
self.expander = QueryExpander() if self.config.use_expansion else None
self.reranker = CrossEncoderReranker() if self.config.use_reranking else None
def search(self, query: str) -> List[dict]:
"""Full search pipeline"""
# Query expansion
if self.expander:
queries = self.expander.expand_with_synonyms(query)
else:
queries = [query]
# Retrieve candidates
all_candidates = {}
for q in queries:
results = self.hybrid.search(q, top_k=self.config.retrieval_k)
for r in results:
if r.doc_id not in all_candidates:
all_candidates[r.doc_id] = r
candidates = list(all_candidates.values())
candidate_docs = [c.document for c in candidates]
# Reranking
if self.reranker and len(candidate_docs) > 0:
reranked = self.reranker.rerank(
query,
candidate_docs,
top_k=self.config.rerank_k
)
return [
{
"rank": i + 1,
"document": r.document,
"score": r.relevance_score
}
for i, r in enumerate(reranked)
]
# Return hybrid results if no reranking
return [
{
"rank": i + 1,
"document": c.document,
"score": c.hybrid_score
}
for i, c in enumerate(candidates[:self.config.rerank_k])
]
Performance Comparison
| Method | Recall@10 | Latency | Cost |
|---|---|---|---|
| BM25 | 0.65 | <10ms | None |
| Semantic | 0.75 | 50ms | Embeddings |
| Hybrid | 0.82 | 60ms | Embeddings |
| Hybrid + Rerank | 0.90 | 150ms | Embeddings + Rerank |
What is Next
Context Window Management
Learn to manage context windows effectively with compression and optimization