Skip to main content
Document processing is fundamental to RAG applications. This chapter covers extracting text from various formats, implementing chunking strategies, and building robust document pipelines.

PDF Text Extraction

Basic PDF Extraction with PyMuPDF

PyMuPDF (fitz) provides fast and accurate PDF text extraction:
import fitz  # PyMuPDF
from dataclasses import dataclass
from pathlib import Path


@dataclass
class ExtractedPage:
    """Represents an extracted PDF page."""
    page_number: int
    text: str
    metadata: dict


class PDFExtractor:
    """Extract text and metadata from PDF documents."""
    
    def extract(self, pdf_path: str | Path) -> list[ExtractedPage]:
        """Extract all pages from a PDF."""
        pdf_path = Path(pdf_path)
        
        if not pdf_path.exists():
            raise FileNotFoundError(f"PDF not found: {pdf_path}")
        
        pages = []
        
        with fitz.open(pdf_path) as doc:
            metadata = {
                "title": doc.metadata.get("title", ""),
                "author": doc.metadata.get("author", ""),
                "page_count": len(doc),
                "file_name": pdf_path.name,
            }
            
            for page_num, page in enumerate(doc, start=1):
                text = page.get_text("text")
                
                # Clean up whitespace
                text = self._clean_text(text)
                
                pages.append(ExtractedPage(
                    page_number=page_num,
                    text=text,
                    metadata={**metadata, "page": page_num}
                ))
        
        return pages
    
    def _clean_text(self, text: str) -> str:
        """Clean extracted text."""
        import re
        
        # Normalize whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove page artifacts
        text = re.sub(r'\x00', '', text)
        
        return text.strip()


# Usage
extractor = PDFExtractor()
pages = extractor.extract("document.pdf")

for page in pages:
    print(f"Page {page.page_number}: {len(page.text)} chars")

Table Extraction with pdfplumber

For documents with tables, pdfplumber excels:
import pdfplumber
from dataclasses import dataclass


@dataclass
class ExtractedTable:
    """Represents an extracted table."""
    page_number: int
    table_number: int
    headers: list[str]
    rows: list[list[str]]


class TableExtractor:
    """Extract tables from PDF documents."""
    
    def __init__(self, min_rows: int = 2):
        self.min_rows = min_rows
    
    def extract_tables(self, pdf_path: str) -> list[ExtractedTable]:
        """Extract all tables from a PDF."""
        tables = []
        
        with pdfplumber.open(pdf_path) as pdf:
            for page_num, page in enumerate(pdf.pages, start=1):
                page_tables = page.extract_tables()
                
                for table_num, table in enumerate(page_tables, start=1):
                    if len(table) >= self.min_rows:
                        # First row as headers
                        headers = [str(cell or "") for cell in table[0]]
                        rows = [
                            [str(cell or "") for cell in row]
                            for row in table[1:]
                        ]
                        
                        tables.append(ExtractedTable(
                            page_number=page_num,
                            table_number=table_num,
                            headers=headers,
                            rows=rows
                        ))
        
        return tables
    
    def table_to_markdown(self, table: ExtractedTable) -> str:
        """Convert table to markdown format."""
        lines = []
        
        # Headers
        lines.append("| " + " | ".join(table.headers) + " |")
        lines.append("| " + " | ".join(["---"] * len(table.headers)) + " |")
        
        # Rows
        for row in table.rows:
            # Pad row if needed
            padded_row = row + [""] * (len(table.headers) - len(row))
            lines.append("| " + " | ".join(padded_row[:len(table.headers)]) + " |")
        
        return "\n".join(lines)


# Usage
table_extractor = TableExtractor()
tables = table_extractor.extract_tables("report.pdf")

for table in tables:
    print(f"Table on page {table.page_number}:")
    print(table_extractor.table_to_markdown(table))

Text Chunking Strategies

Chunking determines how documents are split for embedding and retrieval.

Fixed-Size Chunking

Simple but effective for uniform documents:
from dataclasses import dataclass


@dataclass
class Chunk:
    """Represents a text chunk."""
    text: str
    metadata: dict
    chunk_index: int


class FixedSizeChunker:
    """Split text into fixed-size chunks with overlap."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split text into overlapping chunks."""
        metadata = metadata or {}
        chunks = []
        
        if len(text) <= self.chunk_size:
            return [Chunk(text=text, metadata=metadata, chunk_index=0)]
        
        start = 0
        chunk_index = 0
        
        while start < len(text):
            end = start + self.chunk_size
            
            # Find a good break point
            if end < len(text):
                end = self._find_break_point(text, end)
            
            chunk_text = text[start:end].strip()
            
            if chunk_text:
                chunks.append(Chunk(
                    text=chunk_text,
                    metadata={
                        **metadata,
                        "chunk_start": start,
                        "chunk_end": end,
                    },
                    chunk_index=chunk_index
                ))
                chunk_index += 1
            
            # Move start with overlap
            start = end - self.chunk_overlap
            
            # Prevent infinite loop
            if start <= chunks[-1].metadata.get("chunk_start", 0) if chunks else False:
                start = end
        
        return chunks
    
    def _find_break_point(self, text: str, position: int) -> int:
        """Find a natural break point near position."""
        # Look for paragraph break
        para_break = text.rfind('\n\n', position - 100, position)
        if para_break > position - 100:
            return para_break + 2
        
        # Look for sentence break
        for punct in ['. ', '! ', '? ']:
            sent_break = text.rfind(punct, position - 100, position)
            if sent_break > position - 100:
                return sent_break + 2
        
        # Look for word break
        space = text.rfind(' ', position - 50, position)
        if space > position - 50:
            return space + 1
        
        return position


# Usage
chunker = FixedSizeChunker(chunk_size=500, chunk_overlap=100)
text = "Your long document text here..."
chunks = chunker.chunk(text, {"source": "document.pdf"})

Semantic Chunking

Split based on semantic similarity for better coherence:
import numpy as np
from openai import OpenAI


class SemanticChunker:
    """Split text based on semantic similarity."""
    
    def __init__(
        self,
        client: OpenAI,
        similarity_threshold: float = 0.8,
        min_chunk_size: int = 100,
        max_chunk_size: int = 2000
    ):
        self.client = client
        self.similarity_threshold = similarity_threshold
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Split text at semantic boundaries."""
        metadata = metadata or {}
        
        # Split into sentences
        sentences = self._split_sentences(text)
        
        if len(sentences) <= 1:
            return [Chunk(text=text, metadata=metadata, chunk_index=0)]
        
        # Get embeddings for sentences
        embeddings = self._embed_sentences(sentences)
        
        # Find semantic breaks
        break_points = self._find_semantic_breaks(embeddings)
        
        # Create chunks from break points
        chunks = []
        start_idx = 0
        
        for chunk_index, end_idx in enumerate(break_points):
            chunk_text = " ".join(sentences[start_idx:end_idx + 1])
            
            # Ensure chunk meets size requirements
            if len(chunk_text) >= self.min_chunk_size:
                chunks.append(Chunk(
                    text=chunk_text,
                    metadata=metadata,
                    chunk_index=chunk_index
                ))
            elif chunks:
                # Merge with previous chunk
                chunks[-1] = Chunk(
                    text=chunks[-1].text + " " + chunk_text,
                    metadata=chunks[-1].metadata,
                    chunk_index=chunks[-1].chunk_index
                )
            
            start_idx = end_idx + 1
        
        return chunks
    
    def _split_sentences(self, text: str) -> list[str]:
        """Split text into sentences."""
        import re
        
        # Simple sentence splitting
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _embed_sentences(self, sentences: list[str]) -> np.ndarray:
        """Get embeddings for sentences."""
        response = self.client.embeddings.create(
            model="text-embedding-3-small",
            input=sentences
        )
        
        return np.array([e.embedding for e in response.data])
    
    def _find_semantic_breaks(self, embeddings: np.ndarray) -> list[int]:
        """Find indices where semantic breaks occur."""
        breaks = []
        current_chunk_start = 0
        current_chunk_size = 0
        
        for i in range(len(embeddings) - 1):
            # Calculate similarity with next sentence
            similarity = self._cosine_similarity(
                embeddings[i], embeddings[i + 1]
            )
            
            current_chunk_size += 1
            
            # Break if low similarity or max size reached
            if similarity < self.similarity_threshold:
                breaks.append(i)
                current_chunk_start = i + 1
                current_chunk_size = 0
        
        # Add final break
        breaks.append(len(embeddings) - 1)
        
        return breaks
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """Calculate cosine similarity between vectors."""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

Recursive Chunking

Hierarchical chunking for structured documents:
class RecursiveChunker:
    """Recursively split text using multiple separators."""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: list[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or [
            "\n\n\n",  # Multiple newlines
            "\n\n",    # Paragraphs
            "\n",      # Lines
            ". ",      # Sentences
            " ",       # Words
            ""         # Characters
        ]
    
    def chunk(self, text: str, metadata: dict = None) -> list[Chunk]:
        """Recursively split text."""
        metadata = metadata or {}
        
        chunks = self._split_recursive(text, self.separators)
        
        return [
            Chunk(text=chunk, metadata=metadata, chunk_index=i)
            for i, chunk in enumerate(chunks)
        ]
    
    def _split_recursive(
        self,
        text: str,
        separators: list[str]
    ) -> list[str]:
        """Recursively split using separators."""
        if not text:
            return []
        
        if len(text) <= self.chunk_size:
            return [text]
        
        if not separators:
            # No more separators, force split
            return self._force_split(text)
        
        separator = separators[0]
        remaining_separators = separators[1:]
        
        if separator == "":
            # Character-level split
            return self._force_split(text)
        
        splits = text.split(separator)
        
        chunks = []
        current_chunk = ""
        
        for split in splits:
            test_chunk = (
                current_chunk + separator + split
                if current_chunk else split
            )
            
            if len(test_chunk) <= self.chunk_size:
                current_chunk = test_chunk
            else:
                if current_chunk:
                    chunks.append(current_chunk)
                
                if len(split) > self.chunk_size:
                    # Recursively split with next separator
                    sub_chunks = self._split_recursive(
                        split, remaining_separators
                    )
                    chunks.extend(sub_chunks)
                    current_chunk = ""
                else:
                    current_chunk = split
        
        if current_chunk:
            chunks.append(current_chunk)
        
        return chunks
    
    def _force_split(self, text: str) -> list[str]:
        """Force split text into chunks."""
        chunks = []
        
        for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
            chunk = text[i:i + self.chunk_size]
            if chunk:
                chunks.append(chunk)
        
        return chunks

Document Loaders

Multi-Format Document Loader

Handle various document formats:
from abc import ABC, abstractmethod
from pathlib import Path
from dataclasses import dataclass
from typing import Protocol


@dataclass
class Document:
    """Represents a loaded document."""
    content: str
    metadata: dict
    source: str


class DocumentLoader(Protocol):
    """Protocol for document loaders."""
    
    def load(self, path: Path) -> Document:
        """Load a document from path."""
        ...
    
    def supports(self, path: Path) -> bool:
        """Check if loader supports the file type."""
        ...


class PDFLoader:
    """Load PDF documents."""
    
    def supports(self, path: Path) -> bool:
        return path.suffix.lower() == ".pdf"
    
    def load(self, path: Path) -> Document:
        import fitz
        
        text_parts = []
        
        with fitz.open(path) as doc:
            for page in doc:
                text_parts.append(page.get_text())
        
        return Document(
            content="\n\n".join(text_parts),
            metadata={
                "file_type": "pdf",
                "page_count": len(text_parts),
            },
            source=str(path)
        )


class MarkdownLoader:
    """Load Markdown documents."""
    
    def supports(self, path: Path) -> bool:
        return path.suffix.lower() in [".md", ".markdown"]
    
    def load(self, path: Path) -> Document:
        content = path.read_text(encoding="utf-8")
        
        # Extract title from first heading
        title = ""
        for line in content.split("\n"):
            if line.startswith("# "):
                title = line[2:].strip()
                break
        
        return Document(
            content=content,
            metadata={
                "file_type": "markdown",
                "title": title,
            },
            source=str(path)
        )


class TextLoader:
    """Load plain text documents."""
    
    def supports(self, path: Path) -> bool:
        return path.suffix.lower() in [".txt", ".text"]
    
    def load(self, path: Path) -> Document:
        content = path.read_text(encoding="utf-8")
        
        return Document(
            content=content,
            metadata={"file_type": "text"},
            source=str(path)
        )


class HTMLLoader:
    """Load HTML documents."""
    
    def supports(self, path: Path) -> bool:
        return path.suffix.lower() in [".html", ".htm"]
    
    def load(self, path: Path) -> Document:
        from bs4 import BeautifulSoup
        
        html = path.read_text(encoding="utf-8")
        soup = BeautifulSoup(html, "html.parser")
        
        # Remove script and style elements
        for element in soup(["script", "style", "nav", "footer"]):
            element.decompose()
        
        # Extract text
        text = soup.get_text(separator="\n")
        
        # Get title
        title = soup.title.string if soup.title else ""
        
        return Document(
            content=text,
            metadata={
                "file_type": "html",
                "title": title,
            },
            source=str(path)
        )


class UniversalDocumentLoader:
    """Load documents of various formats."""
    
    def __init__(self):
        self.loaders: list[DocumentLoader] = [
            PDFLoader(),
            MarkdownLoader(),
            TextLoader(),
            HTMLLoader(),
        ]
    
    def load(self, path: str | Path) -> Document:
        """Load a document using the appropriate loader."""
        path = Path(path)
        
        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")
        
        for loader in self.loaders:
            if loader.supports(path):
                return loader.load(path)
        
        raise ValueError(f"Unsupported file type: {path.suffix}")
    
    def load_directory(
        self,
        directory: str | Path,
        recursive: bool = True
    ) -> list[Document]:
        """Load all documents from a directory."""
        directory = Path(directory)
        documents = []
        
        pattern = "**/*" if recursive else "*"
        
        for file_path in directory.glob(pattern):
            if file_path.is_file():
                try:
                    doc = self.load(file_path)
                    documents.append(doc)
                except ValueError:
                    # Skip unsupported files
                    continue
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
        
        return documents


# Usage
loader = UniversalDocumentLoader()

# Load single document
doc = loader.load("report.pdf")

# Load entire directory
docs = loader.load_directory("documents/", recursive=True)
print(f"Loaded {len(docs)} documents")

Complete Document Processing Pipeline

Combine extraction, chunking, and embedding:
from dataclasses import dataclass
from pathlib import Path
import hashlib
import json
from openai import OpenAI


@dataclass
class ProcessedDocument:
    """A fully processed document with chunks and embeddings."""
    source: str
    chunks: list[Chunk]
    embeddings: list[list[float]]
    metadata: dict


class DocumentPipeline:
    """Complete document processing pipeline."""
    
    def __init__(
        self,
        openai_client: OpenAI,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        embedding_model: str = "text-embedding-3-small"
    ):
        self.client = openai_client
        self.loader = UniversalDocumentLoader()
        self.chunker = RecursiveChunker(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        self.embedding_model = embedding_model
    
    def process(self, path: str | Path) -> ProcessedDocument:
        """Process a single document."""
        path = Path(path)
        
        # Load document
        document = self.loader.load(path)
        
        # Chunk document
        chunks = self.chunker.chunk(
            document.content,
            metadata=document.metadata
        )
        
        # Generate embeddings
        embeddings = self._embed_chunks(chunks)
        
        return ProcessedDocument(
            source=str(path),
            chunks=chunks,
            embeddings=embeddings,
            metadata={
                **document.metadata,
                "document_hash": self._hash_content(document.content),
                "chunk_count": len(chunks),
            }
        )
    
    def process_batch(
        self,
        paths: list[str | Path]
    ) -> list[ProcessedDocument]:
        """Process multiple documents."""
        return [self.process(path) for path in paths]
    
    def _embed_chunks(self, chunks: list[Chunk]) -> list[list[float]]:
        """Generate embeddings for chunks."""
        if not chunks:
            return []
        
        texts = [chunk.text for chunk in chunks]
        
        # Batch embeddings (max 2048 per request)
        all_embeddings = []
        batch_size = 2048
        
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i + batch_size]
            
            response = self.client.embeddings.create(
                model=self.embedding_model,
                input=batch
            )
            
            batch_embeddings = [e.embedding for e in response.data]
            all_embeddings.extend(batch_embeddings)
        
        return all_embeddings
    
    def _hash_content(self, content: str) -> str:
        """Generate hash of content for deduplication."""
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def save(self, processed: ProcessedDocument, output_path: str | Path):
        """Save processed document to JSON."""
        output_path = Path(output_path)
        
        data = {
            "source": processed.source,
            "metadata": processed.metadata,
            "chunks": [
                {
                    "text": chunk.text,
                    "metadata": chunk.metadata,
                    "chunk_index": chunk.chunk_index,
                    "embedding": processed.embeddings[i]
                }
                for i, chunk in enumerate(processed.chunks)
            ]
        }
        
        with open(output_path, "w") as f:
            json.dump(data, f)
    
    def load(self, input_path: str | Path) -> ProcessedDocument:
        """Load processed document from JSON."""
        input_path = Path(input_path)
        
        with open(input_path) as f:
            data = json.load(f)
        
        chunks = [
            Chunk(
                text=c["text"],
                metadata=c["metadata"],
                chunk_index=c["chunk_index"]
            )
            for c in data["chunks"]
        ]
        
        embeddings = [c["embedding"] for c in data["chunks"]]
        
        return ProcessedDocument(
            source=data["source"],
            chunks=chunks,
            embeddings=embeddings,
            metadata=data["metadata"]
        )


# Usage
client = OpenAI()
pipeline = DocumentPipeline(client)

# Process single document
processed = pipeline.process("research_paper.pdf")
print(f"Created {len(processed.chunks)} chunks")

# Save for later use
pipeline.save(processed, "processed_paper.json")

# Load processed document
loaded = pipeline.load("processed_paper.json")
Chunking Best Practices
  • Start with 500-1000 character chunks for most use cases
  • Use 10-20% overlap to maintain context across boundaries
  • Semantic chunking works best for diverse content
  • Test retrieval quality with different chunk sizes

Practice Exercise

Build a document ingestion service:
  1. Accept PDF, Markdown, and HTML uploads
  2. Extract text with proper formatting
  3. Implement configurable chunking strategies
  4. Generate embeddings in batches
  5. Store chunks in a vector database
Focus on:
  • Handling large documents efficiently
  • Deduplication using content hashes
  • Progress tracking for batch processing
  • Error recovery for partial failures