Skip to main content
December 2025 Update: Complete guide to streaming LLM responses with SSE, WebSockets, and production best practices.

Why Streaming Matters

LLM responses can take 5-30 seconds. Without streaming:
Without Streaming               With Streaming
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
User waits 10s...               First token in 200ms!
...still waiting...             Words appear as generated
...loading spinner...           User reads in real-time
Full response appears           Complete response built up
MetricNon-StreamingStreaming
Time to First Token5-30s100-500ms
Perceived LatencyFull waitNear instant
User ExperienceFrustratingResponsive

OpenAI Streaming

Basic Streaming

from openai import OpenAI

client = OpenAI()

def stream_chat(message: str):
    """Stream a chat response"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
    
    print()  # Newline at end

stream_chat("Explain quantum computing in simple terms")

Collecting Streamed Response

def stream_and_collect(message: str) -> str:
    """Stream response and return complete text"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    collected_content = []
    
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            collected_content.append(content)
            print(content, end="", flush=True)
    
    print()
    return "".join(collected_content)

full_response = stream_and_collect("Tell me a short story")

Async Streaming

from openai import AsyncOpenAI
import asyncio

async_client = AsyncOpenAI()

async def async_stream_chat(message: str) -> str:
    """Async streaming for concurrent requests"""
    stream = await async_client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    collected = []
    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            collected.append(content)
            print(content, end="", flush=True)
    
    print()
    return "".join(collected)

# Run multiple streams concurrently
async def main():
    tasks = [
        async_stream_chat("Tell me about Python"),
        async_stream_chat("Tell me about JavaScript"),
    ]
    results = await asyncio.gather(*tasks)
    return results

asyncio.run(main())

Streaming with Tool Calls

Handle streaming when tools are involved:
import json

def stream_with_tools(message: str, tools: list):
    """Stream response with tool call handling"""
    
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        tools=tools,
        stream=True
    )
    
    collected_content = []
    tool_calls = {}
    
    for chunk in stream:
        delta = chunk.choices[0].delta
        
        # Handle text content
        if delta.content:
            collected_content.append(delta.content)
            print(delta.content, end="", flush=True)
        
        # Handle tool calls
        if delta.tool_calls:
            for tool_call in delta.tool_calls:
                idx = tool_call.index
                
                if idx not in tool_calls:
                    tool_calls[idx] = {
                        "id": "",
                        "name": "",
                        "arguments": ""
                    }
                
                if tool_call.id:
                    tool_calls[idx]["id"] = tool_call.id
                if tool_call.function.name:
                    tool_calls[idx]["name"] = tool_call.function.name
                if tool_call.function.arguments:
                    tool_calls[idx]["arguments"] += tool_call.function.arguments
    
    # Process tool calls
    for idx, tool_call in tool_calls.items():
        print(f"\n🔧 Tool: {tool_call['name']}")
        args = json.loads(tool_call["arguments"])
        print(f"   Args: {args}")
    
    return "".join(collected_content), list(tool_calls.values())

FastAPI Streaming Endpoints

Server-Sent Events (SSE)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.post("/chat/stream")
async def stream_chat(request: dict):
    """Stream chat response as SSE"""
    
    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=request["messages"],
            stream=True
        )
        
        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                # SSE format
                data = json.dumps({"content": content})
                yield f"data: {data}\n\n"
        
        # Signal end of stream
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

# Client-side JavaScript:
# const eventSource = new EventSource('/chat/stream');
# eventSource.onmessage = (event) => {
#     if (event.data === '[DONE]') {
#         eventSource.close();
#     } else {
#         const data = JSON.parse(event.data);
#         document.getElementById('response').textContent += data.content;
#     }
# };

Streaming with Token Counting

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import tiktoken
import json

app = FastAPI()

@app.post("/chat/stream-with-metrics")
async def stream_with_metrics(request: dict):
    """Stream with real-time token counting"""
    
    async def generate():
        encoder = tiktoken.encoding_for_model("gpt-4o")
        total_tokens = 0
        
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=request["messages"],
            stream=True,
            stream_options={"include_usage": True}
        )
        
        for chunk in stream:
            # Content chunks
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                tokens = len(encoder.encode(content))
                total_tokens += tokens
                
                data = {
                    "type": "content",
                    "content": content,
                    "tokens_so_far": total_tokens
                }
                yield f"data: {json.dumps(data)}\n\n"
            
            # Final usage info
            if chunk.usage:
                data = {
                    "type": "usage",
                    "prompt_tokens": chunk.usage.prompt_tokens,
                    "completion_tokens": chunk.usage.completion_tokens,
                    "total_tokens": chunk.usage.total_tokens
                }
                yield f"data: {json.dumps(data)}\n\n"
        
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

WebSocket Streaming

For bidirectional real-time communication:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
import asyncio

app = FastAPI()
client = OpenAI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_message(self, message: dict, websocket: WebSocket):
        await websocket.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await manager.connect(websocket)
    
    try:
        while True:
            # Receive message from client
            data = await websocket.receive_json()
            
            # Start streaming response
            stream = client.chat.completions.create(
                model="gpt-4o",
                messages=data.get("messages", []),
                stream=True
            )
            
            # Send status
            await manager.send_message(
                {"type": "start", "message_id": data.get("id")},
                websocket
            )
            
            # Stream chunks
            full_response = []
            for chunk in stream:
                content = chunk.choices[0].delta.content
                if content:
                    full_response.append(content)
                    await manager.send_message(
                        {"type": "chunk", "content": content},
                        websocket
                    )
            
            # Send completion
            await manager.send_message(
                {
                    "type": "complete",
                    "full_response": "".join(full_response)
                },
                websocket
            )
    
    except WebSocketDisconnect:
        manager.disconnect(websocket)

Client-Side WebSocket

// React/JavaScript WebSocket client
class ChatClient {
    constructor(url) {
        this.ws = new WebSocket(url);
        this.onChunk = null;
        this.onComplete = null;
        
        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            switch (data.type) {
                case 'chunk':
                    if (this.onChunk) this.onChunk(data.content);
                    break;
                case 'complete':
                    if (this.onComplete) this.onComplete(data.full_response);
                    break;
            }
        };
    }
    
    send(messages) {
        this.ws.send(JSON.stringify({ messages }));
    }
}

// Usage
const client = new ChatClient('ws://localhost:8000/ws/chat');
client.onChunk = (content) => {
    document.getElementById('response').textContent += content;
};
client.send([{ role: 'user', content: 'Hello!' }]);

Streaming with LangChain

from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingStdOutCallbackHandler
from langchain_core.messages import HumanMessage

# Simple streaming
llm = ChatOpenAI(
    model="gpt-4o",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

response = llm.invoke([HumanMessage(content="Tell me a joke")])

# Custom streaming handler
from langchain_core.callbacks import BaseCallbackHandler

class CustomStreamHandler(BaseCallbackHandler):
    def __init__(self, on_token):
        self.on_token = on_token
        self.tokens = []
    
    def on_llm_new_token(self, token: str, **kwargs):
        self.tokens.append(token)
        self.on_token(token)
    
    def get_full_response(self) -> str:
        return "".join(self.tokens)

# Usage
tokens_received = []
handler = CustomStreamHandler(lambda t: tokens_received.append(t))

llm = ChatOpenAI(model="gpt-4o", streaming=True, callbacks=[handler])
response = llm.invoke([HumanMessage(content="Explain streaming")])
print(f"Received {len(tokens_received)} tokens")

Async Streaming with LangChain

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

async def stream_langchain():
    llm = ChatOpenAI(model="gpt-4o")
    
    chunks = []
    async for chunk in llm.astream([HumanMessage(content="Hello!")]):
        print(chunk.content, end="", flush=True)
        chunks.append(chunk.content)
    
    return "".join(chunks)

import asyncio
asyncio.run(stream_langchain())

Production Streaming Patterns

Graceful Error Handling

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import traceback

app = FastAPI()

@app.post("/chat/stream")
async def robust_stream(request: dict):
    """Streaming with error handling"""
    
    async def generate():
        try:
            stream = client.chat.completions.create(
                model="gpt-4o",
                messages=request["messages"],
                stream=True
            )
            
            for chunk in stream:
                content = chunk.choices[0].delta.content
                if content:
                    yield f"data: {json.dumps({'content': content})}\n\n"
            
            yield f"data: {json.dumps({'done': True})}\n\n"
            
        except Exception as e:
            error_data = {
                "error": True,
                "message": str(e),
                "type": type(e).__name__
            }
            yield f"data: {json.dumps(error_data)}\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

Timeout and Cancellation

import asyncio
from contextlib import asynccontextmanager

class StreamManager:
    """Manage streaming with timeouts and cancellation"""
    
    def __init__(self, timeout_seconds: int = 60):
        self.timeout = timeout_seconds
        self.active_streams = {}
    
    async def create_stream(
        self,
        stream_id: str,
        messages: list
    ):
        """Create a managed stream"""
        
        async def generator():
            try:
                stream = client.chat.completions.create(
                    model="gpt-4o",
                    messages=messages,
                    stream=True
                )
                
                async def read_stream():
                    for chunk in stream:
                        if stream_id not in self.active_streams:
                            break  # Cancelled
                        
                        content = chunk.choices[0].delta.content
                        if content:
                            yield content
                
                async for content in asyncio.wait_for(
                    read_stream().__anext__(),
                    timeout=self.timeout
                ):
                    yield content
                    
            except asyncio.TimeoutError:
                yield "[TIMEOUT]"
            finally:
                self.active_streams.pop(stream_id, None)
        
        self.active_streams[stream_id] = True
        return generator()
    
    def cancel_stream(self, stream_id: str):
        """Cancel an active stream"""
        self.active_streams.pop(stream_id, None)

Streaming with Rate Limiting

from collections import defaultdict
import time

class RateLimitedStreamer:
    """Rate-limit streaming per user"""
    
    def __init__(
        self,
        max_concurrent: int = 3,
        max_per_minute: int = 10
    ):
        self.max_concurrent = max_concurrent
        self.max_per_minute = max_per_minute
        self.active_streams = defaultdict(int)
        self.request_times = defaultdict(list)
    
    def can_stream(self, user_id: str) -> tuple[bool, str]:
        """Check if user can start a new stream"""
        # Check concurrent limit
        if self.active_streams[user_id] >= self.max_concurrent:
            return False, "Too many concurrent streams"
        
        # Check rate limit
        now = time.time()
        minute_ago = now - 60
        recent = [t for t in self.request_times[user_id] if t > minute_ago]
        
        if len(recent) >= self.max_per_minute:
            return False, "Rate limit exceeded"
        
        return True, ""
    
    def start_stream(self, user_id: str):
        """Mark stream as started"""
        self.active_streams[user_id] += 1
        self.request_times[user_id].append(time.time())
    
    def end_stream(self, user_id: str):
        """Mark stream as ended"""
        self.active_streams[user_id] = max(0, self.active_streams[user_id] - 1)

Key Takeaways

Always Stream

Streaming dramatically improves perceived performance

Use SSE for Simplicity

SSE is simpler than WebSockets for one-way streaming

Handle Errors Gracefully

Errors should be streamed to client, not cause crashes

Manage Resources

Implement timeouts, cancellation, and rate limiting

What’s Next

Prompt Versioning & Management

Learn to version, test, and manage prompts in production