PDF Text Extraction
Basic PDF Extraction with PyMuPDF
PyMuPDF (fitz) provides fast and accurate PDF text extraction:Copy
import fitz # PyMuPDF
from dataclasses import dataclass
from pathlib import Path
@dataclass
class ExtractedPage:
"""Represents an extracted PDF page."""
page_number: int
text: str
metadata: dict
class PDFExtractor:
"""Extract text and metadata from PDF documents."""
def extract(self, pdf_path: str | Path) -> list[ExtractedPage]:
"""Extract all pages from a PDF."""
pdf_path = Path(pdf_path)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF not found: {pdf_path}")
pages = []
with fitz.open(pdf_path) as doc:
metadata = {
"title": doc.metadata.get("title", ""),
"author": doc.metadata.get("author", ""),
"page_count": len(doc),
"file_name": pdf_path.name,
}
for page_num, page in enumerate(doc, start=1):
text = page.get_text("text")
# Clean up whitespace
text = self._clean_text(text)
pages.append(ExtractedPage(
page_number=page_num,
text=text,
metadata={**metadata, "page": page_num}
))
return pages
def _clean_text(self, text: str) -> str:
"""Clean extracted text."""
import re
# Normalize whitespace
text = re.sub(r'\s+', ' ', text)
# Remove page artifacts
text = re.sub(r'\x00', '', text)
return text.strip()
# Usage
extractor = PDFExtractor()
pages = extractor.extract("document.pdf")
for page in pages:
print(f"Page {page.page_number}: {len(page.text)} chars")
Table Extraction with pdfplumber
For documents with tables, pdfplumber excels:Copy
import pdfplumber
from dataclasses import dataclass
@dataclass
class ExtractedTable:
"""Represents an extracted table."""
page_number: int
table_number: int
headers: list[str]
rows: list[list[str]]
class TableExtractor:
"""Extract tables from PDF documents."""
def __init__(self, min_rows: int = 2):
self.min_rows = min_rows
def extract_tables(self, pdf_path: str) -> list[ExtractedTable]:
"""Extract all tables from a PDF."""
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
page_tables = page.extract_tables()
for table_num, table in enumerate(page_tables, start=1):
if len(table) >= self.min_rows:
# First row as headers
headers = [str(cell or "") for cell in table[0]]
rows = [
[str(cell or "") for cell in row]
for row in table[1:]
]
tables.append(ExtractedTable(
page_number=page_num,
table_number=table_num,
headers=headers,
rows=rows
))
return tables
def table_to_markdown(self, table: ExtractedTable) -> str:
"""Convert table to markdown format."""
lines = []
# Headers
lines.append("| " + " | ".join(table.headers) + " |")
lines.append("| " + " | ".join(["---"] * len(table.headers)) + " |")
# Rows
for row in table.rows:
# Pad row if needed
padded_row = row + [""] * (len(table.headers) - len(row))
lines.append("| " + " | ".join(padded_row[:len(table.headers)]) + " |")
return "\n".join(lines)
# Usage
table_extractor = TableExtractor()
tables = table_extractor.extract_tables("report.pdf")
for table in tables:
print(f"Table on page {table.page_number}:")
print(table_extractor.table_to_markdown(table))
Text Chunking Strategies
Chunking determines how documents are split for embedding and retrieval.Fixed-Size Chunking
Simple but effective for uniform documents:Copy
from dataclasses import dataclass
@dataclass
class Chunk:
"""Represents a text chunk."""
text: str
metadata: dict
chunk_index: int
class FixedSizeChunker:
"""Split text into fixed-size chunks with overlap."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split text into overlapping chunks."""
metadata = metadata or {}
chunks = []
if len(text) <= self.chunk_size:
return [Chunk(text=text, metadata=metadata, chunk_index=0)]
start = 0
chunk_index = 0
while start < len(text):
end = start + self.chunk_size
# Find a good break point
if end < len(text):
end = self._find_break_point(text, end)
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(Chunk(
text=chunk_text,
metadata={
**metadata,
"chunk_start": start,
"chunk_end": end,
},
chunk_index=chunk_index
))
chunk_index += 1
# Move start with overlap
start = end - self.chunk_overlap
# Prevent infinite loop
if start <= chunks[-1].metadata.get("chunk_start", 0) if chunks else False:
start = end
return chunks
def _find_break_point(self, text: str, position: int) -> int:
"""Find a natural break point near position."""
# Look for paragraph break
para_break = text.rfind('\n\n', position - 100, position)
if para_break > position - 100:
return para_break + 2
# Look for sentence break
for punct in ['. ', '! ', '? ']:
sent_break = text.rfind(punct, position - 100, position)
if sent_break > position - 100:
return sent_break + 2
# Look for word break
space = text.rfind(' ', position - 50, position)
if space > position - 50:
return space + 1
return position
# Usage
chunker = FixedSizeChunker(chunk_size=500, chunk_overlap=100)
text = "Your long document text here..."
chunks = chunker.chunk(text, {"source": "document.pdf"})
Semantic Chunking
Split based on semantic similarity for better coherence:Copy
import numpy as np
from openai import OpenAI
class SemanticChunker:
"""Split text based on semantic similarity."""
def __init__(
self,
client: OpenAI,
similarity_threshold: float = 0.8,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.client = client
self.similarity_threshold = similarity_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Split text at semantic boundaries."""
metadata = metadata or {}
# Split into sentences
sentences = self._split_sentences(text)
if len(sentences) <= 1:
return [Chunk(text=text, metadata=metadata, chunk_index=0)]
# Get embeddings for sentences
embeddings = self._embed_sentences(sentences)
# Find semantic breaks
break_points = self._find_semantic_breaks(embeddings)
# Create chunks from break points
chunks = []
start_idx = 0
for chunk_index, end_idx in enumerate(break_points):
chunk_text = " ".join(sentences[start_idx:end_idx + 1])
# Ensure chunk meets size requirements
if len(chunk_text) >= self.min_chunk_size:
chunks.append(Chunk(
text=chunk_text,
metadata=metadata,
chunk_index=chunk_index
))
elif chunks:
# Merge with previous chunk
chunks[-1] = Chunk(
text=chunks[-1].text + " " + chunk_text,
metadata=chunks[-1].metadata,
chunk_index=chunks[-1].chunk_index
)
start_idx = end_idx + 1
return chunks
def _split_sentences(self, text: str) -> list[str]:
"""Split text into sentences."""
import re
# Simple sentence splitting
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
def _embed_sentences(self, sentences: list[str]) -> np.ndarray:
"""Get embeddings for sentences."""
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=sentences
)
return np.array([e.embedding for e in response.data])
def _find_semantic_breaks(self, embeddings: np.ndarray) -> list[int]:
"""Find indices where semantic breaks occur."""
breaks = []
current_chunk_start = 0
current_chunk_size = 0
for i in range(len(embeddings) - 1):
# Calculate similarity with next sentence
similarity = self._cosine_similarity(
embeddings[i], embeddings[i + 1]
)
current_chunk_size += 1
# Break if low similarity or max size reached
if similarity < self.similarity_threshold:
breaks.append(i)
current_chunk_start = i + 1
current_chunk_size = 0
# Add final break
breaks.append(len(embeddings) - 1)
return breaks
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity between vectors."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Recursive Chunking
Hierarchical chunking for structured documents:Copy
class RecursiveChunker:
"""Recursively split text using multiple separators."""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: list[str] = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or [
"\n\n\n", # Multiple newlines
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
" ", # Words
"" # Characters
]
def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
"""Recursively split text."""
metadata = metadata or {}
chunks = self._split_recursive(text, self.separators)
return [
Chunk(text=chunk, metadata=metadata, chunk_index=i)
for i, chunk in enumerate(chunks)
]
def _split_recursive(
self,
text: str,
separators: list[str]
) -> list[str]:
"""Recursively split using separators."""
if not text:
return []
if len(text) <= self.chunk_size:
return [text]
if not separators:
# No more separators, force split
return self._force_split(text)
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Character-level split
return self._force_split(text)
splits = text.split(separator)
chunks = []
current_chunk = ""
for split in splits:
test_chunk = (
current_chunk + separator + split
if current_chunk else split
)
if len(test_chunk) <= self.chunk_size:
current_chunk = test_chunk
else:
if current_chunk:
chunks.append(current_chunk)
if len(split) > self.chunk_size:
# Recursively split with next separator
sub_chunks = self._split_recursive(
split, remaining_separators
)
chunks.extend(sub_chunks)
current_chunk = ""
else:
current_chunk = split
if current_chunk:
chunks.append(current_chunk)
return chunks
def _force_split(self, text: str) -> list[str]:
"""Force split text into chunks."""
chunks = []
for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
chunk = text[i:i + self.chunk_size]
if chunk:
chunks.append(chunk)
return chunks
Document Loaders
Multi-Format Document Loader
Handle various document formats:Copy
from abc import ABC, abstractmethod
from pathlib import Path
from dataclasses import dataclass
from typing import Protocol
@dataclass
class Document:
"""Represents a loaded document."""
content: str
metadata: dict
source: str
class DocumentLoader(Protocol):
"""Protocol for document loaders."""
def load(self, path: Path) -> Document:
"""Load a document from path."""
...
def supports(self, path: Path) -> bool:
"""Check if loader supports the file type."""
...
class PDFLoader:
"""Load PDF documents."""
def supports(self, path: Path) -> bool:
return path.suffix.lower() == ".pdf"
def load(self, path: Path) -> Document:
import fitz
text_parts = []
with fitz.open(path) as doc:
for page in doc:
text_parts.append(page.get_text())
return Document(
content="\n\n".join(text_parts),
metadata={
"file_type": "pdf",
"page_count": len(text_parts),
},
source=str(path)
)
class MarkdownLoader:
"""Load Markdown documents."""
def supports(self, path: Path) -> bool:
return path.suffix.lower() in [".md", ".markdown"]
def load(self, path: Path) -> Document:
content = path.read_text(encoding="utf-8")
# Extract title from first heading
title = ""
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return Document(
content=content,
metadata={
"file_type": "markdown",
"title": title,
},
source=str(path)
)
class TextLoader:
"""Load plain text documents."""
def supports(self, path: Path) -> bool:
return path.suffix.lower() in [".txt", ".text"]
def load(self, path: Path) -> Document:
content = path.read_text(encoding="utf-8")
return Document(
content=content,
metadata={"file_type": "text"},
source=str(path)
)
class HTMLLoader:
"""Load HTML documents."""
def supports(self, path: Path) -> bool:
return path.suffix.lower() in [".html", ".htm"]
def load(self, path: Path) -> Document:
from bs4 import BeautifulSoup
html = path.read_text(encoding="utf-8")
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "nav", "footer"]):
element.decompose()
# Extract text
text = soup.get_text(separator="\n")
# Get title
title = soup.title.string if soup.title else ""
return Document(
content=text,
metadata={
"file_type": "html",
"title": title,
},
source=str(path)
)
class UniversalDocumentLoader:
"""Load documents of various formats."""
def __init__(self):
self.loaders: list[DocumentLoader] = [
PDFLoader(),
MarkdownLoader(),
TextLoader(),
HTMLLoader(),
]
def load(self, path: str | Path) -> Document:
"""Load a document using the appropriate loader."""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
for loader in self.loaders:
if loader.supports(path):
return loader.load(path)
raise ValueError(f"Unsupported file type: {path.suffix}")
def load_directory(
self,
directory: str | Path,
recursive: bool = True
) -> list[Document]:
"""Load all documents from a directory."""
directory = Path(directory)
documents = []
pattern = "**/*" if recursive else "*"
for file_path in directory.glob(pattern):
if file_path.is_file():
try:
doc = self.load(file_path)
documents.append(doc)
except ValueError:
# Skip unsupported files
continue
except Exception as e:
print(f"Error loading {file_path}: {e}")
return documents
# Usage
loader = UniversalDocumentLoader()
# Load single document
doc = loader.load("report.pdf")
# Load entire directory
docs = loader.load_directory("documents/", recursive=True)
print(f"Loaded {len(docs)} documents")
Complete Document Processing Pipeline
Combine extraction, chunking, and embedding:Copy
from dataclasses import dataclass
from pathlib import Path
import hashlib
import json
from openai import OpenAI
@dataclass
class ProcessedDocument:
"""A fully processed document with chunks and embeddings."""
source: str
chunks: list[Chunk]
embeddings: list[list[float]]
metadata: dict
class DocumentPipeline:
"""Complete document processing pipeline."""
def __init__(
self,
openai_client: OpenAI,
chunk_size: int = 1000,
chunk_overlap: int = 200,
embedding_model: str = "text-embedding-3-small"
):
self.client = openai_client
self.loader = UniversalDocumentLoader()
self.chunker = RecursiveChunker(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
self.embedding_model = embedding_model
def process(self, path: str | Path) -> ProcessedDocument:
"""Process a single document."""
path = Path(path)
# Load document
document = self.loader.load(path)
# Chunk document
chunks = self.chunker.chunk(
document.content,
metadata=document.metadata
)
# Generate embeddings
embeddings = self._embed_chunks(chunks)
return ProcessedDocument(
source=str(path),
chunks=chunks,
embeddings=embeddings,
metadata={
**document.metadata,
"document_hash": self._hash_content(document.content),
"chunk_count": len(chunks),
}
)
def process_batch(
self,
paths: list[str | Path]
) -> list[ProcessedDocument]:
"""Process multiple documents."""
return [self.process(path) for path in paths]
def _embed_chunks(self, chunks: list[Chunk]) -> list[list[float]]:
"""Generate embeddings for chunks."""
if not chunks:
return []
texts = [chunk.text for chunk in chunks]
# Batch embeddings (max 2048 per request)
all_embeddings = []
batch_size = 2048
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = self.client.embeddings.create(
model=self.embedding_model,
input=batch
)
batch_embeddings = [e.embedding for e in response.data]
all_embeddings.extend(batch_embeddings)
return all_embeddings
def _hash_content(self, content: str) -> str:
"""Generate hash of content for deduplication."""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def save(self, processed: ProcessedDocument, output_path: str | Path):
"""Save processed document to JSON."""
output_path = Path(output_path)
data = {
"source": processed.source,
"metadata": processed.metadata,
"chunks": [
{
"text": chunk.text,
"metadata": chunk.metadata,
"chunk_index": chunk.chunk_index,
"embedding": processed.embeddings[i]
}
for i, chunk in enumerate(processed.chunks)
]
}
with open(output_path, "w") as f:
json.dump(data, f)
def load(self, input_path: str | Path) -> ProcessedDocument:
"""Load processed document from JSON."""
input_path = Path(input_path)
with open(input_path) as f:
data = json.load(f)
chunks = [
Chunk(
text=c["text"],
metadata=c["metadata"],
chunk_index=c["chunk_index"]
)
for c in data["chunks"]
]
embeddings = [c["embedding"] for c in data["chunks"]]
return ProcessedDocument(
source=data["source"],
chunks=chunks,
embeddings=embeddings,
metadata=data["metadata"]
)
# Usage
client = OpenAI()
pipeline = DocumentPipeline(client)
# Process single document
processed = pipeline.process("research_paper.pdf")
print(f"Created {len(processed.chunks)} chunks")
# Save for later use
pipeline.save(processed, "processed_paper.json")
# Load processed document
loaded = pipeline.load("processed_paper.json")
Chunking Best Practices
- Start with 500-1000 character chunks for most use cases
- Use 10-20% overlap to maintain context across boundaries
- Semantic chunking works best for diverse content
- Test retrieval quality with different chunk sizes
Practice Exercise
Build a document ingestion service:- Accept PDF, Markdown, and HTML uploads
- Extract text with proper formatting
- Implement configurable chunking strategies
- Generate embeddings in batches
- Store chunks in a vector database
- Handling large documents efficiently
- Deduplication using content hashes
- Progress tracking for batch processing
- Error recovery for partial failures