Skip to main content

Real-Time Communication Overview

┌─────────────────────────────────────────────────────────────────┐
│                REAL-TIME COMMUNICATION METHODS                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. HTTP Polling                                                │
│     Client → Server (every N seconds)                          │
│     Simple but wasteful                                         │
│                                                                 │
│  2. Long Polling                                                │
│     Client → Server (waits for response)                       │
│     Better, still overhead per request                         │
│                                                                 │
│  3. Server-Sent Events (SSE)                                   │
│     Server → Client (one-way stream)                           │
│     Simple, HTTP-based, auto-reconnect                         │
│                                                                 │
│  4. WebSockets                                                  │
│     Client ↔ Server (bidirectional)                            │
│     Full duplex, low latency, complex                          │
│                                                                 │
│  5. WebRTC                                                      │
│     Client ↔ Client (peer-to-peer)                             │
│     Audio/video, lowest latency                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Comparison Matrix

MethodDirectionLatencyComplexityUse Case
PollingClient→ServerHighLowLegacy, simple updates
Long PollingClient→ServerMediumMediumChat (fallback)
SSEServer→ClientLowLowNotifications, feeds
WebSocketBidirectionalVery LowHighChat, gaming, trading
WebRTCP2PLowestVery HighVideo calls, streaming

WebSockets Deep Dive

WebSocket Lifecycle

┌─────────────────────────────────────────────────────────────────┐
│                    WEBSOCKET LIFECYCLE                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. HTTP Upgrade Request                                        │
│     GET /chat HTTP/1.1                                         │
│     Upgrade: websocket                                          │
│     Connection: Upgrade                                         │
│     Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==                │
│                                                                 │
│  2. Server Accepts                                              │
│     HTTP/1.1 101 Switching Protocols                           │
│     Upgrade: websocket                                          │
│     Connection: Upgrade                                         │
│     Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=         │
│                                                                 │
│  3. WebSocket Connection Established                           │
│     ┌────────────────────────────────────────────────────────┐ │
│     │ Client ◄══════════════════════════════════════► Server │ │
│     │        Full duplex binary/text frames                  │ │
│     └────────────────────────────────────────────────────────┘ │
│                                                                 │
│  4. Either side can close                                      │
│     Client sends Close frame → Server acknowledges → Done     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Scalable WebSocket Architecture

┌─────────────────────────────────────────────────────────────────┐
│              SCALABLE WEBSOCKET ARCHITECTURE                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│                         Clients                                 │
│                    ┌───┐ ┌───┐ ┌───┐                          │
│                    │ C │ │ C │ │ C │                          │
│                    └─┬─┘ └─┬─┘ └─┬─┘                          │
│                      │     │     │                              │
│                    ┌─┴─────┴─────┴─┐                          │
│                    │  Load Balancer │ (sticky sessions)        │
│                    └───────┬───────┘                          │
│           ┌────────────────┼────────────────┐                  │
│           │                │                │                   │
│      ┌────▼────┐      ┌────▼────┐      ┌────▼────┐            │
│      │  WS     │      │  WS     │      │  WS     │            │
│      │ Server 1│      │ Server 2│      │ Server 3│            │
│      └────┬────┘      └────┬────┘      └────┬────┘            │
│           │                │                │                   │
│           └────────────────┼────────────────┘                  │
│                            │                                    │
│                    ┌───────▼───────┐                           │
│                    │ Message Broker│ Redis Pub/Sub             │
│                    │   (Pub/Sub)   │ or Kafka                  │
│                    └───────────────┘                           │
│                                                                 │
│  Challenge: User A on Server 1 messages User B on Server 2    │
│  Solution: Pub/Sub to broadcast across servers                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Production WebSocket Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
import json
import redis.asyncio as redis
import logging

logger = logging.getLogger(__name__)

app = FastAPI()

@dataclass
class Connection:
    websocket: WebSocket
    user_id: str
    connected_at: datetime = field(default_factory=datetime.utcnow)
    rooms: Set[str] = field(default_factory=set)

class ConnectionManager:
    """
    Manages WebSocket connections with Redis Pub/Sub for scaling.
    """
    
    def __init__(self):
        self.connections: Dict[str, Connection] = {}  # user_id -> Connection
        self.rooms: Dict[str, Set[str]] = {}  # room_id -> {user_ids}
        self.redis: Optional[redis.Redis] = None
        self.pubsub = None
    
    async def init_redis(self, redis_url: str):
        """Initialize Redis for cross-server messaging"""
        self.redis = await redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        asyncio.create_task(self._listen_to_redis())
    
    async def _listen_to_redis(self):
        """Listen for messages from other servers"""
        await self.pubsub.psubscribe("ws:*")
        
        async for message in self.pubsub.listen():
            if message["type"] != "pmessage":
                continue
            
            channel = message["channel"].decode()
            data = json.loads(message["data"])
            
            if channel.startswith("ws:room:"):
                room_id = channel.split(":")[2]
                await self._broadcast_to_local_room(room_id, data)
            elif channel.startswith("ws:user:"):
                user_id = channel.split(":")[2]
                await self._send_to_local_user(user_id, data)
    
    async def connect(self, websocket: WebSocket, user_id: str) -> Connection:
        """Accept WebSocket connection"""
        await websocket.accept()
        
        conn = Connection(websocket=websocket, user_id=user_id)
        self.connections[user_id] = conn
        
        # Track connection in Redis for presence
        if self.redis:
            await self.redis.sadd("online_users", user_id)
            await self.redis.hset(
                f"user:{user_id}:presence",
                mapping={
                    "server_id": "server-1",  # Your server ID
                    "connected_at": conn.connected_at.isoformat()
                }
            )
        
        logger.info(f"User {user_id} connected. Total: {len(self.connections)}")
        return conn
    
    async def disconnect(self, user_id: str):
        """Handle WebSocket disconnect"""
        if user_id not in self.connections:
            return
        
        conn = self.connections.pop(user_id)
        
        # Leave all rooms
        for room_id in conn.rooms.copy():
            await self.leave_room(user_id, room_id)
        
        # Update Redis presence
        if self.redis:
            await self.redis.srem("online_users", user_id)
            await self.redis.delete(f"user:{user_id}:presence")
        
        logger.info(f"User {user_id} disconnected. Total: {len(self.connections)}")
    
    async def join_room(self, user_id: str, room_id: str):
        """Add user to a room"""
        if user_id not in self.connections:
            return
        
        if room_id not in self.rooms:
            self.rooms[room_id] = set()
        
        self.rooms[room_id].add(user_id)
        self.connections[user_id].rooms.add(room_id)
        
        # Notify room
        await self.broadcast_to_room(room_id, {
            "type": "user_joined",
            "user_id": user_id,
            "room_id": room_id
        })
    
    async def leave_room(self, user_id: str, room_id: str):
        """Remove user from a room"""
        if room_id in self.rooms:
            self.rooms[room_id].discard(user_id)
            if not self.rooms[room_id]:
                del self.rooms[room_id]
        
        if user_id in self.connections:
            self.connections[user_id].rooms.discard(room_id)
        
        await self.broadcast_to_room(room_id, {
            "type": "user_left",
            "user_id": user_id,
            "room_id": room_id
        })
    
    async def send_to_user(self, user_id: str, message: dict):
        """Send message to a specific user (cross-server)"""
        # Try local first
        if user_id in self.connections:
            await self._send_to_local_user(user_id, message)
        
        # Also publish to Redis for other servers
        if self.redis:
            await self.redis.publish(
                f"ws:user:{user_id}",
                json.dumps(message)
            )
    
    async def _send_to_local_user(self, user_id: str, message: dict):
        """Send to user on this server only"""
        if user_id in self.connections:
            try:
                await self.connections[user_id].websocket.send_json(message)
            except Exception as e:
                logger.error(f"Failed to send to {user_id}: {e}")
                await self.disconnect(user_id)
    
    async def broadcast_to_room(self, room_id: str, message: dict):
        """Broadcast to all users in a room (cross-server)"""
        # Local broadcast
        await self._broadcast_to_local_room(room_id, message)
        
        # Publish to Redis for other servers
        if self.redis:
            await self.redis.publish(
                f"ws:room:{room_id}",
                json.dumps(message)
            )
    
    async def _broadcast_to_local_room(self, room_id: str, message: dict):
        """Broadcast to room users on this server only"""
        if room_id not in self.rooms:
            return
        
        tasks = []
        for user_id in self.rooms[room_id]:
            if user_id in self.connections:
                tasks.append(
                    self.connections[user_id].websocket.send_json(message)
                )
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def get_online_users(self) -> Set[str]:
        """Get all online users across all servers"""
        if self.redis:
            return await self.redis.smembers("online_users")
        return set(self.connections.keys())


# Global connection manager
manager = ConnectionManager()

@app.on_event("startup")
async def startup():
    await manager.init_redis("redis://localhost:6379")

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    """WebSocket endpoint with message handling"""
    conn = await manager.connect(websocket, user_id)
    
    try:
        while True:
            data = await websocket.receive_json()
            await handle_message(user_id, data)
    
    except WebSocketDisconnect:
        await manager.disconnect(user_id)
    except Exception as e:
        logger.error(f"WebSocket error for {user_id}: {e}")
        await manager.disconnect(user_id)

async def handle_message(user_id: str, data: dict):
    """Handle incoming WebSocket message"""
    message_type = data.get("type")
    
    if message_type == "join_room":
        await manager.join_room(user_id, data["room_id"])
    
    elif message_type == "leave_room":
        await manager.leave_room(user_id, data["room_id"])
    
    elif message_type == "room_message":
        await manager.broadcast_to_room(data["room_id"], {
            "type": "message",
            "from": user_id,
            "content": data["content"],
            "timestamp": datetime.utcnow().isoformat()
        })
    
    elif message_type == "direct_message":
        await manager.send_to_user(data["to_user"], {
            "type": "direct_message",
            "from": user_id,
            "content": data["content"],
            "timestamp": datetime.utcnow().isoformat()
        })
    
    elif message_type == "ping":
        await manager.send_to_user(user_id, {"type": "pong"})

Server-Sent Events (SSE)

Simpler than WebSockets for server-to-client streaming.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
import json

app = FastAPI()

class EventBroker:
    """Simple in-memory event broker for SSE"""
    
    def __init__(self):
        self.subscribers: Dict[str, asyncio.Queue] = {}
    
    async def subscribe(self, user_id: str) -> asyncio.Queue:
        queue = asyncio.Queue()
        self.subscribers[user_id] = queue
        return queue
    
    def unsubscribe(self, user_id: str):
        self.subscribers.pop(user_id, None)
    
    async def publish(self, user_id: str, event: dict):
        if user_id in self.subscribers:
            await self.subscribers[user_id].put(event)
    
    async def broadcast(self, event: dict):
        for queue in self.subscribers.values():
            await queue.put(event)

broker = EventBroker()

async def event_stream(user_id: str) -> AsyncGenerator[str, None]:
    """Generate SSE stream for a user"""
    queue = await broker.subscribe(user_id)
    
    try:
        # Send initial connection event
        yield f"event: connected\ndata: {json.dumps({'user_id': user_id})}\n\n"
        
        while True:
            try:
                # Wait for event with timeout (for keepalive)
                event = await asyncio.wait_for(queue.get(), timeout=30)
                
                event_type = event.get("type", "message")
                data = json.dumps(event)
                
                yield f"event: {event_type}\ndata: {data}\n\n"
                
            except asyncio.TimeoutError:
                # Send keepalive comment
                yield ": keepalive\n\n"
    
    finally:
        broker.unsubscribe(user_id)

@app.get("/events/{user_id}")
async def sse_endpoint(user_id: str):
    """SSE endpoint for real-time notifications"""
    return StreamingResponse(
        event_stream(user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )

@app.post("/notify/{user_id}")
async def send_notification(user_id: str, message: dict):
    """Send notification to a user"""
    await broker.publish(user_id, {
        "type": "notification",
        **message
    })
    return {"status": "sent"}

Long Polling

Fallback for when WebSockets/SSE aren’t available.
from fastapi import FastAPI
from typing import Optional
import asyncio

app = FastAPI()

class LongPollManager:
    def __init__(self):
        self.pending: Dict[str, asyncio.Future] = {}
        self.messages: Dict[str, list] = {}
    
    async def wait_for_messages(
        self, 
        user_id: str, 
        timeout: float = 30
    ) -> list:
        """Wait for new messages with timeout"""
        
        # Check for pending messages first
        if user_id in self.messages and self.messages[user_id]:
            messages = self.messages.pop(user_id)
            return messages
        
        # Create a future to wait on
        future = asyncio.get_event_loop().create_future()
        self.pending[user_id] = future
        
        try:
            messages = await asyncio.wait_for(future, timeout=timeout)
            return messages
        except asyncio.TimeoutError:
            return []
        finally:
            self.pending.pop(user_id, None)
    
    async def send_message(self, user_id: str, message: dict):
        """Send message to user, either immediately or queue"""
        if user_id in self.pending:
            # User is waiting, resolve immediately
            future = self.pending.pop(user_id)
            if not future.done():
                future.set_result([message])
        else:
            # Queue for next poll
            if user_id not in self.messages:
                self.messages[user_id] = []
            self.messages[user_id].append(message)

poll_manager = LongPollManager()

@app.get("/poll/{user_id}")
async def long_poll(user_id: str, timeout: float = 30):
    """Long polling endpoint"""
    messages = await poll_manager.wait_for_messages(user_id, timeout)
    return {"messages": messages}

@app.post("/send/{user_id}")
async def send_message(user_id: str, message: dict):
    """Send message to user"""
    await poll_manager.send_message(user_id, message)
    return {"status": "sent"}

Presence System

Track who’s online in real-time.
from datetime import datetime, timedelta
from typing import Dict, Set, List
import asyncio
import redis.asyncio as redis

class PresenceSystem:
    """
    Track online/offline status across servers.
    Uses Redis with TTL for automatic cleanup.
    """
    
    HEARTBEAT_INTERVAL = 30  # seconds
    PRESENCE_TTL = 60  # seconds (2x heartbeat)
    
    def __init__(self, redis_client: redis.Redis, server_id: str):
        self.redis = redis_client
        self.server_id = server_id
    
    async def set_online(self, user_id: str):
        """Mark user as online"""
        key = f"presence:{user_id}"
        await self.redis.hset(key, mapping={
            "server_id": self.server_id,
            "status": "online",
            "last_seen": datetime.utcnow().isoformat()
        })
        await self.redis.expire(key, self.PRESENCE_TTL)
        
        # Add to online set
        await self.redis.sadd("online_users", user_id)
    
    async def heartbeat(self, user_id: str):
        """Refresh presence TTL"""
        key = f"presence:{user_id}"
        
        # Update last_seen and extend TTL
        await self.redis.hset(key, "last_seen", datetime.utcnow().isoformat())
        await self.redis.expire(key, self.PRESENCE_TTL)
    
    async def set_offline(self, user_id: str):
        """Mark user as offline"""
        await self.redis.delete(f"presence:{user_id}")
        await self.redis.srem("online_users", user_id)
    
    async def is_online(self, user_id: str) -> bool:
        """Check if user is online"""
        return await self.redis.exists(f"presence:{user_id}")
    
    async def get_status(self, user_id: str) -> Dict:
        """Get user's presence status"""
        data = await self.redis.hgetall(f"presence:{user_id}")
        
        if not data:
            return {"status": "offline"}
        
        return {
            "status": data.get("status", "offline"),
            "server_id": data.get("server_id"),
            "last_seen": data.get("last_seen")
        }
    
    async def get_online_users(self) -> Set[str]:
        """Get all online users"""
        return await self.redis.smembers("online_users")
    
    async def get_online_count(self) -> int:
        """Get count of online users"""
        return await self.redis.scard("online_users")
    
    async def get_friends_online(self, user_id: str, friend_ids: List[str]) -> List[str]:
        """Get which friends are online"""
        online_friends = []
        
        # Use pipeline for efficiency
        async with self.redis.pipeline() as pipe:
            for friend_id in friend_ids:
                pipe.exists(f"presence:{friend_id}")
            results = await pipe.execute()
        
        for friend_id, is_online in zip(friend_ids, results):
            if is_online:
                online_friends.append(friend_id)
        
        return online_friends
    
    async def cleanup_stale(self):
        """Remove stale presence entries (run periodically)"""
        online_users = await self.redis.smembers("online_users")
        
        for user_id in online_users:
            if not await self.redis.exists(f"presence:{user_id}"):
                await self.redis.srem("online_users", user_id)

Typing Indicators

Show when someone is typing.
class TypingIndicator:
    """
    Track and broadcast typing indicators.
    Uses short TTLs to auto-expire.
    """
    
    TYPING_TTL = 3  # seconds
    
    def __init__(self, redis_client: redis.Redis, broadcast_fn):
        self.redis = redis_client
        self.broadcast = broadcast_fn
    
    async def set_typing(self, user_id: str, room_id: str):
        """Mark user as typing in a room"""
        key = f"typing:{room_id}:{user_id}"
        
        # Only broadcast if not already typing
        is_new = not await self.redis.exists(key)
        
        await self.redis.setex(key, self.TYPING_TTL, "1")
        
        if is_new:
            await self.broadcast(room_id, {
                "type": "typing_start",
                "user_id": user_id,
                "room_id": room_id
            })
    
    async def stop_typing(self, user_id: str, room_id: str):
        """Mark user as stopped typing"""
        key = f"typing:{room_id}:{user_id}"
        
        if await self.redis.delete(key):
            await self.broadcast(room_id, {
                "type": "typing_stop",
                "user_id": user_id,
                "room_id": room_id
            })
    
    async def get_typing_users(self, room_id: str) -> List[str]:
        """Get all users currently typing in a room"""
        pattern = f"typing:{room_id}:*"
        keys = await self.redis.keys(pattern)
        
        return [key.split(":")[-1] for key in keys]

Interview Tips

What interviewers expect:
  1. Know when to use what: SSE for notifications, WebSockets for chat
  2. Scaling awareness: “Sticky sessions + Redis pub/sub for horizontal scaling”
  3. Failure handling: “Client reconnects with exponential backoff”
  4. Presence complexity: “TTL-based presence with heartbeats”

Common Questions

Q: How do you scale WebSocket servers?
Use sticky sessions at load balancer level (same client → same server). Use Redis Pub/Sub to broadcast messages across servers.
Q: How do you handle disconnects?
Implement heartbeat (ping/pong every 30s). Client-side reconnection with exponential backoff. Server detects dead connections via ping timeout.
Q: When would you use SSE over WebSockets?
SSE for server-to-client only (notifications, live updates, streaming). Simpler, HTTP-based, automatic reconnection. WebSockets when you need bidirectional or lower latency.