Skip to main content
December 2025 Update: Production patterns for processing thousands of LLM requests efficiently while respecting rate limits and managing costs.

Why Batch Processing?

Single Request Pattern              Batch Pattern
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Request → Wait → Response           [Request Pool]
Request → Wait → Response              ↓
Request → Wait → Response           Concurrent Workers
...                                    ↓
Total: N × (latency + overhead)     [Response Queue]
                                    Total: ceil(N/batch) × latency

OpenAI Batch API

OpenAI’s Batch API offers 50% cost savings:
import json
from openai import OpenAI
from pathlib import Path
import time

client = OpenAI()

def create_batch_request(
    requests: list[dict],
    output_path: str = "batch_requests.jsonl"
) -> str:
    """Create a JSONL file for batch processing"""
    
    with open(output_path, 'w') as f:
        for i, req in enumerate(requests):
            batch_item = {
                "custom_id": f"request-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": req.get("model", "gpt-4o-mini"),
                    "messages": req["messages"],
                    "max_tokens": req.get("max_tokens", 1000)
                }
            }
            f.write(json.dumps(batch_item) + "\n")
    
    return output_path

def submit_batch(file_path: str) -> str:
    """Submit batch job to OpenAI"""
    
    # Upload the file
    batch_file = client.files.create(
        file=open(file_path, "rb"),
        purpose="batch"
    )
    
    # Create the batch
    batch = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
        metadata={
            "description": "Batch processing job"
        }
    )
    
    return batch.id

def check_batch_status(batch_id: str) -> dict:
    """Check status of a batch job"""
    batch = client.batches.retrieve(batch_id)
    
    return {
        "status": batch.status,
        "total": batch.request_counts.total,
        "completed": batch.request_counts.completed,
        "failed": batch.request_counts.failed
    }

def get_batch_results(batch_id: str) -> list[dict]:
    """Get results from completed batch"""
    batch = client.batches.retrieve(batch_id)
    
    if batch.status != "completed":
        raise ValueError(f"Batch not completed. Status: {batch.status}")
    
    # Download results
    result_file = client.files.content(batch.output_file_id)
    results = []
    
    for line in result_file.text.strip().split("\n"):
        result = json.loads(line)
        results.append({
            "custom_id": result["custom_id"],
            "response": result["response"]["body"]["choices"][0]["message"]["content"],
            "usage": result["response"]["body"]["usage"]
        })
    
    return results

# Complete batch workflow
def run_batch_workflow(prompts: list[str]) -> list[dict]:
    """Complete batch processing workflow"""
    
    # Prepare requests
    requests = [
        {"messages": [{"role": "user", "content": p}]}
        for p in prompts
    ]
    
    # Create and submit batch
    file_path = create_batch_request(requests)
    batch_id = submit_batch(file_path)
    print(f"Submitted batch: {batch_id}")
    
    # Poll for completion
    while True:
        status = check_batch_status(batch_id)
        print(f"Status: {status['status']} - {status['completed']}/{status['total']}")
        
        if status["status"] == "completed":
            break
        elif status["status"] == "failed":
            raise Exception("Batch failed")
        
        time.sleep(60)  # Check every minute
    
    # Get results
    return get_batch_results(batch_id)

Concurrent Processing with Rate Limiting

import asyncio
from asyncio import Semaphore
from dataclasses import dataclass
from typing import List, Callable, Any
from datetime import datetime, timedelta
import time

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 60
    tokens_per_minute: int = 90000
    max_concurrent: int = 10

class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.request_tokens = config.requests_per_minute
        self.token_tokens = config.tokens_per_minute
        self.last_update = time.time()
        self.lock = asyncio.Lock()
    
    async def acquire(self, estimated_tokens: int = 100):
        """Acquire rate limit tokens"""
        async with self.lock:
            now = time.time()
            elapsed = now - self.last_update
            
            # Refill tokens
            refill_rate_req = self.config.requests_per_minute / 60
            refill_rate_tok = self.config.tokens_per_minute / 60
            
            self.request_tokens = min(
                self.config.requests_per_minute,
                self.request_tokens + elapsed * refill_rate_req
            )
            self.token_tokens = min(
                self.config.tokens_per_minute,
                self.token_tokens + elapsed * refill_rate_tok
            )
            
            self.last_update = now
            
            # Wait if needed
            while self.request_tokens < 1 or self.token_tokens < estimated_tokens:
                wait_time = max(
                    (1 - self.request_tokens) / refill_rate_req,
                    (estimated_tokens - self.token_tokens) / refill_rate_tok
                )
                await asyncio.sleep(wait_time)
                
                elapsed = time.time() - self.last_update
                self.request_tokens += elapsed * refill_rate_req
                self.token_tokens += elapsed * refill_rate_tok
                self.last_update = time.time()
            
            # Consume tokens
            self.request_tokens -= 1
            self.token_tokens -= estimated_tokens

class ConcurrentProcessor:
    """Process requests concurrently with rate limiting"""
    
    def __init__(
        self,
        process_fn: Callable,
        config: RateLimitConfig = None
    ):
        self.process_fn = process_fn
        self.config = config or RateLimitConfig()
        self.rate_limiter = RateLimiter(self.config)
        self.semaphore = Semaphore(self.config.max_concurrent)
    
    async def process_one(
        self,
        item: Any,
        index: int,
        estimated_tokens: int = 100
    ) -> tuple[int, Any, Any]:
        """Process a single item with rate limiting"""
        
        await self.rate_limiter.acquire(estimated_tokens)
        
        async with self.semaphore:
            try:
                result = await self.process_fn(item)
                return index, item, result
            except Exception as e:
                return index, item, e
    
    async def process_batch(
        self,
        items: List[Any],
        estimated_tokens_per_item: int = 100,
        on_progress: Callable[[int, int], None] = None
    ) -> List[tuple[Any, Any]]:
        """Process a batch of items"""
        
        tasks = [
            self.process_one(item, i, estimated_tokens_per_item)
            for i, item in enumerate(items)
        ]
        
        results = [None] * len(items)
        completed = 0
        
        for coro in asyncio.as_completed(tasks):
            index, item, result = await coro
            results[index] = (item, result)
            completed += 1
            
            if on_progress:
                on_progress(completed, len(items))
        
        return results

# Usage
async def process_prompt(prompt: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

processor = ConcurrentProcessor(
    process_fn=process_prompt,
    config=RateLimitConfig(
        requests_per_minute=500,
        tokens_per_minute=150000,
        max_concurrent=20
    )
)

prompts = ["Explain topic " + str(i) for i in range(100)]

results = await processor.process_batch(
    prompts,
    estimated_tokens_per_item=200,
    on_progress=lambda done, total: print(f"{done}/{total}")
)

Queue-Based Processing

For very large volumes, use a queue system:
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Optional, Callable
import uuid

@dataclass
class Job:
    id: str
    input: Any
    result: Optional[Any] = None
    error: Optional[Exception] = None
    status: str = "pending"

class JobQueue:
    """Async job queue for batch processing"""
    
    def __init__(
        self,
        process_fn: Callable,
        num_workers: int = 5,
        rate_limiter: RateLimiter = None
    ):
        self.process_fn = process_fn
        self.num_workers = num_workers
        self.rate_limiter = rate_limiter
        self.queue: Queue[Job] = Queue()
        self.results: dict[str, Job] = {}
        self.workers: list[asyncio.Task] = []
        self.running = False
    
    async def worker(self, worker_id: int):
        """Worker coroutine"""
        while self.running:
            try:
                job = await asyncio.wait_for(
                    self.queue.get(),
                    timeout=1.0
                )
            except asyncio.TimeoutError:
                continue
            
            try:
                job.status = "processing"
                
                if self.rate_limiter:
                    await self.rate_limiter.acquire()
                
                job.result = await self.process_fn(job.input)
                job.status = "completed"
                
            except Exception as e:
                job.error = e
                job.status = "failed"
            
            finally:
                self.queue.task_done()
    
    async def start(self):
        """Start worker pool"""
        self.running = True
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.num_workers)
        ]
    
    async def stop(self):
        """Stop worker pool"""
        self.running = False
        await asyncio.gather(*self.workers, return_exceptions=True)
    
    async def submit(self, input: Any) -> str:
        """Submit a job"""
        job_id = str(uuid.uuid4())
        job = Job(id=job_id, input=input)
        self.results[job_id] = job
        await self.queue.put(job)
        return job_id
    
    async def submit_batch(self, inputs: list[Any]) -> list[str]:
        """Submit multiple jobs"""
        return [await self.submit(inp) for inp in inputs]
    
    def get_result(self, job_id: str) -> Optional[Job]:
        """Get job result"""
        return self.results.get(job_id)
    
    async def wait_for_completion(self):
        """Wait for all jobs to complete"""
        await self.queue.join()

# Usage with context manager
class BatchProcessor:
    def __init__(self, process_fn: Callable, **kwargs):
        self.queue = JobQueue(process_fn, **kwargs)
    
    async def __aenter__(self):
        await self.queue.start()
        return self
    
    async def __aexit__(self, *args):
        await self.queue.stop()
    
    async def process_all(self, inputs: list[Any]) -> list[Any]:
        job_ids = await self.queue.submit_batch(inputs)
        await self.queue.wait_for_completion()
        
        results = []
        for job_id in job_ids:
            job = self.queue.get_result(job_id)
            if job.error:
                results.append(job.error)
            else:
                results.append(job.result)
        
        return results

# Usage
async with BatchProcessor(
    process_prompt,
    num_workers=10,
    rate_limiter=RateLimiter(RateLimitConfig(requests_per_minute=100))
) as processor:
    results = await processor.process_all(prompts)

Chunked Processing for Large Datasets

from typing import Iterator, List, Any
import math

def chunk_list(lst: List[Any], chunk_size: int) -> Iterator[List[Any]]:
    """Split list into chunks"""
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

class ChunkedProcessor:
    """Process large datasets in chunks"""
    
    def __init__(
        self,
        processor: ConcurrentProcessor,
        chunk_size: int = 100,
        delay_between_chunks: float = 0
    ):
        self.processor = processor
        self.chunk_size = chunk_size
        self.delay = delay_between_chunks
    
    async def process(
        self,
        items: List[Any],
        on_chunk_complete: Callable[[int, List], None] = None
    ) -> List[Any]:
        """Process all items in chunks"""
        
        all_results = []
        total_chunks = math.ceil(len(items) / self.chunk_size)
        
        for i, chunk in enumerate(chunk_list(items, self.chunk_size)):
            print(f"Processing chunk {i+1}/{total_chunks}")
            
            chunk_results = await self.processor.process_batch(chunk)
            all_results.extend(chunk_results)
            
            if on_chunk_complete:
                on_chunk_complete(i, chunk_results)
            
            # Delay between chunks
            if self.delay and i < total_chunks - 1:
                await asyncio.sleep(self.delay)
        
        return all_results

# Usage for very large datasets
chunked = ChunkedProcessor(
    processor=ConcurrentProcessor(process_prompt),
    chunk_size=100,
    delay_between_chunks=5.0  # 5 second pause between chunks
)

# Process 10,000 items
large_dataset = [f"Process item {i}" for i in range(10000)]
results = await chunked.process(large_dataset)

Progress Tracking and Checkpointing

import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional
from datetime import datetime

@dataclass
class ProcessingState:
    total_items: int
    processed_items: int
    failed_items: int
    last_processed_index: int
    start_time: str
    results: list
    errors: list

class CheckpointedProcessor:
    """Process with checkpointing for recovery"""
    
    def __init__(
        self,
        processor: ConcurrentProcessor,
        checkpoint_path: str = "checkpoint.json",
        checkpoint_interval: int = 50
    ):
        self.processor = processor
        self.checkpoint_path = Path(checkpoint_path)
        self.checkpoint_interval = checkpoint_interval
        self.state: Optional[ProcessingState] = None
    
    def load_checkpoint(self) -> Optional[ProcessingState]:
        """Load existing checkpoint"""
        if self.checkpoint_path.exists():
            with open(self.checkpoint_path) as f:
                data = json.load(f)
                return ProcessingState(**data)
        return None
    
    def save_checkpoint(self):
        """Save current state"""
        if self.state:
            with open(self.checkpoint_path, 'w') as f:
                json.dump(asdict(self.state), f)
    
    def clear_checkpoint(self):
        """Remove checkpoint file"""
        if self.checkpoint_path.exists():
            self.checkpoint_path.unlink()
    
    async def process_with_checkpoints(
        self,
        items: list,
        resume: bool = True
    ) -> ProcessingState:
        """Process items with checkpointing"""
        
        # Load or create state
        if resume:
            self.state = self.load_checkpoint()
        
        if self.state is None:
            self.state = ProcessingState(
                total_items=len(items),
                processed_items=0,
                failed_items=0,
                last_processed_index=-1,
                start_time=datetime.now().isoformat(),
                results=[None] * len(items),
                errors=[]
            )
        
        # Resume from checkpoint
        start_index = self.state.last_processed_index + 1
        remaining = items[start_index:]
        
        if not remaining:
            print("All items already processed")
            return self.state
        
        print(f"Processing {len(remaining)} remaining items...")
        
        # Process in batches with checkpoints
        for i, chunk in enumerate(chunk_list(remaining, self.checkpoint_interval)):
            chunk_start = start_index + (i * self.checkpoint_interval)
            
            results = await self.processor.process_batch(chunk)
            
            # Update state
            for j, (item, result) in enumerate(results):
                idx = chunk_start + j
                
                if isinstance(result, Exception):
                    self.state.failed_items += 1
                    self.state.errors.append({
                        "index": idx,
                        "error": str(result)
                    })
                else:
                    self.state.results[idx] = result
                
                self.state.processed_items += 1
                self.state.last_processed_index = idx
            
            # Save checkpoint
            self.save_checkpoint()
            print(f"Checkpoint saved: {self.state.processed_items}/{self.state.total_items}")
        
        # Clear checkpoint on success
        if self.state.failed_items == 0:
            self.clear_checkpoint()
        
        return self.state

# Usage
checkpointed = CheckpointedProcessor(
    processor=ConcurrentProcessor(process_prompt),
    checkpoint_path="my_batch_checkpoint.json",
    checkpoint_interval=100
)

# First run
state = await checkpointed.process_with_checkpoints(large_dataset)

# If interrupted, resume
state = await checkpointed.process_with_checkpoints(large_dataset, resume=True)

Cost Estimation

from dataclasses import dataclass

@dataclass
class TokenPricing:
    input_per_1k: float
    output_per_1k: float

PRICING = {
    "gpt-4o": TokenPricing(0.0025, 0.010),
    "gpt-4o-mini": TokenPricing(0.00015, 0.0006),
    "gpt-4o-batch": TokenPricing(0.00125, 0.005),  # 50% discount
    "claude-3-5-sonnet": TokenPricing(0.003, 0.015)
}

class CostEstimator:
    """Estimate batch processing costs"""
    
    def __init__(self, model: str = "gpt-4o-mini"):
        self.pricing = PRICING.get(model, PRICING["gpt-4o-mini"])
    
    def estimate_tokens(
        self,
        text: str,
        chars_per_token: float = 4.0
    ) -> int:
        """Rough token estimation"""
        return int(len(text) / chars_per_token)
    
    def estimate_batch_cost(
        self,
        prompts: list[str],
        avg_output_tokens: int = 500
    ) -> dict:
        """Estimate cost for batch"""
        
        total_input_tokens = sum(
            self.estimate_tokens(p) for p in prompts
        )
        total_output_tokens = len(prompts) * avg_output_tokens
        
        input_cost = (total_input_tokens / 1000) * self.pricing.input_per_1k
        output_cost = (total_output_tokens / 1000) * self.pricing.output_per_1k
        
        return {
            "input_tokens": total_input_tokens,
            "output_tokens": total_output_tokens,
            "input_cost": round(input_cost, 4),
            "output_cost": round(output_cost, 4),
            "total_cost": round(input_cost + output_cost, 4)
        }
    
    def compare_models(
        self,
        prompts: list[str],
        avg_output_tokens: int = 500
    ) -> dict:
        """Compare costs across models"""
        results = {}
        
        for model, pricing in PRICING.items():
            self.pricing = pricing
            results[model] = self.estimate_batch_cost(prompts, avg_output_tokens)
        
        return results

# Usage
estimator = CostEstimator("gpt-4o-mini")

prompts = ["Analyze this data: " + str(i) for i in range(1000)]
estimate = estimator.estimate_batch_cost(prompts, avg_output_tokens=300)

print(f"Estimated cost: ${estimate['total_cost']}")
print(f"Input tokens: {estimate['input_tokens']:,}")
print(f"Output tokens: {estimate['output_tokens']:,}")

# Compare all models
comparison = estimator.compare_models(prompts)
for model, cost in comparison.items():
    print(f"{model}: ${cost['total_cost']:.4f}")

Key Takeaways

Use Batch API

OpenAI Batch API saves 50% on costs for async workloads

Rate Limit Properly

Token bucket algorithms prevent hitting API limits

Checkpoint Progress

Save progress for recovery from failures

Estimate Costs

Always estimate before running large batches

What’s Next

LLM Fallbacks

Build resilient systems with multi-provider fallback chains