Every LLM application eventually hits the moment where you need to process not 10 requests, but 10,000. Maybe you’re embedding an entire document corpus, classifying a backlog of support tickets, or generating product descriptions for a catalog. At this scale, sequential processing is not slow — it’s comically impractical.The math makes this concrete: if each LLM call takes 3 seconds and you have 10,000 prompts, sequential processing takes 8.3 hours. With batched concurrent processing (20 workers), that drops to about 25 minutes. With the OpenAI Batch API, you submit the whole thing and come back in a few hours — at 50% the cost.
Single Request Pattern Batch Pattern━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━Request → Wait → Response [Request Pool]Request → Wait → Response ↓Request → Wait → Response Concurrent Workers... ↓Total: N x (latency + overhead) [Response Queue] Total: ceil(N/batch) x latency
OpenAI’s Batch API is a fundamentally different approach: instead of making real-time API calls, you submit a file of requests and OpenAI processes them asynchronously within a 24-hour window. The tradeoff is latency for cost — you get 50% off both input and output tokens.When to use it: anytime you don’t need results immediately. Data pipelines, nightly report generation, bulk content creation, embedding large corpora, evaluation runs — all perfect candidates. When NOT to use it: anything user-facing or time-sensitive.Think of it like the difference between express and standard shipping: same package, same destination, but standard is half the price because the carrier can optimize their route.
import jsonfrom openai import OpenAIfrom pathlib import Pathimport timeclient = OpenAI()def create_batch_request( requests: list[dict], output_path: str = "batch_requests.jsonl") -> str: """Create a JSONL file for batch processing. Each line is a self-contained request with a custom_id for correlating results. The custom_id is critical -- without it you can't match responses back to inputs because OpenAI doesn't guarantee response ordering. """ with open(output_path, 'w') as f: for i, req in enumerate(requests): batch_item = { "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": req.get("model", "gpt-4o-mini"), "messages": req["messages"], "max_tokens": req.get("max_tokens", 1000) } } f.write(json.dumps(batch_item) + "\n") return output_pathdef submit_batch(file_path: str) -> str: """Submit batch job to OpenAI. The two-step process (upload file, then create batch) exists because OpenAI validates the file format during upload. If the JSONL is malformed, you'll get the error here rather than mid-processing. Production tip: Always save the batch_id somewhere persistent (database, file). If your script crashes after submission but before polling, you need the batch_id to retrieve results. """ # Step 1: Upload the JSONL file to OpenAI's file storage batch_file = client.files.create( file=open(file_path, "rb"), purpose="batch" # Must be "batch" -- other purposes won't work ) # Step 2: Create the batch job referencing the uploaded file batch = client.batches.create( input_file_id=batch_file.id, endpoint="/v1/chat/completions", # Only chat completions supported completion_window="24h", # Currently the only option metadata={ "description": "Batch processing job" # Add your own metadata here for tracking: run_id, user, etc. } ) return batch.iddef check_batch_status(batch_id: str) -> dict: """Check status of a batch job""" batch = client.batches.retrieve(batch_id) return { "status": batch.status, "total": batch.request_counts.total, "completed": batch.request_counts.completed, "failed": batch.request_counts.failed }def get_batch_results(batch_id: str) -> list[dict]: """Get results from completed batch""" batch = client.batches.retrieve(batch_id) if batch.status != "completed": raise ValueError(f"Batch not completed. Status: {batch.status}") # Download results result_file = client.files.content(batch.output_file_id) results = [] for line in result_file.text.strip().split("\n"): result = json.loads(line) results.append({ "custom_id": result["custom_id"], "response": result["response"]["body"]["choices"][0]["message"]["content"], "usage": result["response"]["body"]["usage"] }) return results# Complete batch workflowdef run_batch_workflow(prompts: list[str]) -> list[dict]: """Complete batch processing workflow. This is a synchronous polling loop -- fine for scripts and notebooks. For production systems, submit the batch and use a scheduled job (cron, Step Functions, Celery beat) to check status periodically rather than keeping a process alive just to poll. """ # Prepare requests requests = [ {"messages": [{"role": "user", "content": p}]} for p in prompts ] # Create and submit batch file_path = create_batch_request(requests) batch_id = submit_batch(file_path) print(f"Submitted batch: {batch_id}") # Poll for completion -- in practice, batches can take 1-24 hours. # Polling every 60 seconds is a reasonable cadence. while True: status = check_batch_status(batch_id) print(f"Status: {status['status']} - {status['completed']}/{status['total']}") if status["status"] == "completed": break elif status["status"] in ("failed", "expired", "cancelled"): raise Exception(f"Batch {status['status']}") time.sleep(60) return get_batch_results(batch_id)
Batch API gotcha: The 24-hour completion window is not a guarantee — it’s a maximum. Most batches complete much faster (minutes to hours), but don’t build workflows that assume completion in under 24 hours. Also, individual requests within a batch can fail even if the batch succeeds. Always check per-request error fields in the results.
When you can’t wait for the Batch API (need results in minutes, not hours), the real-time concurrent pattern is your tool. This fires multiple requests simultaneously while respecting rate limits — think of it as a controlled firehose.The three knobs you’re tuning: requests per minute (RPM), tokens per minute (TPM), and max concurrent connections. Start conservative and increase. It’s far better to process a batch in 10 minutes at 80% rate utilization than to get rate-limited into a retry spiral that takes 30 minutes.
import asynciofrom asyncio import Semaphorefrom dataclasses import dataclassfrom typing import List, Callable, Anyfrom datetime import datetime, timedeltaimport time@dataclassclass RateLimitConfig: requests_per_minute: int = 60 # Check your API tier for actual limit tokens_per_minute: int = 90000 # Token limit is often the binding constraint max_concurrent: int = 10 # Start here, increase if no errorsclass RateLimiter: """Token bucket rate limiter""" def __init__(self, config: RateLimitConfig): self.config = config self.request_tokens = config.requests_per_minute self.token_tokens = config.tokens_per_minute self.last_update = time.time() self.lock = asyncio.Lock() async def acquire(self, estimated_tokens: int = 100): """Acquire rate limit tokens""" async with self.lock: now = time.time() elapsed = now - self.last_update # Refill tokens refill_rate_req = self.config.requests_per_minute / 60 refill_rate_tok = self.config.tokens_per_minute / 60 self.request_tokens = min( self.config.requests_per_minute, self.request_tokens + elapsed * refill_rate_req ) self.token_tokens = min( self.config.tokens_per_minute, self.token_tokens + elapsed * refill_rate_tok ) self.last_update = now # Wait if needed while self.request_tokens < 1 or self.token_tokens < estimated_tokens: wait_time = max( (1 - self.request_tokens) / refill_rate_req, (estimated_tokens - self.token_tokens) / refill_rate_tok ) await asyncio.sleep(wait_time) elapsed = time.time() - self.last_update self.request_tokens += elapsed * refill_rate_req self.token_tokens += elapsed * refill_rate_tok self.last_update = time.time() # Consume tokens self.request_tokens -= 1 self.token_tokens -= estimated_tokensclass ConcurrentProcessor: """Process requests concurrently with rate limiting. Combines three mechanisms: 1. Rate limiter: controls request speed (RPM/TPM compliance) 2. Semaphore: limits concurrent connections (resource protection) 3. as_completed: yields results as they finish (progress tracking) """ def __init__( self, process_fn: Callable, config: RateLimitConfig = None ): self.process_fn = process_fn self.config = config or RateLimitConfig() self.rate_limiter = RateLimiter(self.config) self.semaphore = Semaphore(self.config.max_concurrent) async def process_one( self, item: Any, index: int, estimated_tokens: int = 100 ) -> tuple[int, Any, Any]: """Process a single item with rate limiting. Returns (index, item, result_or_exception). The index lets us reassemble results in original order even though as_completed yields them out of order. """ # Rate limit BEFORE acquiring semaphore so we don't # hold a concurrency slot while waiting for rate tokens await self.rate_limiter.acquire(estimated_tokens) async with self.semaphore: try: result = await self.process_fn(item) return index, item, result except Exception as e: # Return exception as data, not raised -- one failure # should not kill the entire batch return index, item, e async def process_batch( self, items: List[Any], estimated_tokens_per_item: int = 100, on_progress: Callable[[int, int], None] = None ) -> List[tuple[Any, Any]]: """Process a batch of items with real-time progress. Uses as_completed (not gather) so we can report progress as each item finishes rather than waiting for all items. """ tasks = [ self.process_one(item, i, estimated_tokens_per_item) for i, item in enumerate(items) ] # Pre-allocate results array -- index-based insertion # ensures correct ordering despite out-of-order completion results = [None] * len(items) completed = 0 for coro in asyncio.as_completed(tasks): index, item, result = await coro results[index] = (item, result) completed += 1 if on_progress: on_progress(completed, len(items)) return results# Usageasync def process_prompt(prompt: str) -> str: response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.contentprocessor = ConcurrentProcessor( process_fn=process_prompt, config=RateLimitConfig( requests_per_minute=500, tokens_per_minute=150000, max_concurrent=20 ))prompts = ["Explain topic " + str(i) for i in range(100)]results = await processor.process_batch( prompts, estimated_tokens_per_item=200, on_progress=lambda done, total: print(f"{done}/{total}"))
Tuning max_concurrent: Start at 10-20 and monitor your 429 error rate. If it is 0%, you have headroom to increase. If it exceeds 1%, reduce concurrency. The optimal value depends on your API tier, average prompt size, and time of day (providers throttle harder during US business hours).
For very large volumes (100K+ items), asyncio.gather won’t cut it — you can’t create 100K coroutines and hope for the best. The queue-based pattern decouples “submitting work” from “processing work” using a producer-consumer model with a fixed pool of workers.This is the same pattern that powers every serious data pipeline: RabbitMQ consumers, Celery workers, SQS processors. The async queue version is lighter-weight (no external broker needed) but follows the same principles.
import asynciofrom asyncio import Queuefrom dataclasses import dataclassfrom typing import Optional, Callableimport uuid@dataclassclass Job: """Represents a single unit of work. The result and error fields are populated after processing. Status tracks the job lifecycle for monitoring and debugging. """ id: str input: Any result: Optional[Any] = None error: Optional[Exception] = None status: str = "pending" # pending -> processing -> completed/failedclass JobQueue: """Async job queue for batch processing""" def __init__( self, process_fn: Callable, num_workers: int = 5, rate_limiter: RateLimiter = None ): self.process_fn = process_fn self.num_workers = num_workers self.rate_limiter = rate_limiter self.queue: Queue[Job] = Queue() self.results: dict[str, Job] = {} self.workers: list[asyncio.Task] = [] self.running = False async def worker(self, worker_id: int): """Worker coroutine -- runs in a loop pulling jobs from the queue. The 1-second timeout on queue.get() ensures the worker checks self.running periodically and can shut down cleanly when stop() is called, rather than blocking forever on an empty queue. """ while self.running: try: job = await asyncio.wait_for( self.queue.get(), timeout=1.0 ) except asyncio.TimeoutError: continue # Check self.running flag, then try again try: job.status = "processing" # Rate limit if configured -- the worker waits here # until the rate limiter grants permission if self.rate_limiter: await self.rate_limiter.acquire() job.result = await self.process_fn(job.input) job.status = "completed" except Exception as e: job.error = e job.status = "failed" finally: # task_done() signals the queue that this item is processed, # which unblocks queue.join() when all items are done self.queue.task_done() async def start(self): """Start worker pool -- creates num_workers background tasks.""" self.running = True self.workers = [ asyncio.create_task(self.worker(i)) for i in range(self.num_workers) ] async def stop(self): """Stop worker pool gracefully. Sets running=False so workers exit their loop after finishing the current job. return_exceptions=True prevents one worker's error from masking others during shutdown. """ self.running = False await asyncio.gather(*self.workers, return_exceptions=True) async def submit(self, input: Any) -> str: """Submit a job""" job_id = str(uuid.uuid4()) job = Job(id=job_id, input=input) self.results[job_id] = job await self.queue.put(job) return job_id async def submit_batch(self, inputs: list[Any]) -> list[str]: """Submit multiple jobs""" return [await self.submit(inp) for inp in inputs] def get_result(self, job_id: str) -> Optional[Job]: """Get job result""" return self.results.get(job_id) async def wait_for_completion(self): """Wait for all jobs to complete""" await self.queue.join()# Usage with context managerclass BatchProcessor: def __init__(self, process_fn: Callable, **kwargs): self.queue = JobQueue(process_fn, **kwargs) async def __aenter__(self): await self.queue.start() return self async def __aexit__(self, *args): await self.queue.stop() async def process_all(self, inputs: list[Any]) -> list[Any]: job_ids = await self.queue.submit_batch(inputs) await self.queue.wait_for_completion() results = [] for job_id in job_ids: job = self.queue.get_result(job_id) if job.error: results.append(job.error) else: results.append(job.result) return results# Usageasync with BatchProcessor( process_prompt, num_workers=10, rate_limiter=RateLimiter(RateLimitConfig(requests_per_minute=100))) as processor: results = await processor.process_all(prompts)
Memory trap with large batches: The results dict in JobQueue stores every job object in memory. For 100K+ items with large responses, this can consume several GB. For truly large workloads, write results to disk or a database as they complete rather than accumulating them in memory. A good pattern is to pass a result_callback to the worker that persists each result immediately.
When datasets get really large (10K+ items), processing everything as one giant batch creates problems: no progress visibility, no recovery from mid-batch failures, and potential memory issues from holding all results in memory. Chunking solves this by breaking the dataset into manageable pieces and processing each chunk independently.The inter-chunk delay is a deliberate breather. It gives the API provider time to recover between bursts and prevents your rate limiter from drifting (accumulated rounding errors can cause brief rate limit violations at chunk boundaries).
from typing import Iterator, List, Anyimport mathdef chunk_list(lst: List[Any], chunk_size: int) -> Iterator[List[Any]]: """Split list into chunks. Uses a generator so we don't create N sub-lists in memory upfront. """ for i in range(0, len(lst), chunk_size): yield lst[i:i + chunk_size]class ChunkedProcessor: """Process large datasets in chunks. Each chunk is an independent batch that can succeed or fail without affecting other chunks. This gives you natural checkpointing boundaries. """ def __init__( self, processor: ConcurrentProcessor, chunk_size: int = 100, delay_between_chunks: float = 0 ): self.processor = processor self.chunk_size = chunk_size self.delay = delay_between_chunks async def process( self, items: List[Any], on_chunk_complete: Callable[[int, List], None] = None ) -> List[Any]: """Process all items in chunks. Each chunk is fully processed before the next one starts. This gives you natural checkpointing boundaries and prevents memory from growing unboundedly. """ all_results = [] total_chunks = math.ceil(len(items) / self.chunk_size) for i, chunk in enumerate(chunk_list(items, self.chunk_size)): print(f"Processing chunk {i+1}/{total_chunks}") chunk_results = await self.processor.process_batch(chunk) all_results.extend(chunk_results) # Callback for per-chunk operations: save to disk, # update a progress database, send a Slack notification, etc. if on_chunk_complete: on_chunk_complete(i, chunk_results) # Deliberate breather between chunks -- lets the API provider # recover and prevents rate limiter drift at chunk boundaries if self.delay and i < total_chunks - 1: await asyncio.sleep(self.delay) return all_results# Usage for very large datasetschunked = ChunkedProcessor( processor=ConcurrentProcessor(process_prompt), chunk_size=100, delay_between_chunks=5.0 # 5 second pause between chunks)# Process 10,000 itemslarge_dataset = [f"Process item {i}" for i in range(10000)]results = await chunked.process(large_dataset)
Choosing chunk_size: Smaller chunks (50-100) give finer progress granularity and faster recovery from crashes but add overhead from inter-chunk delays. Larger chunks (500-1000) are more efficient but mean more lost progress on failure. A good heuristic: set chunk_size so each chunk takes 1-5 minutes to process. This balances efficiency with acceptable data loss on crash.
Checkpointing is the difference between “my script crashed at item 7,500 and I have to start over” and “my script crashed at item 7,500 and resumed from exactly where it left off.” For any batch job that takes more than 10 minutes, checkpointing is not optional — it’s a basic production requirement.The pattern is simple: periodically serialize your progress to disk. On restart, check if a checkpoint exists and resume from the last saved position. The tricky part is making this atomic — a crash during checkpoint writing shouldn’t corrupt your state.
import jsonfrom pathlib import Pathfrom dataclasses import dataclass, asdictfrom typing import Optionalfrom datetime import datetime@dataclassclass ProcessingState: """Complete state of a batch processing run. Everything needed to resume from a crash: what's done, what failed, and where to pick up. """ total_items: int processed_items: int failed_items: int last_processed_index: int start_time: str results: list errors: listclass CheckpointedProcessor: """Process with checkpointing for crash recovery. Saves state every N items so you never lose more than N items of progress on a crash. The checkpoint file is the source of truth for resumption. """ def __init__( self, processor: ConcurrentProcessor, checkpoint_path: str = "checkpoint.json", checkpoint_interval: int = 50 ): self.processor = processor self.checkpoint_path = Path(checkpoint_path) self.checkpoint_interval = checkpoint_interval self.state: Optional[ProcessingState] = None def load_checkpoint(self) -> Optional[ProcessingState]: """Load existing checkpoint""" if self.checkpoint_path.exists(): with open(self.checkpoint_path) as f: data = json.load(f) return ProcessingState(**data) return None def save_checkpoint(self): """Save current state to disk. Production improvement: write to a temp file first, then rename atomically. This prevents a crash during write from corrupting the checkpoint. Example: tmp = self.checkpoint_path.with_suffix('.tmp') tmp.write_text(json.dumps(asdict(self.state))) tmp.rename(self.checkpoint_path) """ if self.state: with open(self.checkpoint_path, 'w') as f: json.dump(asdict(self.state), f) def clear_checkpoint(self): """Remove checkpoint file""" if self.checkpoint_path.exists(): self.checkpoint_path.unlink() async def process_with_checkpoints( self, items: list, resume: bool = True ) -> ProcessingState: """Process items with checkpointing""" # Load or create state if resume: self.state = self.load_checkpoint() if self.state is None: self.state = ProcessingState( total_items=len(items), processed_items=0, failed_items=0, last_processed_index=-1, start_time=datetime.now().isoformat(), results=[None] * len(items), errors=[] ) # Resume from checkpoint start_index = self.state.last_processed_index + 1 remaining = items[start_index:] if not remaining: print("All items already processed") return self.state print(f"Processing {len(remaining)} remaining items...") # Process in batches with checkpoints for i, chunk in enumerate(chunk_list(remaining, self.checkpoint_interval)): chunk_start = start_index + (i * self.checkpoint_interval) results = await self.processor.process_batch(chunk) # Update state for j, (item, result) in enumerate(results): idx = chunk_start + j if isinstance(result, Exception): self.state.failed_items += 1 self.state.errors.append({ "index": idx, "error": str(result) }) else: self.state.results[idx] = result self.state.processed_items += 1 self.state.last_processed_index = idx # Save checkpoint self.save_checkpoint() print(f"Checkpoint saved: {self.state.processed_items}/{self.state.total_items}") # Clear checkpoint on success if self.state.failed_items == 0: self.clear_checkpoint() return self.state# Usagecheckpointed = CheckpointedProcessor( processor=ConcurrentProcessor(process_prompt), checkpoint_path="my_batch_checkpoint.json", checkpoint_interval=100)# First runstate = await checkpointed.process_with_checkpoints(large_dataset)# If interrupted, resumestate = await checkpointed.process_with_checkpoints(large_dataset, resume=True)
Checkpoint file size: The results list in ProcessingState stores all results in the checkpoint file. For 100K items with 500-character responses each, that is a ~50MB JSON file. For larger workloads, store results in a separate file or database and only keep the index and metadata in the checkpoint. This also makes checkpoint writes faster, reducing the crash-during-write risk window.
Never run a large batch without estimating cost first. This sounds obvious, but the number of teams that have accidentally burned 5,000onabatchjobtheyexpectedtocost50 is staggering. The most common mistake: forgetting that output tokens cost 2-5x more than input tokens, and underestimating average output length.Always run a small sample (50-100 items) first to measure actual token usage, then extrapolate. The estimate below uses character-count heuristics, which are accurate to within ~20% for English text.
from dataclasses import dataclass@dataclassclass TokenPricing: input_per_1k: float # Cost per 1,000 input tokens output_per_1k: float # Cost per 1,000 output tokens (usually 2-5x more)# Pricing as of early 2025 -- always verify current pricing before large runsPRICING = { "gpt-4o": TokenPricing(0.0025, 0.010), "gpt-4o-mini": TokenPricing(0.00015, 0.0006), "gpt-4o-batch": TokenPricing(0.00125, 0.005), # 50% off real-time "claude-3-5-sonnet": TokenPricing(0.003, 0.015)}class CostEstimator: """Estimate batch processing costs before committing. Run this BEFORE every large batch. Compare models to find the sweet spot between cost and quality for your use case. """ def __init__(self, model: str = "gpt-4o-mini"): self.pricing = PRICING.get(model, PRICING["gpt-4o-mini"]) def estimate_tokens( self, text: str, chars_per_token: float = 4.0 ) -> int: """Rough token estimation. The 4 chars/token heuristic is ~80% accurate for English. For precise counts, use: tiktoken.encoding_for_model(model).encode(text) The tiktoken approach is 10-20x slower but exact. """ return int(len(text) / chars_per_token) def estimate_batch_cost( self, prompts: list[str], avg_output_tokens: int = 500 ) -> dict: """Estimate cost for batch""" total_input_tokens = sum( self.estimate_tokens(p) for p in prompts ) total_output_tokens = len(prompts) * avg_output_tokens input_cost = (total_input_tokens / 1000) * self.pricing.input_per_1k output_cost = (total_output_tokens / 1000) * self.pricing.output_per_1k return { "input_tokens": total_input_tokens, "output_tokens": total_output_tokens, "input_cost": round(input_cost, 4), "output_cost": round(output_cost, 4), "total_cost": round(input_cost + output_cost, 4) } def compare_models( self, prompts: list[str], avg_output_tokens: int = 500 ) -> dict: """Compare costs across models. Run this before every large batch. The difference between gpt-4o and gpt-4o-mini can be 15-20x for the same workload. If quality is acceptable with the cheaper model, the savings are enormous at scale. """ results = {} for model, pricing in PRICING.items(): self.pricing = pricing results[model] = self.estimate_batch_cost(prompts, avg_output_tokens) return results# Usageestimator = CostEstimator("gpt-4o-mini")prompts = ["Analyze this data: " + str(i) for i in range(1000)]estimate = estimator.estimate_batch_cost(prompts, avg_output_tokens=300)print(f"Estimated cost: ${estimate['total_cost']}")print(f"Input tokens: {estimate['input_tokens']:,}")print(f"Output tokens: {estimate['output_tokens']:,}")# Compare all modelscomparison = estimator.compare_models(prompts)for model, cost in comparison.items(): print(f"{model}: ${cost['total_cost']:.4f}")
The real cost trap is output tokens. Input pricing gets all the attention, but output tokens cost 2-7x more depending on the model. A batch job where you expect 100-token responses but the model averages 800 tokens (because your prompt does not constrain output length) can cost 8x your estimate. Always run a 50-100 item sample first and measure actual output lengths before committing to a full run.
Your batch processing job of 100K LLM requests fails at item 67,000 due to a provider outage. How do you design for this?
What interviewers are testing: Resilience engineering and operational maturity for long-running data pipelines.Strong answer: This is a checkpointing and idempotency problem. The system needs three things: progress persistence, safe resumption, and deduplication.First, checkpoint every N items (100-500 is a good range). Each checkpoint writes the last successfully processed index and all results so far to a durable store (local file, S3, or database). The checkpoint write itself should be atomic — write to a temp file, then rename — so a crash during checkpointing doesn’t corrupt state.Second, on restart, load the checkpoint and resume from last_processed_index + 1. The items before that index are already done, so skip them. The items after that index need processing. Items in the “gap” (between last checkpoint and the crash point) might or might not have been processed — so you need idempotency.Third, for idempotency: use deterministic IDs for each request (e.g., hash of the prompt + index). Before processing an item, check if a result already exists for that ID. This makes retries safe — processing the same item twice produces the same result entry, not a duplicate.The provider outage itself should trigger a circuit breaker: after 5-10 consecutive failures, pause processing and wait with exponential backoff before probing again. Don’t burn through your retry budget on 33,000 remaining items when the provider is down.Cost protection: track cumulative spend during the batch. If it exceeds a configurable threshold (say, 150% of estimated cost), halt and alert. Provider outages can sometimes cause partial responses that still incur charges.
Compare the OpenAI Batch API vs. self-managed concurrent processing. When would you choose each?
What interviewers are testing: Understanding of cost-latency tradeoffs and operational complexity in real production systems.Strong answer: The decision comes down to four factors: latency requirements, cost sensitivity, operational complexity tolerance, and control needs.OpenAI Batch API: 50% cost savings, up to 24-hour completion window, zero infrastructure to manage, no rate limit concerns (OpenAI manages scheduling internally). Choose this for: nightly data pipelines, evaluation runs, content generation backlogs, embedding large corpora — anything where “done by tomorrow morning” is fast enough.Self-managed concurrent processing: Results in minutes instead of hours, full control over retry logic and error handling, ability to use multiple providers with fallback, real-time progress visibility. Choose this for: time-sensitive batch jobs (process uploaded files within 30 minutes), workflows that need partial results quickly, multi-provider strategies, or when you need custom logic like filtering mid-batch based on intermediate results.The hybrid approach often wins: use self-managed concurrency for the urgent 20% of items (new uploads, high-priority customers) and route the remaining 80% through the Batch API for cost savings. This requires a routing layer that classifies items by urgency and dispatches them to the appropriate processing path.One often-overlooked factor: the Batch API doesn’t support streaming, function calling, or vision inputs in all configurations. Verify that your specific use case is supported before committing to it for a production pipeline.
You need to process 1 million documents through an LLM for classification. The total estimated cost is $15,000. How do you approach this?
What interviewers are testing: End-to-end thinking about large-scale ML data pipelines, cost optimization, and risk management.Strong answer: At this scale, you’re operating more like a data engineering project than a simple script. The approach has four phases.Phase 1 — Validate and sample: Process a random sample of 500 documents first. Measure actual token usage (not estimates), quality of results, failure rate, and per-item cost. Extrapolate to confirm the 15Kestimate.Ifqualityisborderline,thisiswhereyoutunethepromptbeforeburning15K on bad results.Phase 2 — Optimize cost: Can you use a cheaper model? If gpt-4o-mini gives 95% of gpt-4o’s classification accuracy, you’ve just cut costs by 80%. Can you shorten prompts? Every token in the system message gets multiplied by 1M. Can you use the Batch API? That’s 7,500insteadof15,000. Can you pre-filter obviously classifiable documents with a rules-based system and only send ambiguous ones to the LLM? That might eliminate 40% of calls.Phase 3 — Execute with guardrails: Split into daily batches of ~100K (spreading cost over 10 days reduces blast radius if something goes wrong). Set daily cost limits. Checkpoint every 1,000 items. Run quality spot-checks on each day’s output before proceeding to the next batch. Use the Batch API for the bulk and concurrent processing only for rush items.Phase 4 — Long-term: If this is recurring, train a fine-tuned model or a traditional ML classifier on the LLM-labeled data. A fine-tuned gpt-4o-mini is much cheaper per call, and a distilled BERT classifier running on your own hardware costs essentially nothing at inference time. The $15K LLM run becomes your labeling step, not your production system.The meta-point: at $15K, this needs a project plan, not just a script. Stakeholder buy-in on cost, rollback plans, quality gates, and a path to cost reduction over time.
How would you implement a cost-aware batch processor that automatically switches models or pauses when budget is exceeded?
What interviewers are testing: Practical cost governance for AI systems, which is an increasingly critical production concern.Strong answer: Build a cost tracker as a first-class component, not an afterthought.The CostTracker maintains a running total of actual spend (from API response usage fields, not estimates) and compares against configurable thresholds. It exposes three states: green (under 80% of budget), yellow (80-100%, switch to cheaper model), and red (over budget, pause processing).Implementation: After each API call, extract the actual token counts from the response and calculate cost using the model’s pricing. Update the running total atomically (this tracker is shared across concurrent workers). At each threshold crossing, execute the configured action.Actions at yellow threshold: Automatically downgrade remaining items from gpt-4o to gpt-4o-mini. Log the switchover for quality tracking. Alert the team via Slack/PagerDuty so humans can decide whether to increase the budget or accept the quality tradeoff.Actions at red threshold: Stop submitting new items. Let in-flight requests complete (don’t waste already-spent money). Save checkpoint. Send an alert with a summary: items completed, items remaining, actual cost vs. budget, and estimated cost to complete.The nuance most people miss: token estimation before the call is unreliable for output tokens (you don’t know how much the model will generate). So budget tracking must be based on actual usage, not pre-estimates. This means you might slightly overshoot the budget by the cost of in-flight requests when the red threshold triggers. Account for this buffer in your budget planning.