Real-Time Communication Overview
Copy
┌─────────────────────────────────────────────────────────────────┐
│ REAL-TIME COMMUNICATION METHODS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. HTTP Polling │
│ Client → Server (every N seconds) │
│ Simple but wasteful │
│ │
│ 2. Long Polling │
│ Client → Server (waits for response) │
│ Better, still overhead per request │
│ │
│ 3. Server-Sent Events (SSE) │
│ Server → Client (one-way stream) │
│ Simple, HTTP-based, auto-reconnect │
│ │
│ 4. WebSockets │
│ Client ↔ Server (bidirectional) │
│ Full duplex, low latency, complex │
│ │
│ 5. WebRTC │
│ Client ↔ Client (peer-to-peer) │
│ Audio/video, lowest latency │
│ │
└─────────────────────────────────────────────────────────────────┘
Comparison Matrix
| Method | Direction | Latency | Complexity | Use Case |
|---|---|---|---|---|
| Polling | Client→Server | High | Low | Legacy, simple updates |
| Long Polling | Client→Server | Medium | Medium | Chat (fallback) |
| SSE | Server→Client | Low | Low | Notifications, feeds |
| WebSocket | Bidirectional | Very Low | High | Chat, gaming, trading |
| WebRTC | P2P | Lowest | Very High | Video calls, streaming |
WebSockets Deep Dive
WebSocket Lifecycle
Copy
┌─────────────────────────────────────────────────────────────────┐
│ WEBSOCKET LIFECYCLE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. HTTP Upgrade Request │
│ GET /chat HTTP/1.1 │
│ Upgrade: websocket │
│ Connection: Upgrade │
│ Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== │
│ │
│ 2. Server Accepts │
│ HTTP/1.1 101 Switching Protocols │
│ Upgrade: websocket │
│ Connection: Upgrade │
│ Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= │
│ │
│ 3. WebSocket Connection Established │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ Client ◄══════════════════════════════════════► Server │ │
│ │ Full duplex binary/text frames │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ 4. Either side can close │
│ Client sends Close frame → Server acknowledges → Done │
│ │
└─────────────────────────────────────────────────────────────────┘
Scalable WebSocket Architecture
Copy
┌─────────────────────────────────────────────────────────────────┐
│ SCALABLE WEBSOCKET ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Clients │
│ ┌───┐ ┌───┐ ┌───┐ │
│ │ C │ │ C │ │ C │ │
│ └─┬─┘ └─┬─┘ └─┬─┘ │
│ │ │ │ │
│ ┌─┴─────┴─────┴─┐ │
│ │ Load Balancer │ (sticky sessions) │
│ └───────┬───────┘ │
│ ┌────────────────┼────────────────┐ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ WS │ │ WS │ │ WS │ │
│ │ Server 1│ │ Server 2│ │ Server 3│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ┌───────▼───────┐ │
│ │ Message Broker│ Redis Pub/Sub │
│ │ (Pub/Sub) │ or Kafka │
│ └───────────────┘ │
│ │
│ Challenge: User A on Server 1 messages User B on Server 2 │
│ Solution: Pub/Sub to broadcast across servers │
│ │
└─────────────────────────────────────────────────────────────────┘
Production WebSocket Server
- Python (FastAPI)
- JavaScript (Node.js)
Copy
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
import json
import redis.asyncio as redis
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
@dataclass
class Connection:
websocket: WebSocket
user_id: str
connected_at: datetime = field(default_factory=datetime.utcnow)
rooms: Set[str] = field(default_factory=set)
class ConnectionManager:
"""
Manages WebSocket connections with Redis Pub/Sub for scaling.
"""
def __init__(self):
self.connections: Dict[str, Connection] = {} # user_id -> Connection
self.rooms: Dict[str, Set[str]] = {} # room_id -> {user_ids}
self.redis: Optional[redis.Redis] = None
self.pubsub = None
async def init_redis(self, redis_url: str):
"""Initialize Redis for cross-server messaging"""
self.redis = await redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
asyncio.create_task(self._listen_to_redis())
async def _listen_to_redis(self):
"""Listen for messages from other servers"""
await self.pubsub.psubscribe("ws:*")
async for message in self.pubsub.listen():
if message["type"] != "pmessage":
continue
channel = message["channel"].decode()
data = json.loads(message["data"])
if channel.startswith("ws:room:"):
room_id = channel.split(":")[2]
await self._broadcast_to_local_room(room_id, data)
elif channel.startswith("ws:user:"):
user_id = channel.split(":")[2]
await self._send_to_local_user(user_id, data)
async def connect(self, websocket: WebSocket, user_id: str) -> Connection:
"""Accept WebSocket connection"""
await websocket.accept()
conn = Connection(websocket=websocket, user_id=user_id)
self.connections[user_id] = conn
# Track connection in Redis for presence
if self.redis:
await self.redis.sadd("online_users", user_id)
await self.redis.hset(
f"user:{user_id}:presence",
mapping={
"server_id": "server-1", # Your server ID
"connected_at": conn.connected_at.isoformat()
}
)
logger.info(f"User {user_id} connected. Total: {len(self.connections)}")
return conn
async def disconnect(self, user_id: str):
"""Handle WebSocket disconnect"""
if user_id not in self.connections:
return
conn = self.connections.pop(user_id)
# Leave all rooms
for room_id in conn.rooms.copy():
await self.leave_room(user_id, room_id)
# Update Redis presence
if self.redis:
await self.redis.srem("online_users", user_id)
await self.redis.delete(f"user:{user_id}:presence")
logger.info(f"User {user_id} disconnected. Total: {len(self.connections)}")
async def join_room(self, user_id: str, room_id: str):
"""Add user to a room"""
if user_id not in self.connections:
return
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(user_id)
self.connections[user_id].rooms.add(room_id)
# Notify room
await self.broadcast_to_room(room_id, {
"type": "user_joined",
"user_id": user_id,
"room_id": room_id
})
async def leave_room(self, user_id: str, room_id: str):
"""Remove user from a room"""
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
if not self.rooms[room_id]:
del self.rooms[room_id]
if user_id in self.connections:
self.connections[user_id].rooms.discard(room_id)
await self.broadcast_to_room(room_id, {
"type": "user_left",
"user_id": user_id,
"room_id": room_id
})
async def send_to_user(self, user_id: str, message: dict):
"""Send message to a specific user (cross-server)"""
# Try local first
if user_id in self.connections:
await self._send_to_local_user(user_id, message)
# Also publish to Redis for other servers
if self.redis:
await self.redis.publish(
f"ws:user:{user_id}",
json.dumps(message)
)
async def _send_to_local_user(self, user_id: str, message: dict):
"""Send to user on this server only"""
if user_id in self.connections:
try:
await self.connections[user_id].websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send to {user_id}: {e}")
await self.disconnect(user_id)
async def broadcast_to_room(self, room_id: str, message: dict):
"""Broadcast to all users in a room (cross-server)"""
# Local broadcast
await self._broadcast_to_local_room(room_id, message)
# Publish to Redis for other servers
if self.redis:
await self.redis.publish(
f"ws:room:{room_id}",
json.dumps(message)
)
async def _broadcast_to_local_room(self, room_id: str, message: dict):
"""Broadcast to room users on this server only"""
if room_id not in self.rooms:
return
tasks = []
for user_id in self.rooms[room_id]:
if user_id in self.connections:
tasks.append(
self.connections[user_id].websocket.send_json(message)
)
await asyncio.gather(*tasks, return_exceptions=True)
async def get_online_users(self) -> Set[str]:
"""Get all online users across all servers"""
if self.redis:
return await self.redis.smembers("online_users")
return set(self.connections.keys())
# Global connection manager
manager = ConnectionManager()
@app.on_event("startup")
async def startup():
await manager.init_redis("redis://localhost:6379")
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
"""WebSocket endpoint with message handling"""
conn = await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_json()
await handle_message(user_id, data)
except WebSocketDisconnect:
await manager.disconnect(user_id)
except Exception as e:
logger.error(f"WebSocket error for {user_id}: {e}")
await manager.disconnect(user_id)
async def handle_message(user_id: str, data: dict):
"""Handle incoming WebSocket message"""
message_type = data.get("type")
if message_type == "join_room":
await manager.join_room(user_id, data["room_id"])
elif message_type == "leave_room":
await manager.leave_room(user_id, data["room_id"])
elif message_type == "room_message":
await manager.broadcast_to_room(data["room_id"], {
"type": "message",
"from": user_id,
"content": data["content"],
"timestamp": datetime.utcnow().isoformat()
})
elif message_type == "direct_message":
await manager.send_to_user(data["to_user"], {
"type": "direct_message",
"from": user_id,
"content": data["content"],
"timestamp": datetime.utcnow().isoformat()
})
elif message_type == "ping":
await manager.send_to_user(user_id, {"type": "pong"})
Copy
const WebSocket = require('ws');
const Redis = require('ioredis');
const http = require('http');
class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.redisUrl = options.redisUrl || 'redis://localhost:6379';
// Connection tracking
this.connections = new Map(); // userId -> ws
this.rooms = new Map(); // roomId -> Set<userId>
// Redis for scaling
this.redisPub = new Redis(this.redisUrl);
this.redisSub = new Redis(this.redisUrl);
this.redisClient = new Redis(this.redisUrl);
}
async start() {
// HTTP server for WebSocket upgrade
this.server = http.createServer();
this.wss = new WebSocket.Server({ server: this.server });
// Handle WebSocket connections
this.wss.on('connection', (ws, req) => {
this.handleConnection(ws, req);
});
// Subscribe to Redis channels
await this.setupRedisSubscription();
// Start server
this.server.listen(this.port, () => {
console.log(`WebSocket server running on port ${this.port}`);
});
}
async handleConnection(ws, req) {
// Extract user ID from query params
const url = new URL(req.url, `http://localhost`);
const userId = url.searchParams.get('userId');
if (!userId) {
ws.close(4001, 'User ID required');
return;
}
// Store connection
this.connections.set(userId, ws);
await this.updatePresence(userId, 'online');
console.log(`User ${userId} connected. Total: ${this.connections.size}`);
// Handle messages
ws.on('message', async (data) => {
try {
const message = JSON.parse(data);
await this.handleMessage(userId, message);
} catch (error) {
console.error('Invalid message:', error);
}
});
// Handle disconnect
ws.on('close', async () => {
this.connections.delete(userId);
await this.updatePresence(userId, 'offline');
await this.leaveAllRooms(userId);
console.log(`User ${userId} disconnected. Total: ${this.connections.size}`);
});
// Heartbeat
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
}
async handleMessage(userId, message) {
const { type, ...payload } = message;
switch (type) {
case 'join_room':
await this.joinRoom(userId, payload.roomId);
break;
case 'leave_room':
await this.leaveRoom(userId, payload.roomId);
break;
case 'room_message':
await this.broadcastToRoom(payload.roomId, {
type: 'message',
from: userId,
content: payload.content,
timestamp: new Date().toISOString()
});
break;
case 'direct_message':
await this.sendToUser(payload.toUser, {
type: 'direct_message',
from: userId,
content: payload.content,
timestamp: new Date().toISOString()
});
break;
case 'ping':
await this.sendToUser(userId, { type: 'pong' });
break;
}
}
async joinRoom(userId, roomId) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId).add(userId);
// Track in Redis
await this.redisClient.sadd(`room:${roomId}:users`, userId);
await this.broadcastToRoom(roomId, {
type: 'user_joined',
userId,
roomId
});
}
async leaveRoom(userId, roomId) {
if (this.rooms.has(roomId)) {
this.rooms.get(roomId).delete(userId);
if (this.rooms.get(roomId).size === 0) {
this.rooms.delete(roomId);
}
}
await this.redisClient.srem(`room:${roomId}:users`, userId);
await this.broadcastToRoom(roomId, {
type: 'user_left',
userId,
roomId
});
}
async leaveAllRooms(userId) {
for (const [roomId, users] of this.rooms) {
if (users.has(userId)) {
await this.leaveRoom(userId, roomId);
}
}
}
async sendToUser(userId, message) {
// Try local first
const ws = this.connections.get(userId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
return;
}
// Publish to Redis for other servers
await this.redisPub.publish(
`ws:user:${userId}`,
JSON.stringify(message)
);
}
async broadcastToRoom(roomId, message) {
// Local broadcast
const localUsers = this.rooms.get(roomId) || new Set();
for (const userId of localUsers) {
const ws = this.connections.get(userId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
// Publish to Redis for other servers
await this.redisPub.publish(
`ws:room:${roomId}`,
JSON.stringify(message)
);
}
async setupRedisSubscription() {
await this.redisSub.psubscribe('ws:*');
this.redisSub.on('pmessage', async (pattern, channel, data) => {
const message = JSON.parse(data);
if (channel.startsWith('ws:room:')) {
const roomId = channel.split(':')[2];
// Only send to local users (to avoid loops)
this.localBroadcastToRoom(roomId, message);
} else if (channel.startsWith('ws:user:')) {
const userId = channel.split(':')[2];
this.localSendToUser(userId, message);
}
});
}
localSendToUser(userId, message) {
const ws = this.connections.get(userId);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
localBroadcastToRoom(roomId, message) {
const users = this.rooms.get(roomId) || new Set();
for (const userId of users) {
this.localSendToUser(userId, message);
}
}
async updatePresence(userId, status) {
if (status === 'online') {
await this.redisClient.sadd('online_users', userId);
await this.redisClient.hset(`user:${userId}:presence`, {
status: 'online',
server: process.env.SERVER_ID || 'server-1',
timestamp: Date.now()
});
} else {
await this.redisClient.srem('online_users', userId);
await this.redisClient.del(`user:${userId}:presence`);
}
}
// Heartbeat to detect dead connections
startHeartbeat() {
setInterval(() => {
this.wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
}
}
// Usage
const server = new WebSocketServer({ port: 8080 });
server.start();
server.startHeartbeat();
Server-Sent Events (SSE)
Simpler than WebSockets for server-to-client streaming.Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator
import asyncio
import json
app = FastAPI()
class EventBroker:
"""Simple in-memory event broker for SSE"""
def __init__(self):
self.subscribers: Dict[str, asyncio.Queue] = {}
async def subscribe(self, user_id: str) -> asyncio.Queue:
queue = asyncio.Queue()
self.subscribers[user_id] = queue
return queue
def unsubscribe(self, user_id: str):
self.subscribers.pop(user_id, None)
async def publish(self, user_id: str, event: dict):
if user_id in self.subscribers:
await self.subscribers[user_id].put(event)
async def broadcast(self, event: dict):
for queue in self.subscribers.values():
await queue.put(event)
broker = EventBroker()
async def event_stream(user_id: str) -> AsyncGenerator[str, None]:
"""Generate SSE stream for a user"""
queue = await broker.subscribe(user_id)
try:
# Send initial connection event
yield f"event: connected\ndata: {json.dumps({'user_id': user_id})}\n\n"
while True:
try:
# Wait for event with timeout (for keepalive)
event = await asyncio.wait_for(queue.get(), timeout=30)
event_type = event.get("type", "message")
data = json.dumps(event)
yield f"event: {event_type}\ndata: {data}\n\n"
except asyncio.TimeoutError:
# Send keepalive comment
yield ": keepalive\n\n"
finally:
broker.unsubscribe(user_id)
@app.get("/events/{user_id}")
async def sse_endpoint(user_id: str):
"""SSE endpoint for real-time notifications"""
return StreamingResponse(
event_stream(user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
@app.post("/notify/{user_id}")
async def send_notification(user_id: str, message: dict):
"""Send notification to a user"""
await broker.publish(user_id, {
"type": "notification",
**message
})
return {"status": "sent"}
Long Polling
Fallback for when WebSockets/SSE aren’t available.Copy
from fastapi import FastAPI
from typing import Optional
import asyncio
app = FastAPI()
class LongPollManager:
def __init__(self):
self.pending: Dict[str, asyncio.Future] = {}
self.messages: Dict[str, list] = {}
async def wait_for_messages(
self,
user_id: str,
timeout: float = 30
) -> list:
"""Wait for new messages with timeout"""
# Check for pending messages first
if user_id in self.messages and self.messages[user_id]:
messages = self.messages.pop(user_id)
return messages
# Create a future to wait on
future = asyncio.get_event_loop().create_future()
self.pending[user_id] = future
try:
messages = await asyncio.wait_for(future, timeout=timeout)
return messages
except asyncio.TimeoutError:
return []
finally:
self.pending.pop(user_id, None)
async def send_message(self, user_id: str, message: dict):
"""Send message to user, either immediately or queue"""
if user_id in self.pending:
# User is waiting, resolve immediately
future = self.pending.pop(user_id)
if not future.done():
future.set_result([message])
else:
# Queue for next poll
if user_id not in self.messages:
self.messages[user_id] = []
self.messages[user_id].append(message)
poll_manager = LongPollManager()
@app.get("/poll/{user_id}")
async def long_poll(user_id: str, timeout: float = 30):
"""Long polling endpoint"""
messages = await poll_manager.wait_for_messages(user_id, timeout)
return {"messages": messages}
@app.post("/send/{user_id}")
async def send_message(user_id: str, message: dict):
"""Send message to user"""
await poll_manager.send_message(user_id, message)
return {"status": "sent"}
Presence System
Track who’s online in real-time.Copy
from datetime import datetime, timedelta
from typing import Dict, Set, List
import asyncio
import redis.asyncio as redis
class PresenceSystem:
"""
Track online/offline status across servers.
Uses Redis with TTL for automatic cleanup.
"""
HEARTBEAT_INTERVAL = 30 # seconds
PRESENCE_TTL = 60 # seconds (2x heartbeat)
def __init__(self, redis_client: redis.Redis, server_id: str):
self.redis = redis_client
self.server_id = server_id
async def set_online(self, user_id: str):
"""Mark user as online"""
key = f"presence:{user_id}"
await self.redis.hset(key, mapping={
"server_id": self.server_id,
"status": "online",
"last_seen": datetime.utcnow().isoformat()
})
await self.redis.expire(key, self.PRESENCE_TTL)
# Add to online set
await self.redis.sadd("online_users", user_id)
async def heartbeat(self, user_id: str):
"""Refresh presence TTL"""
key = f"presence:{user_id}"
# Update last_seen and extend TTL
await self.redis.hset(key, "last_seen", datetime.utcnow().isoformat())
await self.redis.expire(key, self.PRESENCE_TTL)
async def set_offline(self, user_id: str):
"""Mark user as offline"""
await self.redis.delete(f"presence:{user_id}")
await self.redis.srem("online_users", user_id)
async def is_online(self, user_id: str) -> bool:
"""Check if user is online"""
return await self.redis.exists(f"presence:{user_id}")
async def get_status(self, user_id: str) -> Dict:
"""Get user's presence status"""
data = await self.redis.hgetall(f"presence:{user_id}")
if not data:
return {"status": "offline"}
return {
"status": data.get("status", "offline"),
"server_id": data.get("server_id"),
"last_seen": data.get("last_seen")
}
async def get_online_users(self) -> Set[str]:
"""Get all online users"""
return await self.redis.smembers("online_users")
async def get_online_count(self) -> int:
"""Get count of online users"""
return await self.redis.scard("online_users")
async def get_friends_online(self, user_id: str, friend_ids: List[str]) -> List[str]:
"""Get which friends are online"""
online_friends = []
# Use pipeline for efficiency
async with self.redis.pipeline() as pipe:
for friend_id in friend_ids:
pipe.exists(f"presence:{friend_id}")
results = await pipe.execute()
for friend_id, is_online in zip(friend_ids, results):
if is_online:
online_friends.append(friend_id)
return online_friends
async def cleanup_stale(self):
"""Remove stale presence entries (run periodically)"""
online_users = await self.redis.smembers("online_users")
for user_id in online_users:
if not await self.redis.exists(f"presence:{user_id}"):
await self.redis.srem("online_users", user_id)
Typing Indicators
Show when someone is typing.Copy
class TypingIndicator:
"""
Track and broadcast typing indicators.
Uses short TTLs to auto-expire.
"""
TYPING_TTL = 3 # seconds
def __init__(self, redis_client: redis.Redis, broadcast_fn):
self.redis = redis_client
self.broadcast = broadcast_fn
async def set_typing(self, user_id: str, room_id: str):
"""Mark user as typing in a room"""
key = f"typing:{room_id}:{user_id}"
# Only broadcast if not already typing
is_new = not await self.redis.exists(key)
await self.redis.setex(key, self.TYPING_TTL, "1")
if is_new:
await self.broadcast(room_id, {
"type": "typing_start",
"user_id": user_id,
"room_id": room_id
})
async def stop_typing(self, user_id: str, room_id: str):
"""Mark user as stopped typing"""
key = f"typing:{room_id}:{user_id}"
if await self.redis.delete(key):
await self.broadcast(room_id, {
"type": "typing_stop",
"user_id": user_id,
"room_id": room_id
})
async def get_typing_users(self, room_id: str) -> List[str]:
"""Get all users currently typing in a room"""
pattern = f"typing:{room_id}:*"
keys = await self.redis.keys(pattern)
return [key.split(":")[-1] for key in keys]
Interview Tips
What interviewers expect:
- Know when to use what: SSE for notifications, WebSockets for chat
- Scaling awareness: “Sticky sessions + Redis pub/sub for horizontal scaling”
- Failure handling: “Client reconnects with exponential backoff”
- Presence complexity: “TTL-based presence with heartbeats”
Common Questions
Q: How do you scale WebSocket servers?Use sticky sessions at load balancer level (same client → same server). Use Redis Pub/Sub to broadcast messages across servers.Q: How do you handle disconnects?
Implement heartbeat (ping/pong every 30s). Client-side reconnection with exponential backoff. Server detects dead connections via ping timeout.Q: When would you use SSE over WebSockets?
SSE for server-to-client only (notifications, live updates, streaming). Simpler, HTTP-based, automatic reconnection. WebSockets when you need bidirectional or lower latency.