Chapter 5: Pub/Sub
Redis Pub/Sub enables real-time messaging between clients. Publishers send messages to channels, and all subscribers to those channels receive them instantly. Let’s build it!Prerequisites: Chapter 2: TCP Server
Further Reading: System Design: Message Queues
Time: 2-3 hours
Outcome: Working publish/subscribe system
Further Reading: System Design: Message Queues
Time: 2-3 hours
Outcome: Working publish/subscribe system
Pub/Sub Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PUB/SUB MODEL │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Channel │ │
│ │ "news" │ │
│ └──────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Client 1 │ │ Client 2 │ │ Client 3 │ │
│ │ (sub) │ │ (sub) │ │ (sub) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ ▲ │
│ │ │
│ ┌──────────┐ │
│ │Publisher │ PUBLISH news "Hello!" │
│ └──────────┘ │
│ │
│ FLOW: │
│ 1. Publisher sends: PUBLISH news "Hello!" │
│ 2. Server finds all subscribers to "news" channel │
│ 3. Server pushes message to each subscriber │
│ 4. All 3 clients receive: ["message", "news", "Hello!"] │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Part 1: Pub/Sub Hub
The hub manages channel subscriptions and message routing.internal/pubsub/hub.go
Copy
package pubsub
import (
"sync"
)
// Subscriber represents a client subscribed to channels
type Subscriber struct {
ID string
Channels map[string]bool
Patterns map[string]bool
Messages chan Message
}
// Message represents a pub/sub message
type Message struct {
Type string // "message", "pmessage", "subscribe", "unsubscribe"
Channel string
Pattern string // For pattern matches
Payload string
Count int // Subscription count (for sub/unsub)
}
// Hub manages all pub/sub subscriptions
type Hub struct {
// Channel -> Set of subscriber IDs
channels map[string]map[string]*Subscriber
// Pattern -> Set of subscriber IDs
patterns map[string]map[string]*Subscriber
// Subscriber ID -> Subscriber
subscribers map[string]*Subscriber
mu sync.RWMutex
}
// NewHub creates a new pub/sub hub
func NewHub() *Hub {
return &Hub{
channels: make(map[string]map[string]*Subscriber),
patterns: make(map[string]map[string]*Subscriber),
subscribers: make(map[string]*Subscriber),
}
}
// Subscribe adds a subscriber to channels
func (h *Hub) Subscribe(subscriberID string, channels ...string) *Subscriber {
h.mu.Lock()
defer h.mu.Unlock()
sub := h.getOrCreateSubscriber(subscriberID)
for _, channel := range channels {
if sub.Channels[channel] {
continue // Already subscribed
}
sub.Channels[channel] = true
if h.channels[channel] == nil {
h.channels[channel] = make(map[string]*Subscriber)
}
h.channels[channel][subscriberID] = sub
// Send subscription confirmation
sub.Messages <- Message{
Type: "subscribe",
Channel: channel,
Count: len(sub.Channels) + len(sub.Patterns),
}
}
return sub
}
// Unsubscribe removes a subscriber from channels
func (h *Hub) Unsubscribe(subscriberID string, channels ...string) {
h.mu.Lock()
defer h.mu.Unlock()
sub, ok := h.subscribers[subscriberID]
if !ok {
return
}
// If no channels specified, unsubscribe from all
if len(channels) == 0 {
channels = make([]string, 0, len(sub.Channels))
for ch := range sub.Channels {
channels = append(channels, ch)
}
}
for _, channel := range channels {
if !sub.Channels[channel] {
continue
}
delete(sub.Channels, channel)
delete(h.channels[channel], subscriberID)
// Clean up empty channel
if len(h.channels[channel]) == 0 {
delete(h.channels, channel)
}
sub.Messages <- Message{
Type: "unsubscribe",
Channel: channel,
Count: len(sub.Channels) + len(sub.Patterns),
}
}
}
// PSubscribe subscribes to channel patterns
func (h *Hub) PSubscribe(subscriberID string, patterns ...string) *Subscriber {
h.mu.Lock()
defer h.mu.Unlock()
sub := h.getOrCreateSubscriber(subscriberID)
for _, pattern := range patterns {
if sub.Patterns[pattern] {
continue
}
sub.Patterns[pattern] = true
if h.patterns[pattern] == nil {
h.patterns[pattern] = make(map[string]*Subscriber)
}
h.patterns[pattern][subscriberID] = sub
sub.Messages <- Message{
Type: "psubscribe",
Pattern: pattern,
Count: len(sub.Channels) + len(sub.Patterns),
}
}
return sub
}
// PUnsubscribe removes pattern subscriptions
func (h *Hub) PUnsubscribe(subscriberID string, patterns ...string) {
h.mu.Lock()
defer h.mu.Unlock()
sub, ok := h.subscribers[subscriberID]
if !ok {
return
}
if len(patterns) == 0 {
patterns = make([]string, 0, len(sub.Patterns))
for p := range sub.Patterns {
patterns = append(patterns, p)
}
}
for _, pattern := range patterns {
if !sub.Patterns[pattern] {
continue
}
delete(sub.Patterns, pattern)
delete(h.patterns[pattern], subscriberID)
if len(h.patterns[pattern]) == 0 {
delete(h.patterns, pattern)
}
sub.Messages <- Message{
Type: "punsubscribe",
Pattern: pattern,
Count: len(sub.Channels) + len(sub.Patterns),
}
}
}
// Publish sends a message to all subscribers of a channel
func (h *Hub) Publish(channel, message string) int {
h.mu.RLock()
defer h.mu.RUnlock()
count := 0
// Direct channel subscribers
if subs, ok := h.channels[channel]; ok {
for _, sub := range subs {
select {
case sub.Messages <- Message{
Type: "message",
Channel: channel,
Payload: message,
}:
count++
default:
// Channel full, skip (or handle differently)
}
}
}
// Pattern subscribers
for pattern, subs := range h.patterns {
if matchPattern(pattern, channel) {
for _, sub := range subs {
select {
case sub.Messages <- Message{
Type: "pmessage",
Pattern: pattern,
Channel: channel,
Payload: message,
}:
count++
default:
}
}
}
}
return count
}
// RemoveSubscriber removes a subscriber completely
func (h *Hub) RemoveSubscriber(subscriberID string) {
h.Unsubscribe(subscriberID)
h.PUnsubscribe(subscriberID)
h.mu.Lock()
defer h.mu.Unlock()
if sub, ok := h.subscribers[subscriberID]; ok {
close(sub.Messages)
delete(h.subscribers, subscriberID)
}
}
func (h *Hub) getOrCreateSubscriber(id string) *Subscriber {
if sub, ok := h.subscribers[id]; ok {
return sub
}
sub := &Subscriber{
ID: id,
Channels: make(map[string]bool),
Patterns: make(map[string]bool),
Messages: make(chan Message, 100), // Buffer for messages
}
h.subscribers[id] = sub
return sub
}
// NumSub returns subscriber counts for channels
func (h *Hub) NumSub(channels ...string) map[string]int {
h.mu.RLock()
defer h.mu.RUnlock()
result := make(map[string]int)
for _, ch := range channels {
result[ch] = len(h.channels[ch])
}
return result
}
// NumPat returns the number of active patterns
func (h *Hub) NumPat() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.patterns)
}
// Channels returns all active channels matching a pattern
func (h *Hub) Channels(pattern string) []string {
h.mu.RLock()
defer h.mu.RUnlock()
var result []string
for ch := range h.channels {
if pattern == "" || matchPattern(pattern, ch) {
result = append(result, ch)
}
}
return result
}
Part 2: Pattern Matching
Redis uses glob-style pattern matching for PSUBSCRIBE:internal/pubsub/pattern.go
Copy
package pubsub
// matchPattern matches a Redis-style glob pattern against a string
// Supports: * (any chars), ? (single char), [abc] (char class)
func matchPattern(pattern, str string) bool {
return matchPatternHelper(pattern, str, 0, 0)
}
func matchPatternHelper(pattern, str string, pi, si int) bool {
for pi < len(pattern) {
if si >= len(str) {
// Check if remaining pattern is all *
for pi < len(pattern) {
if pattern[pi] != '*' {
return false
}
pi++
}
return true
}
switch pattern[pi] {
case '*':
// Match zero or more characters
// Try matching * with nothing, then with each possible length
for i := si; i <= len(str); i++ {
if matchPatternHelper(pattern, str, pi+1, i) {
return true
}
}
return false
case '?':
// Match exactly one character
pi++
si++
case '[':
// Character class
pi++
not := false
if pi < len(pattern) && pattern[pi] == '^' {
not = true
pi++
}
matched := false
for pi < len(pattern) && pattern[pi] != ']' {
if pi+2 < len(pattern) && pattern[pi+1] == '-' {
// Range like [a-z]
if str[si] >= pattern[pi] && str[si] <= pattern[pi+2] {
matched = true
}
pi += 3
} else {
if pattern[pi] == str[si] {
matched = true
}
pi++
}
}
if pi < len(pattern) && pattern[pi] == ']' {
pi++
}
if matched == not {
return false
}
si++
case '\\':
// Escape next character
pi++
if pi >= len(pattern) {
return false
}
if pattern[pi] != str[si] {
return false
}
pi++
si++
default:
if pattern[pi] != str[si] {
return false
}
pi++
si++
}
}
return si == len(str)
}
Part 3: Client Integration
Integrate Pub/Sub with the connection handler:internal/server/connection.go
Copy
package server
import (
"bufio"
"fmt"
"net"
"strings"
"github.com/yourname/redis/internal/pubsub"
"github.com/yourname/redis/internal/protocol"
)
type Connection struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
server *Server
id string
// Pub/Sub state
subscriber *pubsub.Subscriber
inPubSub bool
}
func (c *Connection) Handle() {
defer c.cleanup()
for {
// Check for pub/sub messages first
if c.inPubSub {
select {
case msg, ok := <-c.subscriber.Messages:
if !ok {
return
}
c.sendPubSubMessage(msg)
continue
default:
// No message, proceed to read command
}
}
// Read command with timeout
c.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
cmd, err := c.readCommand()
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue // Just a timeout, keep going
}
return
}
c.conn.SetReadDeadline(time.Time{})
c.handleCommand(cmd)
}
}
func (c *Connection) handleCommand(args []string) {
if len(args) == 0 {
return
}
cmd := strings.ToUpper(args[0])
// In pub/sub mode, only allow specific commands
if c.inPubSub {
switch cmd {
case "SUBSCRIBE", "UNSUBSCRIBE", "PSUBSCRIBE", "PUNSUBSCRIBE", "PING", "QUIT":
// Allowed
default:
c.writeError("ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context")
return
}
}
switch cmd {
case "SUBSCRIBE":
c.handleSubscribe(args[1:])
case "UNSUBSCRIBE":
c.handleUnsubscribe(args[1:])
case "PSUBSCRIBE":
c.handlePSubscribe(args[1:])
case "PUNSUBSCRIBE":
c.handlePUnsubscribe(args[1:])
case "PUBLISH":
c.handlePublish(args[1:])
case "PUBSUB":
c.handlePubSubCommand(args[1:])
default:
// Handle other commands...
c.server.HandleCommand(c, args)
}
}
func (c *Connection) handleSubscribe(channels []string) {
if len(channels) == 0 {
c.writeError("ERR wrong number of arguments for 'subscribe' command")
return
}
c.subscriber = c.server.Hub.Subscribe(c.id, channels...)
c.inPubSub = true
// Start listening for messages in background
go c.listenForMessages()
}
func (c *Connection) handleUnsubscribe(channels []string) {
if !c.inPubSub {
return
}
c.server.Hub.Unsubscribe(c.id, channels...)
// Exit pub/sub mode if no more subscriptions
if len(c.subscriber.Channels) == 0 && len(c.subscriber.Patterns) == 0 {
c.inPubSub = false
}
}
func (c *Connection) handlePSubscribe(patterns []string) {
if len(patterns) == 0 {
c.writeError("ERR wrong number of arguments for 'psubscribe' command")
return
}
c.subscriber = c.server.Hub.PSubscribe(c.id, patterns...)
c.inPubSub = true
go c.listenForMessages()
}
func (c *Connection) handlePUnsubscribe(patterns []string) {
if !c.inPubSub {
return
}
c.server.Hub.PUnsubscribe(c.id, patterns...)
if len(c.subscriber.Channels) == 0 && len(c.subscriber.Patterns) == 0 {
c.inPubSub = false
}
}
func (c *Connection) handlePublish(args []string) {
if len(args) != 2 {
c.writeError("ERR wrong number of arguments for 'publish' command")
return
}
channel := args[0]
message := args[1]
count := c.server.Hub.Publish(channel, message)
c.writeInteger(count)
}
func (c *Connection) handlePubSubCommand(args []string) {
if len(args) == 0 {
c.writeError("ERR wrong number of arguments for 'pubsub' command")
return
}
subCmd := strings.ToUpper(args[0])
switch subCmd {
case "CHANNELS":
pattern := ""
if len(args) > 1 {
pattern = args[1]
}
channels := c.server.Hub.Channels(pattern)
c.writeArray(channels)
case "NUMSUB":
counts := c.server.Hub.NumSub(args[1:]...)
result := make([]interface{}, 0, len(counts)*2)
for ch, count := range counts {
result = append(result, ch, count)
}
c.writeArrayMixed(result)
case "NUMPAT":
c.writeInteger(c.server.Hub.NumPat())
default:
c.writeError(fmt.Sprintf("ERR Unknown PUBSUB subcommand '%s'", subCmd))
}
}
func (c *Connection) listenForMessages() {
for msg := range c.subscriber.Messages {
c.sendPubSubMessage(msg)
}
}
func (c *Connection) sendPubSubMessage(msg pubsub.Message) {
switch msg.Type {
case "message":
c.writeArrayStrings([]string{"message", msg.Channel, msg.Payload})
case "pmessage":
c.writeArrayStrings([]string{"pmessage", msg.Pattern, msg.Channel, msg.Payload})
case "subscribe", "psubscribe":
channel := msg.Channel
if msg.Pattern != "" {
channel = msg.Pattern
}
c.writeArrayMixed([]interface{}{msg.Type, channel, msg.Count})
case "unsubscribe", "punsubscribe":
channel := msg.Channel
if msg.Pattern != "" {
channel = msg.Pattern
}
c.writeArrayMixed([]interface{}{msg.Type, channel, msg.Count})
}
}
func (c *Connection) cleanup() {
if c.subscriber != nil {
c.server.Hub.RemoveSubscriber(c.id)
}
c.conn.Close()
}
Part 4: Message Serialization
internal/server/writer.go
Copy
package server
import (
"fmt"
)
func (c *Connection) writeArrayStrings(items []string) {
c.writer.WriteString(fmt.Sprintf("*%d\r\n", len(items)))
for _, item := range items {
c.writer.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(item), item))
}
c.writer.Flush()
}
func (c *Connection) writeArrayMixed(items []interface{}) {
c.writer.WriteString(fmt.Sprintf("*%d\r\n", len(items)))
for _, item := range items {
switch v := item.(type) {
case string:
c.writer.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(v), v))
case int:
c.writer.WriteString(fmt.Sprintf(":%d\r\n", v))
case nil:
c.writer.WriteString("$-1\r\n")
}
}
c.writer.Flush()
}
func (c *Connection) writeInteger(n int) {
c.writer.WriteString(fmt.Sprintf(":%d\r\n", n))
c.writer.Flush()
}
func (c *Connection) writeError(msg string) {
c.writer.WriteString(fmt.Sprintf("-ERR %s\r\n", msg))
c.writer.Flush()
}
func (c *Connection) writeArray(items []string) {
c.writeArrayStrings(items)
}
Usage Example
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PUB/SUB IN ACTION │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ SUBSCRIBER 1 SUBSCRIBER 2 │
│ ──────────── ──────────── │
│ > SUBSCRIBE news > PSUBSCRIBE news:* │
│ 1) "subscribe" 1) "psubscribe" │
│ 2) "news" 2) "news:*" │
│ 3) (integer) 1 3) (integer) 1 │
│ │
│ PUBLISHER │
│ ───────── │
│ > PUBLISH news "Hello!" │
│ (integer) 1 ← Only sub1 (exact match) │
│ │
│ > PUBLISH news:sports "Goal!" │
│ (integer) 1 ← Only sub2 (pattern match) │
│ │
│ SUBSCRIBER 1 receives: SUBSCRIBER 2 receives: │
│ 1) "message" 1) "pmessage" │
│ 2) "news" 2) "news:*" │
│ 3) "Hello!" 3) "news:sports" │
│ 4) "Goal!" │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Real-World Use Cases
Real-time Chat
Each chat room is a channel. Users subscribe to rooms they’re in.
Live Notifications
Push updates to connected clients (price changes, status updates).
Cache Invalidation
Publish when cache entries change; subscribers update local caches.
Event Broadcasting
Decouple services - publisher doesn’t know who’s listening.
Exercises
Exercise 1: Implement PUBSUB SHARDCHANNELS
Exercise 1: Implement PUBSUB SHARDCHANNELS
In Redis Cluster, implement shard-aware channels:
Copy
// PUBSUB SHARDCHANNELS [pattern]
// Return channels that hash to the current shard
Exercise 2: Add Message History
Exercise 2: Add Message History
Store recent messages so new subscribers can catch up:
Copy
// Option 1: Store last N messages per channel
// Option 2: Use Redis Streams (more advanced)
Exercise 3: Implement Client Tracking
Exercise 3: Implement Client Tracking
Track which clients are subscribed to what:
Copy
// CLIENT GETNAME
// CLIENT SETNAME connection-name
// CLIENT LIST
Key Takeaways
Fire and Forget
Messages aren’t stored - if no one is listening, they’re lost
Pattern Matching
PSUBSCRIBE enables flexible channel matching with globs
Push Model
Server pushes to clients - no polling needed
Decoupling
Publishers and subscribers are completely independent
Limitations to Consider
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PUB/SUB LIMITATIONS │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. NO PERSISTENCE │
│ - Messages aren't saved │
│ - If subscriber disconnects, messages are lost │
│ → Consider Redis Streams for durability │
│ │
│ 2. NO ACKNOWLEDGMENTS │
│ - No way to confirm delivery │
│ - At-most-once delivery semantics │
│ → Use message queues (RabbitMQ, Kafka) for guaranteed delivery │
│ │
│ 3. BLOCKING IN SUB MODE │
│ - Can't run other commands while subscribed │
│ - Need separate connection for normal operations │
│ │
│ 4. MEMORY PRESSURE │
│ - Slow subscribers can cause message buffer to grow │
│ - No backpressure mechanism │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Congratulations! 🎉
You’ve built a working Redis clone with:- ✅ RESP protocol parser
- ✅ TCP server with concurrent connections
- ✅ Multiple data structures (Strings, Lists, Sets, Hashes, Sorted Sets)
- ✅ Persistence (RDB snapshots, AOF logging)
- ✅ Pub/Sub messaging
Redis Project Complete!
You now understand how Redis works internally!