Essential Knowledge: Every major system needs search. Understanding how search works is critical for designing social platforms, e-commerce, and content systems.
Search System Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ SEARCH SYSTEM OVERVIEW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ User Query │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │Query Parser │ Tokenize, normalize │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │Query Expander│ Synonyms, spell check │
│ └──────┬───────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Shard 1 │ │ Shard 2 │ │ Shard 3 │ │
│ │ (A-F) │ │ (G-N) │ │ (O-Z) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └───────────────┼───────────────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Result Merger│ Score, sort, dedupe │
│ └──────┬───────┘ │
│ │ │
│ ┌──────▼───────┐ │
│ │ Ranker │ ML ranking, personalization│
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ Search Results │
│ │
└─────────────────────────────────────────────────────────────────┘
Inverted Index
The core data structure powering search.Copy
┌─────────────────────────────────────────────────────────────────┐
│ INVERTED INDEX │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Forward Index (Document → Terms): │
│ ┌────────┬────────────────────────────────────────────────┐ │
│ │ Doc 1 │ "The quick brown fox jumps over the lazy dog" │ │
│ │ Doc 2 │ "Quick brown foxes are rare" │ │
│ │ Doc 3 │ "The dog is lazy but loyal" │ │
│ └────────┴────────────────────────────────────────────────┘ │
│ │
│ Inverted Index (Term → Documents): │
│ ┌──────────┬─────────────────────────────────────────────┐ │
│ │ Term │ Postings List │ │
│ ├──────────┼─────────────────────────────────────────────┤ │
│ │ quick │ [1: pos=2, 2: pos=1] │ │
│ │ brown │ [1: pos=3, 2: pos=2] │ │
│ │ fox │ [1: pos=4] │ │
│ │ foxes │ [2: pos=3] │ │
│ │ lazy │ [1: pos=8, 3: pos=4] │ │
│ │ dog │ [1: pos=9, 3: pos=2] │ │
│ │ rare │ [2: pos=5] │ │
│ │ loyal │ [3: pos=6] │ │
│ └──────────┴─────────────────────────────────────────────┘ │
│ │
│ Query "lazy dog": │
│ 1. Look up "lazy" → [1, 3] │
│ 2. Look up "dog" → [1, 3] │
│ 3. Intersect → [1, 3] (both have "lazy" AND "dog") │
│ │
└─────────────────────────────────────────────────────────────────┘
Building an Inverted Index
- Python
Copy
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Set, Tuple, Optional
import re
import math
@dataclass
class Posting:
doc_id: str
term_frequency: int
positions: List[int]
@dataclass
class Document:
doc_id: str
content: str
metadata: Dict = field(default_factory=dict)
class Tokenizer:
"""Text tokenization and normalization"""
def __init__(self, stop_words: Set[str] = None):
self.stop_words = stop_words or {
'the', 'a', 'an', 'is', 'are', 'was', 'were',
'in', 'on', 'at', 'to', 'for', 'of', 'and', 'or'
}
def tokenize(self, text: str) -> List[Tuple[str, int]]:
"""Return list of (token, position) tuples"""
# Lowercase and extract words
text = text.lower()
words = re.findall(r'\b\w+\b', text)
tokens = []
for i, word in enumerate(words):
# Skip stop words
if word in self.stop_words:
continue
# Apply stemming (simplified)
word = self._stem(word)
tokens.append((word, i))
return tokens
def _stem(self, word: str) -> str:
"""Simple suffix stripping (use Porter/Snowball in production)"""
suffixes = ['ing', 'ed', 'es', 's', 'ly']
for suffix in suffixes:
if word.endswith(suffix) and len(word) > len(suffix) + 2:
return word[:-len(suffix)]
return word
class InvertedIndex:
"""
In-memory inverted index with TF-IDF scoring.
"""
def __init__(self, tokenizer: Tokenizer = None):
self.tokenizer = tokenizer or Tokenizer()
# term -> {doc_id -> Posting}
self.index: Dict[str, Dict[str, Posting]] = defaultdict(dict)
# Document metadata
self.documents: Dict[str, Document] = {}
self.doc_lengths: Dict[str, int] = {}
self.avg_doc_length: float = 0
# Index statistics
self.total_docs: int = 0
def add_document(self, doc: Document):
"""Add a document to the index"""
self.documents[doc.doc_id] = doc
self.total_docs += 1
# Tokenize content
tokens = self.tokenizer.tokenize(doc.content)
self.doc_lengths[doc.doc_id] = len(tokens)
# Update average document length
self._update_avg_length()
# Build term → posting mapping
term_positions: Dict[str, List[int]] = defaultdict(list)
for token, position in tokens:
term_positions[token].append(position)
# Add to inverted index
for term, positions in term_positions.items():
self.index[term][doc.doc_id] = Posting(
doc_id=doc.doc_id,
term_frequency=len(positions),
positions=positions
)
def _update_avg_length(self):
if self.doc_lengths:
self.avg_doc_length = sum(self.doc_lengths.values()) / len(self.doc_lengths)
def search(self, query: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""Search documents using BM25 scoring"""
query_tokens = self.tokenizer.tokenize(query)
query_terms = [token for token, _ in query_tokens]
if not query_terms:
return []
# Score all matching documents
doc_scores: Dict[str, float] = defaultdict(float)
for term in query_terms:
if term not in self.index:
continue
# IDF calculation
df = len(self.index[term]) # Document frequency
idf = math.log((self.total_docs - df + 0.5) / (df + 0.5) + 1)
for doc_id, posting in self.index[term].items():
# BM25 score
tf = posting.term_frequency
doc_length = self.doc_lengths[doc_id]
# BM25 parameters
k1 = 1.5
b = 0.75
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * (doc_length / self.avg_doc_length))
doc_scores[doc_id] += idf * (numerator / denominator)
# Sort by score and return top K
ranked = sorted(doc_scores.items(), key=lambda x: -x[1])
return ranked[:top_k]
def phrase_search(self, phrase: str, top_k: int = 10) -> List[Tuple[str, float]]:
"""Search for exact phrase using position information"""
tokens = self.tokenizer.tokenize(phrase)
terms = [token for token, _ in tokens]
if len(terms) < 2:
return self.search(phrase, top_k)
# Get documents containing all terms
candidate_docs = None
for term in terms:
if term not in self.index:
return [] # Term not found
term_docs = set(self.index[term].keys())
if candidate_docs is None:
candidate_docs = term_docs
else:
candidate_docs &= term_docs
if not candidate_docs:
return []
# Check for phrase matches using positions
results = []
for doc_id in candidate_docs:
if self._check_phrase_match(doc_id, terms):
results.append((doc_id, 1.0)) # Simplified scoring
return results[:top_k]
def _check_phrase_match(self, doc_id: str, terms: List[str]) -> bool:
"""Check if terms appear consecutively in document"""
first_positions = self.index[terms[0]][doc_id].positions
for start_pos in first_positions:
match = True
for i, term in enumerate(terms[1:], 1):
expected_pos = start_pos + i
term_positions = self.index[term][doc_id].positions
if expected_pos not in term_positions:
match = False
break
if match:
return True
return False
def suggest(self, prefix: str, limit: int = 10) -> List[str]:
"""Autocomplete suggestions based on prefix"""
prefix = prefix.lower()
matches = [
term for term in self.index.keys()
if term.startswith(prefix)
]
# Sort by document frequency (popularity)
matches.sort(key=lambda t: -len(self.index[t]))
return matches[:limit]
# Usage example
index = InvertedIndex()
# Add documents
docs = [
Document("1", "The quick brown fox jumps over the lazy dog"),
Document("2", "Quick brown foxes are rare in the wild"),
Document("3", "The dog is lazy but very loyal"),
Document("4", "A quick guide to brown bread making"),
]
for doc in docs:
index.add_document(doc)
# Search
results = index.search("quick brown fox")
print(results) # [(doc_id, score), ...]
# Phrase search
results = index.phrase_search("lazy dog")
print(results)
# Autocomplete
suggestions = index.suggest("qui")
print(suggestions) # ['quick']
Elasticsearch Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ ELASTICSEARCH CLUSTER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Cluster │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │
│ │ │ (Master) │ │ (Data) │ │ (Data) │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ │
│ │ │ │ Shard P1 │ │ │ │ Shard P2 │ │ │ │ Shard P3 │ │ │ │
│ │ │ │ (primary)│ │ │ │ (primary)│ │ │ │ (primary)│ │ │ │
│ │ │ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │ │ │
│ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌──────────┐ │ │ │
│ │ │ │ Shard R2 │ │ │ │ Shard R3 │ │ │ │ Shard R1 │ │ │ │
│ │ │ │ (replica)│ │ │ │ (replica)│ │ │ │ (replica)│ │ │ │
│ │ │ └──────────┘ │ │ └──────────┘ │ │ └──────────┘ │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Index "products" → 3 shards, 1 replica each │
│ Shard = Independent Lucene index │
│ Each shard can have replicas on other nodes │
│ │
└─────────────────────────────────────────────────────────────────┘
Elasticsearch Integration
Copy
from elasticsearch import Elasticsearch, helpers
from typing import List, Dict, Optional
import json
class SearchService:
"""
Production-ready Elasticsearch wrapper.
"""
def __init__(self, hosts: List[str], index_name: str):
self.es = Elasticsearch(hosts)
self.index_name = index_name
def create_index(self, settings: Dict = None, mappings: Dict = None):
"""Create index with custom settings and mappings"""
default_settings = {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"stop",
"snowball"
]
}
}
}
}
default_mappings = {
"properties": {
"title": {
"type": "text",
"analyzer": "custom_analyzer",
"fields": {
"keyword": {"type": "keyword"} # For exact match
}
},
"description": {
"type": "text",
"analyzer": "custom_analyzer"
},
"category": {
"type": "keyword"
},
"price": {
"type": "float"
},
"created_at": {
"type": "date"
},
"tags": {
"type": "keyword"
},
"suggest": {
"type": "completion" # For autocomplete
}
}
}
body = {
"settings": settings or default_settings,
"mappings": mappings or default_mappings
}
if self.es.indices.exists(index=self.index_name):
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name, body=body)
def index_document(self, doc_id: str, document: Dict):
"""Index a single document"""
# Add suggest field for autocomplete
if 'title' in document:
document['suggest'] = {
"input": document['title'].split(),
"weight": document.get('popularity', 1)
}
self.es.index(
index=self.index_name,
id=doc_id,
body=document
)
def bulk_index(self, documents: List[Dict]):
"""Bulk index documents for better performance"""
actions = [
{
"_index": self.index_name,
"_id": doc["id"],
"_source": doc
}
for doc in documents
]
helpers.bulk(self.es, actions)
def search(
self,
query: str,
filters: Dict = None,
sort: List[Dict] = None,
page: int = 1,
size: int = 10
) -> Dict:
"""
Multi-field search with filtering, sorting, and pagination.
"""
# Build the query
must_clauses = []
filter_clauses = []
# Full-text search
if query:
must_clauses.append({
"multi_match": {
"query": query,
"fields": [
"title^3", # Boost title matches
"description",
"tags^2"
],
"type": "best_fields",
"fuzziness": "AUTO" # Handle typos
}
})
# Apply filters
if filters:
if 'category' in filters:
filter_clauses.append({
"term": {"category": filters['category']}
})
if 'price_min' in filters or 'price_max' in filters:
price_range = {}
if 'price_min' in filters:
price_range['gte'] = filters['price_min']
if 'price_max' in filters:
price_range['lte'] = filters['price_max']
filter_clauses.append({
"range": {"price": price_range}
})
if 'tags' in filters:
filter_clauses.append({
"terms": {"tags": filters['tags']}
})
# Build final query
body = {
"query": {
"bool": {
"must": must_clauses or [{"match_all": {}}],
"filter": filter_clauses
}
},
"from": (page - 1) * size,
"size": size,
"highlight": {
"fields": {
"title": {},
"description": {"fragment_size": 150}
}
},
"aggs": {
"categories": {
"terms": {"field": "category", "size": 20}
},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 50},
{"from": 50, "to": 100},
{"from": 100, "to": 500},
{"from": 500}
]
}
},
"avg_price": {
"avg": {"field": "price"}
}
}
}
# Apply sorting
if sort:
body["sort"] = sort
else:
# Default: relevance score + recency
body["sort"] = [
{"_score": {"order": "desc"}},
{"created_at": {"order": "desc"}}
]
result = self.es.search(index=self.index_name, body=body)
return {
"total": result['hits']['total']['value'],
"hits": [
{
"id": hit['_id'],
"score": hit['_score'],
"source": hit['_source'],
"highlights": hit.get('highlight', {})
}
for hit in result['hits']['hits']
],
"aggregations": result.get('aggregations', {})
}
def autocomplete(self, prefix: str, size: int = 10) -> List[str]:
"""Autocomplete suggestions"""
body = {
"suggest": {
"product-suggest": {
"prefix": prefix,
"completion": {
"field": "suggest",
"size": size,
"skip_duplicates": True,
"fuzzy": {
"fuzziness": 1
}
}
}
}
}
result = self.es.search(index=self.index_name, body=body)
suggestions = []
for option in result['suggest']['product-suggest'][0]['options']:
suggestions.append(option['text'])
return suggestions
def search_as_you_type(self, query: str, size: int = 10) -> List[Dict]:
"""Real-time search as user types"""
body = {
"query": {
"multi_match": {
"query": query,
"type": "bool_prefix",
"fields": [
"title",
"title._2gram",
"title._3gram"
]
}
},
"size": size
}
result = self.es.search(index=self.index_name, body=body)
return [hit['_source'] for hit in result['hits']['hits']]
Ranking Algorithms
TF-IDF
Copy
TF-IDF = Term Frequency × Inverse Document Frequency
TF = (occurrences of term in document) / (total terms in document)
IDF = log(total documents / documents containing term)
Example:
Document has 100 words, "machine" appears 3 times
1000 documents total, "machine" in 50 documents
TF = 3/100 = 0.03
IDF = log(1000/50) = log(20) = 1.30
TF-IDF = 0.03 × 1.30 = 0.039
BM25
Copy
BM25 is an improved version of TF-IDF used by Elasticsearch.
BM25(D, Q) = Σ IDF(qi) × (f(qi, D) × (k1 + 1)) / (f(qi, D) + k1 × (1 - b + b × |D|/avgdl))
Where:
- f(qi, D) = term frequency in document
- |D| = document length
- avgdl = average document length
- k1 = term saturation parameter (1.2-2.0)
- b = length normalization (0.75)
Key insight: Term frequency has diminishing returns (saturation)
Learning to Rank (L2R)
Copy
┌─────────────────────────────────────────────────────────────────┐
│ LEARNING TO RANK │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Extract Features: │
│ • BM25 score │
│ • Document popularity │
│ • Click-through rate │
│ • Recency │
│ • User preferences │
│ │
│ 2. Train ML Model: │
│ • Pointwise: Predict relevance score │
│ • Pairwise: Predict which doc is better (RankNet) │
│ • Listwise: Optimize ranking metric (LambdaMART) │
│ │
│ 3. Apply at Query Time: │
│ Query → Retrieve candidates → Re-rank with ML → Results │
│ │
└─────────────────────────────────────────────────────────────────┘
Search Query Processing
Copy
class QueryProcessor:
"""
Process and enhance search queries.
"""
def __init__(self):
self.synonyms = {
"phone": ["mobile", "smartphone", "cell"],
"laptop": ["notebook", "computer"],
"cheap": ["affordable", "budget", "inexpensive"]
}
self.spell_corrections = {} # Load from dictionary
def process(self, query: str) -> Dict:
"""Full query processing pipeline"""
# 1. Clean and normalize
query = query.lower().strip()
# 2. Tokenize
tokens = query.split()
# 3. Spell correction
corrected_tokens = [self._spell_correct(t) for t in tokens]
# 4. Synonym expansion
expanded_tokens = []
for token in corrected_tokens:
expanded_tokens.append(token)
if token in self.synonyms:
expanded_tokens.extend(self.synonyms[token])
# 5. Detect intent
intent = self._detect_intent(query)
# 6. Extract filters
filters = self._extract_filters(query)
return {
"original": query,
"processed": " ".join(corrected_tokens),
"expanded": expanded_tokens,
"intent": intent,
"filters": filters
}
def _spell_correct(self, word: str) -> str:
"""Simple spell correction using edit distance"""
# In production, use libraries like pyspellchecker
return self.spell_corrections.get(word, word)
def _detect_intent(self, query: str) -> str:
"""Detect search intent"""
navigational_patterns = ['go to', 'open', 'find page']
transactional_patterns = ['buy', 'purchase', 'order', 'price']
for pattern in navigational_patterns:
if pattern in query:
return "navigational"
for pattern in transactional_patterns:
if pattern in query:
return "transactional"
return "informational"
def _extract_filters(self, query: str) -> Dict:
"""Extract structured filters from query"""
import re
filters = {}
# Price filter: "under $100", "less than 50"
price_match = re.search(r'under \$?(\d+)|less than \$?(\d+)', query)
if price_match:
price = price_match.group(1) or price_match.group(2)
filters['price_max'] = float(price)
# Color filter
colors = ['red', 'blue', 'green', 'black', 'white']
for color in colors:
if color in query:
filters['color'] = color
break
# Size filter
sizes = ['small', 'medium', 'large', 'xl', 'xxl']
for size in sizes:
if size in query:
filters['size'] = size
break
return filters
Scaling Search
Copy
┌─────────────────────────────────────────────────────────────────┐
│ SEARCH SCALING STRATEGIES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. SHARDING │
│ • Document-based: Hash doc ID to shard │
│ • Term-based: Hash term to shard (rare) │
│ • Time-based: Recent data in hot shards │
│ │
│ 2. REPLICATION │
│ • Read replicas for query load │
│ • Async replication acceptable │
│ • Cross-region replicas for latency │
│ │
│ 3. CACHING │
│ • Query result cache (common queries) │
│ • Filter cache (frequently used filters) │
│ • Segment cache (hot Lucene segments) │
│ │
│ 4. TIERED STORAGE │
│ • Hot tier: SSD, recent/popular data │
│ • Warm tier: HDD, older data │
│ • Cold tier: Object storage, archive │
│ │
│ 5. QUERY OPTIMIZATION │
│ • Two-stage retrieval: Fast candidate → Re-rank │
│ • Query routing: Route to relevant shards only │
│ • Timeout handling: Return partial results │
│ │
└─────────────────────────────────────────────────────────────────┘
Interview Tips
What interviewers expect:
- Know inverted index: “It maps terms to documents for O(1) lookups”
- BM25 basics: “It’s TF-IDF with term saturation and length normalization”
- Scaling awareness: “Shard by document ID, replicate for read scaling”
- Query processing: “Tokenize, normalize, expand synonyms, spell correct”
Common Questions
Q: How would you implement autocomplete?Use a completion suggester or prefix trie. Store popular queries/titles. Rank by popularity/recency. Cache heavily as prefixes are repeated.Q: How does Elasticsearch scale?
Sharding for write scaling (each shard is an independent Lucene index). Replication for read scaling. Queries scatter to shards, gather and merge results.Q: How would you handle typos in search?
Fuzzy matching with edit distance (Levenshtein). n-gram tokenization for partial matches. Spell correction using dictionary lookups.