Documentation Index Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
Use this file to discover all available pages before exploring further.
Realtime AI applications require minimal latency and instant feedback. The difference between 200ms and 2 seconds feels like the difference between talking to a person and talking to an answering machine. Users don’t just prefer fast responses — they fundamentally change how they interact when the system feels alive.
This chapter covers building responsive AI systems using streaming, WebSockets, and dedicated realtime APIs. The core mental model: HTTP is like sending letters back and forth, SSE is like a radio broadcast (one-way stream), and WebSockets are like a phone call (two-way, always on).
Transport Protocol Comparison
Protocol Direction Connection Proxy/CDN Support Reconnection Best For HTTP/REST Request-response Short-lived Excellent N/A Simple queries, non-streaming SSE Server-to-client only Persistent (HTTP) Good (standard HTTP) Built-in auto-reconnect AI chat streaming, notifications WebSocket Bidirectional Persistent (upgrade) Requires config Manual implementation Voice chat, collaborative editing Realtime API Bidirectional audio Persistent (WebSocket) Requires config Manual implementation Voice assistants, live audio
Decision framework:
Default to SSE for AI chat applications — it covers 90% of use cases with the least infrastructure pain.
Use WebSocket only when the client needs to send frequent, low-latency messages (voice streaming, collaborative real-time editing, gaming).
Use the Realtime API when you need native audio-in/audio-out without separate STT/TTS round-trips.
Stick with HTTP for batch processing, non-interactive workloads, or environments where persistent connections are impractical (serverless functions with hard timeout limits).
WebSocket Chat
Basic WebSocket Server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
from typing import List
import asyncio
app = FastAPI()
class ConnectionManager :
"""Manage WebSocket connections.
Why a manager? In production, you need to track who is connected,
broadcast messages, and clean up dead connections. Without this,
you end up with memory leaks and zombie connections.
"""
def __init__ ( self ):
self .active_connections: List[WebSocket] = []
async def connect ( self , websocket : WebSocket):
"""Accept new connection."""
await websocket.accept()
self .active_connections.append(websocket)
def disconnect ( self , websocket : WebSocket):
"""Remove connection."""
self .active_connections.remove(websocket)
async def send_message ( self , message : str , websocket : WebSocket):
"""Send message to specific client."""
await websocket.send_text(message)
async def broadcast ( self , message : str ):
"""Send message to all clients."""
for connection in self .active_connections:
await connection.send_text(message)
manager = ConnectionManager()
client = OpenAI()
@app.websocket ( "/ws/chat" )
async def websocket_chat ( websocket : WebSocket):
"""WebSocket endpoint for streaming chat."""
await manager.connect(websocket)
conversation = []
try :
while True :
# Receive message
data = await websocket.receive_text()
message = json.loads(data)
user_content = message.get( "content" , "" )
conversation.append({ "role" : "user" , "content" : user_content})
# Send acknowledgment
await manager.send_message(
json.dumps({ "type" : "ack" , "status" : "processing" }),
websocket
)
# Stream response
stream = client.chat.completions.create(
model = "gpt-4o-mini" ,
messages = conversation,
stream = True
)
full_response = ""
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
token = chunk.choices[ 0 ].delta.content
full_response += token
await manager.send_message(
json.dumps({
"type" : "token" ,
"content" : token
}),
websocket
)
# Send completion
conversation.append({ "role" : "assistant" , "content" : full_response})
await manager.send_message(
json.dumps({
"type" : "complete" ,
"content" : full_response
}),
websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
# Run with: uvicorn main:app --host 0.0.0.0 --port 8000
WebSocket Client
import asyncio
import websockets
import json
async def chat_client ( uri : str = "ws://localhost:8000/ws/chat" ):
"""Interactive WebSocket chat client."""
async with websockets.connect(uri) as websocket:
print ( "Connected to chat server. Type 'quit' to exit." )
while True :
# Get user input
user_input = input ( " \n You: " )
if user_input.lower() == "quit" :
break
# Send message
await websocket.send(json.dumps({ "content" : user_input}))
# Receive streaming response
print ( " \n Assistant: " , end = "" , flush = True )
while True :
response = await websocket.recv()
data = json.loads(response)
if data[ "type" ] == "token" :
print (data[ "content" ], end = "" , flush = True )
elif data[ "type" ] == "complete" :
print () # Newline
break
elif data[ "type" ] == "ack" :
continue
# Run the client
if __name__ == "__main__" :
asyncio.run(chat_client())
Server-Sent Events (SSE)
SSE is the simpler alternative to WebSockets when you only need server-to-client streaming (which covers most AI chat use cases). The client sends a regular HTTP request, and the server keeps the connection open to push data. No special protocol, works through proxies and load balancers, and auto-reconnects on failure. Use SSE unless you specifically need bidirectional communication.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
import asyncio
app = FastAPI()
client = OpenAI()
async def generate_sse_stream ( messages : list ):
"""Generate SSE stream for chat response."""
stream = client.chat.completions.create(
model = "gpt-4o-mini" ,
messages = messages,
stream = True
)
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
token = chunk.choices[ 0 ].delta.content
data = json.dumps({ "token" : token})
yield f "data: { data } \n\n "
await asyncio.sleep( 0 ) # Allow other tasks
yield f "data: { json.dumps({ 'done' : True }) } \n\n "
@app.post ( "/api/chat/stream" )
async def stream_chat ( request : Request):
"""SSE endpoint for streaming chat responses."""
body = await request.json()
messages = body.get( "messages" , [])
return StreamingResponse(
generate_sse_stream(messages),
media_type = "text/event-stream" ,
headers = {
"Cache-Control" : "no-cache" , # Prevent caching partial responses
"Connection" : "keep-alive" , # Keep the TCP connection open
"X-Accel-Buffering" : "no" # Tell Nginx not to buffer (critical!)
}
)
# Pitfall: If you're behind Nginx and tokens arrive in bursts instead of one-by-one,
# you're missing the X-Accel-Buffering header. Nginx buffers by default.
# Client-side JavaScript:
"""
const eventSource = new EventSource('/api/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({messages: [{role: 'user', content: 'Hello'}]})
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.token) {
console.log(data.token);
}
if (data.done) {
eventSource.close();
}
};
"""
OpenAI Realtime API
The Realtime API is fundamentally different from the standard chat API. Instead of request-response, it is a persistent WebSocket connection where audio flows in both directions simultaneously. The model listens while it speaks, detects when you start talking (voice activity detection), and responds with audio directly — no separate STT/TTS round-trips. This cuts latency from seconds to hundreds of milliseconds.
Audio Conversation
import asyncio
import websockets
import json
import base64
import wave
import io
class RealtimeClient :
"""Client for OpenAI Realtime API."""
def __init__ ( self , api_key : str ):
self .api_key = api_key
self .url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
self .websocket = None
async def connect ( self ):
"""Establish WebSocket connection."""
headers = {
"Authorization" : f "Bearer { self .api_key } " ,
"OpenAI-Beta" : "realtime=v1"
}
self .websocket = await websockets.connect(
self .url,
extra_headers = headers
)
# Configure session -- this is sent once at connection start
await self .send_event({
"type" : "session.update" ,
"session" : {
"modalities" : [ "text" , "audio" ],
"instructions" : "You are a helpful assistant. Be concise." ,
"voice" : "alloy" , # Voice persona for TTS output
"input_audio_format" : "pcm16" , # Raw 16-bit PCM at 24kHz
"output_audio_format" : "pcm16" ,
"turn_detection" : {
"type" : "server_vad" , # Server-side voice activity detection
"threshold" : 0.5 , # Sensitivity: lower = more sensitive
"prefix_padding_ms" : 300 , # Keep 300ms of audio before speech starts
"silence_duration_ms" : 500 # 500ms silence = user stopped talking
}
}
})
# Tip: If users report the model cutting them off, increase silence_duration_ms.
# If it feels sluggish, decrease it. There is no universal right answer.
async def send_event ( self , event : dict ):
"""Send event to server."""
await self .websocket.send(json.dumps(event))
async def receive_events ( self ):
"""Receive and handle events from server."""
while True :
message = await self .websocket.recv()
event = json.loads(message)
yield event
async def send_audio ( self , audio_data : bytes ):
"""Send audio data to server."""
# Audio should be base64-encoded PCM16 at 24kHz
encoded = base64.b64encode(audio_data).decode()
await self .send_event({
"type" : "input_audio_buffer.append" ,
"audio" : encoded
})
async def send_text ( self , text : str ):
"""Send text message."""
await self .send_event({
"type" : "conversation.item.create" ,
"item" : {
"type" : "message" ,
"role" : "user" ,
"content" : [{ "type" : "input_text" , "text" : text}]
}
})
# Request response
await self .send_event({ "type" : "response.create" })
async def close ( self ):
"""Close connection."""
if self .websocket:
await self .websocket.close()
async def realtime_conversation ():
"""Run a realtime conversation."""
import os
client = RealtimeClient(os.environ[ "OPENAI_API_KEY" ])
await client.connect()
print ( "Connected to Realtime API. Type messages or 'quit' to exit." )
# Handle incoming events in background
async def handle_events ():
async for event in client.receive_events():
event_type = event.get( "type" , "" )
if event_type == "response.text.delta" :
print (event.get( "delta" , "" ), end = "" , flush = True )
elif event_type == "response.text.done" :
print () # Newline
elif event_type == "response.audio.delta" :
# Handle audio output
audio_data = base64.b64decode(event.get( "delta" , "" ))
# Play audio or save to file
elif event_type == "error" :
print ( f "Error: { event.get( 'error' , {}).get( 'message' , 'Unknown' ) } " )
event_task = asyncio.create_task(handle_events())
try :
while True :
user_input = await asyncio.get_event_loop().run_in_executor(
None , input , "You: "
)
if user_input.lower() == "quit" :
break
await client.send_text(user_input)
finally :
event_task.cancel()
await client.close()
# Run with: asyncio.run(realtime_conversation())
Voice Activity Detection
import numpy as np
from dataclasses import dataclass
from typing import Optional, Callable
import asyncio
@dataclass
class VADConfig :
"""Voice Activity Detection configuration.
VAD answers one question: "Is someone speaking right now?"
It works by measuring audio energy (RMS) and comparing to a threshold.
The tricky part is handling transitions -- you don't want a brief pause
between sentences to split one utterance into two, and you don't want
a cough to trigger speech detection.
"""
threshold: float = 0.02 # RMS energy threshold (tune per environment)
min_speech_duration_ms: int = 200 # Ignore speech shorter than this (filters coughs)
min_silence_duration_ms: int = 500 # Wait this long before declaring "speech ended"
sample_rate: int = 16000 # 16kHz is standard for speech recognition
class VoiceActivityDetector :
"""Detect speech in audio stream."""
def __init__ ( self , config : VADConfig = None ):
self .config = config or VADConfig()
self .is_speaking = False
self .silence_start = None
self .speech_start = None
self .speech_buffer = []
def process_chunk ( self , audio_chunk : bytes ) -> Optional[ dict ]:
"""Process audio chunk and return speech events."""
# Convert bytes to numpy array
audio = np.frombuffer(audio_chunk, dtype = np.int16)
# Calculate RMS energy
rms = np.sqrt(np.mean(audio.astype(np.float32) ** 2 )) / 32768
current_time_ms = len ( self .speech_buffer) * len (audio_chunk) * 1000 / (
self .config.sample_rate * 2 # 2 bytes per sample
)
if rms > self .config.threshold:
# Speech detected
if not self .is_speaking:
self .speech_start = current_time_ms
self .is_speaking = True
self .silence_start = None
self .speech_buffer.append(audio_chunk)
return { "type" : "speech_active" , "rms" : rms}
else :
# Silence detected
if self .is_speaking:
if self .silence_start is None :
self .silence_start = current_time_ms
silence_duration = current_time_ms - self .silence_start
if silence_duration >= self .config.min_silence_duration_ms:
# End of speech
speech_duration = current_time_ms - self .speech_start
if speech_duration >= self .config.min_speech_duration_ms:
# Valid speech segment
audio_data = b "" .join( self .speech_buffer)
self .speech_buffer = []
self .is_speaking = False
return {
"type" : "speech_end" ,
"audio" : audio_data,
"duration_ms" : speech_duration
}
self .speech_buffer = []
self .is_speaking = False
return None
class RealtimeAudioProcessor :
"""Process realtime audio with VAD."""
def __init__ (
self ,
on_speech_segment : Callable[[ bytes ], None ],
vad_config : VADConfig = None
):
self .vad = VoiceActivityDetector(vad_config)
self .on_speech_segment = on_speech_segment
async def process_stream ( self , audio_stream ):
"""Process audio stream and emit speech segments."""
async for chunk in audio_stream:
result = self .vad.process_chunk(chunk)
if result and result[ "type" ] == "speech_end" :
await self .on_speech_segment(result[ "audio" ])
# Usage with microphone input
async def process_microphone ():
"""Process microphone input with VAD."""
import pyaudio
CHUNK_SIZE = 1600 # 100ms at 16kHz
async def handle_speech ( audio_data : bytes ):
print ( f "Speech segment: { len (audio_data) } bytes" )
# Send to transcription or realtime API
processor = RealtimeAudioProcessor(
on_speech_segment = handle_speech,
vad_config = VADConfig( threshold = 0.02 )
)
# Create audio stream generator
async def audio_generator ():
p = pyaudio.PyAudio()
stream = p.open(
format = pyaudio.paInt16,
channels = 1 ,
rate = 16000 ,
input = True ,
frames_per_buffer = CHUNK_SIZE
)
try :
while True :
chunk = stream.read( CHUNK_SIZE )
yield chunk
await asyncio.sleep( 0 )
finally :
stream.stop_stream()
stream.close()
p.terminate()
await processor.process_stream(audio_generator())
Latency Optimization
from openai import OpenAI
from dataclasses import dataclass
import time
import asyncio
from typing import Optional
@dataclass
class LatencyMetrics :
"""Track latency metrics."""
time_to_first_token_ms: float
total_time_ms: float
tokens_generated: int
tokens_per_second: float
class LowLatencyClient :
"""Optimized client for low-latency inference."""
def __init__ (
self ,
model : str = "gpt-4o-mini" ,
timeout : float = 10.0
):
self .client = OpenAI( timeout = timeout)
self .model = model
def stream_with_metrics (
self ,
messages : list ,
max_tokens : int = 256
) -> tuple[ str , LatencyMetrics]:
"""Stream response and collect latency metrics."""
start_time = time.time()
first_token_time = None
tokens = []
stream = self .client.chat.completions.create(
model = self .model,
messages = messages,
max_tokens = max_tokens,
stream = True ,
stream_options = { "include_usage" : True }
)
for chunk in stream:
if chunk.choices and chunk.choices[ 0 ].delta.content:
if first_token_time is None :
first_token_time = time.time()
tokens.append(chunk.choices[ 0 ].delta.content)
end_time = time.time()
total_time_ms = (end_time - start_time) * 1000
ttft_ms = (first_token_time - start_time) * 1000 if first_token_time else 0
metrics = LatencyMetrics(
time_to_first_token_ms = ttft_ms,
total_time_ms = total_time_ms,
tokens_generated = len (tokens),
tokens_per_second = len (tokens) / (total_time_ms / 1000 ) if total_time_ms > 0 else 0
)
return "" .join(tokens), metrics
async def parallel_completions (
self ,
prompts : list[ str ],
max_concurrent : int = 5
) -> list[tuple[ str , LatencyMetrics]]:
"""Run multiple completions in parallel."""
semaphore = asyncio.Semaphore(max_concurrent)
async def run_one ( prompt : str ):
async with semaphore:
# Use sync client in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None ,
lambda : self .stream_with_metrics(
[{ "role" : "user" , "content" : prompt}]
)
)
tasks = [run_one(p) for p in prompts]
return await asyncio.gather( * tasks)
class LatencyOptimizer :
"""Strategies for optimizing latency."""
@ staticmethod
def optimize_prompt ( prompt : str ) -> str :
"""Optimize prompt for lower latency.
Every token in the prompt adds to Time-to-First-Token (TTFT).
The relationship is roughly linear: 2x the prompt tokens = 2x the TTFT.
Trimming unnecessary whitespace and truncating bloated prompts is the
single easiest latency win.
"""
# Remove unnecessary whitespace -- saves tokens without losing meaning
prompt = " " .join(prompt.split())
# Truncate very long prompts -- beyond 4K chars, you're likely padding
if len (prompt) > 4000 :
prompt = prompt[: 4000 ] + "..."
return prompt
@ staticmethod
def select_model_for_latency (
complexity : str ,
max_latency_ms : float
) -> str :
"""Select appropriate model based on latency requirements."""
model_latencies = {
"gpt-4o-mini" : { "simple" : 100 , "moderate" : 200 , "complex" : 400 },
"gpt-4o" : { "simple" : 300 , "moderate" : 600 , "complex" : 1000 },
}
for model, latencies in model_latencies.items():
if latencies.get(complexity, 1000 ) <= max_latency_ms:
return model
return "gpt-4o-mini" # Fallback to fastest
@ staticmethod
def chunk_for_streaming (
text : str ,
chunk_size : int = 50
) -> list[ str ]:
"""Chunk text for streaming display."""
words = text.split()
chunks = []
for i in range ( 0 , len (words), chunk_size):
chunks.append( " " .join(words[i:i + chunk_size]))
return chunks
# Usage
client = LowLatencyClient( model = "gpt-4o-mini" )
# Single completion with metrics
response, metrics = client.stream_with_metrics(
[{ "role" : "user" , "content" : "Hello!" }]
)
print ( f "Response: { response } " )
print ( f "TTFT: { metrics.time_to_first_token_ms :.0f} ms" )
print ( f "Total: { metrics.total_time_ms :.0f} ms" )
print ( f "Speed: { metrics.tokens_per_second :.1f} tok/s" )
# Parallel completions
prompts = [ "What is AI?" , "Explain Python" , "Define cloud computing" ]
results = asyncio.run(client.parallel_completions(prompts))
for (response, metrics), prompt in zip (results, prompts):
print ( f " { prompt[: 20 ] } : { metrics.time_to_first_token_ms :.0f} ms TTFT" )
Typing Indicators and Progress
from fastapi import FastAPI, WebSocket
from openai import OpenAI
import json
import asyncio
import time
app = FastAPI()
client = OpenAI()
class ProgressiveResponse :
"""Handle progressive response display."""
def __init__ ( self , websocket : WebSocket):
self .websocket = websocket
self .is_typing = False
self .typing_task = None
async def start_typing ( self ):
"""Start typing indicator."""
self .is_typing = True
async def send_typing ():
while self .is_typing:
await self .websocket.send_json({
"type" : "typing" ,
"status" : True
})
await asyncio.sleep( 2 )
self .typing_task = asyncio.create_task(send_typing())
async def stop_typing ( self ):
"""Stop typing indicator."""
self .is_typing = False
if self .typing_task:
self .typing_task.cancel()
try :
await self .typing_task
except asyncio.CancelledError:
pass
async def send_token ( self , token : str , index : int ):
"""Send token with metadata."""
await self .websocket.send_json({
"type" : "token" ,
"content" : token,
"index" : index,
"timestamp" : time.time()
})
async def send_progress (
self ,
stage : str ,
progress : float ,
message : str = ""
):
"""Send progress update."""
await self .websocket.send_json({
"type" : "progress" ,
"stage" : stage,
"progress" : progress,
"message" : message
})
@app.websocket ( "/ws/chat/progressive" )
async def progressive_chat ( websocket : WebSocket):
"""WebSocket with progressive response features."""
await websocket.accept()
responder = ProgressiveResponse(websocket)
try :
while True :
data = await websocket.receive_text()
message = json.loads(data)
# Show typing indicator
await responder.start_typing()
# Send processing stages
await responder.send_progress( "thinking" , 0.2 , "Understanding your question..." )
await asyncio.sleep( 0.1 )
await responder.send_progress( "generating" , 0.4 , "Formulating response..." )
# Stop typing and start streaming
await responder.stop_typing()
stream = client.chat.completions.create(
model = "gpt-4o-mini" ,
messages = [{ "role" : "user" , "content" : message[ "content" ]}],
stream = True
)
token_index = 0
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
await responder.send_token(
chunk.choices[ 0 ].delta.content,
token_index
)
token_index += 1
await responder.send_progress( "complete" , 1.0 , "Done!" )
await websocket.send_json({
"type" : "complete" ,
"total_tokens" : token_index
})
except Exception as e:
await responder.stop_typing()
await websocket.send_json({
"type" : "error" ,
"message" : str (e)
})
# Client-side handling:
"""
const ws = new WebSocket('ws://localhost:8000/ws/chat/progressive');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'typing':
showTypingIndicator(data.status);
break;
case 'progress':
updateProgressBar(data.stage, data.progress, data.message);
break;
case 'token':
appendToken(data.content);
break;
case 'complete':
hideProgress();
console.log(`Completed with ${data.total_tokens} tokens`);
break;
case 'error':
showError(data.message);
break;
}
};
"""
Realtime Collaboration
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json
import asyncio
from dataclasses import dataclass, field
@dataclass
class Room :
"""A collaborative chat room."""
id : str
participants: Set[WebSocket] = field( default_factory = set )
conversation: list = field( default_factory = list )
ai_responding: bool = False
class CollaborativeChat :
"""Multi-user realtime AI chat."""
def __init__ ( self ):
self .rooms: Dict[ str , Room] = {}
from openai import OpenAI
self .client = OpenAI()
def get_or_create_room ( self , room_id : str ) -> Room:
"""Get or create a chat room."""
if room_id not in self .rooms:
self .rooms[room_id] = Room( id = room_id)
return self .rooms[room_id]
async def join_room ( self , room_id : str , websocket : WebSocket):
"""Add participant to room."""
room = self .get_or_create_room(room_id)
room.participants.add(websocket)
# Send room state to new participant
await websocket.send_json({
"type" : "room_state" ,
"conversation" : room.conversation,
"participants" : len (room.participants)
})
# Notify others
await self .broadcast(room, {
"type" : "participant_joined" ,
"count" : len (room.participants)
}, exclude = websocket)
async def leave_room ( self , room_id : str , websocket : WebSocket):
"""Remove participant from room."""
if room_id in self .rooms:
room = self .rooms[room_id]
room.participants.discard(websocket)
await self .broadcast(room, {
"type" : "participant_left" ,
"count" : len (room.participants)
})
async def broadcast (
self ,
room : Room,
message : dict ,
exclude : WebSocket = None
):
"""Broadcast message to all room participants."""
for ws in room.participants:
if ws != exclude:
try :
await ws.send_json(message)
except Exception :
pass
async def handle_message (
self ,
room_id : str ,
user_id : str ,
content : str
):
"""Handle incoming user message."""
room = self .rooms[room_id]
# Add user message
user_msg = {
"role" : "user" ,
"user_id" : user_id,
"content" : content
}
room.conversation.append(user_msg)
# Broadcast user message
await self .broadcast(room, {
"type" : "user_message" ,
"user_id" : user_id,
"content" : content
})
# Generate AI response if not already responding
if not room.ai_responding:
room.ai_responding = True
try :
await self .broadcast(room, { "type" : "ai_typing" , "status" : True })
# Format conversation for API
messages = [
{ "role" : m[ "role" ], "content" : m[ "content" ]}
for m in room.conversation
]
stream = self .client.chat.completions.create(
model = "gpt-4o-mini" ,
messages = messages,
stream = True
)
ai_response = ""
for chunk in stream:
if chunk.choices[ 0 ].delta.content:
token = chunk.choices[ 0 ].delta.content
ai_response += token
await self .broadcast(room, {
"type" : "ai_token" ,
"content" : token
})
# Add AI response to conversation
room.conversation.append({
"role" : "assistant" ,
"content" : ai_response
})
await self .broadcast(room, {
"type" : "ai_complete" ,
"content" : ai_response
})
finally :
room.ai_responding = False
await self .broadcast(room, { "type" : "ai_typing" , "status" : False })
app = FastAPI()
collab = CollaborativeChat()
@app.websocket ( "/ws/room/ {room_id} " )
async def room_websocket ( websocket : WebSocket, room_id : str ):
"""WebSocket for collaborative room."""
await websocket.accept()
await collab.join_room(room_id, websocket)
try :
while True :
data = await websocket.receive_text()
message = json.loads(data)
await collab.handle_message(
room_id,
message.get( "user_id" , "anonymous" ),
message.get( "content" , "" )
)
except WebSocketDisconnect:
await collab.leave_room(room_id, websocket)
Realtime Best Practices
Use WebSockets for bidirectional communication, SSE for server-to-client only (SSE is simpler and more proxy-friendly)
Implement typing indicators — without them, users have no idea if the system is working or frozen
Stream tokens as they are generated — users can start reading while the model is still generating
Handle disconnections gracefully — mobile users lose connection constantly; reconnect and resume without losing context
Monitor Time-to-First-Token (TTFT) as your primary latency metric — total time matters less than perceived responsiveness
Watch out for Nginx/CloudFlare buffering — it can silently destroy your streaming experience
Edge Cases and Production Gotchas
Client disconnects mid-stream
Problem : The user closes the tab or loses cellular signal while the LLM is still generating. Your server keeps generating tokens, paying for them, but nobody is listening.Fix : Check the connection state before sending each chunk. In FastAPI WebSocket, wrap send_text in a try/except for WebSocketDisconnect. For SSE, the StreamingResponse generator should catch asyncio.CancelledError (which FastAPI raises when the client disconnects). Always clean up resources (cancel the OpenAI stream, decrement active connection counters) in a finally block.
Nginx/CloudFlare buffering destroys streaming
Problem : Tokens arrive in large bursts instead of one-by-one. The reverse proxy buffers the entire response before forwarding.Fix : For Nginx, add proxy_buffering off; and the X-Accel-Buffering: no response header. For CloudFlare, disable “Rocket Loader” and “Auto Minify.” For AWS ALB, set idle timeout higher than your longest expected stream. This is the most common “streaming isn’t working” issue in production and is never an application-layer bug.
Mobile networks with high latency and packet loss
Problem : WebSocket connections drop frequently on 3G/4G. Users see partial responses and frozen UIs.Fix : Implement message sequence numbers so the client can detect gaps. Add a reconnection protocol that resumes from the last received sequence number. For SSE, the Last-Event-ID header handles this automatically — another reason to prefer SSE over WebSocket for mobile clients.
Multiple concurrent streams from one user
Problem : A user opens three tabs, each initiating a stream. Your server is now running three concurrent LLM calls for one person, tripling cost and potentially hitting rate limits.Fix : Track active streams per user (using the RateLimitedStreamer pattern above). Either reject new streams while one is active, or cancel the previous stream when a new one starts. Include the stream ID in the WebSocket session so the client can resume rather than restart.
Practice Exercise
Build a realtime AI application that:
Uses WebSockets for bidirectional communication
Implements voice activity detection
Streams responses with latency metrics
Shows typing indicators and progress
Supports multiple concurrent users
Focus on:
Minimizing time to first token
Smooth streaming experience
Graceful error handling
Scalable connection management