December 2025 Update: Production patterns for processing thousands of LLM requests efficiently while respecting rate limits and managing costs.
Why Batch Processing?
Copy
Single Request Pattern Batch Pattern
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Request → Wait → Response [Request Pool]
Request → Wait → Response ↓
Request → Wait → Response Concurrent Workers
... ↓
Total: N × (latency + overhead) [Response Queue]
Total: ceil(N/batch) × latency
OpenAI Batch API
OpenAI’s Batch API offers 50% cost savings:Copy
import json
from openai import OpenAI
from pathlib import Path
import time
client = OpenAI()
def create_batch_request(
requests: list[dict],
output_path: str = "batch_requests.jsonl"
) -> str:
"""Create a JSONL file for batch processing"""
with open(output_path, 'w') as f:
for i, req in enumerate(requests):
batch_item = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.get("model", "gpt-4o-mini"),
"messages": req["messages"],
"max_tokens": req.get("max_tokens", 1000)
}
}
f.write(json.dumps(batch_item) + "\n")
return output_path
def submit_batch(file_path: str) -> str:
"""Submit batch job to OpenAI"""
# Upload the file
batch_file = client.files.create(
file=open(file_path, "rb"),
purpose="batch"
)
# Create the batch
batch = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"description": "Batch processing job"
}
)
return batch.id
def check_batch_status(batch_id: str) -> dict:
"""Check status of a batch job"""
batch = client.batches.retrieve(batch_id)
return {
"status": batch.status,
"total": batch.request_counts.total,
"completed": batch.request_counts.completed,
"failed": batch.request_counts.failed
}
def get_batch_results(batch_id: str) -> list[dict]:
"""Get results from completed batch"""
batch = client.batches.retrieve(batch_id)
if batch.status != "completed":
raise ValueError(f"Batch not completed. Status: {batch.status}")
# Download results
result_file = client.files.content(batch.output_file_id)
results = []
for line in result_file.text.strip().split("\n"):
result = json.loads(line)
results.append({
"custom_id": result["custom_id"],
"response": result["response"]["body"]["choices"][0]["message"]["content"],
"usage": result["response"]["body"]["usage"]
})
return results
# Complete batch workflow
def run_batch_workflow(prompts: list[str]) -> list[dict]:
"""Complete batch processing workflow"""
# Prepare requests
requests = [
{"messages": [{"role": "user", "content": p}]}
for p in prompts
]
# Create and submit batch
file_path = create_batch_request(requests)
batch_id = submit_batch(file_path)
print(f"Submitted batch: {batch_id}")
# Poll for completion
while True:
status = check_batch_status(batch_id)
print(f"Status: {status['status']} - {status['completed']}/{status['total']}")
if status["status"] == "completed":
break
elif status["status"] == "failed":
raise Exception("Batch failed")
time.sleep(60) # Check every minute
# Get results
return get_batch_results(batch_id)
Concurrent Processing with Rate Limiting
Copy
import asyncio
from asyncio import Semaphore
from dataclasses import dataclass
from typing import List, Callable, Any
from datetime import datetime, timedelta
import time
@dataclass
class RateLimitConfig:
requests_per_minute: int = 60
tokens_per_minute: int = 90000
max_concurrent: int = 10
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_tokens = config.requests_per_minute
self.token_tokens = config.tokens_per_minute
self.last_update = time.time()
self.lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 100):
"""Acquire rate limit tokens"""
async with self.lock:
now = time.time()
elapsed = now - self.last_update
# Refill tokens
refill_rate_req = self.config.requests_per_minute / 60
refill_rate_tok = self.config.tokens_per_minute / 60
self.request_tokens = min(
self.config.requests_per_minute,
self.request_tokens + elapsed * refill_rate_req
)
self.token_tokens = min(
self.config.tokens_per_minute,
self.token_tokens + elapsed * refill_rate_tok
)
self.last_update = now
# Wait if needed
while self.request_tokens < 1 or self.token_tokens < estimated_tokens:
wait_time = max(
(1 - self.request_tokens) / refill_rate_req,
(estimated_tokens - self.token_tokens) / refill_rate_tok
)
await asyncio.sleep(wait_time)
elapsed = time.time() - self.last_update
self.request_tokens += elapsed * refill_rate_req
self.token_tokens += elapsed * refill_rate_tok
self.last_update = time.time()
# Consume tokens
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
class ConcurrentProcessor:
"""Process requests concurrently with rate limiting"""
def __init__(
self,
process_fn: Callable,
config: RateLimitConfig = None
):
self.process_fn = process_fn
self.config = config or RateLimitConfig()
self.rate_limiter = RateLimiter(self.config)
self.semaphore = Semaphore(self.config.max_concurrent)
async def process_one(
self,
item: Any,
index: int,
estimated_tokens: int = 100
) -> tuple[int, Any, Any]:
"""Process a single item with rate limiting"""
await self.rate_limiter.acquire(estimated_tokens)
async with self.semaphore:
try:
result = await self.process_fn(item)
return index, item, result
except Exception as e:
return index, item, e
async def process_batch(
self,
items: List[Any],
estimated_tokens_per_item: int = 100,
on_progress: Callable[[int, int], None] = None
) -> List[tuple[Any, Any]]:
"""Process a batch of items"""
tasks = [
self.process_one(item, i, estimated_tokens_per_item)
for i, item in enumerate(items)
]
results = [None] * len(items)
completed = 0
for coro in asyncio.as_completed(tasks):
index, item, result = await coro
results[index] = (item, result)
completed += 1
if on_progress:
on_progress(completed, len(items))
return results
# Usage
async def process_prompt(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
processor = ConcurrentProcessor(
process_fn=process_prompt,
config=RateLimitConfig(
requests_per_minute=500,
tokens_per_minute=150000,
max_concurrent=20
)
)
prompts = ["Explain topic " + str(i) for i in range(100)]
results = await processor.process_batch(
prompts,
estimated_tokens_per_item=200,
on_progress=lambda done, total: print(f"{done}/{total}")
)
Queue-Based Processing
For very large volumes, use a queue system:Copy
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Optional, Callable
import uuid
@dataclass
class Job:
id: str
input: Any
result: Optional[Any] = None
error: Optional[Exception] = None
status: str = "pending"
class JobQueue:
"""Async job queue for batch processing"""
def __init__(
self,
process_fn: Callable,
num_workers: int = 5,
rate_limiter: RateLimiter = None
):
self.process_fn = process_fn
self.num_workers = num_workers
self.rate_limiter = rate_limiter
self.queue: Queue[Job] = Queue()
self.results: dict[str, Job] = {}
self.workers: list[asyncio.Task] = []
self.running = False
async def worker(self, worker_id: int):
"""Worker coroutine"""
while self.running:
try:
job = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
except asyncio.TimeoutError:
continue
try:
job.status = "processing"
if self.rate_limiter:
await self.rate_limiter.acquire()
job.result = await self.process_fn(job.input)
job.status = "completed"
except Exception as e:
job.error = e
job.status = "failed"
finally:
self.queue.task_done()
async def start(self):
"""Start worker pool"""
self.running = True
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.num_workers)
]
async def stop(self):
"""Stop worker pool"""
self.running = False
await asyncio.gather(*self.workers, return_exceptions=True)
async def submit(self, input: Any) -> str:
"""Submit a job"""
job_id = str(uuid.uuid4())
job = Job(id=job_id, input=input)
self.results[job_id] = job
await self.queue.put(job)
return job_id
async def submit_batch(self, inputs: list[Any]) -> list[str]:
"""Submit multiple jobs"""
return [await self.submit(inp) for inp in inputs]
def get_result(self, job_id: str) -> Optional[Job]:
"""Get job result"""
return self.results.get(job_id)
async def wait_for_completion(self):
"""Wait for all jobs to complete"""
await self.queue.join()
# Usage with context manager
class BatchProcessor:
def __init__(self, process_fn: Callable, **kwargs):
self.queue = JobQueue(process_fn, **kwargs)
async def __aenter__(self):
await self.queue.start()
return self
async def __aexit__(self, *args):
await self.queue.stop()
async def process_all(self, inputs: list[Any]) -> list[Any]:
job_ids = await self.queue.submit_batch(inputs)
await self.queue.wait_for_completion()
results = []
for job_id in job_ids:
job = self.queue.get_result(job_id)
if job.error:
results.append(job.error)
else:
results.append(job.result)
return results
# Usage
async with BatchProcessor(
process_prompt,
num_workers=10,
rate_limiter=RateLimiter(RateLimitConfig(requests_per_minute=100))
) as processor:
results = await processor.process_all(prompts)
Chunked Processing for Large Datasets
Copy
from typing import Iterator, List, Any
import math
def chunk_list(lst: List[Any], chunk_size: int) -> Iterator[List[Any]]:
"""Split list into chunks"""
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
class ChunkedProcessor:
"""Process large datasets in chunks"""
def __init__(
self,
processor: ConcurrentProcessor,
chunk_size: int = 100,
delay_between_chunks: float = 0
):
self.processor = processor
self.chunk_size = chunk_size
self.delay = delay_between_chunks
async def process(
self,
items: List[Any],
on_chunk_complete: Callable[[int, List], None] = None
) -> List[Any]:
"""Process all items in chunks"""
all_results = []
total_chunks = math.ceil(len(items) / self.chunk_size)
for i, chunk in enumerate(chunk_list(items, self.chunk_size)):
print(f"Processing chunk {i+1}/{total_chunks}")
chunk_results = await self.processor.process_batch(chunk)
all_results.extend(chunk_results)
if on_chunk_complete:
on_chunk_complete(i, chunk_results)
# Delay between chunks
if self.delay and i < total_chunks - 1:
await asyncio.sleep(self.delay)
return all_results
# Usage for very large datasets
chunked = ChunkedProcessor(
processor=ConcurrentProcessor(process_prompt),
chunk_size=100,
delay_between_chunks=5.0 # 5 second pause between chunks
)
# Process 10,000 items
large_dataset = [f"Process item {i}" for i in range(10000)]
results = await chunked.process(large_dataset)
Progress Tracking and Checkpointing
Copy
import json
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional
from datetime import datetime
@dataclass
class ProcessingState:
total_items: int
processed_items: int
failed_items: int
last_processed_index: int
start_time: str
results: list
errors: list
class CheckpointedProcessor:
"""Process with checkpointing for recovery"""
def __init__(
self,
processor: ConcurrentProcessor,
checkpoint_path: str = "checkpoint.json",
checkpoint_interval: int = 50
):
self.processor = processor
self.checkpoint_path = Path(checkpoint_path)
self.checkpoint_interval = checkpoint_interval
self.state: Optional[ProcessingState] = None
def load_checkpoint(self) -> Optional[ProcessingState]:
"""Load existing checkpoint"""
if self.checkpoint_path.exists():
with open(self.checkpoint_path) as f:
data = json.load(f)
return ProcessingState(**data)
return None
def save_checkpoint(self):
"""Save current state"""
if self.state:
with open(self.checkpoint_path, 'w') as f:
json.dump(asdict(self.state), f)
def clear_checkpoint(self):
"""Remove checkpoint file"""
if self.checkpoint_path.exists():
self.checkpoint_path.unlink()
async def process_with_checkpoints(
self,
items: list,
resume: bool = True
) -> ProcessingState:
"""Process items with checkpointing"""
# Load or create state
if resume:
self.state = self.load_checkpoint()
if self.state is None:
self.state = ProcessingState(
total_items=len(items),
processed_items=0,
failed_items=0,
last_processed_index=-1,
start_time=datetime.now().isoformat(),
results=[None] * len(items),
errors=[]
)
# Resume from checkpoint
start_index = self.state.last_processed_index + 1
remaining = items[start_index:]
if not remaining:
print("All items already processed")
return self.state
print(f"Processing {len(remaining)} remaining items...")
# Process in batches with checkpoints
for i, chunk in enumerate(chunk_list(remaining, self.checkpoint_interval)):
chunk_start = start_index + (i * self.checkpoint_interval)
results = await self.processor.process_batch(chunk)
# Update state
for j, (item, result) in enumerate(results):
idx = chunk_start + j
if isinstance(result, Exception):
self.state.failed_items += 1
self.state.errors.append({
"index": idx,
"error": str(result)
})
else:
self.state.results[idx] = result
self.state.processed_items += 1
self.state.last_processed_index = idx
# Save checkpoint
self.save_checkpoint()
print(f"Checkpoint saved: {self.state.processed_items}/{self.state.total_items}")
# Clear checkpoint on success
if self.state.failed_items == 0:
self.clear_checkpoint()
return self.state
# Usage
checkpointed = CheckpointedProcessor(
processor=ConcurrentProcessor(process_prompt),
checkpoint_path="my_batch_checkpoint.json",
checkpoint_interval=100
)
# First run
state = await checkpointed.process_with_checkpoints(large_dataset)
# If interrupted, resume
state = await checkpointed.process_with_checkpoints(large_dataset, resume=True)
Cost Estimation
Copy
from dataclasses import dataclass
@dataclass
class TokenPricing:
input_per_1k: float
output_per_1k: float
PRICING = {
"gpt-4o": TokenPricing(0.0025, 0.010),
"gpt-4o-mini": TokenPricing(0.00015, 0.0006),
"gpt-4o-batch": TokenPricing(0.00125, 0.005), # 50% discount
"claude-3-5-sonnet": TokenPricing(0.003, 0.015)
}
class CostEstimator:
"""Estimate batch processing costs"""
def __init__(self, model: str = "gpt-4o-mini"):
self.pricing = PRICING.get(model, PRICING["gpt-4o-mini"])
def estimate_tokens(
self,
text: str,
chars_per_token: float = 4.0
) -> int:
"""Rough token estimation"""
return int(len(text) / chars_per_token)
def estimate_batch_cost(
self,
prompts: list[str],
avg_output_tokens: int = 500
) -> dict:
"""Estimate cost for batch"""
total_input_tokens = sum(
self.estimate_tokens(p) for p in prompts
)
total_output_tokens = len(prompts) * avg_output_tokens
input_cost = (total_input_tokens / 1000) * self.pricing.input_per_1k
output_cost = (total_output_tokens / 1000) * self.pricing.output_per_1k
return {
"input_tokens": total_input_tokens,
"output_tokens": total_output_tokens,
"input_cost": round(input_cost, 4),
"output_cost": round(output_cost, 4),
"total_cost": round(input_cost + output_cost, 4)
}
def compare_models(
self,
prompts: list[str],
avg_output_tokens: int = 500
) -> dict:
"""Compare costs across models"""
results = {}
for model, pricing in PRICING.items():
self.pricing = pricing
results[model] = self.estimate_batch_cost(prompts, avg_output_tokens)
return results
# Usage
estimator = CostEstimator("gpt-4o-mini")
prompts = ["Analyze this data: " + str(i) for i in range(1000)]
estimate = estimator.estimate_batch_cost(prompts, avg_output_tokens=300)
print(f"Estimated cost: ${estimate['total_cost']}")
print(f"Input tokens: {estimate['input_tokens']:,}")
print(f"Output tokens: {estimate['output_tokens']:,}")
# Compare all models
comparison = estimator.compare_models(prompts)
for model, cost in comparison.items():
print(f"{model}: ${cost['total_cost']:.4f}")
Key Takeaways
Use Batch API
OpenAI Batch API saves 50% on costs for async workloads
Rate Limit Properly
Token bucket algorithms prevent hitting API limits
Checkpoint Progress
Save progress for recovery from failures
Estimate Costs
Always estimate before running large batches
What’s Next
LLM Fallbacks
Build resilient systems with multi-provider fallback chains