Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

December 2025 Update: Complete guide to streaming LLM responses with SSE, WebSockets, and production best practices.

Why Streaming Matters

LLM responses can take 5-30 seconds to fully generate. Streaming fundamentally changes user perception: instead of staring at a loading spinner for 10 seconds, users see the first word in 200ms and can start reading immediately. The psychological effect is dramatic — streaming makes a 10-second response feel nearly instant because the user’s brain is busy processing content as it arrives, not counting seconds. Without streaming:
Without Streaming               With Streaming
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
User waits 10s...               First token in 200ms!
...still waiting...             Words appear as generated
...loading spinner...           User reads in real-time
Full response appears           Complete response built up
MetricNon-StreamingStreaming
Time to First Token5-30s100-500ms
Perceived LatencyFull waitNear instant
User ExperienceFrustratingResponsive

OpenAI Streaming

Basic Streaming

from openai import OpenAI

client = OpenAI()

def stream_chat(message: str):
    """Stream a chat response"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    
    print()  # Newline at end

stream_chat("Explain quantum computing in simple terms")

Collecting Streamed Response

def stream_and_collect(message: str) -> str:
    """Stream response and return complete text"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    collected_content = []
    
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            collected_content.append(content)
            print(content, end="", flush=True)
    
    print()
    return "".join(collected_content)

full_response = stream_and_collect("Tell me a short story")

Async Streaming

from openai import AsyncOpenAI
import asyncio

async_client = AsyncOpenAI()

async def async_stream_chat(message: str) -> str:
    """Async streaming for concurrent requests"""
    stream = await async_client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    collected = []
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            collected.append(content)
            print(content, end="", flush=True)
    
    print()
    return "".join(collected)

# Run multiple streams concurrently
async def main():
    tasks = [
        async_stream_chat("Tell me about Python"),
        async_stream_chat("Tell me about JavaScript"),
    ]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(main())

Streaming with Tool Calls

This is where streaming gets tricky. When the model decides to call a tool, the tool call arguments arrive as fragments across multiple chunks — you need to accumulate them before you can parse the JSON and execute the function. Think of it like receiving a fax of a phone number digit by digit: you can’t dial until you have all the digits.
import json

def stream_with_tools(message: str, tools: list):
    """Stream response with tool call handling"""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        tools=tools,
        stream=True
    )
    
    collected_content = []
    tool_calls = {}
    
    for chunk in stream:
        delta = chunk.choices[0].delta
        
        # Handle text content -- display tokens as they arrive
        if delta.content:
            collected_content.append(delta.content)
            print(delta.content, end="", flush=True)
        
        # Handle tool calls -- arguments arrive in fragments across chunks.
        # We must accumulate them before parsing.
        if delta.tool_calls:
            for tool_call in delta.tool_calls:
                idx = tool_call.index  # Identifies which tool call (for parallel calls)
                
                if idx not in tool_calls:
                    tool_calls[idx] = {
                        "id": "",
                        "name": "",
                        "arguments": ""  # Will be built up chunk by chunk
                    }
                
                # Each field arrives independently -- only set when non-None
                if tool_call.id:
                    tool_calls[idx]["id"] = tool_call.id
                if tool_call.function.name:
                    tool_calls[idx]["name"] = tool_call.function.name
                if tool_call.function.arguments:
                    # This is the key: arguments come as string fragments
                    # e.g., '{"lo', 'cation":', ' "NYC"}'
                    tool_calls[idx]["arguments"] += tool_call.function.arguments
    
    # Process tool calls
    for idx, tool_call in tool_calls.items():
        print(f"\n🔧 Tool: {tool_call['name']}")
        args = json.loads(tool_call["arguments"])
        print(f"   Args: {args}")
    
    return "".join(collected_content), list(tool_calls.values())

FastAPI Streaming Endpoints

SSE (Server-Sent Events) is the most common pattern for AI chat applications. Unlike WebSockets, SSE is unidirectional (server to client), works over regular HTTP, survives proxy/load balancer configurations, and auto-reconnects on disconnect. Use SSE unless you need bidirectional streaming (like collaborative editing or voice chat).

Server-Sent Events (SSE)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.post("/chat/stream")
async def stream_chat(request: dict):
    """Stream chat response as SSE"""
    
    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=request["messages"],
            stream=True
        )
        
        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                # SSE format
                data = json.dumps({"content": content})
                yield f"data: {data}\n\n"
        
        # Signal end of stream
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",     # Don't cache partial responses
            "Connection": "keep-alive",       # Keep TCP connection open
            # If behind Nginx, also add "X-Accel-Buffering": "no"
            # Without this, Nginx buffers all tokens and sends them in one burst
        }
    )

# Client-side JavaScript:
# const eventSource = new EventSource('/chat/stream');
# eventSource.onmessage = (event) => {
#     if (event.data === '[DONE]') {
#         eventSource.close();
#     } else {
#         const data = JSON.parse(event.data);
#         document.getElementById('response').textContent += data.content;
#     }
# };

Streaming with Token Counting

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import tiktoken
import json

app = FastAPI()

@app.post("/chat/stream-with-metrics")
async def stream_with_metrics(request: dict):
    """Stream with real-time token counting"""
    
    async def generate():
        encoder = tiktoken.encoding_for_model("gpt-4o")
        total_tokens = 0
        
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=request["messages"],
            stream=True,
            stream_options={"include_usage": True}
        )
        
        for chunk in stream:
            # Content chunks
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                tokens = len(encoder.encode(content))
                total_tokens += tokens
                
                data = {
                    "type": "content",
                    "content": content,
                    "tokens_so_far": total_tokens
                }
                yield f"data: {json.dumps(data)}\n\n"
            
            # Final usage info
            if chunk.usage:
                data = {
                    "type": "usage",
                    "prompt_tokens": chunk.usage.prompt_tokens,
                    "completion_tokens": chunk.usage.completion_tokens,
                    "total_tokens": chunk.usage.total_tokens
                }
                yield f"data: {json.dumps(data)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

WebSocket Streaming

For bidirectional real-time communication:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
import asyncio

app = FastAPI()
client = OpenAI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_message(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await manager.connect(websocket)
    
    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            
            # Start streaming response
            stream = client.chat.completions.create(
                model="gpt-4o",
                messages=data.get("messages", []),
                stream=True
            )
            
            # Send status
            await manager.send_message(
                {"type": "start", "message_id": data.get("id")},
                websocket
            )
            
            # Stream chunks
            full_response = []
            for chunk in stream:
                content = chunk.choices[0].delta.content
                if content:
                    full_response.append(content)
                    await manager.send_message(
                        {"type": "chunk", "content": content},
                        websocket
                    )
            
            # Send completion
            await manager.send_message(
                {
                    "type": "complete",
                    "full_response": "".join(full_response)
                },
                websocket
            )
    
    except WebSocketDisconnect:
        manager.disconnect(websocket)

Client-Side WebSocket

// React/JavaScript WebSocket client
class ChatClient {
    constructor(url) {
        this.ws = new WebSocket(url);
        this.onChunk = null;
        this.onComplete = null;
        
        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            switch (data.type) {
                case 'chunk':
                    if (this.onChunk) this.onChunk(data.content);
                    break;
                case 'complete':
                    if (this.onComplete) this.onComplete(data.full_response);
                    break;
            }
        };
    }
    
    send(messages) {
        this.ws.send(JSON.stringify({ messages }));
    }
}

// Usage
const client = new ChatClient('ws://localhost:8000/ws/chat');
client.onChunk = (content) => {
    document.getElementById('response').textContent += content;
};
client.send([{ role: 'user', content: 'Hello!' }]);

Streaming with LangChain

from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingStdOutCallbackHandler
from langchain_core.messages import HumanMessage

# Simple streaming
llm = ChatOpenAI(
    model="gpt-4o",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

response = llm.invoke([HumanMessage(content="Tell me a joke")])

# Custom streaming handler
from langchain_core.callbacks import BaseCallbackHandler

class CustomStreamHandler(BaseCallbackHandler):
    def __init__(self, on_token):
        self.on_token = on_token
        self.tokens = []
    
    def on_llm_new_token(self, token: str, **kwargs):
        self.tokens.append(token)
        self.on_token(token)
    
    def get_full_response(self) -> str:
        return "".join(self.tokens)

# Usage
tokens_received = []
handler = CustomStreamHandler(lambda t: tokens_received.append(t))

llm = ChatOpenAI(model="gpt-4o", streaming=True, callbacks=[handler])
response = llm.invoke([HumanMessage(content="Explain streaming")])
print(f"Received {len(tokens_received)} tokens")

Async Streaming with LangChain

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

async def stream_langchain():
    llm = ChatOpenAI(model="gpt-4o")
    
    chunks = []
    async for chunk in llm.astream([HumanMessage(content="Hello!")]):
        print(chunk.content, end="", flush=True)
        chunks.append(chunk.content)
    
    return "".join(chunks)

import asyncio
asyncio.run(stream_langchain())

Production Streaming Patterns

In production, streaming introduces failure modes you don’t see with regular request-response. The connection can drop mid-stream, the LLM can time out after generating half an answer, rate limits can hit between chunks, and clients can disconnect while you’re still paying for generation. These patterns address the real-world problems.

Graceful Error Handling

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import traceback

app = FastAPI()

@app.post("/chat/stream")
async def robust_stream(request: dict):
    """Streaming with error handling"""
    
    async def generate():
        try:
            stream = client.chat.completions.create(
                model="gpt-4o",
                messages=request["messages"],
                stream=True
            )
            
            for chunk in stream:
                content = chunk.choices[0].delta.content
                if content:
                    yield f"data: {json.dumps({'content': content})}\n\n"
            
            yield f"data: {json.dumps({'done': True})}\n\n"
            
        except Exception as e:
            error_data = {
                "error": True,
                "message": str(e),
                "type": type(e).__name__
            }
            yield f"data: {json.dumps(error_data)}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Timeout and Cancellation

import asyncio
from contextlib import asynccontextmanager

class StreamManager:
    """Manage streaming with timeouts and cancellation"""
    
    def __init__(self, timeout_seconds: int = 60):
        self.timeout = timeout_seconds
        self.active_streams = {}
    
    async def create_stream(
        self,
        stream_id: str,
        messages: list
    ):
        """Create a managed stream"""
        
        async def generator():
            try:
                stream = client.chat.completions.create(
                    model="gpt-4o",
                    messages=messages,
                    stream=True
                )
                
                async def read_stream():
                    for chunk in stream:
                        if stream_id not in self.active_streams:
                            break  # Cancelled
                        
                        content = chunk.choices[0].delta.content
                        if content:
                            yield content
                
                async for content in asyncio.wait_for(
                    read_stream().__anext__(),
                    timeout=self.timeout
                ):
                    yield content
                    
            except asyncio.TimeoutError:
                yield "[TIMEOUT]"
            finally:
                self.active_streams.pop(stream_id, None)
        
        self.active_streams[stream_id] = True
        return generator()
    
    def cancel_stream(self, stream_id: str):
        """Cancel an active stream"""
        self.active_streams.pop(stream_id, None)

Streaming with Rate Limiting

from collections import defaultdict
import time

class RateLimitedStreamer:
    """Rate-limit streaming per user.
    
    Why per-user limits? Without them, a single user can open 50 concurrent
    streams and exhaust your API quota for everyone. The two dimensions that 
    matter: concurrent streams (resource exhaustion) and requests per minute 
    (cost control).
    """
    
    def __init__(
        self,
        max_concurrent: int = 3,    # Prevent resource exhaustion
        max_per_minute: int = 10    # Prevent cost runaway
    ):
        self.max_concurrent = max_concurrent
        self.max_per_minute = max_per_minute
        self.active_streams = defaultdict(int)
        self.request_times = defaultdict(list)
    
    def can_stream(self, user_id: str) -> tuple[bool, str]:
        """Check if user can start a new stream"""
        # Check concurrent limit
        if self.active_streams[user_id] >= self.max_concurrent:
            return False, "Too many concurrent streams"
        
        # Check rate limit
        now = time.time()
        minute_ago = now - 60
        recent = [t for t in self.request_times[user_id] if t > minute_ago]
        
        if len(recent) >= self.max_per_minute:
            return False, "Rate limit exceeded"
        
        return True, ""
    
    def start_stream(self, user_id: str):
        """Mark stream as started"""
        self.active_streams[user_id] += 1
        self.request_times[user_id].append(time.time())
    
    def end_stream(self, user_id: str):
        """Mark stream as ended"""
        self.active_streams[user_id] = max(0, self.active_streams[user_id] - 1)

SSE vs WebSocket Decision Table

CriterionSSEWebSocket
DirectionServer-to-client onlyBidirectional
ProtocolStandard HTTPUpgrade from HTTP
Auto-reconnectBuilt-in (EventSource API)Manual implementation required
Proxy/CDN supportWorks through most proxiesRequires proxy WebSocket support
Binary dataText only (base64 for binary)Native binary frames
Serverless friendlyYes (standard HTTP response)No (requires persistent connection)
Use for AI chatPreferredOnly if you need client-to-server streaming too
Rule of thumb: If the client sends a single request and receives a streamed response (the standard AI chat pattern), use SSE. If the client needs to send data continuously during the response (voice, collaboration), use WebSocket.

Streaming Edge Cases

When streaming structured outputs (JSON), the response is not valid JSON until the stream completes. You cannot json.loads() a partial stream. Solutions: (1) Use Instructor’s create_partial for Pydantic models that update as fields arrive. (2) For raw JSON, accumulate the full string and parse only on [DONE]. (3) For real-time UI updates during structured streaming, use a streaming JSON parser like ijson that emits events as key-value pairs become complete.
When the model decides to call a tool, the stream switches from content chunks to tool_call chunks. The arguments arrive as string fragments across multiple chunks. You must detect this transition, accumulate the fragments, and parse the JSON only once the tool call is complete. The common mistake: trying to parse tool_call.function.arguments on each chunk — it is a fragment, not valid JSON.
If the server generates tokens faster than the client can process them (slow mobile device, heavy UI rendering), the TCP send buffer fills up. In Python async, this manifests as send_text() blocking. Fix: implement a bounded queue per client. If the queue is full, drop the oldest unacknowledged tokens and send a “gap” marker so the client requests the full response on completion.

Key Takeaways

Always Stream

Streaming dramatically improves perceived performance

Use SSE for Simplicity

SSE is simpler than WebSockets for one-way streaming

Handle Errors Gracefully

Errors should be streamed to client, not cause crashes

Manage Resources

Implement timeouts, cancellation, and rate limiting

What’s Next

Prompt Versioning & Management

Learn to version, test, and manage prompts in production

Interview Deep-Dive

Strong Answer:
  • SSE (Server-Sent Events) is unidirectional: the server pushes data to the client over a standard HTTP connection. WebSockets are bidirectional: both client and server can send messages at any time over a persistent TCP connection. For LLM streaming, the data flow is inherently unidirectional during generation — the server streams tokens to the client — so SSE is the natural fit.
  • SSE has several operational advantages. It works over standard HTTP, which means it survives proxies, load balancers, CDNs, and corporate firewalls that often block or mishandle WebSocket upgrade requests. It auto-reconnects on disconnect with the Last-Event-ID header, giving you resumability for free. It is simpler to implement, debug, and monitor — you can curl an SSE endpoint and see the stream in your terminal. You cannot do that with WebSockets.
  • WebSockets become necessary when you need bidirectional streaming. Real-time voice chat (audio streaming in both directions simultaneously), collaborative editing where multiple users push changes, or any scenario where the client needs to send data mid-stream (like canceling a generation while tokens are still flowing). For a standard chat UI where the user sends a message, waits for the response, then sends the next message, WebSockets add complexity without benefit.
  • The concrete architectural difference: with SSE, each message from the user creates a new HTTP request that returns a streaming response. The connection closes when generation completes. With WebSockets, you maintain a persistent connection across multiple messages, which means you need connection lifecycle management, heartbeats, reconnection logic, and state management on the server. For a chat app with 10K concurrent users, that is 10K persistent WebSocket connections your server must hold open, versus SSE where connections open and close per message.
  • My recommendation for most AI chat products: SSE for the generation stream, with a regular REST endpoint for sending messages. If you later need features like real-time typing indicators or multi-user presence, add WebSockets for those specific features alongside SSE for the main generation stream. Do not force everything through WebSockets just because you might need bidirectional communication someday.
Follow-up: A user reports that tokens are arriving in bursts instead of one at a time. They see nothing for 2 seconds, then a chunk of 20 tokens at once. What is causing this and how do you fix it?This is almost always a buffering problem in the infrastructure between your server and the client. The three most common culprits, in order of likelihood: First, Nginx or your reverse proxy is buffering the response. Nginx buffers upstream responses by default before forwarding them. Fix: add proxy_buffering off; to the Nginx location block, or set the X-Accel-Buffering: no response header. Second, your application framework is buffering. Some WSGI servers and middleware buffer response bodies. Ensure you are using an ASGI server (like Uvicorn) with a proper StreamingResponse that flushes after each chunk. Third, the CDN or cloud load balancer is buffering. AWS ALB, Cloudflare, and similar services may buffer SSE streams. Each has specific configuration to disable buffering for streaming endpoints. I would diagnose by testing at each layer: curl directly to the application server (bypasses all proxies), then through Nginx, then through the full stack, and identify where the bursting begins.
Strong Answer:
  • With regular text streaming, each chunk contains a fragment of the assistant’s message — a few tokens of text that you can display immediately. The client appends each fragment to the UI as it arrives. Simple and progressive.
  • Tool calls break this pattern entirely. When the model decides to call a tool, the tool call arguments arrive as fragmented JSON strings spread across multiple chunks. You might receive: chunk 1 contains {"loc, chunk 2 contains ation":, chunk 3 contains "NYC"}. You cannot parse or act on the tool call until all fragments are assembled into valid JSON. Meanwhile, you cannot display the fragments to the user because they are machine-readable function arguments, not human-readable text.
  • The implementation challenge is that a single response can interleave text content and tool calls, or contain multiple parallel tool calls. Each tool call has an index that identifies which call it belongs to. You need an accumulator that: tracks each tool call by index, concatenates argument fragments per index, detects when the stream ends or transitions to a new tool call, and only then parses the complete JSON and executes the function.
  • The UX challenge is what to show the user during tool call accumulation. The user sees nothing while arguments stream in, creating a dead period. Best practice is to show a status indicator: “Searching for weather data…” as soon as you detect the tool call name (which arrives early in the stream, before the arguments). This gives the user feedback that work is happening. Once the tool executes and you feed results back to the model, the second-pass text response streams normally.
  • An additional production concern: the model might stream partial arguments that are syntactically invalid JSON even when complete. You need a try/catch around json.loads after accumulation, and a recovery strategy — either retry the API call or return an error to the user. I have seen this happen with complex nested schemas where the model generates a trailing comma or mismatched bracket.
Follow-up: The model calls three tools in parallel during a streaming response. How do you handle the execution and the subsequent response efficiently?When the stream completes with multiple tool calls, you should execute all three tools concurrently using asyncio.gather or equivalent, not sequentially. Sequential execution means the total wait time is the sum of all three tool latencies. Concurrent execution means it is the maximum of the three. For three API calls averaging 200ms each, that is 200ms versus 600ms — a 3x improvement. After all three results come back, you append them all to the message history (each with its corresponding tool_call_id) and make one more API call to get the final response. The model sees all three results simultaneously and synthesizes a coherent answer. The key implementation detail: you must include every tool result for every tool call. If the model requested three tools and you only return two results, the API returns an error. If one tool fails, return an error object as its result rather than omitting it — the model can reason about the failure and explain it to the user.
Strong Answer:
  • Streaming rate limiting has two dimensions that traditional request rate limiting does not: concurrent connections and total generation tokens. A single streaming request can hold a connection open for 30 seconds and consume 2000 tokens, while a non-streaming request is a single brief round-trip. You need to limit both the request rate (messages per minute) and concurrent streams (simultaneous open connections per user).
  • For concurrent stream limiting, I would implement a server-side counter per user. When a stream starts, increment the counter. When it ends (complete, error, or client disconnect), decrement it. If the counter exceeds the limit (I would start with 3 concurrent streams per user), reject the new request with a 429 status code and a clear message: “Maximum concurrent streams reached. Please wait for an existing response to complete.” This prevents the 50-tab scenario directly.
  • For request rate limiting, use a sliding window: maximum 10-20 requests per minute per user for a chat application. Use Redis or an in-memory counter with TTL. The sliding window is better than a fixed window because it prevents burst-at-boundary attacks where a user sends 20 requests at second 59 and another 20 at second 61 of the next window.
  • The OpenAI rate limit protection is a separate concern. You should have a global token bucket or semaphore that limits total concurrent requests to the OpenAI API across all users, sized to stay within your rate limit. If the global limit is hit, queue incoming requests rather than rejecting them, with a timeout. This way, a single abusive user fills the queue but other users’ requests are still eventually served rather than immediately rejected.
  • One subtlety specific to streaming: detecting client disconnects. If the user closes the browser tab mid-stream, the server may keep generating tokens and paying for them until the stream finishes. Implement disconnect detection by catching ConnectionResetError or using middleware that checks if the client is still connected before yielding each chunk. When a disconnect is detected, cancel the upstream OpenAI stream immediately.
Follow-up: How do you handle the billing and fairness implications? Should heavy users pay more, or should you throttle them differently?This is a product decision with engineering implications. The technical implementation I would recommend: track token consumption per user per billing period (day or month). Implement tiered rate limits: free users get 10K tokens/day with 2 concurrent streams, paid users get 100K tokens/day with 5 concurrent streams, enterprise gets custom limits. Enforce these limits server-side, not client-side — never trust the client. For fairness across users sharing the same API key limits, implement a weighted fair queue: when the system is at capacity, distribute available request slots proportionally across active users rather than first-come-first-served, which lets power users starve everyone else. The key metric to monitor is “request queue wait time by user tier” — if free users routinely wait 10+ seconds because paid users saturate capacity, your capacity planning or tier limits need adjustment.
Strong Answer:
  • Cancellation has three layers: client-side, server-side, and upstream API. The client needs to signal cancellation (user clicks a stop button), the server needs to stop the generator and clean up, and ideally you stop the upstream API call to stop paying for tokens you will not use.
  • On the client side with SSE, the client calls eventSource.close() which drops the HTTP connection. With WebSockets, the client sends a cancel message like {"type": "cancel", "stream_id": "abc123"}. The server needs to detect the disconnection or receive the cancel message.
  • On the server side, the key is making the stream generator cancellation-aware. In Python with FastAPI, the StreamingResponse generator can check a cancellation flag between each chunk yield. When using WebSockets, the cancel message sets a flag that the generator checks on its next iteration. The generator should break out of its loop immediately, perform cleanup (close the OpenAI stream, update metrics), and return.
  • For the upstream OpenAI API call, the Python client supports stream.close() which terminates the HTTP connection to OpenAI. Crucially, you are still billed for tokens already generated before cancellation. If the model has generated 500 tokens when the user cancels at token 501, you pay for 500 output tokens. You do not get those back. This means cancellation saves future token costs but not past ones.
  • The cost implication creates an interesting optimization: if a user consistently cancels after the first paragraph, consider reducing max_tokens for that user, or implementing a “pause” feature where the model generates a paragraph at a time and the user can choose to continue or stop. This aligns token generation with actual consumption.
  • Implementation detail that trips people up: when the client disconnects during SSE, the server does not get an immediate notification in many frameworks. The server discovers the disconnect only when the next yield fails with a write error. This means there can be a lag between the user clicking “stop” and the server actually stopping generation. To minimize this lag, yield frequently (every token) rather than batching tokens before yielding.
Follow-up: The client disconnects but the server does not notice for 5 seconds. During that window, you have generated 200 tokens that nobody will see. At scale, how significant is this waste?At scale, it compounds fast. If you have 10,000 daily users and 5% cancel mid-stream, that is 500 cancellations per day. At 200 wasted tokens per cancellation, that is 100K tokens per day wasted — roughly 1/daywithGPT4omini,or1/day with GPT-4o-mini, or 10/day with GPT-4o. Over a year, the GPT-4o waste alone is $3,600. Not catastrophic, but not nothing. The fix is to implement proper disconnect detection rather than relying on write failures. In an ASGI framework, you can use request.is_disconnected() which checks the connection state without writing. Poll this every 500ms in a background task and set a cancellation event when it returns true. Alternatively, use a heartbeat mechanism: send a lightweight SSE comment (: heartbeat\n\n) every second. If the write fails, you know immediately. This reduces the detection window from 5 seconds to 1 second, cutting waste by 80%.