Skip to main content
Production LLM applications require handling multiple concurrent requests efficiently. This chapter covers async patterns, rate limiting, and strategies for scaling LLM operations.

Async Fundamentals

Basic Async LLM Calls

import asyncio
from openai import AsyncOpenAI


async def async_completion(prompt: str) -> str:
    """Make an async completion request."""
    client = AsyncOpenAI()
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content


async def main():
    result = await async_completion("Explain async programming.")
    print(result)


asyncio.run(main())

Parallel Requests with gather

Process multiple prompts concurrently:
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass


@dataclass
class CompletionResult:
    """Result from an async completion."""
    prompt: str
    response: str
    success: bool
    error: str = None


async def process_prompt(
    client: AsyncOpenAI,
    prompt: str
) -> CompletionResult:
    """Process a single prompt."""
    try:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        
        return CompletionResult(
            prompt=prompt,
            response=response.choices[0].message.content,
            success=True
        )
    except Exception as e:
        return CompletionResult(
            prompt=prompt,
            response="",
            success=False,
            error=str(e)
        )


async def process_batch(prompts: list[str]) -> list[CompletionResult]:
    """Process multiple prompts in parallel."""
    client = AsyncOpenAI()
    
    tasks = [process_prompt(client, prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    
    return results


# Usage
prompts = [
    "Summarize machine learning in one sentence.",
    "What is deep learning?",
    "Explain neural networks briefly.",
    "Define natural language processing.",
]

results = asyncio.run(process_batch(prompts))

for result in results:
    if result.success:
        print(f"Q: {result.prompt[:30]}...")
        print(f"A: {result.response[:100]}...\n")
    else:
        print(f"Error: {result.error}")

Rate Limiting

Token Bucket Rate Limiter

Implement rate limiting to respect API limits:
import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class TokenBucket:
    """Token bucket rate limiter."""
    
    capacity: float  # Max tokens
    refill_rate: float  # Tokens per second
    tokens: float = field(init=False)
    last_refill: float = field(init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()
    
    async def acquire(self, tokens: float = 1) -> float:
        """Acquire tokens, waiting if necessary. Returns wait time."""
        async with self._lock:
            await self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0
            
            # Calculate wait time
            tokens_needed = tokens - self.tokens
            wait_time = tokens_needed / self.refill_rate
            
            await asyncio.sleep(wait_time)
            await self._refill()
            
            self.tokens -= tokens
            return wait_time
    
    async def _refill(self):
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.refill_rate
        )
        self.last_refill = now


class RateLimitedClient:
    """OpenAI client with rate limiting."""
    
    def __init__(
        self,
        requests_per_minute: int = 60,
        tokens_per_minute: int = 90000
    ):
        from openai import AsyncOpenAI
        
        self.client = AsyncOpenAI()
        
        # Rate limiters
        self.request_limiter = TokenBucket(
            capacity=requests_per_minute,
            refill_rate=requests_per_minute / 60
        )
        self.token_limiter = TokenBucket(
            capacity=tokens_per_minute,
            refill_rate=tokens_per_minute / 60
        )
    
    async def complete(
        self,
        messages: list[dict],
        model: str = "gpt-4o-mini",
        estimated_tokens: int = 500
    ) -> str:
        """Make a rate-limited completion request."""
        # Acquire rate limit tokens
        await self.request_limiter.acquire(1)
        await self.token_limiter.acquire(estimated_tokens)
        
        response = await self.client.chat.completions.create(
            model=model,
            messages=messages
        )
        
        return response.choices[0].message.content


# Usage
async def main():
    client = RateLimitedClient(
        requests_per_minute=60,
        tokens_per_minute=90000
    )
    
    prompts = [f"Count to {i}" for i in range(1, 21)]
    
    async def process(prompt):
        return await client.complete([{"role": "user", "content": prompt}])
    
    results = await asyncio.gather(*[process(p) for p in prompts])
    print(f"Processed {len(results)} requests")


asyncio.run(main())

Sliding Window Rate Limiter

import asyncio
import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class SlidingWindowLimiter:
    """Sliding window rate limiter."""
    
    max_requests: int
    window_seconds: float
    timestamps: deque = field(default_factory=deque)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    async def acquire(self) -> float:
        """Acquire permission to make a request."""
        async with self._lock:
            now = time.monotonic()
            
            # Remove old timestamps
            cutoff = now - self.window_seconds
            while self.timestamps and self.timestamps[0] < cutoff:
                self.timestamps.popleft()
            
            if len(self.timestamps) < self.max_requests:
                self.timestamps.append(now)
                return 0
            
            # Calculate wait time
            oldest = self.timestamps[0]
            wait_time = oldest + self.window_seconds - now
            
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            
            # Retry after waiting
            now = time.monotonic()
            cutoff = now - self.window_seconds
            while self.timestamps and self.timestamps[0] < cutoff:
                self.timestamps.popleft()
            
            self.timestamps.append(now)
            return wait_time


# Usage
limiter = SlidingWindowLimiter(max_requests=10, window_seconds=1.0)

async def rate_limited_request(i: int):
    wait = await limiter.acquire()
    if wait > 0:
        print(f"Request {i} waited {wait:.2f}s")
    print(f"Request {i} executing")
    return i

async def main():
    tasks = [rate_limited_request(i) for i in range(20)]
    await asyncio.gather(*tasks)

asyncio.run(main())

Backoff Strategies

Exponential Backoff with Jitter

import asyncio
import random
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
from openai import RateLimitError, APIError, APIConnectionError

T = TypeVar("T")


@dataclass
class BackoffConfig:
    """Configuration for exponential backoff."""
    
    initial_delay: float = 1.0
    max_delay: float = 60.0
    max_retries: int = 5
    exponential_base: float = 2.0
    jitter: bool = True


async def with_exponential_backoff(
    func: Callable[[], Awaitable[T]],
    config: BackoffConfig = None,
    retryable_exceptions: tuple = (RateLimitError, APIConnectionError)
) -> T:
    """Execute function with exponential backoff on failures."""
    config = config or BackoffConfig()
    
    last_exception = None
    
    for attempt in range(config.max_retries):
        try:
            return await func()
            
        except retryable_exceptions as e:
            last_exception = e
            
            if attempt == config.max_retries - 1:
                raise
            
            # Calculate delay
            delay = min(
                config.initial_delay * (config.exponential_base ** attempt),
                config.max_delay
            )
            
            # Add jitter
            if config.jitter:
                delay = delay * (0.5 + random.random())
            
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
            await asyncio.sleep(delay)
        
        except APIError as e:
            # Don't retry client errors
            if e.status_code and 400 <= e.status_code < 500:
                raise
            
            last_exception = e
            
            if attempt == config.max_retries - 1:
                raise
            
            delay = config.initial_delay * (config.exponential_base ** attempt)
            await asyncio.sleep(delay)
    
    raise last_exception


# Usage
from openai import AsyncOpenAI

async def main():
    client = AsyncOpenAI()
    
    async def make_request():
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Hello!"}]
        )
        return response.choices[0].message.content
    
    result = await with_exponential_backoff(
        make_request,
        BackoffConfig(initial_delay=1.0, max_retries=3)
    )
    print(result)

asyncio.run(main())

Adaptive Rate Limiting

import asyncio
import time
from dataclasses import dataclass, field


@dataclass
class AdaptiveRateLimiter:
    """Rate limiter that adapts based on API responses."""
    
    initial_rate: float  # Requests per second
    min_rate: float = 0.1
    max_rate: float = 100.0
    increase_factor: float = 1.1
    decrease_factor: float = 0.5
    
    current_rate: float = field(init=False)
    last_request: float = field(init=False)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
    
    def __post_init__(self):
        self.current_rate = self.initial_rate
        self.last_request = 0
    
    async def acquire(self):
        """Wait for rate limit slot."""
        async with self._lock:
            now = time.monotonic()
            min_interval = 1.0 / self.current_rate
            elapsed = now - self.last_request
            
            if elapsed < min_interval:
                await asyncio.sleep(min_interval - elapsed)
            
            self.last_request = time.monotonic()
    
    def on_success(self):
        """Increase rate on successful request."""
        self.current_rate = min(
            self.current_rate * self.increase_factor,
            self.max_rate
        )
    
    def on_rate_limit(self):
        """Decrease rate on rate limit error."""
        self.current_rate = max(
            self.current_rate * self.decrease_factor,
            self.min_rate
        )
        print(f"Rate limited. Reducing to {self.current_rate:.2f} req/s")
    
    def on_error(self):
        """Slightly decrease rate on other errors."""
        self.current_rate = max(
            self.current_rate * 0.9,
            self.min_rate
        )


class AdaptiveClient:
    """OpenAI client with adaptive rate limiting."""
    
    def __init__(self, initial_rate: float = 10.0):
        from openai import AsyncOpenAI
        
        self.client = AsyncOpenAI()
        self.limiter = AdaptiveRateLimiter(initial_rate=initial_rate)
    
    async def complete(self, prompt: str) -> str:
        """Make a completion with adaptive rate limiting."""
        await self.limiter.acquire()
        
        try:
            response = await self.client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": prompt}]
            )
            
            self.limiter.on_success()
            return response.choices[0].message.content
            
        except RateLimitError:
            self.limiter.on_rate_limit()
            raise
            
        except Exception:
            self.limiter.on_error()
            raise

Semaphore-Based Concurrency Control

import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass


@dataclass
class ConcurrencyController:
    """Control concurrent request limits."""
    
    max_concurrent: int = 10
    
    def __post_init__(self):
        self.semaphore = asyncio.Semaphore(self.max_concurrent)
        self.active_count = 0
    
    async def __aenter__(self):
        await self.semaphore.acquire()
        self.active_count += 1
        return self
    
    async def __aexit__(self, *args):
        self.active_count -= 1
        self.semaphore.release()


class ConcurrentBatchProcessor:
    """Process batches with controlled concurrency."""
    
    def __init__(self, max_concurrent: int = 10):
        self.client = AsyncOpenAI()
        self.controller = ConcurrencyController(max_concurrent)
    
    async def process_item(self, item: str) -> dict:
        """Process a single item with concurrency control."""
        async with self.controller:
            try:
                response = await self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[{"role": "user", "content": item}]
                )
                
                return {
                    "input": item,
                    "output": response.choices[0].message.content,
                    "success": True
                }
            except Exception as e:
                return {
                    "input": item,
                    "output": None,
                    "success": False,
                    "error": str(e)
                }
    
    async def process_batch(
        self,
        items: list[str],
        progress_callback: callable = None
    ) -> list[dict]:
        """Process a batch of items."""
        results = []
        completed = 0
        
        async def process_with_progress(item):
            nonlocal completed
            result = await self.process_item(item)
            completed += 1
            
            if progress_callback:
                progress_callback(completed, len(items))
            
            return result
        
        tasks = [process_with_progress(item) for item in items]
        results = await asyncio.gather(*tasks)
        
        return results


# Usage
async def main():
    processor = ConcurrentBatchProcessor(max_concurrent=5)
    
    items = [f"Summarize the number {i}" for i in range(20)]
    
    def on_progress(done, total):
        print(f"Progress: {done}/{total}")
    
    results = await processor.process_batch(items, on_progress)
    
    successful = sum(1 for r in results if r["success"])
    print(f"Completed: {successful}/{len(items)} successful")

asyncio.run(main())

Request Queue with Priority

import asyncio
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
from enum import IntEnum
import heapq


class Priority(IntEnum):
    HIGH = 0
    NORMAL = 1
    LOW = 2


@dataclass(order=True)
class QueuedRequest:
    """A request in the priority queue."""
    priority: Priority
    sequence: int  # For FIFO ordering within same priority
    request: Any = field(compare=False)
    future: asyncio.Future = field(compare=False)


class PriorityRequestQueue:
    """Process LLM requests with priority ordering."""
    
    def __init__(
        self,
        processor: Callable[[Any], Awaitable[Any]],
        max_concurrent: int = 5
    ):
        self.processor = processor
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue: list[QueuedRequest] = []
        self.sequence = 0
        self._lock = asyncio.Lock()
        self._processing = False
    
    async def submit(
        self,
        request: Any,
        priority: Priority = Priority.NORMAL
    ) -> Any:
        """Submit a request and wait for result."""
        future = asyncio.Future()
        
        async with self._lock:
            queued = QueuedRequest(
                priority=priority,
                sequence=self.sequence,
                request=request,
                future=future
            )
            self.sequence += 1
            heapq.heappush(self.queue, queued)
        
        # Start processing if not already running
        if not self._processing:
            asyncio.create_task(self._process_queue())
        
        return await future
    
    async def _process_queue(self):
        """Process requests from the queue."""
        self._processing = True
        
        while True:
            async with self._lock:
                if not self.queue:
                    self._processing = False
                    return
                
                queued = heapq.heappop(self.queue)
            
            async with self.semaphore:
                try:
                    result = await self.processor(queued.request)
                    queued.future.set_result(result)
                except Exception as e:
                    queued.future.set_exception(e)


# Usage
from openai import AsyncOpenAI

async def main():
    client = AsyncOpenAI()
    
    async def process_request(prompt: str) -> str:
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    queue = PriorityRequestQueue(process_request, max_concurrent=3)
    
    # Submit requests with different priorities
    tasks = [
        queue.submit("Low priority task 1", Priority.LOW),
        queue.submit("High priority task", Priority.HIGH),
        queue.submit("Normal task 1", Priority.NORMAL),
        queue.submit("Low priority task 2", Priority.LOW),
        queue.submit("Normal task 2", Priority.NORMAL),
    ]
    
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result[:50] + "...")

asyncio.run(main())

Async Context Manager for Sessions

import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass, field
from typing import Optional
import time


@dataclass
class SessionStats:
    """Statistics for an async session."""
    requests: int = 0
    successful: int = 0
    failed: int = 0
    total_tokens: int = 0
    total_latency: float = 0.0
    
    @property
    def avg_latency(self) -> float:
        return self.total_latency / self.requests if self.requests else 0
    
    @property
    def success_rate(self) -> float:
        return self.successful / self.requests if self.requests else 0


class AsyncLLMSession:
    """Managed async session for LLM operations."""
    
    def __init__(
        self,
        max_concurrent: int = 10,
        timeout: float = 30.0
    ):
        self.client: Optional[AsyncOpenAI] = None
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.timeout = timeout
        self.stats = SessionStats()
    
    async def __aenter__(self):
        self.client = AsyncOpenAI()
        self.stats = SessionStats()
        return self
    
    async def __aexit__(self, *args):
        self.client = None
    
    async def complete(
        self,
        prompt: str,
        model: str = "gpt-4o-mini"
    ) -> str:
        """Make a completion within the session."""
        if not self.client:
            raise RuntimeError("Session not initialized")
        
        async with self.semaphore:
            self.stats.requests += 1
            start = time.monotonic()
            
            try:
                response = await asyncio.wait_for(
                    self.client.chat.completions.create(
                        model=model,
                        messages=[{"role": "user", "content": prompt}]
                    ),
                    timeout=self.timeout
                )
                
                self.stats.successful += 1
                self.stats.total_tokens += response.usage.total_tokens
                self.stats.total_latency += time.monotonic() - start
                
                return response.choices[0].message.content
                
            except Exception as e:
                self.stats.failed += 1
                self.stats.total_latency += time.monotonic() - start
                raise
    
    async def batch_complete(
        self,
        prompts: list[str],
        model: str = "gpt-4o-mini"
    ) -> list[str]:
        """Complete multiple prompts."""
        tasks = [self.complete(p, model) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)


# Usage
async def main():
    async with AsyncLLMSession(max_concurrent=5) as session:
        prompts = [f"What is {i} + {i}?" for i in range(10)]
        
        results = await session.batch_complete(prompts)
        
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Error {i}: {result}")
            else:
                print(f"Result {i}: {result[:30]}...")
        
        print(f"\nStats:")
        print(f"  Requests: {session.stats.requests}")
        print(f"  Success rate: {session.stats.success_rate:.1%}")
        print(f"  Avg latency: {session.stats.avg_latency:.2f}s")
        print(f"  Total tokens: {session.stats.total_tokens}")

asyncio.run(main())
Async Best Practices for LLMs
  • Always use connection pooling with async clients
  • Implement proper timeout handling for all requests
  • Use semaphores to control maximum concurrent requests
  • Add jitter to retry delays to prevent thundering herd
  • Monitor and adapt rate limits based on API responses

Practice Exercise

Build an async batch processor with:
  1. Priority queue for request ordering
  2. Adaptive rate limiting based on API responses
  3. Exponential backoff with jitter for retries
  4. Progress tracking and cancellation support
  5. Comprehensive statistics collection
Focus on:
  • Proper resource cleanup on failures
  • Graceful shutdown handling
  • Memory-efficient batch processing
  • Real-time progress reporting