Async Fundamentals
Basic Async LLM Calls
Copy
import asyncio
from openai import AsyncOpenAI
async def async_completion(prompt: str) -> str:
"""Make an async completion request."""
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def main():
result = await async_completion("Explain async programming.")
print(result)
asyncio.run(main())
Parallel Requests with gather
Process multiple prompts concurrently:Copy
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass
@dataclass
class CompletionResult:
"""Result from an async completion."""
prompt: str
response: str
success: bool
error: str = None
async def process_prompt(
client: AsyncOpenAI,
prompt: str
) -> CompletionResult:
"""Process a single prompt."""
try:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return CompletionResult(
prompt=prompt,
response=response.choices[0].message.content,
success=True
)
except Exception as e:
return CompletionResult(
prompt=prompt,
response="",
success=False,
error=str(e)
)
async def process_batch(prompts: list[str]) -> list[CompletionResult]:
"""Process multiple prompts in parallel."""
client = AsyncOpenAI()
tasks = [process_prompt(client, prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
# Usage
prompts = [
"Summarize machine learning in one sentence.",
"What is deep learning?",
"Explain neural networks briefly.",
"Define natural language processing.",
]
results = asyncio.run(process_batch(prompts))
for result in results:
if result.success:
print(f"Q: {result.prompt[:30]}...")
print(f"A: {result.response[:100]}...\n")
else:
print(f"Error: {result.error}")
Rate Limiting
Token Bucket Rate Limiter
Implement rate limiting to respect API limits:Copy
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""Token bucket rate limiter."""
capacity: float # Max tokens
refill_rate: float # Tokens per second
tokens: float = field(init=False)
last_refill: float = field(init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
def __post_init__(self):
self.tokens = self.capacity
self.last_refill = time.monotonic()
async def acquire(self, tokens: float = 1) -> float:
"""Acquire tokens, waiting if necessary. Returns wait time."""
async with self._lock:
await self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return 0
# Calculate wait time
tokens_needed = tokens - self.tokens
wait_time = tokens_needed / self.refill_rate
await asyncio.sleep(wait_time)
await self._refill()
self.tokens -= tokens
return wait_time
async def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.capacity,
self.tokens + elapsed * self.refill_rate
)
self.last_refill = now
class RateLimitedClient:
"""OpenAI client with rate limiting."""
def __init__(
self,
requests_per_minute: int = 60,
tokens_per_minute: int = 90000
):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
# Rate limiters
self.request_limiter = TokenBucket(
capacity=requests_per_minute,
refill_rate=requests_per_minute / 60
)
self.token_limiter = TokenBucket(
capacity=tokens_per_minute,
refill_rate=tokens_per_minute / 60
)
async def complete(
self,
messages: list[dict],
model: str = "gpt-4o-mini",
estimated_tokens: int = 500
) -> str:
"""Make a rate-limited completion request."""
# Acquire rate limit tokens
await self.request_limiter.acquire(1)
await self.token_limiter.acquire(estimated_tokens)
response = await self.client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
# Usage
async def main():
client = RateLimitedClient(
requests_per_minute=60,
tokens_per_minute=90000
)
prompts = [f"Count to {i}" for i in range(1, 21)]
async def process(prompt):
return await client.complete([{"role": "user", "content": prompt}])
results = await asyncio.gather(*[process(p) for p in prompts])
print(f"Processed {len(results)} requests")
asyncio.run(main())
Sliding Window Rate Limiter
Copy
import asyncio
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class SlidingWindowLimiter:
"""Sliding window rate limiter."""
max_requests: int
window_seconds: float
timestamps: deque = field(default_factory=deque)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
async def acquire(self) -> float:
"""Acquire permission to make a request."""
async with self._lock:
now = time.monotonic()
# Remove old timestamps
cutoff = now - self.window_seconds
while self.timestamps and self.timestamps[0] < cutoff:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return 0
# Calculate wait time
oldest = self.timestamps[0]
wait_time = oldest + self.window_seconds - now
if wait_time > 0:
await asyncio.sleep(wait_time)
# Retry after waiting
now = time.monotonic()
cutoff = now - self.window_seconds
while self.timestamps and self.timestamps[0] < cutoff:
self.timestamps.popleft()
self.timestamps.append(now)
return wait_time
# Usage
limiter = SlidingWindowLimiter(max_requests=10, window_seconds=1.0)
async def rate_limited_request(i: int):
wait = await limiter.acquire()
if wait > 0:
print(f"Request {i} waited {wait:.2f}s")
print(f"Request {i} executing")
return i
async def main():
tasks = [rate_limited_request(i) for i in range(20)]
await asyncio.gather(*tasks)
asyncio.run(main())
Backoff Strategies
Exponential Backoff with Jitter
Copy
import asyncio
import random
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
from openai import RateLimitError, APIError, APIConnectionError
T = TypeVar("T")
@dataclass
class BackoffConfig:
"""Configuration for exponential backoff."""
initial_delay: float = 1.0
max_delay: float = 60.0
max_retries: int = 5
exponential_base: float = 2.0
jitter: bool = True
async def with_exponential_backoff(
func: Callable[[], Awaitable[T]],
config: BackoffConfig = None,
retryable_exceptions: tuple = (RateLimitError, APIConnectionError)
) -> T:
"""Execute function with exponential backoff on failures."""
config = config or BackoffConfig()
last_exception = None
for attempt in range(config.max_retries):
try:
return await func()
except retryable_exceptions as e:
last_exception = e
if attempt == config.max_retries - 1:
raise
# Calculate delay
delay = min(
config.initial_delay * (config.exponential_base ** attempt),
config.max_delay
)
# Add jitter
if config.jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
except APIError as e:
# Don't retry client errors
if e.status_code and 400 <= e.status_code < 500:
raise
last_exception = e
if attempt == config.max_retries - 1:
raise
delay = config.initial_delay * (config.exponential_base ** attempt)
await asyncio.sleep(delay)
raise last_exception
# Usage
from openai import AsyncOpenAI
async def main():
client = AsyncOpenAI()
async def make_request():
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)
return response.choices[0].message.content
result = await with_exponential_backoff(
make_request,
BackoffConfig(initial_delay=1.0, max_retries=3)
)
print(result)
asyncio.run(main())
Adaptive Rate Limiting
Copy
import asyncio
import time
from dataclasses import dataclass, field
@dataclass
class AdaptiveRateLimiter:
"""Rate limiter that adapts based on API responses."""
initial_rate: float # Requests per second
min_rate: float = 0.1
max_rate: float = 100.0
increase_factor: float = 1.1
decrease_factor: float = 0.5
current_rate: float = field(init=False)
last_request: float = field(init=False)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False)
def __post_init__(self):
self.current_rate = self.initial_rate
self.last_request = 0
async def acquire(self):
"""Wait for rate limit slot."""
async with self._lock:
now = time.monotonic()
min_interval = 1.0 / self.current_rate
elapsed = now - self.last_request
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self.last_request = time.monotonic()
def on_success(self):
"""Increase rate on successful request."""
self.current_rate = min(
self.current_rate * self.increase_factor,
self.max_rate
)
def on_rate_limit(self):
"""Decrease rate on rate limit error."""
self.current_rate = max(
self.current_rate * self.decrease_factor,
self.min_rate
)
print(f"Rate limited. Reducing to {self.current_rate:.2f} req/s")
def on_error(self):
"""Slightly decrease rate on other errors."""
self.current_rate = max(
self.current_rate * 0.9,
self.min_rate
)
class AdaptiveClient:
"""OpenAI client with adaptive rate limiting."""
def __init__(self, initial_rate: float = 10.0):
from openai import AsyncOpenAI
self.client = AsyncOpenAI()
self.limiter = AdaptiveRateLimiter(initial_rate=initial_rate)
async def complete(self, prompt: str) -> str:
"""Make a completion with adaptive rate limiting."""
await self.limiter.acquire()
try:
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
self.limiter.on_success()
return response.choices[0].message.content
except RateLimitError:
self.limiter.on_rate_limit()
raise
except Exception:
self.limiter.on_error()
raise
Semaphore-Based Concurrency Control
Copy
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass
@dataclass
class ConcurrencyController:
"""Control concurrent request limits."""
max_concurrent: int = 10
def __post_init__(self):
self.semaphore = asyncio.Semaphore(self.max_concurrent)
self.active_count = 0
async def __aenter__(self):
await self.semaphore.acquire()
self.active_count += 1
return self
async def __aexit__(self, *args):
self.active_count -= 1
self.semaphore.release()
class ConcurrentBatchProcessor:
"""Process batches with controlled concurrency."""
def __init__(self, max_concurrent: int = 10):
self.client = AsyncOpenAI()
self.controller = ConcurrencyController(max_concurrent)
async def process_item(self, item: str) -> dict:
"""Process a single item with concurrency control."""
async with self.controller:
try:
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}]
)
return {
"input": item,
"output": response.choices[0].message.content,
"success": True
}
except Exception as e:
return {
"input": item,
"output": None,
"success": False,
"error": str(e)
}
async def process_batch(
self,
items: list[str],
progress_callback: callable = None
) -> list[dict]:
"""Process a batch of items."""
results = []
completed = 0
async def process_with_progress(item):
nonlocal completed
result = await self.process_item(item)
completed += 1
if progress_callback:
progress_callback(completed, len(items))
return result
tasks = [process_with_progress(item) for item in items]
results = await asyncio.gather(*tasks)
return results
# Usage
async def main():
processor = ConcurrentBatchProcessor(max_concurrent=5)
items = [f"Summarize the number {i}" for i in range(20)]
def on_progress(done, total):
print(f"Progress: {done}/{total}")
results = await processor.process_batch(items, on_progress)
successful = sum(1 for r in results if r["success"])
print(f"Completed: {successful}/{len(items)} successful")
asyncio.run(main())
Request Queue with Priority
Copy
import asyncio
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
from enum import IntEnum
import heapq
class Priority(IntEnum):
HIGH = 0
NORMAL = 1
LOW = 2
@dataclass(order=True)
class QueuedRequest:
"""A request in the priority queue."""
priority: Priority
sequence: int # For FIFO ordering within same priority
request: Any = field(compare=False)
future: asyncio.Future = field(compare=False)
class PriorityRequestQueue:
"""Process LLM requests with priority ordering."""
def __init__(
self,
processor: Callable[[Any], Awaitable[Any]],
max_concurrent: int = 5
):
self.processor = processor
self.semaphore = asyncio.Semaphore(max_concurrent)
self.queue: list[QueuedRequest] = []
self.sequence = 0
self._lock = asyncio.Lock()
self._processing = False
async def submit(
self,
request: Any,
priority: Priority = Priority.NORMAL
) -> Any:
"""Submit a request and wait for result."""
future = asyncio.Future()
async with self._lock:
queued = QueuedRequest(
priority=priority,
sequence=self.sequence,
request=request,
future=future
)
self.sequence += 1
heapq.heappush(self.queue, queued)
# Start processing if not already running
if not self._processing:
asyncio.create_task(self._process_queue())
return await future
async def _process_queue(self):
"""Process requests from the queue."""
self._processing = True
while True:
async with self._lock:
if not self.queue:
self._processing = False
return
queued = heapq.heappop(self.queue)
async with self.semaphore:
try:
result = await self.processor(queued.request)
queued.future.set_result(result)
except Exception as e:
queued.future.set_exception(e)
# Usage
from openai import AsyncOpenAI
async def main():
client = AsyncOpenAI()
async def process_request(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
queue = PriorityRequestQueue(process_request, max_concurrent=3)
# Submit requests with different priorities
tasks = [
queue.submit("Low priority task 1", Priority.LOW),
queue.submit("High priority task", Priority.HIGH),
queue.submit("Normal task 1", Priority.NORMAL),
queue.submit("Low priority task 2", Priority.LOW),
queue.submit("Normal task 2", Priority.NORMAL),
]
results = await asyncio.gather(*tasks)
for result in results:
print(result[:50] + "...")
asyncio.run(main())
Async Context Manager for Sessions
Copy
import asyncio
from openai import AsyncOpenAI
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class SessionStats:
"""Statistics for an async session."""
requests: int = 0
successful: int = 0
failed: int = 0
total_tokens: int = 0
total_latency: float = 0.0
@property
def avg_latency(self) -> float:
return self.total_latency / self.requests if self.requests else 0
@property
def success_rate(self) -> float:
return self.successful / self.requests if self.requests else 0
class AsyncLLMSession:
"""Managed async session for LLM operations."""
def __init__(
self,
max_concurrent: int = 10,
timeout: float = 30.0
):
self.client: Optional[AsyncOpenAI] = None
self.semaphore = asyncio.Semaphore(max_concurrent)
self.timeout = timeout
self.stats = SessionStats()
async def __aenter__(self):
self.client = AsyncOpenAI()
self.stats = SessionStats()
return self
async def __aexit__(self, *args):
self.client = None
async def complete(
self,
prompt: str,
model: str = "gpt-4o-mini"
) -> str:
"""Make a completion within the session."""
if not self.client:
raise RuntimeError("Session not initialized")
async with self.semaphore:
self.stats.requests += 1
start = time.monotonic()
try:
response = await asyncio.wait_for(
self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
),
timeout=self.timeout
)
self.stats.successful += 1
self.stats.total_tokens += response.usage.total_tokens
self.stats.total_latency += time.monotonic() - start
return response.choices[0].message.content
except Exception as e:
self.stats.failed += 1
self.stats.total_latency += time.monotonic() - start
raise
async def batch_complete(
self,
prompts: list[str],
model: str = "gpt-4o-mini"
) -> list[str]:
"""Complete multiple prompts."""
tasks = [self.complete(p, model) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
async def main():
async with AsyncLLMSession(max_concurrent=5) as session:
prompts = [f"What is {i} + {i}?" for i in range(10)]
results = await session.batch_complete(prompts)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Error {i}: {result}")
else:
print(f"Result {i}: {result[:30]}...")
print(f"\nStats:")
print(f" Requests: {session.stats.requests}")
print(f" Success rate: {session.stats.success_rate:.1%}")
print(f" Avg latency: {session.stats.avg_latency:.2f}s")
print(f" Total tokens: {session.stats.total_tokens}")
asyncio.run(main())
Async Best Practices for LLMs
- Always use connection pooling with async clients
- Implement proper timeout handling for all requests
- Use semaphores to control maximum concurrent requests
- Add jitter to retry delays to prevent thundering herd
- Monitor and adapt rate limits based on API responses
Practice Exercise
Build an async batch processor with:- Priority queue for request ordering
- Adaptive rate limiting based on API responses
- Exponential backoff with jitter for retries
- Progress tracking and cancellation support
- Comprehensive statistics collection
- Proper resource cleanup on failures
- Graceful shutdown handling
- Memory-efficient batch processing
- Real-time progress reporting