December 2025 Update: Complete guide to streaming LLM responses with SSE, WebSockets, and production best practices.
Why Streaming Matters
LLM responses can take 5-30 seconds. Without streaming:Copy
Without Streaming With Streaming
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
User waits 10s... First token in 200ms!
...still waiting... Words appear as generated
...loading spinner... User reads in real-time
Full response appears Complete response built up
| Metric | Non-Streaming | Streaming |
|---|---|---|
| Time to First Token | 5-30s | 100-500ms |
| Perceived Latency | Full wait | Near instant |
| User Experience | Frustrating | Responsive |
OpenAI Streaming
Basic Streaming
Copy
from openai import OpenAI
client = OpenAI()
def stream_chat(message: str):
"""Stream a chat response"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # Newline at end
stream_chat("Explain quantum computing in simple terms")
Collecting Streamed Response
Copy
def stream_and_collect(message: str) -> str:
"""Stream response and return complete text"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
stream=True
)
collected_content = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
collected_content.append(content)
print(content, end="", flush=True)
print()
return "".join(collected_content)
full_response = stream_and_collect("Tell me a short story")
Async Streaming
Copy
from openai import AsyncOpenAI
import asyncio
async_client = AsyncOpenAI()
async def async_stream_chat(message: str) -> str:
"""Async streaming for concurrent requests"""
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
stream=True
)
collected = []
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
collected.append(content)
print(content, end="", flush=True)
print()
return "".join(collected)
# Run multiple streams concurrently
async def main():
tasks = [
async_stream_chat("Tell me about Python"),
async_stream_chat("Tell me about JavaScript"),
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())
Streaming with Tool Calls
Handle streaming when tools are involved:Copy
import json
def stream_with_tools(message: str, tools: list):
"""Stream response with tool call handling"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
tools=tools,
stream=True
)
collected_content = []
tool_calls = {}
for chunk in stream:
delta = chunk.choices[0].delta
# Handle text content
if delta.content:
collected_content.append(delta.content)
print(delta.content, end="", flush=True)
# Handle tool calls
if delta.tool_calls:
for tool_call in delta.tool_calls:
idx = tool_call.index
if idx not in tool_calls:
tool_calls[idx] = {
"id": "",
"name": "",
"arguments": ""
}
if tool_call.id:
tool_calls[idx]["id"] = tool_call.id
if tool_call.function.name:
tool_calls[idx]["name"] = tool_call.function.name
if tool_call.function.arguments:
tool_calls[idx]["arguments"] += tool_call.function.arguments
# Process tool calls
for idx, tool_call in tool_calls.items():
print(f"\n🔧 Tool: {tool_call['name']}")
args = json.loads(tool_call["arguments"])
print(f" Args: {args}")
return "".join(collected_content), list(tool_calls.values())
FastAPI Streaming Endpoints
Server-Sent Events (SSE)
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
@app.post("/chat/stream")
async def stream_chat(request: dict):
"""Stream chat response as SSE"""
async def generate():
stream = client.chat.completions.create(
model="gpt-4o",
messages=request["messages"],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# SSE format
data = json.dumps({"content": content})
yield f"data: {data}\n\n"
# Signal end of stream
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
# Client-side JavaScript:
# const eventSource = new EventSource('/chat/stream');
# eventSource.onmessage = (event) => {
# if (event.data === '[DONE]') {
# eventSource.close();
# } else {
# const data = JSON.parse(event.data);
# document.getElementById('response').textContent += data.content;
# }
# };
Streaming with Token Counting
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import tiktoken
import json
app = FastAPI()
@app.post("/chat/stream-with-metrics")
async def stream_with_metrics(request: dict):
"""Stream with real-time token counting"""
async def generate():
encoder = tiktoken.encoding_for_model("gpt-4o")
total_tokens = 0
stream = client.chat.completions.create(
model="gpt-4o",
messages=request["messages"],
stream=True,
stream_options={"include_usage": True}
)
for chunk in stream:
# Content chunks
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
tokens = len(encoder.encode(content))
total_tokens += tokens
data = {
"type": "content",
"content": content,
"tokens_so_far": total_tokens
}
yield f"data: {json.dumps(data)}\n\n"
# Final usage info
if chunk.usage:
data = {
"type": "usage",
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens
}
yield f"data: {json.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
WebSocket Streaming
For bidirectional real-time communication:Copy
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
import asyncio
app = FastAPI()
client = OpenAI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_message(self, message: dict, websocket: WebSocket):
await websocket.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Receive message from client
data = await websocket.receive_json()
# Start streaming response
stream = client.chat.completions.create(
model="gpt-4o",
messages=data.get("messages", []),
stream=True
)
# Send status
await manager.send_message(
{"type": "start", "message_id": data.get("id")},
websocket
)
# Stream chunks
full_response = []
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
await manager.send_message(
{"type": "chunk", "content": content},
websocket
)
# Send completion
await manager.send_message(
{
"type": "complete",
"full_response": "".join(full_response)
},
websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
Client-Side WebSocket
Copy
// React/JavaScript WebSocket client
class ChatClient {
constructor(url) {
this.ws = new WebSocket(url);
this.onChunk = null;
this.onComplete = null;
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'chunk':
if (this.onChunk) this.onChunk(data.content);
break;
case 'complete':
if (this.onComplete) this.onComplete(data.full_response);
break;
}
};
}
send(messages) {
this.ws.send(JSON.stringify({ messages }));
}
}
// Usage
const client = new ChatClient('ws://localhost:8000/ws/chat');
client.onChunk = (content) => {
document.getElementById('response').textContent += content;
};
client.send([{ role: 'user', content: 'Hello!' }]);
Streaming with LangChain
Copy
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingStdOutCallbackHandler
from langchain_core.messages import HumanMessage
# Simple streaming
llm = ChatOpenAI(
model="gpt-4o",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)
response = llm.invoke([HumanMessage(content="Tell me a joke")])
# Custom streaming handler
from langchain_core.callbacks import BaseCallbackHandler
class CustomStreamHandler(BaseCallbackHandler):
def __init__(self, on_token):
self.on_token = on_token
self.tokens = []
def on_llm_new_token(self, token: str, **kwargs):
self.tokens.append(token)
self.on_token(token)
def get_full_response(self) -> str:
return "".join(self.tokens)
# Usage
tokens_received = []
handler = CustomStreamHandler(lambda t: tokens_received.append(t))
llm = ChatOpenAI(model="gpt-4o", streaming=True, callbacks=[handler])
response = llm.invoke([HumanMessage(content="Explain streaming")])
print(f"Received {len(tokens_received)} tokens")
Async Streaming with LangChain
Copy
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
async def stream_langchain():
llm = ChatOpenAI(model="gpt-4o")
chunks = []
async for chunk in llm.astream([HumanMessage(content="Hello!")]):
print(chunk.content, end="", flush=True)
chunks.append(chunk.content)
return "".join(chunks)
import asyncio
asyncio.run(stream_langchain())
Production Streaming Patterns
Graceful Error Handling
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import traceback
app = FastAPI()
@app.post("/chat/stream")
async def robust_stream(request: dict):
"""Streaming with error handling"""
async def generate():
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=request["messages"],
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield f"data: {json.dumps({'content': content})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
except Exception as e:
error_data = {
"error": True,
"message": str(e),
"type": type(e).__name__
}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
Timeout and Cancellation
Copy
import asyncio
from contextlib import asynccontextmanager
class StreamManager:
"""Manage streaming with timeouts and cancellation"""
def __init__(self, timeout_seconds: int = 60):
self.timeout = timeout_seconds
self.active_streams = {}
async def create_stream(
self,
stream_id: str,
messages: list
):
"""Create a managed stream"""
async def generator():
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True
)
async def read_stream():
for chunk in stream:
if stream_id not in self.active_streams:
break # Cancelled
content = chunk.choices[0].delta.content
if content:
yield content
async for content in asyncio.wait_for(
read_stream().__anext__(),
timeout=self.timeout
):
yield content
except asyncio.TimeoutError:
yield "[TIMEOUT]"
finally:
self.active_streams.pop(stream_id, None)
self.active_streams[stream_id] = True
return generator()
def cancel_stream(self, stream_id: str):
"""Cancel an active stream"""
self.active_streams.pop(stream_id, None)
Streaming with Rate Limiting
Copy
from collections import defaultdict
import time
class RateLimitedStreamer:
"""Rate-limit streaming per user"""
def __init__(
self,
max_concurrent: int = 3,
max_per_minute: int = 10
):
self.max_concurrent = max_concurrent
self.max_per_minute = max_per_minute
self.active_streams = defaultdict(int)
self.request_times = defaultdict(list)
def can_stream(self, user_id: str) -> tuple[bool, str]:
"""Check if user can start a new stream"""
# Check concurrent limit
if self.active_streams[user_id] >= self.max_concurrent:
return False, "Too many concurrent streams"
# Check rate limit
now = time.time()
minute_ago = now - 60
recent = [t for t in self.request_times[user_id] if t > minute_ago]
if len(recent) >= self.max_per_minute:
return False, "Rate limit exceeded"
return True, ""
def start_stream(self, user_id: str):
"""Mark stream as started"""
self.active_streams[user_id] += 1
self.request_times[user_id].append(time.time())
def end_stream(self, user_id: str):
"""Mark stream as ended"""
self.active_streams[user_id] = max(0, self.active_streams[user_id] - 1)
Key Takeaways
Always Stream
Streaming dramatically improves perceived performance
Use SSE for Simplicity
SSE is simpler than WebSockets for one-way streaming
Handle Errors Gracefully
Errors should be streamed to client, not cause crashes
Manage Resources
Implement timeouts, cancellation, and rate limiting
What’s Next
Prompt Versioning & Management
Learn to version, test, and manage prompts in production