Use this file to discover all available pages before exploring further.
Production LLM applications require handling multiple concurrent requests efficiently. This chapter covers async patterns, rate limiting, and strategies for scaling LLM operations — the same infrastructure that separates toy demos from systems handling real traffic.The core insight is this: LLM API calls are I/O-bound, not CPU-bound. While you’re waiting 2-15 seconds for a model to generate a response, your CPU is doing nothing. Async programming lets you fire off dozens of those calls simultaneously, turning sequential 2-minute workflows into 15-second parallel bursts. If you’re processing 1,000 prompts sequentially at 3 seconds each, that’s 50 minutes. With async concurrency of 20, it’s about 2.5 minutes — same cost, same results, 20x faster.Think of it like ordering food for a large group: you don’t wait for person #1’s meal to arrive before ordering for person #2. You send all the orders to the kitchen at once and let them come back as they’re ready.
import asynciofrom openai import AsyncOpenAIasync def async_completion(prompt: str) -> str: """Make an async completion request. Note: We use AsyncOpenAI (not OpenAI) because the synchronous client blocks the entire event loop during the HTTP call, preventing any other coroutines from making progress. """ # Creating the client inside the function works for demos, # but in production, share one client across calls to reuse # the underlying HTTP connection pool. client = AsyncOpenAI() response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.contentasync def main(): result = await async_completion("Explain async programming.") print(result)# asyncio.run() creates an event loop, runs main(), then cleans up.# Never call this inside an already-running loop (e.g., Jupyter notebooks# use 'await main()' instead).asyncio.run(main())
asyncio.gather() is the workhorse for firing multiple LLM calls simultaneously. It takes a list of coroutines, runs them all concurrently, and returns results in the same order as the input. This is the difference between “ask 4 questions one at a time” and “ask all 4 at once.”Critical production warning: gather() with no concurrency limit will fire ALL requests simultaneously. If you have 1,000 prompts, that’s 1,000 concurrent API calls — and you’ll hit rate limits instantly. Always pair gather() with a semaphore (covered below) for production workloads.
import asynciofrom openai import AsyncOpenAIfrom dataclasses import dataclass@dataclassclass CompletionResult: """Result from an async completion. Wrapping results in a dataclass lets us track success/failure per-item rather than having one failure blow up the entire batch. """ prompt: str response: str success: bool error: str = Noneasync def process_prompt( client: AsyncOpenAI, prompt: str) -> CompletionResult: """Process a single prompt. Notice: we catch exceptions and return them as data rather than letting them propagate. This is intentional -- with gather(), one unhandled exception would cancel or mask results from other tasks. """ try: response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return CompletionResult( prompt=prompt, response=response.choices[0].message.content, success=True ) except Exception as e: return CompletionResult( prompt=prompt, response="", success=False, error=str(e) )async def process_batch(prompts: list[str]) -> list[CompletionResult]: """Process multiple prompts in parallel. All prompts fire concurrently. For 4 prompts at 3 seconds each, total wall time is ~3 seconds (not 12). """ # Share one client instance across all tasks -- this reuses the # underlying HTTP connection pool (httpx), which is much more # efficient than creating a new TLS connection per request. client = AsyncOpenAI() tasks = [process_prompt(client, prompt) for prompt in prompts] results = await asyncio.gather(*tasks) return results# Usageprompts = [ "Summarize machine learning in one sentence.", "What is deep learning?", "Explain neural networks briefly.", "Define natural language processing.",]results = asyncio.run(process_batch(prompts))for result in results: if result.success: print(f"Q: {result.prompt[:30]}...") print(f"A: {result.response[:100]}...\n") else: print(f"Error: {result.error}")
Without rate limiting, your async code will happily fire thousands of concurrent requests in milliseconds — and the API provider will just as happily reject most of them with 429 (Too Many Requests) errors. Rate limiting is not optional; it’s the difference between “10x faster batch processing” and “10x faster at generating error responses.”
The token bucket is the gold standard algorithm for API rate limiting. The analogy: imagine a bucket that holds tokens. Tokens drip in at a steady rate (say, 1 per second). Each API call costs one token. If the bucket is empty, you wait. If it’s full, you can burst up to the bucket’s capacity.This is better than a simple “N requests per second” limit because it allows natural bursting: a user who was idle for 10 seconds has accumulated tokens and can fire a quick burst — which matches how humans actually use APIs.
import asyncioimport timefrom dataclasses import dataclass, field@dataclassclass TokenBucket: """Token bucket rate limiter. Think of this as a leaky bucket in reverse: tokens drip IN at a constant rate, and each request drains tokens OUT. """ capacity: float # Max tokens the bucket can hold (burst size) refill_rate: float # Tokens added per second (sustained rate) tokens: float = field(init=False) last_refill: float = field(init=False) _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) def __post_init__(self): self.tokens = self.capacity self.last_refill = time.monotonic() async def acquire(self, tokens: float = 1) -> float: """Acquire tokens, waiting if necessary. Returns wait time.""" async with self._lock: await self._refill() if self.tokens >= tokens: self.tokens -= tokens return 0 # Calculate wait time tokens_needed = tokens - self.tokens wait_time = tokens_needed / self.refill_rate await asyncio.sleep(wait_time) await self._refill() self.tokens -= tokens return wait_time async def _refill(self): """Refill tokens based on elapsed time.""" now = time.monotonic() elapsed = now - self.last_refill self.tokens = min( self.capacity, self.tokens + elapsed * self.refill_rate ) self.last_refill = nowclass RateLimitedClient: """OpenAI client with rate limiting. LLM APIs enforce limits on TWO dimensions: request count and token count. You can hit the token limit with few large requests or the request limit with many small ones. This client guards against both. """ def __init__( self, requests_per_minute: int = 60, tokens_per_minute: int = 90000 ): from openai import AsyncOpenAI self.client = AsyncOpenAI() # Two separate rate limiters -- you must pass both gates # before a request is allowed through self.request_limiter = TokenBucket( capacity=requests_per_minute, refill_rate=requests_per_minute / 60 # Convert to per-second ) self.token_limiter = TokenBucket( capacity=tokens_per_minute, refill_rate=tokens_per_minute / 60 ) async def complete( self, messages: list[dict], model: str = "gpt-4o-mini", estimated_tokens: int = 500 ) -> str: """Make a rate-limited completion request.""" # Acquire rate limit tokens await self.request_limiter.acquire(1) await self.token_limiter.acquire(estimated_tokens) response = await self.client.chat.completions.create( model=model, messages=messages ) return response.choices[0].message.content# Usageasync def main(): client = RateLimitedClient( requests_per_minute=60, tokens_per_minute=90000 ) prompts = [f"Count to {i}" for i in range(1, 21)] async def process(prompt): return await client.complete([{"role": "user", "content": prompt}]) results = await asyncio.gather(*[process(p) for p in prompts]) print(f"Processed {len(results)} requests")asyncio.run(main())
The sliding window approach tracks actual request timestamps rather than maintaining abstract token counts. It’s conceptually simpler and gives you exact enforcement: “no more than N requests in any rolling T-second window.” The tradeoff versus token bucket is that it doesn’t allow bursting — the 11th request in a 10-request window always waits, even if the previous 10 requests happened over a long period.
import asyncioimport timefrom collections import dequefrom dataclasses import dataclass, field@dataclassclass SlidingWindowLimiter: """Sliding window rate limiter. Uses a deque of timestamps to track when requests were made. Old timestamps outside the window are pruned on each check. Memory usage is bounded by max_requests (one timestamp per request). """ max_requests: int window_seconds: float timestamps: deque = field(default_factory=deque) _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) async def acquire(self) -> float: """Acquire permission to make a request. Returns the time spent waiting (0 if no wait was needed). Useful for monitoring how much throttling is occurring. """ async with self._lock: now = time.monotonic() # Evict timestamps that have fallen outside the window. # popleft() is O(1) on deque, so this stays fast. cutoff = now - self.window_seconds while self.timestamps and self.timestamps[0] < cutoff: self.timestamps.popleft() if len(self.timestamps) < self.max_requests: self.timestamps.append(now) return 0 # Window is full -- wait until the oldest entry expires oldest = self.timestamps[0] wait_time = oldest + self.window_seconds - now if wait_time > 0: await asyncio.sleep(wait_time) # After waiting, re-clean (other requests may have completed) now = time.monotonic() cutoff = now - self.window_seconds while self.timestamps and self.timestamps[0] < cutoff: self.timestamps.popleft() self.timestamps.append(now) return wait_time# Usagelimiter = SlidingWindowLimiter(max_requests=10, window_seconds=1.0)async def rate_limited_request(i: int): wait = await limiter.acquire() if wait > 0: print(f"Request {i} waited {wait:.2f}s") print(f"Request {i} executing") return iasync def main(): tasks = [rate_limited_request(i) for i in range(20)] await asyncio.gather(*tasks)asyncio.run(main())
When to use which rate limiter: Token bucket is the default choice for most LLM workloads — it handles bursty traffic gracefully and composes well with dual RPM/TPM limits. Use sliding window only when you need strict guarantees (e.g., “never more than exactly 10 requests in any 1-second window”) or when auditing requires exact request-level timestamps.
When an LLM API returns a rate limit error or a transient server error, the worst thing you can do is immediately retry. If 100 of your concurrent requests all get rate-limited at the same time and all retry after exactly 1 second, you’ve just created a “thundering herd” that hits the API with 100 simultaneous requests again — and gets rate-limited again. The cycle repeats.Exponential backoff with jitter solves this by spreading retries over time. Each successive retry waits longer (exponential), and each wait is randomized within a range (jitter). The result: your 100 retries spread themselves naturally across a 30-second window instead of slamming the API all at once.
Static rate limits are a guess. Adaptive rate limiting observes how the API actually responds and adjusts in real-time. When requests succeed, it cautiously increases the rate. When it gets rate-limited, it aggressively backs off. Think of it like a driver adjusting speed based on traffic conditions rather than always driving at exactly 60mph.This pattern is especially valuable when you don’t know the exact rate limit (many providers don’t publish them), when limits change based on server load, or when you’re sharing a rate limit pool with other tenants in your organization.
import asyncioimport timefrom dataclasses import dataclass, field@dataclassclass AdaptiveRateLimiter: """Rate limiter that adapts based on API responses. The asymmetry between increase and decrease is intentional: we increase slowly (1.1x) but decrease fast (0.5x). This is the AIMD pattern (Additive Increase, Multiplicative Decrease) borrowed from TCP congestion control -- it converges quickly to the actual limit without oscillating. """ initial_rate: float # Requests per second min_rate: float = 0.1 max_rate: float = 100.0 increase_factor: float = 1.1 decrease_factor: float = 0.5 current_rate: float = field(init=False) last_request: float = field(init=False) _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) def __post_init__(self): self.current_rate = self.initial_rate self.last_request = 0 async def acquire(self): """Wait for rate limit slot.""" async with self._lock: now = time.monotonic() min_interval = 1.0 / self.current_rate elapsed = now - self.last_request if elapsed < min_interval: await asyncio.sleep(min_interval - elapsed) self.last_request = time.monotonic() def on_success(self): """Increase rate on successful request.""" self.current_rate = min( self.current_rate * self.increase_factor, self.max_rate ) def on_rate_limit(self): """Decrease rate on rate limit error.""" self.current_rate = max( self.current_rate * self.decrease_factor, self.min_rate ) print(f"Rate limited. Reducing to {self.current_rate:.2f} req/s") def on_error(self): """Slightly decrease rate on other errors.""" self.current_rate = max( self.current_rate * 0.9, self.min_rate )class AdaptiveClient: """OpenAI client with adaptive rate limiting.""" def __init__(self, initial_rate: float = 10.0): from openai import AsyncOpenAI self.client = AsyncOpenAI() self.limiter = AdaptiveRateLimiter(initial_rate=initial_rate) async def complete(self, prompt: str) -> str: """Make a completion with adaptive rate limiting.""" await self.limiter.acquire() try: response = await self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) self.limiter.on_success() return response.choices[0].message.content except RateLimitError: self.limiter.on_rate_limit() raise except Exception: self.limiter.on_error() raise
Adaptive rate limiters have a cold-start problem. When your application restarts, the limiter has no memory of the true rate limit and starts conservative. If you know your API tier limits, set initial_rate close to the actual limit divided by your expected concurrency to minimize ramp-up time. Also, log current_rate to your metrics dashboard — the rate over time graph is one of the best debugging tools for throughput issues.
A semaphore is the simplest and most reliable way to cap concurrency. If rate limiting controls “how fast” you make requests, semaphores control “how many at once.” Even with perfect rate limiting, having 500 concurrent HTTP connections can exhaust memory, file descriptors, or connection pool limits.The rule of thumb for LLM APIs: start with max_concurrent=10 and increase until you start seeing rate limit errors or degraded latency. Most providers handle 10-50 concurrent connections per API key without issue.
import asynciofrom openai import AsyncOpenAIfrom dataclasses import dataclass@dataclassclass ConcurrencyController: """Control concurrent request limits. Uses an asyncio.Semaphore as a gate: only max_concurrent coroutines can hold the semaphore at once. The rest queue up and wait their turn. """ max_concurrent: int = 10 def __post_init__(self): self.semaphore = asyncio.Semaphore(self.max_concurrent) self.active_count = 0 # For monitoring -- how many are in-flight? async def __aenter__(self): await self.semaphore.acquire() self.active_count += 1 return self async def __aexit__(self, *args): self.active_count -= 1 self.semaphore.release()class ConcurrentBatchProcessor: """Process batches with controlled concurrency.""" def __init__(self, max_concurrent: int = 10): self.client = AsyncOpenAI() self.controller = ConcurrencyController(max_concurrent) async def process_item(self, item: str) -> dict: """Process a single item with concurrency control.""" async with self.controller: try: response = await self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": item}] ) return { "input": item, "output": response.choices[0].message.content, "success": True } except Exception as e: return { "input": item, "output": None, "success": False, "error": str(e) } async def process_batch( self, items: list[str], progress_callback: callable = None ) -> list[dict]: """Process a batch of items.""" results = [] completed = 0 async def process_with_progress(item): nonlocal completed result = await self.process_item(item) completed += 1 if progress_callback: progress_callback(completed, len(items)) return result tasks = [process_with_progress(item) for item in items] results = await asyncio.gather(*tasks) return results# Usageasync def main(): processor = ConcurrentBatchProcessor(max_concurrent=5) items = [f"Summarize the number {i}" for i in range(20)] def on_progress(done, total): print(f"Progress: {done}/{total}") results = await processor.process_batch(items, on_progress) successful = sum(1 for r in results if r["success"]) print(f"Completed: {successful}/{len(items)} successful")asyncio.run(main())
Not all LLM requests are created equal. A real-time chat response should jump ahead of a background summarization job. A paying customer’s request should take priority over a free tier user’s batch job. Priority queues let you enforce these business rules at the infrastructure level.The pattern is simple: instead of a FIFO queue, use a min-heap where lower priority numbers go first. Within the same priority level, requests are processed in FIFO order (using a monotonic sequence number as a tiebreaker).
import asynciofrom dataclasses import dataclass, fieldfrom typing import Any, Awaitable, Callablefrom enum import IntEnumimport heapqclass Priority(IntEnum): """Lower number = higher priority (processed first). Map these to your business tiers: - HIGH: Real-time user-facing requests (chat, autocomplete) - NORMAL: Standard API requests - LOW: Background jobs (batch processing, analytics) """ HIGH = 0 NORMAL = 1 LOW = 2@dataclass(order=True)class QueuedRequest: """A request in the priority queue.""" priority: Priority sequence: int # For FIFO ordering within same priority request: Any = field(compare=False) future: asyncio.Future = field(compare=False)class PriorityRequestQueue: """Process LLM requests with priority ordering.""" def __init__( self, processor: Callable[[Any], Awaitable[Any]], max_concurrent: int = 5 ): self.processor = processor self.semaphore = asyncio.Semaphore(max_concurrent) self.queue: list[QueuedRequest] = [] self.sequence = 0 self._lock = asyncio.Lock() self._processing = False async def submit( self, request: Any, priority: Priority = Priority.NORMAL ) -> Any: """Submit a request and wait for result.""" future = asyncio.Future() async with self._lock: queued = QueuedRequest( priority=priority, sequence=self.sequence, request=request, future=future ) self.sequence += 1 heapq.heappush(self.queue, queued) # Start processing if not already running if not self._processing: asyncio.create_task(self._process_queue()) return await future async def _process_queue(self): """Process requests from the queue.""" self._processing = True while True: async with self._lock: if not self.queue: self._processing = False return queued = heapq.heappop(self.queue) async with self.semaphore: try: result = await self.processor(queued.request) queued.future.set_result(result) except Exception as e: queued.future.set_exception(e)# Usagefrom openai import AsyncOpenAIasync def main(): client = AsyncOpenAI() async def process_request(prompt: str) -> str: response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}] ) return response.choices[0].message.content queue = PriorityRequestQueue(process_request, max_concurrent=3) # Submit requests with different priorities tasks = [ queue.submit("Low priority task 1", Priority.LOW), queue.submit("High priority task", Priority.HIGH), queue.submit("Normal task 1", Priority.NORMAL), queue.submit("Low priority task 2", Priority.LOW), queue.submit("Normal task 2", Priority.NORMAL), ] results = await asyncio.gather(*tasks) for result in results: print(result[:50] + "...")asyncio.run(main())
Priority queue anti-pattern: Do not use priorities to implement fairness across users. If user A submits 10,000 LOW-priority items and user B submits 1 HIGH-priority item, user B gets served first — but that is not multi-tenant fairness, it is priority scheduling. For multi-tenant fairness, use weighted fair queuing (round-robin across tenants with configurable weights) rather than absolute priority classes.
For production batch processing, you want session-level resource management and statistics. The context manager pattern (async with) guarantees cleanup happens even when exceptions occur — no leaked connections, no orphaned tasks.This is the pattern to use when you need to answer questions like “how many tokens did this batch consume?” or “what was the p99 latency for this run?” — questions that matter when your monthly LLM bill has commas in it.
import asynciofrom openai import AsyncOpenAIfrom dataclasses import dataclass, fieldfrom typing import Optionalimport time@dataclassclass SessionStats: """Statistics for an async session. Track these metrics for every batch run. They tell you whether your rate limiting is too aggressive (low throughput), whether your prompts are too long (high token counts), and whether a provider is degraded (high latency, low success rate). """ requests: int = 0 successful: int = 0 failed: int = 0 total_tokens: int = 0 total_latency: float = 0.0 @property def avg_latency(self) -> float: return self.total_latency / self.requests if self.requests else 0 @property def success_rate(self) -> float: return self.successful / self.requests if self.requests else 0class AsyncLLMSession: """Managed async session for LLM operations.""" def __init__( self, max_concurrent: int = 10, timeout: float = 30.0 ): self.client: Optional[AsyncOpenAI] = None self.semaphore = asyncio.Semaphore(max_concurrent) self.timeout = timeout self.stats = SessionStats() async def __aenter__(self): self.client = AsyncOpenAI() self.stats = SessionStats() return self async def __aexit__(self, *args): self.client = None async def complete( self, prompt: str, model: str = "gpt-4o-mini" ) -> str: """Make a completion within the session.""" if not self.client: raise RuntimeError("Session not initialized") async with self.semaphore: self.stats.requests += 1 start = time.monotonic() try: response = await asyncio.wait_for( self.client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}] ), timeout=self.timeout ) self.stats.successful += 1 self.stats.total_tokens += response.usage.total_tokens self.stats.total_latency += time.monotonic() - start return response.choices[0].message.content except Exception as e: self.stats.failed += 1 self.stats.total_latency += time.monotonic() - start raise async def batch_complete( self, prompts: list[str], model: str = "gpt-4o-mini" ) -> list[str]: """Complete multiple prompts.""" tasks = [self.complete(p, model) for p in prompts] return await asyncio.gather(*tasks, return_exceptions=True)# Usageasync def main(): async with AsyncLLMSession(max_concurrent=5) as session: prompts = [f"What is {i} + {i}?" for i in range(10)] results = await session.batch_complete(prompts) for i, result in enumerate(results): if isinstance(result, Exception): print(f"Error {i}: {result}") else: print(f"Result {i}: {result[:30]}...") print(f"\nStats:") print(f" Requests: {session.stats.requests}") print(f" Success rate: {session.stats.success_rate:.1%}") print(f" Avg latency: {session.stats.avg_latency:.2f}s") print(f" Total tokens: {session.stats.total_tokens}")asyncio.run(main())
Async Best Practices for LLMs
Always use connection pooling with async clients
Implement proper timeout handling for all requests
Use semaphores to control maximum concurrent requests
Add jitter to retry delays to prevent thundering herd
Monitor and adapt rate limits based on API responses
You have 50,000 prompts to process through an LLM API with a rate limit of 500 RPM and 200K TPM. Walk through your architecture.
What interviewers are testing: Whether you can design a real batch processing pipeline with concrete numbers, not just describe patterns abstractly.Strong answer: Start with the math. At 500 RPM, sequential processing takes 100 minutes minimum. With an average of 400 tokens per request (prompt + completion), the token limit allows 500 requests/minute too, so requests-per-minute is the binding constraint.Architecture: Use a producer-consumer pattern with an async queue. The producer reads prompts from a file or database in chunks (not all 50K into memory). Workers (20-50 concurrent coroutines gated by a semaphore) pull from the queue, make rate-limited API calls, and write results to an output queue.Rate limiting: Use a token bucket with capacity=500 and refill rate of ~8.3/second. Add a semaphore at 30-50 concurrent connections. The dual limiter ensures you respect both RPM and connection limits.Resilience: Implement checkpointing every 500 items — write (index, result) pairs to a JSONL file so you can resume from the last checkpoint after a crash. Add exponential backoff with jitter for 429 and 5xx errors. After 5 retries, move failed items to a dead letter list for manual inspection.Cost control: Before starting, estimate total cost using tiktoken. For 50K prompts at ~400 tokens each, that’s 20M tokens. At gpt-4o-mini pricing (0.15/1Minput,0.60/1M output), that’s roughly $3-12 depending on output length. Consider the Batch API for 50% savings if 24-hour turnaround is acceptable.Monitoring: Log throughput (requests/sec), error rate, p50/p99 latency, and cumulative cost in real-time. Set an alert if error rate exceeds 5% — that usually indicates a systemic issue, not random failures.
What is the thundering herd problem in the context of LLM API retries, and how do you solve it?
What interviewers are testing: Understanding of distributed systems failure modes and the specific amplification risks with expensive API calls.Strong answer: The thundering herd occurs when many clients experience a failure simultaneously and all retry at the same moment, creating a spike that’s worse than the original load. With LLM APIs, this is especially dangerous because each retry costs real money.Scenario: Your application sends 100 concurrent requests. The provider returns 429 for all of them. If all 100 retry after exactly 1 second, you hit the API with 100 requests again — plus any new organic traffic. This creates a feedback loop where retries cause more rate limits which cause more retries.Solution 1 — Jitter: Add randomness to retry delays. Instead of “retry after 2^n seconds,” use “retry after 2^n * random(0.5, 1.5) seconds.” This spreads retries across the window. Full jitter (random between 0 and max delay) is even more effective than decorrelated jitter.Solution 2 — Circuit breaker: If error rate exceeds a threshold (e.g., 50% of requests failing), stop sending new requests entirely for a cooldown period. This prevents the retry storm from growing. After the cooldown, send a single probe request; if it succeeds, gradually ramp traffic back up.Solution 3 — Adaptive rate limiting with AIMD: On success, increase rate by a small constant. On failure, halve the rate immediately. This converges to the actual available capacity without oscillation — the same algorithm TCP uses for congestion control, and for the same reason.The key insight is that jitter alone isn’t enough at scale. You need all three layers working together: jitter smooths individual retries, circuit breaking prevents cascade, and adaptive limiting finds the new sustainable rate.
How would you implement graceful shutdown for an async LLM processing pipeline that has in-flight requests?
What interviewers are testing: Production engineering maturity around data loss prevention and clean shutdown semantics.Strong answer: Graceful shutdown means: stop accepting new work, finish in-flight work, save state for incomplete work, and exit cleanly. The goal is zero data loss — every prompt should either be fully processed with its result saved, or clearly marked as unprocessed for the next run.Implementation: Register a signal handler for SIGTERM/SIGINT that sets a shutdown flag. The producer stops feeding new items to the queue. Workers finish their current request (you cannot cancel an in-flight LLM call without losing the result and the cost). Set a shutdown timeout (e.g., 60 seconds) — if workers haven’t finished by then, log unfinished items and force exit.For the checkpointing layer, flush the current checkpoint immediately on shutdown signal. Any items that were dequeued but not yet processed go back to “pending” in the checkpoint file. On the next run, the checkpoint loader picks them up automatically.The tricky part is the race condition between “worker finishes request” and “checkpoint flushes.” Use an atomic counter for in-flight requests. The shutdown sequence waits until in-flight count reaches zero (with timeout), then flushes the final checkpoint, then exits.In Kubernetes, configure terminationGracePeriodSeconds to be longer than your longest expected LLM call (60-120 seconds). SIGTERM arrives first; SIGKILL follows after the grace period.
Compare asyncio.gather, asyncio.as_completed, and asyncio.TaskGroup for batch LLM processing. When would you choose each?
What interviewers are testing: Depth of understanding of Python’s async primitives beyond the basic tutorial level.Strong answer: These three serve different use cases for concurrent LLM requests.asyncio.gather(*tasks) runs all tasks concurrently and returns results in input order. Use it when you need results aligned with inputs (e.g., processing a CSV where row N’s result must go in output row N). Downside: you get no results until ALL tasks complete, so one slow response blocks everything.asyncio.as_completed(tasks) yields futures as they finish, regardless of input order. Use it when you want to process results as they arrive — for example, streaming partial progress to a UI, or writing results to a database as soon as each is ready. This is better for user-facing progress bars and for reducing peak memory (you can free each result immediately).asyncio.TaskGroup (Python 3.11+) is the modern replacement for gather() with better error handling. If any task raises an exception, it cancels all remaining tasks and raises an ExceptionGroup. This is the right choice when tasks are interdependent — if one fails, continuing the others is pointless (e.g., a multi-step RAG pipeline where step 2 depends on step 1).For batch LLM processing specifically: use as_completed with a semaphore for large batches (thousands of items) because you can checkpoint results incrementally. Use gather for small parallel operations (3-10 items) where simplicity matters. Use TaskGroup for orchestration workflows where partial completion is meaningless.