Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Production LLM applications require handling multiple concurrent requests efficiently. This chapter covers async patterns, rate limiting, and strategies for scaling LLM operations — the same infrastructure that separates toy demos from systems handling real traffic. The core insight is this: LLM API calls are I/O-bound, not CPU-bound. While you’re waiting 2-15 seconds for a model to generate a response, your CPU is doing nothing. Async programming lets you fire off dozens of those calls simultaneously, turning sequential 2-minute workflows into 15-second parallel bursts. If you’re processing 1,000 prompts sequentially at 3 seconds each, that’s 50 minutes. With async concurrency of 20, it’s about 2.5 minutes — same cost, same results, 20x faster. Think of it like ordering food for a large group: you don’t wait for person #1’s meal to arrive before ordering for person #2. You send all the orders to the kitchen at once and let them come back as they’re ready.

Async Fundamentals

Basic Async LLM Calls

import asyncio
from openai import AsyncOpenAI


async def async_completion(prompt: str) -> str:
    """Make an async completion request.
    
    Note: We use AsyncOpenAI (not OpenAI) because the synchronous
    client blocks the entire event loop during the HTTP call,
    preventing any other coroutines from making progress.
    """
    # Creating the client inside the function works for demos,
    # but in production, share one client across calls to reuse
    # the underlying HTTP connection pool.
    client = AsyncOpenAI()
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content


async def main():
    result = await async_completion("Explain async programming.")
    print(result)


# asyncio.run() creates an event loop, runs main(), then cleans up.
# Never call this inside an already-running loop (e.g., Jupyter notebooks
# use 'await main()' instead).
asyncio.run(main())

Parallel Requests with gather

asyncio.gather() is the workhorse for firing multiple LLM calls simultaneously. It takes a list of coroutines, runs them all concurrently, and returns results in the same order as the input. This is the difference between “ask 4 questions one at a time” and “ask all 4 at once.” Critical production warning: gather() with no concurrency limit will fire ALL requests simultaneously. If you have 1,000 prompts, that’s 1,000 concurrent API calls — and you’ll hit rate limits instantly. Always pair gather() with a semaphore (covered below) for production workloads.
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass


@dataclass
class CompletionResult:
    """Result from an async completion.
    
    Wrapping results in a dataclass lets us track success/failure
    per-item rather than having one failure blow up the entire batch.
    """
    prompt: str
    response: str
    success: bool
    error: str = None


async def process_prompt(
    client: AsyncOpenAI,
    prompt: str
) -> CompletionResult:
    """Process a single prompt.
    
    Notice: we catch exceptions and return them as data rather than
    letting them propagate. This is intentional -- with gather(), one
    unhandled exception would cancel or mask results from other tasks.
    """
    try:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        
        return CompletionResult(
            prompt=prompt,
            response=response.choices[0].message.content,
            success=True
        )
    except Exception as e:
        return CompletionResult(
            prompt=prompt,
            response="",
            success=False,
            error=str(e)
        )


async def process_batch(prompts: list[str]) -> list[CompletionResult]:
    """Process multiple prompts in parallel.
    
    All prompts fire concurrently. For 4 prompts at 3 seconds each,
    total wall time is ~3 seconds (not 12).
    """
    # Share one client instance across all tasks -- this reuses the
    # underlying HTTP connection pool (httpx), which is much more
    # efficient than creating a new TLS connection per request.
    client = AsyncOpenAI()
    
    tasks = [process_prompt(client, prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    
    return results


# Usage
prompts = [
    "Summarize machine learning in one sentence.",
    "What is deep learning?",
    "Explain neural networks briefly.",
    "Define natural language processing.",
]

results = asyncio.run(process_batch(prompts))

for result in results:
    if result.success:
        print(f"Q: {result.prompt[:30]}...")
        print(f"A: {result.response[:100]}...\n")
    else:
        print(f"Error: {result.error}")

Rate Limiting

Without rate limiting, your async code will happily fire thousands of concurrent requests in milliseconds — and the API provider will just as happily reject most of them with 429 (Too Many Requests) errors. Rate limiting is not optional; it’s the difference between “10x faster batch processing” and “10x faster at generating error responses.”

Token Bucket Rate Limiter

The token bucket is the gold standard algorithm for API rate limiting. The analogy: imagine a bucket that holds tokens. Tokens drip in at a steady rate (say, 1 per second). Each API call costs one token. If the bucket is empty, you wait. If it’s full, you can burst up to the bucket’s capacity. This is better than a simple “N requests per second” limit because it allows natural bursting: a user who was idle for 10 seconds has accumulated tokens and can fire a quick burst — which matches how humans actually use APIs.
import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucket:
    """Token bucket rate limiter.
    
    Think of this as a leaky bucket in reverse: tokens drip IN
    at a constant rate, and each request drains tokens OUT.
    """
    
    capacity: float       # Max tokens the bucket can hold (burst size)
    refill_rate: float    # Tokens added per second (sustained rate)
    tokens: float = field(init=False)
    last_refill: float = field(init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()
    
    async def acquire(self, tokens: float = 1) -> float:
        """Acquire tokens, waiting if necessary. Returns wait time."""
        async with self._lock:
            await self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0
            
            # Calculate wait time
            tokens_needed = tokens - self.tokens
            wait_time = tokens_needed / self.refill_rate
            
            await asyncio.sleep(wait_time)
            await self._refill()
            
            self.tokens -= tokens
            return wait_time
    
    async def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now


class RateLimitedClient:
    """OpenAI client with rate limiting.
    
    LLM APIs enforce limits on TWO dimensions: request count and
    token count. You can hit the token limit with few large requests
    or the request limit with many small ones. This client guards
    against both.
    """
    
    def __init__(
        self,
        requests_per_minute: int = 60,
        tokens_per_minute: int = 90000
    ):
        from openai import AsyncOpenAI
        
        self.client = AsyncOpenAI()
        
        # Two separate rate limiters -- you must pass both gates
        # before a request is allowed through
        self.request_limiter = TokenBucket(
            capacity=requests_per_minute,
            refill_rate=requests_per_minute / 60  # Convert to per-second
        )
        self.token_limiter = TokenBucket(
            capacity=tokens_per_minute,
            refill_rate=tokens_per_minute / 60
        )
    
    async def complete(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini",
        estimated_tokens: int = 500
    ) -> str:
        """Make a rate-limited completion request."""
        # Acquire rate limit tokens
        await self.request_limiter.acquire(1)
        await self.token_limiter.acquire(estimated_tokens)
        
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages
        )
        
        return response.choices[0].message.content


# Usage
async def main():
    client = RateLimitedClient(
        requests_per_minute=60,
        tokens_per_minute=90000
    )
    
    prompts = [f"Count to {i}" for i in range(1, 21)]
    
    async def process(prompt):
        return await client.complete([{"role": "user", "content": prompt}])
    
    results = await asyncio.gather(*[process(p) for p in prompts])
    print(f"Processed {len(results)} requests")


asyncio.run(main())

Sliding Window Rate Limiter

The sliding window approach tracks actual request timestamps rather than maintaining abstract token counts. It’s conceptually simpler and gives you exact enforcement: “no more than N requests in any rolling T-second window.” The tradeoff versus token bucket is that it doesn’t allow bursting — the 11th request in a 10-request window always waits, even if the previous 10 requests happened over a long period.
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class SlidingWindowLimiter:
    """Sliding window rate limiter.
    
    Uses a deque of timestamps to track when requests were made.
    Old timestamps outside the window are pruned on each check.
    Memory usage is bounded by max_requests (one timestamp per request).
    """
    
    max_requests: int
    window_seconds: float
    timestamps: deque = field(default_factory=deque)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    async def acquire(self) -> float:
        """Acquire permission to make a request.
        
        Returns the time spent waiting (0 if no wait was needed).
        Useful for monitoring how much throttling is occurring.
        """
        async with self._lock:
            now = time.monotonic()
            
            # Evict timestamps that have fallen outside the window.
            # popleft() is O(1) on deque, so this stays fast.
            cutoff = now - self.window_seconds
            while self.timestamps and self.timestamps[0] < cutoff:
                self.timestamps.popleft()
            
            if len(self.timestamps) < self.max_requests:
                self.timestamps.append(now)
                return 0
            
            # Window is full -- wait until the oldest entry expires
            oldest = self.timestamps[0]
            wait_time = oldest + self.window_seconds - now
            
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            # After waiting, re-clean (other requests may have completed)
            now = time.monotonic()
            cutoff = now - self.window_seconds
            while self.timestamps and self.timestamps[0] < cutoff:
                self.timestamps.popleft()
            
            self.timestamps.append(now)
            return wait_time


# Usage
limiter = SlidingWindowLimiter(max_requests=10, window_seconds=1.0)

async def rate_limited_request(i: int):
    wait = await limiter.acquire()
    if wait > 0:
        print(f"Request {i} waited {wait:.2f}s")
    print(f"Request {i} executing")
    return i

async def main():
    tasks = [rate_limited_request(i) for i in range(20)]
    await asyncio.gather(*tasks)

asyncio.run(main())
When to use which rate limiter: Token bucket is the default choice for most LLM workloads — it handles bursty traffic gracefully and composes well with dual RPM/TPM limits. Use sliding window only when you need strict guarantees (e.g., “never more than exactly 10 requests in any 1-second window”) or when auditing requires exact request-level timestamps.

Backoff Strategies

When an LLM API returns a rate limit error or a transient server error, the worst thing you can do is immediately retry. If 100 of your concurrent requests all get rate-limited at the same time and all retry after exactly 1 second, you’ve just created a “thundering herd” that hits the API with 100 simultaneous requests again — and gets rate-limited again. The cycle repeats. Exponential backoff with jitter solves this by spreading retries over time. Each successive retry waits longer (exponential), and each wait is randomized within a range (jitter). The result: your 100 retries spread themselves naturally across a 30-second window instead of slamming the API all at once.

Exponential Backoff with Jitter

import asyncio
import random
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
from openai import RateLimitError, APIError, APIConnectionError

T = TypeVar("T")


@dataclass
class BackoffConfig:
    """Configuration for exponential backoff.
    
    The defaults here are reasonable for most LLM APIs:
    - Start with 1s delay (not too aggressive)
    - Cap at 60s (beyond this, the request is probably stale)
    - 5 retries gives ~30s of total retry window
    - Jitter prevents thundering herd on shared rate limits
    """
    
    initial_delay: float = 1.0
    max_delay: float = 60.0
    max_retries: int = 5
    exponential_base: float = 2.0
    jitter: bool = True  # Always use jitter in production


async def with_exponential_backoff(
    func: Callable[[], Awaitable[T]],
    config: BackoffConfig = None,
    retryable_exceptions: tuple = (RateLimitError, APIConnectionError)
) -> T:
    """Execute function with exponential backoff on failures."""
    config = config or BackoffConfig()
    
    last_exception = None
    
    for attempt in range(config.max_retries):
        try:
            return await func()
            
        except retryable_exceptions as e:
            last_exception = e
            
            if attempt == config.max_retries - 1:
                raise
            
            # Calculate delay
            delay = min(
                config.initial_delay * (config.exponential_base ** attempt),
                config.max_delay
            )
            
            # Add jitter
            if config.jitter:
                delay = delay * (0.5 + random.random())
            
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)
        
        except APIError as e:
            # Don't retry client errors
            if e.status_code and 400 <= e.status_code < 500:
                raise
            
            last_exception = e
            
            if attempt == config.max_retries - 1:
                raise
            
            delay = config.initial_delay * (config.exponential_base ** attempt)
            await asyncio.sleep(delay)
    
    raise last_exception


# Usage
from openai import AsyncOpenAI

async def main():
    client = AsyncOpenAI()
    
    async def make_request():
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Hello!"}]
        )
        return response.choices[0].message.content
    
    result = await with_exponential_backoff(
        make_request,
        BackoffConfig(initial_delay=1.0, max_retries=3)
    )
    print(result)

asyncio.run(main())

Adaptive Rate Limiting

Static rate limits are a guess. Adaptive rate limiting observes how the API actually responds and adjusts in real-time. When requests succeed, it cautiously increases the rate. When it gets rate-limited, it aggressively backs off. Think of it like a driver adjusting speed based on traffic conditions rather than always driving at exactly 60mph. This pattern is especially valuable when you don’t know the exact rate limit (many providers don’t publish them), when limits change based on server load, or when you’re sharing a rate limit pool with other tenants in your organization.
import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class AdaptiveRateLimiter:
    """Rate limiter that adapts based on API responses.
    
    The asymmetry between increase and decrease is intentional:
    we increase slowly (1.1x) but decrease fast (0.5x). This is
    the AIMD pattern (Additive Increase, Multiplicative Decrease)
    borrowed from TCP congestion control -- it converges quickly
    to the actual limit without oscillating.
    """
    
    initial_rate: float  # Requests per second
    min_rate: float = 0.1
    max_rate: float = 100.0
    increase_factor: float = 1.1
    decrease_factor: float = 0.5
    
    current_rate: float = field(init=False)
    last_request: float = field(init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    def __post_init__(self):
        self.current_rate = self.initial_rate
        self.last_request = 0
    
    async def acquire(self):
        """Wait for rate limit slot."""
        async with self._lock:
            now = time.monotonic()
            min_interval = 1.0 / self.current_rate
            elapsed = now - self.last_request
            
            if elapsed < min_interval:
                await asyncio.sleep(min_interval - elapsed)
            
            self.last_request = time.monotonic()
    
    def on_success(self):
        """Increase rate on successful request."""
        self.current_rate = min(
            self.current_rate * self.increase_factor,
            self.max_rate
        )
    
    def on_rate_limit(self):
        """Decrease rate on rate limit error."""
        self.current_rate = max(
            self.current_rate * self.decrease_factor,
            self.min_rate
        )
        print(f"Rate limited. Reducing to {self.current_rate:.2f} req/s")
    
    def on_error(self):
        """Slightly decrease rate on other errors."""
        self.current_rate = max(
            self.current_rate * 0.9,
            self.min_rate
        )


class AdaptiveClient:
    """OpenAI client with adaptive rate limiting."""
    
    def __init__(self, initial_rate: float = 10.0):
        from openai import AsyncOpenAI
        
        self.client = AsyncOpenAI()
        self.limiter = AdaptiveRateLimiter(initial_rate=initial_rate)
    
    async def complete(self, prompt: str) -> str:
        """Make a completion with adaptive rate limiting."""
        await self.limiter.acquire()
        
        try:
            response = await self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}]
            )
            
            self.limiter.on_success()
            return response.choices[0].message.content
            
        except RateLimitError:
            self.limiter.on_rate_limit()
            raise
            
        except Exception:
            self.limiter.on_error()
            raise
Adaptive rate limiters have a cold-start problem. When your application restarts, the limiter has no memory of the true rate limit and starts conservative. If you know your API tier limits, set initial_rate close to the actual limit divided by your expected concurrency to minimize ramp-up time. Also, log current_rate to your metrics dashboard — the rate over time graph is one of the best debugging tools for throughput issues.

Semaphore-Based Concurrency Control

A semaphore is the simplest and most reliable way to cap concurrency. If rate limiting controls “how fast” you make requests, semaphores control “how many at once.” Even with perfect rate limiting, having 500 concurrent HTTP connections can exhaust memory, file descriptors, or connection pool limits. The rule of thumb for LLM APIs: start with max_concurrent=10 and increase until you start seeing rate limit errors or degraded latency. Most providers handle 10-50 concurrent connections per API key without issue.
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass


@dataclass
class ConcurrencyController:
    """Control concurrent request limits.
    
    Uses an asyncio.Semaphore as a gate: only max_concurrent
    coroutines can hold the semaphore at once. The rest queue up
    and wait their turn.
    """
    
    max_concurrent: int = 10
    
    def __post_init__(self):
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        self.active_count = 0  # For monitoring -- how many are in-flight?
    
    async def __aenter__(self):
        await self.semaphore.acquire()
        self.active_count += 1
        return self
    
    async def __aexit__(self, *args):
        self.active_count -= 1
        self.semaphore.release()


class ConcurrentBatchProcessor:
    """Process batches with controlled concurrency."""
    
    def __init__(self, max_concurrent: int = 10):
        self.client = AsyncOpenAI()
        self.controller = ConcurrencyController(max_concurrent)
    
    async def process_item(self, item: str) -> dict:
        """Process a single item with concurrency control."""
        async with self.controller:
            try:
                response = await self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": item}]
                )
                
                return {
                    "input": item,
                    "output": response.choices[0].message.content,
                    "success": True
                }
            except Exception as e:
                return {
                    "input": item,
                    "output": None,
                    "success": False,
                    "error": str(e)
                }
    
    async def process_batch(
        self,
        items: list[str],
        progress_callback: callable = None
    ) -> list[dict]:
        """Process a batch of items."""
        results = []
        completed = 0
        
        async def process_with_progress(item):
            nonlocal completed
            result = await self.process_item(item)
            completed += 1
            
            if progress_callback:
                progress_callback(completed, len(items))
            
            return result
        
        tasks = [process_with_progress(item) for item in items]
        results = await asyncio.gather(*tasks)
        
        return results


# Usage
async def main():
    processor = ConcurrentBatchProcessor(max_concurrent=5)
    
    items = [f"Summarize the number {i}" for i in range(20)]
    
    def on_progress(done, total):
        print(f"Progress: {done}/{total}")
    
    results = await processor.process_batch(items, on_progress)
    
    successful = sum(1 for r in results if r["success"])
    print(f"Completed: {successful}/{len(items)} successful")

asyncio.run(main())

Request Queue with Priority

Not all LLM requests are created equal. A real-time chat response should jump ahead of a background summarization job. A paying customer’s request should take priority over a free tier user’s batch job. Priority queues let you enforce these business rules at the infrastructure level. The pattern is simple: instead of a FIFO queue, use a min-heap where lower priority numbers go first. Within the same priority level, requests are processed in FIFO order (using a monotonic sequence number as a tiebreaker).
import asyncio
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
from enum import IntEnum
import heapq


class Priority(IntEnum):
    """Lower number = higher priority (processed first).
    
    Map these to your business tiers:
    - HIGH: Real-time user-facing requests (chat, autocomplete)
    - NORMAL: Standard API requests
    - LOW: Background jobs (batch processing, analytics)
    """
    HIGH = 0
    NORMAL = 1
    LOW = 2


@dataclass(order=True)
class QueuedRequest:
    """A request in the priority queue."""
    priority: Priority
    sequence: int  # For FIFO ordering within same priority
    request: Any = field(compare=False)
    future: asyncio.Future = field(compare=False)


class PriorityRequestQueue:
    """Process LLM requests with priority ordering."""
    
    def __init__(
        self,
        processor: Callable[[Any], Awaitable[Any]],
        max_concurrent: int = 5
    ):
        self.processor = processor
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue: list[QueuedRequest] = []
        self.sequence = 0
        self._lock = asyncio.Lock()
        self._processing = False
    
    async def submit(
        self,
        request: Any,
        priority: Priority = Priority.NORMAL
    ) -> Any:
        """Submit a request and wait for result."""
        future = asyncio.Future()
        
        async with self._lock:
            queued = QueuedRequest(
                priority=priority,
                sequence=self.sequence,
                request=request,
                future=future
            )
            self.sequence += 1
            heapq.heappush(self.queue, queued)
        
        # Start processing if not already running
        if not self._processing:
            asyncio.create_task(self._process_queue())
        
        return await future
    
    async def _process_queue(self):
        """Process requests from the queue."""
        self._processing = True
        
        while True:
            async with self._lock:
                if not self.queue:
                    self._processing = False
                    return
                
                queued = heapq.heappop(self.queue)
            
            async with self.semaphore:
                try:
                    result = await self.processor(queued.request)
                    queued.future.set_result(result)
                except Exception as e:
                    queued.future.set_exception(e)


# Usage
from openai import AsyncOpenAI

async def main():
    client = AsyncOpenAI()
    
    async def process_request(prompt: str) -> str:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    queue = PriorityRequestQueue(process_request, max_concurrent=3)
    
    # Submit requests with different priorities
    tasks = [
        queue.submit("Low priority task 1", Priority.LOW),
        queue.submit("High priority task", Priority.HIGH),
        queue.submit("Normal task 1", Priority.NORMAL),
        queue.submit("Low priority task 2", Priority.LOW),
        queue.submit("Normal task 2", Priority.NORMAL),
    ]
    
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result[:50] + "...")

asyncio.run(main())
Priority queue anti-pattern: Do not use priorities to implement fairness across users. If user A submits 10,000 LOW-priority items and user B submits 1 HIGH-priority item, user B gets served first — but that is not multi-tenant fairness, it is priority scheduling. For multi-tenant fairness, use weighted fair queuing (round-robin across tenants with configurable weights) rather than absolute priority classes.

Async Context Manager for Sessions

For production batch processing, you want session-level resource management and statistics. The context manager pattern (async with) guarantees cleanup happens even when exceptions occur — no leaked connections, no orphaned tasks. This is the pattern to use when you need to answer questions like “how many tokens did this batch consume?” or “what was the p99 latency for this run?” — questions that matter when your monthly LLM bill has commas in it.
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass, field
from typing import Optional
import time


@dataclass
class SessionStats:
    """Statistics for an async session.
    
    Track these metrics for every batch run. They tell you
    whether your rate limiting is too aggressive (low throughput),
    whether your prompts are too long (high token counts), and
    whether a provider is degraded (high latency, low success rate).
    """
    requests: int = 0
    successful: int = 0
    failed: int = 0
    total_tokens: int = 0
    total_latency: float = 0.0
    
    @property
    def avg_latency(self) -> float:
        return self.total_latency / self.requests if self.requests else 0
    
    @property
    def success_rate(self) -> float:
        return self.successful / self.requests if self.requests else 0


class AsyncLLMSession:
    """Managed async session for LLM operations."""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        timeout: float = 30.0
    ):
        self.client: Optional[AsyncOpenAI] = None
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.timeout = timeout
        self.stats = SessionStats()
    
    async def __aenter__(self):
        self.client = AsyncOpenAI()
        self.stats = SessionStats()
        return self
    
    async def __aexit__(self, *args):
        self.client = None
    
    async def complete(
        self,
        prompt: str,
        model: str = "gpt-4o-mini"
    ) -> str:
        """Make a completion within the session."""
        if not self.client:
            raise RuntimeError("Session not initialized")
        
        async with self.semaphore:
            self.stats.requests += 1
            start = time.monotonic()
            
            try:
                response = await asyncio.wait_for(
                    self.client.chat.completions.create(
                        model=model,
                        messages=[{"role": "user", "content": prompt}]
                    ),
                    timeout=self.timeout
                )
                
                self.stats.successful += 1
                self.stats.total_tokens += response.usage.total_tokens
                self.stats.total_latency += time.monotonic() - start
                
                return response.choices[0].message.content
                
            except Exception as e:
                self.stats.failed += 1
                self.stats.total_latency += time.monotonic() - start
                raise
    
    async def batch_complete(
        self,
        prompts: list[str],
        model: str = "gpt-4o-mini"
    ) -> list[str]:
        """Complete multiple prompts."""
        tasks = [self.complete(p, model) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)


# Usage
async def main():
    async with AsyncLLMSession(max_concurrent=5) as session:
        prompts = [f"What is {i} + {i}?" for i in range(10)]
        
        results = await session.batch_complete(prompts)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Error {i}: {result}")
            else:
                print(f"Result {i}: {result[:30]}...")
        
        print(f"\nStats:")
        print(f"  Requests: {session.stats.requests}")
        print(f"  Success rate: {session.stats.success_rate:.1%}")
        print(f"  Avg latency: {session.stats.avg_latency:.2f}s")
        print(f"  Total tokens: {session.stats.total_tokens}")

asyncio.run(main())
Async Best Practices for LLMs
  • Always use connection pooling with async clients
  • Implement proper timeout handling for all requests
  • Use semaphores to control maximum concurrent requests
  • Add jitter to retry delays to prevent thundering herd
  • Monitor and adapt rate limits based on API responses

Practice Exercise

Build an async batch processor with:
  1. Priority queue for request ordering
  2. Adaptive rate limiting based on API responses
  3. Exponential backoff with jitter for retries
  4. Progress tracking and cancellation support
  5. Comprehensive statistics collection
Focus on:
  • Proper resource cleanup on failures
  • Graceful shutdown handling
  • Memory-efficient batch processing
  • Real-time progress reporting

Interview Deep-Dive

What interviewers are testing: Whether you can design a real batch processing pipeline with concrete numbers, not just describe patterns abstractly.Strong answer: Start with the math. At 500 RPM, sequential processing takes 100 minutes minimum. With an average of 400 tokens per request (prompt + completion), the token limit allows 500 requests/minute too, so requests-per-minute is the binding constraint.Architecture: Use a producer-consumer pattern with an async queue. The producer reads prompts from a file or database in chunks (not all 50K into memory). Workers (20-50 concurrent coroutines gated by a semaphore) pull from the queue, make rate-limited API calls, and write results to an output queue.Rate limiting: Use a token bucket with capacity=500 and refill rate of ~8.3/second. Add a semaphore at 30-50 concurrent connections. The dual limiter ensures you respect both RPM and connection limits.Resilience: Implement checkpointing every 500 items — write (index, result) pairs to a JSONL file so you can resume from the last checkpoint after a crash. Add exponential backoff with jitter for 429 and 5xx errors. After 5 retries, move failed items to a dead letter list for manual inspection.Cost control: Before starting, estimate total cost using tiktoken. For 50K prompts at ~400 tokens each, that’s 20M tokens. At gpt-4o-mini pricing (0.15/1Minput,0.15/1M input, 0.60/1M output), that’s roughly $3-12 depending on output length. Consider the Batch API for 50% savings if 24-hour turnaround is acceptable.Monitoring: Log throughput (requests/sec), error rate, p50/p99 latency, and cumulative cost in real-time. Set an alert if error rate exceeds 5% — that usually indicates a systemic issue, not random failures.
What interviewers are testing: Understanding of distributed systems failure modes and the specific amplification risks with expensive API calls.Strong answer: The thundering herd occurs when many clients experience a failure simultaneously and all retry at the same moment, creating a spike that’s worse than the original load. With LLM APIs, this is especially dangerous because each retry costs real money.Scenario: Your application sends 100 concurrent requests. The provider returns 429 for all of them. If all 100 retry after exactly 1 second, you hit the API with 100 requests again — plus any new organic traffic. This creates a feedback loop where retries cause more rate limits which cause more retries.Solution 1 — Jitter: Add randomness to retry delays. Instead of “retry after 2^n seconds,” use “retry after 2^n * random(0.5, 1.5) seconds.” This spreads retries across the window. Full jitter (random between 0 and max delay) is even more effective than decorrelated jitter.Solution 2 — Circuit breaker: If error rate exceeds a threshold (e.g., 50% of requests failing), stop sending new requests entirely for a cooldown period. This prevents the retry storm from growing. After the cooldown, send a single probe request; if it succeeds, gradually ramp traffic back up.Solution 3 — Adaptive rate limiting with AIMD: On success, increase rate by a small constant. On failure, halve the rate immediately. This converges to the actual available capacity without oscillation — the same algorithm TCP uses for congestion control, and for the same reason.The key insight is that jitter alone isn’t enough at scale. You need all three layers working together: jitter smooths individual retries, circuit breaking prevents cascade, and adaptive limiting finds the new sustainable rate.
What interviewers are testing: Production engineering maturity around data loss prevention and clean shutdown semantics.Strong answer: Graceful shutdown means: stop accepting new work, finish in-flight work, save state for incomplete work, and exit cleanly. The goal is zero data loss — every prompt should either be fully processed with its result saved, or clearly marked as unprocessed for the next run.Implementation: Register a signal handler for SIGTERM/SIGINT that sets a shutdown flag. The producer stops feeding new items to the queue. Workers finish their current request (you cannot cancel an in-flight LLM call without losing the result and the cost). Set a shutdown timeout (e.g., 60 seconds) — if workers haven’t finished by then, log unfinished items and force exit.For the checkpointing layer, flush the current checkpoint immediately on shutdown signal. Any items that were dequeued but not yet processed go back to “pending” in the checkpoint file. On the next run, the checkpoint loader picks them up automatically.The tricky part is the race condition between “worker finishes request” and “checkpoint flushes.” Use an atomic counter for in-flight requests. The shutdown sequence waits until in-flight count reaches zero (with timeout), then flushes the final checkpoint, then exits.In Kubernetes, configure terminationGracePeriodSeconds to be longer than your longest expected LLM call (60-120 seconds). SIGTERM arrives first; SIGKILL follows after the grace period.
What interviewers are testing: Depth of understanding of Python’s async primitives beyond the basic tutorial level.Strong answer: These three serve different use cases for concurrent LLM requests.asyncio.gather(*tasks) runs all tasks concurrently and returns results in input order. Use it when you need results aligned with inputs (e.g., processing a CSV where row N’s result must go in output row N). Downside: you get no results until ALL tasks complete, so one slow response blocks everything.asyncio.as_completed(tasks) yields futures as they finish, regardless of input order. Use it when you want to process results as they arrive — for example, streaming partial progress to a UI, or writing results to a database as soon as each is ready. This is better for user-facing progress bars and for reducing peak memory (you can free each result immediately).asyncio.TaskGroup (Python 3.11+) is the modern replacement for gather() with better error handling. If any task raises an exception, it cancels all remaining tasks and raises an ExceptionGroup. This is the right choice when tasks are interdependent — if one fails, continuing the others is pointless (e.g., a multi-step RAG pipeline where step 2 depends on step 1).For batch LLM processing specifically: use as_completed with a semaphore for large batches (thousands of items) because you can checkpoint results incrementally. Use gather for small parallel operations (3-10 items) where simplicity matters. Use TaskGroup for orchestration workflows where partial completion is meaningless.