Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Realtime AI applications require minimal latency and instant feedback. The difference between 200ms and 2 seconds feels like the difference between talking to a person and talking to an answering machine. Users don’t just prefer fast responses — they fundamentally change how they interact when the system feels alive. This chapter covers building responsive AI systems using streaming, WebSockets, and dedicated realtime APIs. The core mental model: HTTP is like sending letters back and forth, SSE is like a radio broadcast (one-way stream), and WebSockets are like a phone call (two-way, always on).

Transport Protocol Comparison

ProtocolDirectionConnectionProxy/CDN SupportReconnectionBest For
HTTP/RESTRequest-responseShort-livedExcellentN/ASimple queries, non-streaming
SSEServer-to-client onlyPersistent (HTTP)Good (standard HTTP)Built-in auto-reconnectAI chat streaming, notifications
WebSocketBidirectionalPersistent (upgrade)Requires configManual implementationVoice chat, collaborative editing
Realtime APIBidirectional audioPersistent (WebSocket)Requires configManual implementationVoice assistants, live audio
Decision framework:
  • Default to SSE for AI chat applications — it covers 90% of use cases with the least infrastructure pain.
  • Use WebSocket only when the client needs to send frequent, low-latency messages (voice streaming, collaborative real-time editing, gaming).
  • Use the Realtime API when you need native audio-in/audio-out without separate STT/TTS round-trips.
  • Stick with HTTP for batch processing, non-interactive workloads, or environments where persistent connections are impractical (serverless functions with hard timeout limits).

WebSocket Chat

Basic WebSocket Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import OpenAI
import json
from typing import List
import asyncio


app = FastAPI()


class ConnectionManager:
    """Manage WebSocket connections.
    
    Why a manager? In production, you need to track who is connected,
    broadcast messages, and clean up dead connections. Without this,
    you end up with memory leaks and zombie connections.
    """
    
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        """Accept new connection."""
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        """Remove connection."""
        self.active_connections.remove(websocket)
    
    async def send_message(self, message: str, websocket: WebSocket):
        """Send message to specific client."""
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        """Send message to all clients."""
        for connection in self.active_connections:
            await connection.send_text(message)


manager = ConnectionManager()
client = OpenAI()


@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    """WebSocket endpoint for streaming chat."""
    await manager.connect(websocket)
    
    conversation = []
    
    try:
        while True:
            # Receive message
            data = await websocket.receive_text()
            message = json.loads(data)
            
            user_content = message.get("content", "")
            conversation.append({"role": "user", "content": user_content})
            
            # Send acknowledgment
            await manager.send_message(
                json.dumps({"type": "ack", "status": "processing"}),
                websocket
            )
            
            # Stream response
            stream = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=conversation,
                stream=True
            )
            
            full_response = ""
            
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    token = chunk.choices[0].delta.content
                    full_response += token
                    
                    await manager.send_message(
                        json.dumps({
                            "type": "token",
                            "content": token
                        }),
                        websocket
                    )
            
            # Send completion
            conversation.append({"role": "assistant", "content": full_response})
            
            await manager.send_message(
                json.dumps({
                    "type": "complete",
                    "content": full_response
                }),
                websocket
            )
            
    except WebSocketDisconnect:
        manager.disconnect(websocket)


# Run with: uvicorn main:app --host 0.0.0.0 --port 8000

WebSocket Client

import asyncio
import websockets
import json


async def chat_client(uri: str = "ws://localhost:8000/ws/chat"):
    """Interactive WebSocket chat client."""
    async with websockets.connect(uri) as websocket:
        print("Connected to chat server. Type 'quit' to exit.")
        
        while True:
            # Get user input
            user_input = input("\nYou: ")
            
            if user_input.lower() == "quit":
                break
            
            # Send message
            await websocket.send(json.dumps({"content": user_input}))
            
            # Receive streaming response
            print("\nAssistant: ", end="", flush=True)
            
            while True:
                response = await websocket.recv()
                data = json.loads(response)
                
                if data["type"] == "token":
                    print(data["content"], end="", flush=True)
                elif data["type"] == "complete":
                    print()  # Newline
                    break
                elif data["type"] == "ack":
                    continue


# Run the client
if __name__ == "__main__":
    asyncio.run(chat_client())

Server-Sent Events (SSE)

SSE is the simpler alternative to WebSockets when you only need server-to-client streaming (which covers most AI chat use cases). The client sends a regular HTTP request, and the server keeps the connection open to push data. No special protocol, works through proxies and load balancers, and auto-reconnects on failure. Use SSE unless you specifically need bidirectional communication.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
import asyncio


app = FastAPI()
client = OpenAI()


async def generate_sse_stream(messages: list):
    """Generate SSE stream for chat response."""
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            data = json.dumps({"token": token})
            yield f"data: {data}\n\n"
            await asyncio.sleep(0)  # Allow other tasks
    
    yield f"data: {json.dumps({'done': True})}\n\n"


@app.post("/api/chat/stream")
async def stream_chat(request: Request):
    """SSE endpoint for streaming chat responses."""
    body = await request.json()
    messages = body.get("messages", [])
    
    return StreamingResponse(
        generate_sse_stream(messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",       # Prevent caching partial responses
            "Connection": "keep-alive",         # Keep the TCP connection open
            "X-Accel-Buffering": "no"          # Tell Nginx not to buffer (critical!)
        }
    )
    # Pitfall: If you're behind Nginx and tokens arrive in bursts instead of one-by-one,
    # you're missing the X-Accel-Buffering header. Nginx buffers by default.


# Client-side JavaScript:
"""
const eventSource = new EventSource('/api/chat/stream', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({messages: [{role: 'user', content: 'Hello'}]})
});

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.token) {
        console.log(data.token);
    }
    if (data.done) {
        eventSource.close();
    }
};
"""

OpenAI Realtime API

The Realtime API is fundamentally different from the standard chat API. Instead of request-response, it is a persistent WebSocket connection where audio flows in both directions simultaneously. The model listens while it speaks, detects when you start talking (voice activity detection), and responds with audio directly — no separate STT/TTS round-trips. This cuts latency from seconds to hundreds of milliseconds.

Audio Conversation

import asyncio
import websockets
import json
import base64
import wave
import io


class RealtimeClient:
    """Client for OpenAI Realtime API."""
    
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
        self.websocket = None
    
    async def connect(self):
        """Establish WebSocket connection."""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "OpenAI-Beta": "realtime=v1"
        }
        
        self.websocket = await websockets.connect(
            self.url,
            extra_headers=headers
        )
        
        # Configure session -- this is sent once at connection start
        await self.send_event({
            "type": "session.update",
            "session": {
                "modalities": ["text", "audio"],
                "instructions": "You are a helpful assistant. Be concise.",
                "voice": "alloy",            # Voice persona for TTS output
                "input_audio_format": "pcm16",   # Raw 16-bit PCM at 24kHz
                "output_audio_format": "pcm16",
                "turn_detection": {
                    "type": "server_vad",        # Server-side voice activity detection
                    "threshold": 0.5,            # Sensitivity: lower = more sensitive
                    "prefix_padding_ms": 300,    # Keep 300ms of audio before speech starts
                    "silence_duration_ms": 500   # 500ms silence = user stopped talking
                }
            }
        })
        # Tip: If users report the model cutting them off, increase silence_duration_ms.
        # If it feels sluggish, decrease it. There is no universal right answer.
    
    async def send_event(self, event: dict):
        """Send event to server."""
        await self.websocket.send(json.dumps(event))
    
    async def receive_events(self):
        """Receive and handle events from server."""
        while True:
            message = await self.websocket.recv()
            event = json.loads(message)
            yield event
    
    async def send_audio(self, audio_data: bytes):
        """Send audio data to server."""
        # Audio should be base64-encoded PCM16 at 24kHz
        encoded = base64.b64encode(audio_data).decode()
        
        await self.send_event({
            "type": "input_audio_buffer.append",
            "audio": encoded
        })
    
    async def send_text(self, text: str):
        """Send text message."""
        await self.send_event({
            "type": "conversation.item.create",
            "item": {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": text}]
            }
        })
        
        # Request response
        await self.send_event({"type": "response.create"})
    
    async def close(self):
        """Close connection."""
        if self.websocket:
            await self.websocket.close()


async def realtime_conversation():
    """Run a realtime conversation."""
    import os
    
    client = RealtimeClient(os.environ["OPENAI_API_KEY"])
    await client.connect()
    
    print("Connected to Realtime API. Type messages or 'quit' to exit.")
    
    # Handle incoming events in background
    async def handle_events():
        async for event in client.receive_events():
            event_type = event.get("type", "")
            
            if event_type == "response.text.delta":
                print(event.get("delta", ""), end="", flush=True)
            
            elif event_type == "response.text.done":
                print()  # Newline
            
            elif event_type == "response.audio.delta":
                # Handle audio output
                audio_data = base64.b64decode(event.get("delta", ""))
                # Play audio or save to file
            
            elif event_type == "error":
                print(f"Error: {event.get('error', {}).get('message', 'Unknown')}")
    
    event_task = asyncio.create_task(handle_events())
    
    try:
        while True:
            user_input = await asyncio.get_event_loop().run_in_executor(
                None, input, "You: "
            )
            
            if user_input.lower() == "quit":
                break
            
            await client.send_text(user_input)
    
    finally:
        event_task.cancel()
        await client.close()


# Run with: asyncio.run(realtime_conversation())

Voice Activity Detection

import numpy as np
from dataclasses import dataclass
from typing import Optional, Callable
import asyncio


@dataclass
class VADConfig:
    """Voice Activity Detection configuration.
    
    VAD answers one question: "Is someone speaking right now?"
    It works by measuring audio energy (RMS) and comparing to a threshold.
    The tricky part is handling transitions -- you don't want a brief pause 
    between sentences to split one utterance into two, and you don't want 
    a cough to trigger speech detection.
    """
    threshold: float = 0.02             # RMS energy threshold (tune per environment)
    min_speech_duration_ms: int = 200   # Ignore speech shorter than this (filters coughs)
    min_silence_duration_ms: int = 500  # Wait this long before declaring "speech ended"
    sample_rate: int = 16000            # 16kHz is standard for speech recognition


class VoiceActivityDetector:
    """Detect speech in audio stream."""
    
    def __init__(self, config: VADConfig = None):
        self.config = config or VADConfig()
        self.is_speaking = False
        self.silence_start = None
        self.speech_start = None
        self.speech_buffer = []
    
    def process_chunk(self, audio_chunk: bytes) -> Optional[dict]:
        """Process audio chunk and return speech events."""
        # Convert bytes to numpy array
        audio = np.frombuffer(audio_chunk, dtype=np.int16)
        
        # Calculate RMS energy
        rms = np.sqrt(np.mean(audio.astype(np.float32) ** 2)) / 32768
        
        current_time_ms = len(self.speech_buffer) * len(audio_chunk) * 1000 / (
            self.config.sample_rate * 2  # 2 bytes per sample
        )
        
        if rms > self.config.threshold:
            # Speech detected
            if not self.is_speaking:
                self.speech_start = current_time_ms
            
            self.is_speaking = True
            self.silence_start = None
            self.speech_buffer.append(audio_chunk)
            
            return {"type": "speech_active", "rms": rms}
        
        else:
            # Silence detected
            if self.is_speaking:
                if self.silence_start is None:
                    self.silence_start = current_time_ms
                
                silence_duration = current_time_ms - self.silence_start
                
                if silence_duration >= self.config.min_silence_duration_ms:
                    # End of speech
                    speech_duration = current_time_ms - self.speech_start
                    
                    if speech_duration >= self.config.min_speech_duration_ms:
                        # Valid speech segment
                        audio_data = b"".join(self.speech_buffer)
                        self.speech_buffer = []
                        self.is_speaking = False
                        
                        return {
                            "type": "speech_end",
                            "audio": audio_data,
                            "duration_ms": speech_duration
                        }
                    
                    self.speech_buffer = []
                    self.is_speaking = False
        
        return None


class RealtimeAudioProcessor:
    """Process realtime audio with VAD."""
    
    def __init__(
        self,
        on_speech_segment: Callable[[bytes], None],
        vad_config: VADConfig = None
    ):
        self.vad = VoiceActivityDetector(vad_config)
        self.on_speech_segment = on_speech_segment
    
    async def process_stream(self, audio_stream):
        """Process audio stream and emit speech segments."""
        async for chunk in audio_stream:
            result = self.vad.process_chunk(chunk)
            
            if result and result["type"] == "speech_end":
                await self.on_speech_segment(result["audio"])


# Usage with microphone input
async def process_microphone():
    """Process microphone input with VAD."""
    import pyaudio
    
    CHUNK_SIZE = 1600  # 100ms at 16kHz
    
    async def handle_speech(audio_data: bytes):
        print(f"Speech segment: {len(audio_data)} bytes")
        # Send to transcription or realtime API
    
    processor = RealtimeAudioProcessor(
        on_speech_segment=handle_speech,
        vad_config=VADConfig(threshold=0.02)
    )
    
    # Create audio stream generator
    async def audio_generator():
        p = pyaudio.PyAudio()
        stream = p.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=16000,
            input=True,
            frames_per_buffer=CHUNK_SIZE
        )
        
        try:
            while True:
                chunk = stream.read(CHUNK_SIZE)
                yield chunk
                await asyncio.sleep(0)
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
    
    await processor.process_stream(audio_generator())

Latency Optimization

from openai import OpenAI
from dataclasses import dataclass
import time
import asyncio
from typing import Optional


@dataclass
class LatencyMetrics:
    """Track latency metrics."""
    time_to_first_token_ms: float
    total_time_ms: float
    tokens_generated: int
    tokens_per_second: float


class LowLatencyClient:
    """Optimized client for low-latency inference."""
    
    def __init__(
        self,
        model: str = "gpt-4o-mini",
        timeout: float = 10.0
    ):
        self.client = OpenAI(timeout=timeout)
        self.model = model
    
    def stream_with_metrics(
        self,
        messages: list,
        max_tokens: int = 256
    ) -> tuple[str, LatencyMetrics]:
        """Stream response and collect latency metrics."""
        start_time = time.time()
        first_token_time = None
        tokens = []
        
        stream = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            max_tokens=max_tokens,
            stream=True,
            stream_options={"include_usage": True}
        )
        
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                if first_token_time is None:
                    first_token_time = time.time()
                
                tokens.append(chunk.choices[0].delta.content)
        
        end_time = time.time()
        
        total_time_ms = (end_time - start_time) * 1000
        ttft_ms = (first_token_time - start_time) * 1000 if first_token_time else 0
        
        metrics = LatencyMetrics(
            time_to_first_token_ms=ttft_ms,
            total_time_ms=total_time_ms,
            tokens_generated=len(tokens),
            tokens_per_second=len(tokens) / (total_time_ms / 1000) if total_time_ms > 0 else 0
        )
        
        return "".join(tokens), metrics
    
    async def parallel_completions(
        self,
        prompts: list[str],
        max_concurrent: int = 5
    ) -> list[tuple[str, LatencyMetrics]]:
        """Run multiple completions in parallel."""
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def run_one(prompt: str):
            async with semaphore:
                # Use sync client in thread pool
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(
                    None,
                    lambda: self.stream_with_metrics(
                        [{"role": "user", "content": prompt}]
                    )
                )
        
        tasks = [run_one(p) for p in prompts]
        return await asyncio.gather(*tasks)


class LatencyOptimizer:
    """Strategies for optimizing latency."""
    
    @staticmethod
    def optimize_prompt(prompt: str) -> str:
        """Optimize prompt for lower latency.
        
        Every token in the prompt adds to Time-to-First-Token (TTFT).
        The relationship is roughly linear: 2x the prompt tokens = 2x the TTFT.
        Trimming unnecessary whitespace and truncating bloated prompts is the
        single easiest latency win.
        """
        # Remove unnecessary whitespace -- saves tokens without losing meaning
        prompt = " ".join(prompt.split())
        
        # Truncate very long prompts -- beyond 4K chars, you're likely padding
        if len(prompt) > 4000:
            prompt = prompt[:4000] + "..."
        
        return prompt
    
    @staticmethod
    def select_model_for_latency(
        complexity: str,
        max_latency_ms: float
    ) -> str:
        """Select appropriate model based on latency requirements."""
        model_latencies = {
            "gpt-4o-mini": {"simple": 100, "moderate": 200, "complex": 400},
            "gpt-4o": {"simple": 300, "moderate": 600, "complex": 1000},
        }
        
        for model, latencies in model_latencies.items():
            if latencies.get(complexity, 1000) <= max_latency_ms:
                return model
        
        return "gpt-4o-mini"  # Fallback to fastest
    
    @staticmethod
    def chunk_for_streaming(
        text: str,
        chunk_size: int = 50
    ) -> list[str]:
        """Chunk text for streaming display."""
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), chunk_size):
            chunks.append(" ".join(words[i:i + chunk_size]))
        
        return chunks


# Usage
client = LowLatencyClient(model="gpt-4o-mini")

# Single completion with metrics
response, metrics = client.stream_with_metrics(
    [{"role": "user", "content": "Hello!"}]
)

print(f"Response: {response}")
print(f"TTFT: {metrics.time_to_first_token_ms:.0f}ms")
print(f"Total: {metrics.total_time_ms:.0f}ms")
print(f"Speed: {metrics.tokens_per_second:.1f} tok/s")

# Parallel completions
prompts = ["What is AI?", "Explain Python", "Define cloud computing"]
results = asyncio.run(client.parallel_completions(prompts))

for (response, metrics), prompt in zip(results, prompts):
    print(f"{prompt[:20]}: {metrics.time_to_first_token_ms:.0f}ms TTFT")

Typing Indicators and Progress

from fastapi import FastAPI, WebSocket
from openai import OpenAI
import json
import asyncio
import time


app = FastAPI()
client = OpenAI()


class ProgressiveResponse:
    """Handle progressive response display."""
    
    def __init__(self, websocket: WebSocket):
        self.websocket = websocket
        self.is_typing = False
        self.typing_task = None
    
    async def start_typing(self):
        """Start typing indicator."""
        self.is_typing = True
        
        async def send_typing():
            while self.is_typing:
                await self.websocket.send_json({
                    "type": "typing",
                    "status": True
                })
                await asyncio.sleep(2)
        
        self.typing_task = asyncio.create_task(send_typing())
    
    async def stop_typing(self):
        """Stop typing indicator."""
        self.is_typing = False
        if self.typing_task:
            self.typing_task.cancel()
            try:
                await self.typing_task
            except asyncio.CancelledError:
                pass
    
    async def send_token(self, token: str, index: int):
        """Send token with metadata."""
        await self.websocket.send_json({
            "type": "token",
            "content": token,
            "index": index,
            "timestamp": time.time()
        })
    
    async def send_progress(
        self,
        stage: str,
        progress: float,
        message: str = ""
    ):
        """Send progress update."""
        await self.websocket.send_json({
            "type": "progress",
            "stage": stage,
            "progress": progress,
            "message": message
        })


@app.websocket("/ws/chat/progressive")
async def progressive_chat(websocket: WebSocket):
    """WebSocket with progressive response features."""
    await websocket.accept()
    
    responder = ProgressiveResponse(websocket)
    
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            # Show typing indicator
            await responder.start_typing()
            
            # Send processing stages
            await responder.send_progress("thinking", 0.2, "Understanding your question...")
            await asyncio.sleep(0.1)
            
            await responder.send_progress("generating", 0.4, "Formulating response...")
            
            # Stop typing and start streaming
            await responder.stop_typing()
            
            stream = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": message["content"]}],
                stream=True
            )
            
            token_index = 0
            for chunk in stream:
                if chunk.choices[0].delta.content:
                    await responder.send_token(
                        chunk.choices[0].delta.content,
                        token_index
                    )
                    token_index += 1
            
            await responder.send_progress("complete", 1.0, "Done!")
            
            await websocket.send_json({
                "type": "complete",
                "total_tokens": token_index
            })
            
    except Exception as e:
        await responder.stop_typing()
        await websocket.send_json({
            "type": "error",
            "message": str(e)
        })


# Client-side handling:
"""
const ws = new WebSocket('ws://localhost:8000/ws/chat/progressive');

ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    switch (data.type) {
        case 'typing':
            showTypingIndicator(data.status);
            break;
        case 'progress':
            updateProgressBar(data.stage, data.progress, data.message);
            break;
        case 'token':
            appendToken(data.content);
            break;
        case 'complete':
            hideProgress();
            console.log(`Completed with ${data.total_tokens} tokens`);
            break;
        case 'error':
            showError(data.message);
            break;
    }
};
"""

Realtime Collaboration

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json
import asyncio
from dataclasses import dataclass, field


@dataclass
class Room:
    """A collaborative chat room."""
    id: str
    participants: Set[WebSocket] = field(default_factory=set)
    conversation: list = field(default_factory=list)
    ai_responding: bool = False


class CollaborativeChat:
    """Multi-user realtime AI chat."""
    
    def __init__(self):
        self.rooms: Dict[str, Room] = {}
        from openai import OpenAI
        self.client = OpenAI()
    
    def get_or_create_room(self, room_id: str) -> Room:
        """Get or create a chat room."""
        if room_id not in self.rooms:
            self.rooms[room_id] = Room(id=room_id)
        return self.rooms[room_id]
    
    async def join_room(self, room_id: str, websocket: WebSocket):
        """Add participant to room."""
        room = self.get_or_create_room(room_id)
        room.participants.add(websocket)
        
        # Send room state to new participant
        await websocket.send_json({
            "type": "room_state",
            "conversation": room.conversation,
            "participants": len(room.participants)
        })
        
        # Notify others
        await self.broadcast(room, {
            "type": "participant_joined",
            "count": len(room.participants)
        }, exclude=websocket)
    
    async def leave_room(self, room_id: str, websocket: WebSocket):
        """Remove participant from room."""
        if room_id in self.rooms:
            room = self.rooms[room_id]
            room.participants.discard(websocket)
            
            await self.broadcast(room, {
                "type": "participant_left",
                "count": len(room.participants)
            })
    
    async def broadcast(
        self,
        room: Room,
        message: dict,
        exclude: WebSocket = None
    ):
        """Broadcast message to all room participants."""
        for ws in room.participants:
            if ws != exclude:
                try:
                    await ws.send_json(message)
                except Exception:
                    pass
    
    async def handle_message(
        self,
        room_id: str,
        user_id: str,
        content: str
    ):
        """Handle incoming user message."""
        room = self.rooms[room_id]
        
        # Add user message
        user_msg = {
            "role": "user",
            "user_id": user_id,
            "content": content
        }
        room.conversation.append(user_msg)
        
        # Broadcast user message
        await self.broadcast(room, {
            "type": "user_message",
            "user_id": user_id,
            "content": content
        })
        
        # Generate AI response if not already responding
        if not room.ai_responding:
            room.ai_responding = True
            
            try:
                await self.broadcast(room, {"type": "ai_typing", "status": True})
                
                # Format conversation for API
                messages = [
                    {"role": m["role"], "content": m["content"]}
                    for m in room.conversation
                ]
                
                stream = self.client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=messages,
                    stream=True
                )
                
                ai_response = ""
                for chunk in stream:
                    if chunk.choices[0].delta.content:
                        token = chunk.choices[0].delta.content
                        ai_response += token
                        
                        await self.broadcast(room, {
                            "type": "ai_token",
                            "content": token
                        })
                
                # Add AI response to conversation
                room.conversation.append({
                    "role": "assistant",
                    "content": ai_response
                })
                
                await self.broadcast(room, {
                    "type": "ai_complete",
                    "content": ai_response
                })
                
            finally:
                room.ai_responding = False
                await self.broadcast(room, {"type": "ai_typing", "status": False})


app = FastAPI()
collab = CollaborativeChat()


@app.websocket("/ws/room/{room_id}")
async def room_websocket(websocket: WebSocket, room_id: str):
    """WebSocket for collaborative room."""
    await websocket.accept()
    await collab.join_room(room_id, websocket)
    
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            
            await collab.handle_message(
                room_id,
                message.get("user_id", "anonymous"),
                message.get("content", "")
            )
    
    except WebSocketDisconnect:
        await collab.leave_room(room_id, websocket)
Realtime Best Practices
  • Use WebSockets for bidirectional communication, SSE for server-to-client only (SSE is simpler and more proxy-friendly)
  • Implement typing indicators — without them, users have no idea if the system is working or frozen
  • Stream tokens as they are generated — users can start reading while the model is still generating
  • Handle disconnections gracefully — mobile users lose connection constantly; reconnect and resume without losing context
  • Monitor Time-to-First-Token (TTFT) as your primary latency metric — total time matters less than perceived responsiveness
  • Watch out for Nginx/CloudFlare buffering — it can silently destroy your streaming experience

Edge Cases and Production Gotchas

Problem: The user closes the tab or loses cellular signal while the LLM is still generating. Your server keeps generating tokens, paying for them, but nobody is listening.Fix: Check the connection state before sending each chunk. In FastAPI WebSocket, wrap send_text in a try/except for WebSocketDisconnect. For SSE, the StreamingResponse generator should catch asyncio.CancelledError (which FastAPI raises when the client disconnects). Always clean up resources (cancel the OpenAI stream, decrement active connection counters) in a finally block.
Problem: Tokens arrive in large bursts instead of one-by-one. The reverse proxy buffers the entire response before forwarding.Fix: For Nginx, add proxy_buffering off; and the X-Accel-Buffering: no response header. For CloudFlare, disable “Rocket Loader” and “Auto Minify.” For AWS ALB, set idle timeout higher than your longest expected stream. This is the most common “streaming isn’t working” issue in production and is never an application-layer bug.
Problem: WebSocket connections drop frequently on 3G/4G. Users see partial responses and frozen UIs.Fix: Implement message sequence numbers so the client can detect gaps. Add a reconnection protocol that resumes from the last received sequence number. For SSE, the Last-Event-ID header handles this automatically — another reason to prefer SSE over WebSocket for mobile clients.
Problem: A user opens three tabs, each initiating a stream. Your server is now running three concurrent LLM calls for one person, tripling cost and potentially hitting rate limits.Fix: Track active streams per user (using the RateLimitedStreamer pattern above). Either reject new streams while one is active, or cancel the previous stream when a new one starts. Include the stream ID in the WebSocket session so the client can resume rather than restart.

Practice Exercise

Build a realtime AI application that:
  1. Uses WebSockets for bidirectional communication
  2. Implements voice activity detection
  3. Streams responses with latency metrics
  4. Shows typing indicators and progress
  5. Supports multiple concurrent users
Focus on:
  • Minimizing time to first token
  • Smooth streaming experience
  • Graceful error handling
  • Scalable connection management