Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Production patterns for processing thousands of LLM requests efficiently while respecting rate limits and managing costs.

Why Batch Processing?

Every LLM application eventually hits the moment where you need to process not 10 requests, but 10,000. Maybe you’re embedding an entire document corpus, classifying a backlog of support tickets, or generating product descriptions for a catalog. At this scale, sequential processing is not slow — it’s comically impractical. The math makes this concrete: if each LLM call takes 3 seconds and you have 10,000 prompts, sequential processing takes 8.3 hours. With batched concurrent processing (20 workers), that drops to about 25 minutes. With the OpenAI Batch API, you submit the whole thing and come back in a few hours — at 50% the cost.
Single Request Pattern              Batch Pattern
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Request → Wait → Response           [Request Pool]
Request → Wait → Response              ↓
Request → Wait → Response           Concurrent Workers
...                                    ↓
Total: N x (latency + overhead)     [Response Queue]
                                    Total: ceil(N/batch) x latency

OpenAI Batch API

OpenAI’s Batch API is a fundamentally different approach: instead of making real-time API calls, you submit a file of requests and OpenAI processes them asynchronously within a 24-hour window. The tradeoff is latency for cost — you get 50% off both input and output tokens. When to use it: anytime you don’t need results immediately. Data pipelines, nightly report generation, bulk content creation, embedding large corpora, evaluation runs — all perfect candidates. When NOT to use it: anything user-facing or time-sensitive. Think of it like the difference between express and standard shipping: same package, same destination, but standard is half the price because the carrier can optimize their route.
import json
from openai import OpenAI
from pathlib import Path
import time

client = OpenAI()

def create_batch_request(
    requests: list[dict],
    output_path: str = "batch_requests.jsonl"
) -> str:
    """Create a JSONL file for batch processing.
    
    Each line is a self-contained request with a custom_id for
    correlating results. The custom_id is critical -- without it
    you can't match responses back to inputs because OpenAI doesn't
    guarantee response ordering.
    """
    
    with open(output_path, 'w') as f:
        for i, req in enumerate(requests):
            batch_item = {
                "custom_id": f"request-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": req.get("model", "gpt-4o-mini"),
                    "messages": req["messages"],
                    "max_tokens": req.get("max_tokens", 1000)
                }
            }
            f.write(json.dumps(batch_item) + "\n")
    
    return output_path

def submit_batch(file_path: str) -> str:
    """Submit batch job to OpenAI.
    
    The two-step process (upload file, then create batch) exists because
    OpenAI validates the file format during upload. If the JSONL is
    malformed, you'll get the error here rather than mid-processing.
    
    Production tip: Always save the batch_id somewhere persistent
    (database, file). If your script crashes after submission but
    before polling, you need the batch_id to retrieve results.
    """
    
    # Step 1: Upload the JSONL file to OpenAI's file storage
    batch_file = client.files.create(
        file=open(file_path, "rb"),
        purpose="batch"  # Must be "batch" -- other purposes won't work
    )
    
    # Step 2: Create the batch job referencing the uploaded file
    batch = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",  # Only chat completions supported
        completion_window="24h",          # Currently the only option
        metadata={
            "description": "Batch processing job"
            # Add your own metadata here for tracking: run_id, user, etc.
        }
    )
    
    return batch.id

def check_batch_status(batch_id: str) -> dict:
    """Check status of a batch job"""
    batch = client.batches.retrieve(batch_id)
    
    return {
        "status": batch.status,
        "total": batch.request_counts.total,
        "completed": batch.request_counts.completed,
        "failed": batch.request_counts.failed
    }

def get_batch_results(batch_id: str) -> list[dict]:
    """Get results from completed batch"""
    batch = client.batches.retrieve(batch_id)
    
    if batch.status != "completed":
        raise ValueError(f"Batch not completed. Status: {batch.status}")
    
    # Download results
    result_file = client.files.content(batch.output_file_id)
    results = []
    
    for line in result_file.text.strip().split("\n"):
        result = json.loads(line)
        results.append({
            "custom_id": result["custom_id"],
            "response": result["response"]["body"]["choices"][0]["message"]["content"],
            "usage": result["response"]["body"]["usage"]
        })
    
    return results

# Complete batch workflow
def run_batch_workflow(prompts: list[str]) -> list[dict]:
    """Complete batch processing workflow.
    
    This is a synchronous polling loop -- fine for scripts and notebooks.
    For production systems, submit the batch and use a scheduled job
    (cron, Step Functions, Celery beat) to check status periodically
    rather than keeping a process alive just to poll.
    """
    
    # Prepare requests
    requests = [
        {"messages": [{"role": "user", "content": p}]}
        for p in prompts
    ]
    
    # Create and submit batch
    file_path = create_batch_request(requests)
    batch_id = submit_batch(file_path)
    print(f"Submitted batch: {batch_id}")
    
    # Poll for completion -- in practice, batches can take 1-24 hours.
    # Polling every 60 seconds is a reasonable cadence.
    while True:
        status = check_batch_status(batch_id)
        print(f"Status: {status['status']} - {status['completed']}/{status['total']}")
        
        if status["status"] == "completed":
            break
        elif status["status"] in ("failed", "expired", "cancelled"):
            raise Exception(f"Batch {status['status']}")
        
        time.sleep(60)
    
    return get_batch_results(batch_id)
Batch API gotcha: The 24-hour completion window is not a guarantee — it’s a maximum. Most batches complete much faster (minutes to hours), but don’t build workflows that assume completion in under 24 hours. Also, individual requests within a batch can fail even if the batch succeeds. Always check per-request error fields in the results.

Concurrent Processing with Rate Limiting

When you can’t wait for the Batch API (need results in minutes, not hours), the real-time concurrent pattern is your tool. This fires multiple requests simultaneously while respecting rate limits — think of it as a controlled firehose. The three knobs you’re tuning: requests per minute (RPM), tokens per minute (TPM), and max concurrent connections. Start conservative and increase. It’s far better to process a batch in 10 minutes at 80% rate utilization than to get rate-limited into a retry spiral that takes 30 minutes.
import asyncio
from asyncio import Semaphore
from dataclasses import dataclass
from typing import List, Callable, Any
from datetime import datetime, timedelta
import time

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 60       # Check your API tier for actual limit
    tokens_per_minute: int = 90000      # Token limit is often the binding constraint
    max_concurrent: int = 10            # Start here, increase if no errors

class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.request_tokens = config.requests_per_minute
        self.token_tokens = config.tokens_per_minute
        self.last_update = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self, estimated_tokens: int = 100):
        """Acquire rate limit tokens"""
        async with self.lock:
            now = time.time()
            elapsed = now - self.last_update
            
            # Refill tokens
            refill_rate_req = self.config.requests_per_minute / 60
            refill_rate_tok = self.config.tokens_per_minute / 60
            
            self.request_tokens = min(
                self.config.requests_per_minute,
                self.request_tokens + elapsed * refill_rate_req
            )
            self.token_tokens = min(
                self.config.tokens_per_minute,
                self.token_tokens + elapsed * refill_rate_tok
            )
            
            self.last_update = now
            
            # Wait if needed
            while self.request_tokens < 1 or self.token_tokens < estimated_tokens:
                wait_time = max(
                    (1 - self.request_tokens) / refill_rate_req,
                    (estimated_tokens - self.token_tokens) / refill_rate_tok
                )
                await asyncio.sleep(wait_time)
                
                elapsed = time.time() - self.last_update
                self.request_tokens += elapsed * refill_rate_req
                self.token_tokens += elapsed * refill_rate_tok
                self.last_update = time.time()
            
            # Consume tokens
            self.request_tokens -= 1
            self.token_tokens -= estimated_tokens

class ConcurrentProcessor:
    """Process requests concurrently with rate limiting.
    
    Combines three mechanisms:
    1. Rate limiter: controls request speed (RPM/TPM compliance)
    2. Semaphore: limits concurrent connections (resource protection)
    3. as_completed: yields results as they finish (progress tracking)
    """
    
    def __init__(
        self,
        process_fn: Callable,
        config: RateLimitConfig = None
    ):
        self.process_fn = process_fn
        self.config = config or RateLimitConfig()
        self.rate_limiter = RateLimiter(self.config)
        self.semaphore = Semaphore(self.config.max_concurrent)
    
    async def process_one(
        self,
        item: Any,
        index: int,
        estimated_tokens: int = 100
    ) -> tuple[int, Any, Any]:
        """Process a single item with rate limiting.
        
        Returns (index, item, result_or_exception). The index lets
        us reassemble results in original order even though
        as_completed yields them out of order.
        """
        
        # Rate limit BEFORE acquiring semaphore so we don't
        # hold a concurrency slot while waiting for rate tokens
        await self.rate_limiter.acquire(estimated_tokens)
        
        async with self.semaphore:
            try:
                result = await self.process_fn(item)
                return index, item, result
            except Exception as e:
                # Return exception as data, not raised -- one failure
                # should not kill the entire batch
                return index, item, e
    
    async def process_batch(
        self,
        items: List[Any],
        estimated_tokens_per_item: int = 100,
        on_progress: Callable[[int, int], None] = None
    ) -> List[tuple[Any, Any]]:
        """Process a batch of items with real-time progress.
        
        Uses as_completed (not gather) so we can report progress
        as each item finishes rather than waiting for all items.
        """
        
        tasks = [
            self.process_one(item, i, estimated_tokens_per_item)
            for i, item in enumerate(items)
        ]
        
        # Pre-allocate results array -- index-based insertion
        # ensures correct ordering despite out-of-order completion
        results = [None] * len(items)
        completed = 0
        
        for coro in asyncio.as_completed(tasks):
            index, item, result = await coro
            results[index] = (item, result)
            completed += 1
            
            if on_progress:
                on_progress(completed, len(items))
        
        return results

# Usage
async def process_prompt(prompt: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

processor = ConcurrentProcessor(
    process_fn=process_prompt,
    config=RateLimitConfig(
        requests_per_minute=500,
        tokens_per_minute=150000,
        max_concurrent=20
    )
)

prompts = ["Explain topic " + str(i) for i in range(100)]

results = await processor.process_batch(
    prompts,
    estimated_tokens_per_item=200,
    on_progress=lambda done, total: print(f"{done}/{total}")
)
Tuning max_concurrent: Start at 10-20 and monitor your 429 error rate. If it is 0%, you have headroom to increase. If it exceeds 1%, reduce concurrency. The optimal value depends on your API tier, average prompt size, and time of day (providers throttle harder during US business hours).

Queue-Based Processing

For very large volumes (100K+ items), asyncio.gather won’t cut it — you can’t create 100K coroutines and hope for the best. The queue-based pattern decouples “submitting work” from “processing work” using a producer-consumer model with a fixed pool of workers. This is the same pattern that powers every serious data pipeline: RabbitMQ consumers, Celery workers, SQS processors. The async queue version is lighter-weight (no external broker needed) but follows the same principles.
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Optional, Callable
import uuid

@dataclass
class Job:
    """Represents a single unit of work.
    
    The result and error fields are populated after processing.
    Status tracks the job lifecycle for monitoring and debugging.
    """
    id: str
    input: Any
    result: Optional[Any] = None
    error: Optional[Exception] = None
    status: str = "pending"  # pending -> processing -> completed/failed

class JobQueue:
    """Async job queue for batch processing"""
    
    def __init__(
        self,
        process_fn: Callable,
        num_workers: int = 5,
        rate_limiter: RateLimiter = None
    ):
        self.process_fn = process_fn
        self.num_workers = num_workers
        self.rate_limiter = rate_limiter
        self.queue: Queue[Job] = Queue()
        self.results: dict[str, Job] = {}
        self.workers: list[asyncio.Task] = []
        self.running = False
    
    async def worker(self, worker_id: int):
        """Worker coroutine -- runs in a loop pulling jobs from the queue.
        
        The 1-second timeout on queue.get() ensures the worker checks
        self.running periodically and can shut down cleanly when stop()
        is called, rather than blocking forever on an empty queue.
        """
        while self.running:
            try:
                job = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
            except asyncio.TimeoutError:
                continue  # Check self.running flag, then try again
            
            try:
                job.status = "processing"
                
                # Rate limit if configured -- the worker waits here
                # until the rate limiter grants permission
                if self.rate_limiter:
                    await self.rate_limiter.acquire()
                
                job.result = await self.process_fn(job.input)
                job.status = "completed"
                
            except Exception as e:
                job.error = e
                job.status = "failed"
            
            finally:
                # task_done() signals the queue that this item is processed,
                # which unblocks queue.join() when all items are done
                self.queue.task_done()
    
    async def start(self):
        """Start worker pool -- creates num_workers background tasks."""
        self.running = True
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.num_workers)
        ]
    
    async def stop(self):
        """Stop worker pool gracefully.
        
        Sets running=False so workers exit their loop after finishing
        the current job. return_exceptions=True prevents one worker's
        error from masking others during shutdown.
        """
        self.running = False
        await asyncio.gather(*self.workers, return_exceptions=True)
    
    async def submit(self, input: Any) -> str:
        """Submit a job"""
        job_id = str(uuid.uuid4())
        job = Job(id=job_id, input=input)
        self.results[job_id] = job
        await self.queue.put(job)
        return job_id
    
    async def submit_batch(self, inputs: list[Any]) -> list[str]:
        """Submit multiple jobs"""
        return [await self.submit(inp) for inp in inputs]
    
    def get_result(self, job_id: str) -> Optional[Job]:
        """Get job result"""
        return self.results.get(job_id)
    
    async def wait_for_completion(self):
        """Wait for all jobs to complete"""
        await self.queue.join()

# Usage with context manager
class BatchProcessor:
    def __init__(self, process_fn: Callable, **kwargs):
        self.queue = JobQueue(process_fn, **kwargs)
    
    async def __aenter__(self):
        await self.queue.start()
        return self
    
    async def __aexit__(self, *args):
        await self.queue.stop()
    
    async def process_all(self, inputs: list[Any]) -> list[Any]:
        job_ids = await self.queue.submit_batch(inputs)
        await self.queue.wait_for_completion()
        
        results = []
        for job_id in job_ids:
            job = self.queue.get_result(job_id)
            if job.error:
                results.append(job.error)
            else:
                results.append(job.result)
        
        return results

# Usage
async with BatchProcessor(
    process_prompt,
    num_workers=10,
    rate_limiter=RateLimiter(RateLimitConfig(requests_per_minute=100))
) as processor:
    results = await processor.process_all(prompts)
Memory trap with large batches: The results dict in JobQueue stores every job object in memory. For 100K+ items with large responses, this can consume several GB. For truly large workloads, write results to disk or a database as they complete rather than accumulating them in memory. A good pattern is to pass a result_callback to the worker that persists each result immediately.

Chunked Processing for Large Datasets

When datasets get really large (10K+ items), processing everything as one giant batch creates problems: no progress visibility, no recovery from mid-batch failures, and potential memory issues from holding all results in memory. Chunking solves this by breaking the dataset into manageable pieces and processing each chunk independently. The inter-chunk delay is a deliberate breather. It gives the API provider time to recover between bursts and prevents your rate limiter from drifting (accumulated rounding errors can cause brief rate limit violations at chunk boundaries).
from typing import Iterator, List, Any
import math

def chunk_list(lst: List[Any], chunk_size: int) -> Iterator[List[Any]]:
    """Split list into chunks.
    
    Uses a generator so we don't create N sub-lists in memory upfront.
    """
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

class ChunkedProcessor:
    """Process large datasets in chunks.
    
    Each chunk is an independent batch that can succeed or fail
    without affecting other chunks. This gives you natural
    checkpointing boundaries.
    """
    
    def __init__(
        self,
        processor: ConcurrentProcessor,
        chunk_size: int = 100,
        delay_between_chunks: float = 0
    ):
        self.processor = processor
        self.chunk_size = chunk_size
        self.delay = delay_between_chunks
    
    async def process(
        self,
        items: List[Any],
        on_chunk_complete: Callable[[int, List], None] = None
    ) -> List[Any]:
        """Process all items in chunks.
        
        Each chunk is fully processed before the next one starts.
        This gives you natural checkpointing boundaries and prevents
        memory from growing unboundedly.
        """
        
        all_results = []
        total_chunks = math.ceil(len(items) / self.chunk_size)
        
        for i, chunk in enumerate(chunk_list(items, self.chunk_size)):
            print(f"Processing chunk {i+1}/{total_chunks}")
            
            chunk_results = await self.processor.process_batch(chunk)
            all_results.extend(chunk_results)
            
            # Callback for per-chunk operations: save to disk,
            # update a progress database, send a Slack notification, etc.
            if on_chunk_complete:
                on_chunk_complete(i, chunk_results)
            
            # Deliberate breather between chunks -- lets the API provider
            # recover and prevents rate limiter drift at chunk boundaries
            if self.delay and i < total_chunks - 1:
                await asyncio.sleep(self.delay)
        
        return all_results

# Usage for very large datasets
chunked = ChunkedProcessor(
    processor=ConcurrentProcessor(process_prompt),
    chunk_size=100,
    delay_between_chunks=5.0  # 5 second pause between chunks
)

# Process 10,000 items
large_dataset = [f"Process item {i}" for i in range(10000)]
results = await chunked.process(large_dataset)
Choosing chunk_size: Smaller chunks (50-100) give finer progress granularity and faster recovery from crashes but add overhead from inter-chunk delays. Larger chunks (500-1000) are more efficient but mean more lost progress on failure. A good heuristic: set chunk_size so each chunk takes 1-5 minutes to process. This balances efficiency with acceptable data loss on crash.

Progress Tracking and Checkpointing

Checkpointing is the difference between “my script crashed at item 7,500 and I have to start over” and “my script crashed at item 7,500 and resumed from exactly where it left off.” For any batch job that takes more than 10 minutes, checkpointing is not optional — it’s a basic production requirement. The pattern is simple: periodically serialize your progress to disk. On restart, check if a checkpoint exists and resume from the last saved position. The tricky part is making this atomic — a crash during checkpoint writing shouldn’t corrupt your state.
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional
from datetime import datetime

@dataclass
class ProcessingState:
    """Complete state of a batch processing run.
    
    Everything needed to resume from a crash: what's done,
    what failed, and where to pick up.
    """
    total_items: int
    processed_items: int
    failed_items: int
    last_processed_index: int
    start_time: str
    results: list
    errors: list

class CheckpointedProcessor:
    """Process with checkpointing for crash recovery.
    
    Saves state every N items so you never lose more than N items
    of progress on a crash. The checkpoint file is the source of
    truth for resumption.
    """
    
    def __init__(
        self,
        processor: ConcurrentProcessor,
        checkpoint_path: str = "checkpoint.json",
        checkpoint_interval: int = 50
    ):
        self.processor = processor
        self.checkpoint_path = Path(checkpoint_path)
        self.checkpoint_interval = checkpoint_interval
        self.state: Optional[ProcessingState] = None
    
    def load_checkpoint(self) -> Optional[ProcessingState]:
        """Load existing checkpoint"""
        if self.checkpoint_path.exists():
            with open(self.checkpoint_path) as f:
                data = json.load(f)
                return ProcessingState(**data)
        return None
    
    def save_checkpoint(self):
        """Save current state to disk.
        
        Production improvement: write to a temp file first, then
        rename atomically. This prevents a crash during write from
        corrupting the checkpoint. Example:
            tmp = self.checkpoint_path.with_suffix('.tmp')
            tmp.write_text(json.dumps(asdict(self.state)))
            tmp.rename(self.checkpoint_path)
        """
        if self.state:
            with open(self.checkpoint_path, 'w') as f:
                json.dump(asdict(self.state), f)
    
    def clear_checkpoint(self):
        """Remove checkpoint file"""
        if self.checkpoint_path.exists():
            self.checkpoint_path.unlink()
    
    async def process_with_checkpoints(
        self,
        items: list,
        resume: bool = True
    ) -> ProcessingState:
        """Process items with checkpointing"""
        
        # Load or create state
        if resume:
            self.state = self.load_checkpoint()
        
        if self.state is None:
            self.state = ProcessingState(
                total_items=len(items),
                processed_items=0,
                failed_items=0,
                last_processed_index=-1,
                start_time=datetime.now().isoformat(),
                results=[None] * len(items),
                errors=[]
            )
        
        # Resume from checkpoint
        start_index = self.state.last_processed_index + 1
        remaining = items[start_index:]
        
        if not remaining:
            print("All items already processed")
            return self.state
        
        print(f"Processing {len(remaining)} remaining items...")
        
        # Process in batches with checkpoints
        for i, chunk in enumerate(chunk_list(remaining, self.checkpoint_interval)):
            chunk_start = start_index + (i * self.checkpoint_interval)
            
            results = await self.processor.process_batch(chunk)
            
            # Update state
            for j, (item, result) in enumerate(results):
                idx = chunk_start + j
                
                if isinstance(result, Exception):
                    self.state.failed_items += 1
                    self.state.errors.append({
                        "index": idx,
                        "error": str(result)
                    })
                else:
                    self.state.results[idx] = result
                
                self.state.processed_items += 1
                self.state.last_processed_index = idx
            
            # Save checkpoint
            self.save_checkpoint()
            print(f"Checkpoint saved: {self.state.processed_items}/{self.state.total_items}")
        
        # Clear checkpoint on success
        if self.state.failed_items == 0:
            self.clear_checkpoint()
        
        return self.state

# Usage
checkpointed = CheckpointedProcessor(
    processor=ConcurrentProcessor(process_prompt),
    checkpoint_path="my_batch_checkpoint.json",
    checkpoint_interval=100
)

# First run
state = await checkpointed.process_with_checkpoints(large_dataset)

# If interrupted, resume
state = await checkpointed.process_with_checkpoints(large_dataset, resume=True)
Checkpoint file size: The results list in ProcessingState stores all results in the checkpoint file. For 100K items with 500-character responses each, that is a ~50MB JSON file. For larger workloads, store results in a separate file or database and only keep the index and metadata in the checkpoint. This also makes checkpoint writes faster, reducing the crash-during-write risk window.

Cost Estimation

Never run a large batch without estimating cost first. This sounds obvious, but the number of teams that have accidentally burned 5,000onabatchjobtheyexpectedtocost5,000 on a batch job they expected to cost 50 is staggering. The most common mistake: forgetting that output tokens cost 2-5x more than input tokens, and underestimating average output length. Always run a small sample (50-100 items) first to measure actual token usage, then extrapolate. The estimate below uses character-count heuristics, which are accurate to within ~20% for English text.
from dataclasses import dataclass

@dataclass
class TokenPricing:
    input_per_1k: float   # Cost per 1,000 input tokens
    output_per_1k: float  # Cost per 1,000 output tokens (usually 2-5x more)

# Pricing as of early 2025 -- always verify current pricing before large runs
PRICING = {
    "gpt-4o": TokenPricing(0.0025, 0.010),
    "gpt-4o-mini": TokenPricing(0.00015, 0.0006),
    "gpt-4o-batch": TokenPricing(0.00125, 0.005),       # 50% off real-time
    "claude-3-5-sonnet": TokenPricing(0.003, 0.015)
}

class CostEstimator:
    """Estimate batch processing costs before committing.
    
    Run this BEFORE every large batch. Compare models to find the
    sweet spot between cost and quality for your use case.
    """
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.pricing = PRICING.get(model, PRICING["gpt-4o-mini"])
    
    def estimate_tokens(
        self,
        text: str,
        chars_per_token: float = 4.0
    ) -> int:
        """Rough token estimation.
        
        The 4 chars/token heuristic is ~80% accurate for English.
        For precise counts, use: tiktoken.encoding_for_model(model).encode(text)
        The tiktoken approach is 10-20x slower but exact.
        """
        return int(len(text) / chars_per_token)
    
    def estimate_batch_cost(
        self,
        prompts: list[str],
        avg_output_tokens: int = 500
    ) -> dict:
        """Estimate cost for batch"""
        
        total_input_tokens = sum(
            self.estimate_tokens(p) for p in prompts
        )
        total_output_tokens = len(prompts) * avg_output_tokens
        
        input_cost = (total_input_tokens / 1000) * self.pricing.input_per_1k
        output_cost = (total_output_tokens / 1000) * self.pricing.output_per_1k
        
        return {
            "input_tokens": total_input_tokens,
            "output_tokens": total_output_tokens,
            "input_cost": round(input_cost, 4),
            "output_cost": round(output_cost, 4),
            "total_cost": round(input_cost + output_cost, 4)
        }
    
    def compare_models(
        self,
        prompts: list[str],
        avg_output_tokens: int = 500
    ) -> dict:
        """Compare costs across models.
        
        Run this before every large batch. The difference between
        gpt-4o and gpt-4o-mini can be 15-20x for the same workload.
        If quality is acceptable with the cheaper model, the savings
        are enormous at scale.
        """
        results = {}
        
        for model, pricing in PRICING.items():
            self.pricing = pricing
            results[model] = self.estimate_batch_cost(prompts, avg_output_tokens)
        
        return results

# Usage
estimator = CostEstimator("gpt-4o-mini")

prompts = ["Analyze this data: " + str(i) for i in range(1000)]
estimate = estimator.estimate_batch_cost(prompts, avg_output_tokens=300)

print(f"Estimated cost: ${estimate['total_cost']}")
print(f"Input tokens: {estimate['input_tokens']:,}")
print(f"Output tokens: {estimate['output_tokens']:,}")

# Compare all models
comparison = estimator.compare_models(prompts)
for model, cost in comparison.items():
    print(f"{model}: ${cost['total_cost']:.4f}")
The real cost trap is output tokens. Input pricing gets all the attention, but output tokens cost 2-7x more depending on the model. A batch job where you expect 100-token responses but the model averages 800 tokens (because your prompt does not constrain output length) can cost 8x your estimate. Always run a 50-100 item sample first and measure actual output lengths before committing to a full run.

Key Takeaways

Use Batch API

OpenAI Batch API saves 50% on costs for async workloads

Rate Limit Properly

Token bucket algorithms prevent hitting API limits

Checkpoint Progress

Save progress for recovery from failures

Estimate Costs

Always estimate before running large batches

Interview Deep-Dive

What interviewers are testing: Resilience engineering and operational maturity for long-running data pipelines.Strong answer: This is a checkpointing and idempotency problem. The system needs three things: progress persistence, safe resumption, and deduplication.First, checkpoint every N items (100-500 is a good range). Each checkpoint writes the last successfully processed index and all results so far to a durable store (local file, S3, or database). The checkpoint write itself should be atomic — write to a temp file, then rename — so a crash during checkpointing doesn’t corrupt state.Second, on restart, load the checkpoint and resume from last_processed_index + 1. The items before that index are already done, so skip them. The items after that index need processing. Items in the “gap” (between last checkpoint and the crash point) might or might not have been processed — so you need idempotency.Third, for idempotency: use deterministic IDs for each request (e.g., hash of the prompt + index). Before processing an item, check if a result already exists for that ID. This makes retries safe — processing the same item twice produces the same result entry, not a duplicate.The provider outage itself should trigger a circuit breaker: after 5-10 consecutive failures, pause processing and wait with exponential backoff before probing again. Don’t burn through your retry budget on 33,000 remaining items when the provider is down.Cost protection: track cumulative spend during the batch. If it exceeds a configurable threshold (say, 150% of estimated cost), halt and alert. Provider outages can sometimes cause partial responses that still incur charges.
What interviewers are testing: Understanding of cost-latency tradeoffs and operational complexity in real production systems.Strong answer: The decision comes down to four factors: latency requirements, cost sensitivity, operational complexity tolerance, and control needs.OpenAI Batch API: 50% cost savings, up to 24-hour completion window, zero infrastructure to manage, no rate limit concerns (OpenAI manages scheduling internally). Choose this for: nightly data pipelines, evaluation runs, content generation backlogs, embedding large corpora — anything where “done by tomorrow morning” is fast enough.Self-managed concurrent processing: Results in minutes instead of hours, full control over retry logic and error handling, ability to use multiple providers with fallback, real-time progress visibility. Choose this for: time-sensitive batch jobs (process uploaded files within 30 minutes), workflows that need partial results quickly, multi-provider strategies, or when you need custom logic like filtering mid-batch based on intermediate results.The hybrid approach often wins: use self-managed concurrency for the urgent 20% of items (new uploads, high-priority customers) and route the remaining 80% through the Batch API for cost savings. This requires a routing layer that classifies items by urgency and dispatches them to the appropriate processing path.One often-overlooked factor: the Batch API doesn’t support streaming, function calling, or vision inputs in all configurations. Verify that your specific use case is supported before committing to it for a production pipeline.
What interviewers are testing: End-to-end thinking about large-scale ML data pipelines, cost optimization, and risk management.Strong answer: At this scale, you’re operating more like a data engineering project than a simple script. The approach has four phases.Phase 1 — Validate and sample: Process a random sample of 500 documents first. Measure actual token usage (not estimates), quality of results, failure rate, and per-item cost. Extrapolate to confirm the 15Kestimate.Ifqualityisborderline,thisiswhereyoutunethepromptbeforeburning15K estimate. If quality is borderline, this is where you tune the prompt before burning 15K on bad results.Phase 2 — Optimize cost: Can you use a cheaper model? If gpt-4o-mini gives 95% of gpt-4o’s classification accuracy, you’ve just cut costs by 80%. Can you shorten prompts? Every token in the system message gets multiplied by 1M. Can you use the Batch API? That’s 7,500insteadof7,500 instead of 15,000. Can you pre-filter obviously classifiable documents with a rules-based system and only send ambiguous ones to the LLM? That might eliminate 40% of calls.Phase 3 — Execute with guardrails: Split into daily batches of ~100K (spreading cost over 10 days reduces blast radius if something goes wrong). Set daily cost limits. Checkpoint every 1,000 items. Run quality spot-checks on each day’s output before proceeding to the next batch. Use the Batch API for the bulk and concurrent processing only for rush items.Phase 4 — Long-term: If this is recurring, train a fine-tuned model or a traditional ML classifier on the LLM-labeled data. A fine-tuned gpt-4o-mini is much cheaper per call, and a distilled BERT classifier running on your own hardware costs essentially nothing at inference time. The $15K LLM run becomes your labeling step, not your production system.The meta-point: at $15K, this needs a project plan, not just a script. Stakeholder buy-in on cost, rollback plans, quality gates, and a path to cost reduction over time.
What interviewers are testing: Practical cost governance for AI systems, which is an increasingly critical production concern.Strong answer: Build a cost tracker as a first-class component, not an afterthought.The CostTracker maintains a running total of actual spend (from API response usage fields, not estimates) and compares against configurable thresholds. It exposes three states: green (under 80% of budget), yellow (80-100%, switch to cheaper model), and red (over budget, pause processing).Implementation: After each API call, extract the actual token counts from the response and calculate cost using the model’s pricing. Update the running total atomically (this tracker is shared across concurrent workers). At each threshold crossing, execute the configured action.Actions at yellow threshold: Automatically downgrade remaining items from gpt-4o to gpt-4o-mini. Log the switchover for quality tracking. Alert the team via Slack/PagerDuty so humans can decide whether to increase the budget or accept the quality tradeoff.Actions at red threshold: Stop submitting new items. Let in-flight requests complete (don’t waste already-spent money). Save checkpoint. Send an alert with a summary: items completed, items remaining, actual cost vs. budget, and estimated cost to complete.The nuance most people miss: token estimation before the call is unreliable for output tokens (you don’t know how much the model will generate). So budget tracking must be based on actual usage, not pre-estimates. This means you might slightly overshoot the budget by the cost of in-flight requests when the red threshold triggers. Account for this buffer in your budget planning.

What’s Next

LLM Fallbacks

Build resilient systems with multi-provider fallback chains