Skip to main content

Chapter 5: Pub/Sub

Redis Pub/Sub enables real-time messaging between clients. Publishers send messages to channels, and all subscribers to those channels receive them instantly. Let’s build it!
Prerequisites: Chapter 2: TCP Server
Further Reading: System Design: Message Queues
Time: 2-3 hours
Outcome: Working publish/subscribe system

Pub/Sub Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                            PUB/SUB MODEL                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│                           ┌──────────────┐                                  │
│                           │   Channel    │                                  │
│                           │   "news"     │                                  │
│                           └──────────────┘                                  │
│                                  │                                          │
│         ┌────────────────────────┼────────────────────────┐                │
│         │                        │                        │                │
│         ▼                        ▼                        ▼                │
│   ┌──────────┐            ┌──────────┐            ┌──────────┐            │
│   │ Client 1 │            │ Client 2 │            │ Client 3 │            │
│   │ (sub)    │            │ (sub)    │            │ (sub)    │            │
│   └──────────┘            └──────────┘            └──────────┘            │
│                                                                              │
│         ▲                                                                   │
│         │                                                                   │
│   ┌──────────┐                                                              │
│   │Publisher │  PUBLISH news "Hello!"                                       │
│   └──────────┘                                                              │
│                                                                              │
│   FLOW:                                                                     │
│   1. Publisher sends: PUBLISH news "Hello!"                                 │
│   2. Server finds all subscribers to "news" channel                         │
│   3. Server pushes message to each subscriber                               │
│   4. All 3 clients receive: ["message", "news", "Hello!"]                  │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Part 1: Pub/Sub Hub

The hub manages channel subscriptions and message routing.
internal/pubsub/hub.go
package pubsub

import (
    "sync"
)

// Subscriber represents a client subscribed to channels
type Subscriber struct {
    ID       string
    Channels map[string]bool
    Patterns map[string]bool
    Messages chan Message
}

// Message represents a pub/sub message
type Message struct {
    Type    string // "message", "pmessage", "subscribe", "unsubscribe"
    Channel string
    Pattern string // For pattern matches
    Payload string
    Count   int    // Subscription count (for sub/unsub)
}

// Hub manages all pub/sub subscriptions
type Hub struct {
    // Channel -> Set of subscriber IDs
    channels map[string]map[string]*Subscriber
    
    // Pattern -> Set of subscriber IDs
    patterns map[string]map[string]*Subscriber
    
    // Subscriber ID -> Subscriber
    subscribers map[string]*Subscriber
    
    mu sync.RWMutex
}

// NewHub creates a new pub/sub hub
func NewHub() *Hub {
    return &Hub{
        channels:    make(map[string]map[string]*Subscriber),
        patterns:    make(map[string]map[string]*Subscriber),
        subscribers: make(map[string]*Subscriber),
    }
}

// Subscribe adds a subscriber to channels
func (h *Hub) Subscribe(subscriberID string, channels ...string) *Subscriber {
    h.mu.Lock()
    defer h.mu.Unlock()
    
    sub := h.getOrCreateSubscriber(subscriberID)
    
    for _, channel := range channels {
        if sub.Channels[channel] {
            continue // Already subscribed
        }
        
        sub.Channels[channel] = true
        
        if h.channels[channel] == nil {
            h.channels[channel] = make(map[string]*Subscriber)
        }
        h.channels[channel][subscriberID] = sub
        
        // Send subscription confirmation
        sub.Messages <- Message{
            Type:    "subscribe",
            Channel: channel,
            Count:   len(sub.Channels) + len(sub.Patterns),
        }
    }
    
    return sub
}

// Unsubscribe removes a subscriber from channels
func (h *Hub) Unsubscribe(subscriberID string, channels ...string) {
    h.mu.Lock()
    defer h.mu.Unlock()
    
    sub, ok := h.subscribers[subscriberID]
    if !ok {
        return
    }
    
    // If no channels specified, unsubscribe from all
    if len(channels) == 0 {
        channels = make([]string, 0, len(sub.Channels))
        for ch := range sub.Channels {
            channels = append(channels, ch)
        }
    }
    
    for _, channel := range channels {
        if !sub.Channels[channel] {
            continue
        }
        
        delete(sub.Channels, channel)
        delete(h.channels[channel], subscriberID)
        
        // Clean up empty channel
        if len(h.channels[channel]) == 0 {
            delete(h.channels, channel)
        }
        
        sub.Messages <- Message{
            Type:    "unsubscribe",
            Channel: channel,
            Count:   len(sub.Channels) + len(sub.Patterns),
        }
    }
}

// PSubscribe subscribes to channel patterns
func (h *Hub) PSubscribe(subscriberID string, patterns ...string) *Subscriber {
    h.mu.Lock()
    defer h.mu.Unlock()
    
    sub := h.getOrCreateSubscriber(subscriberID)
    
    for _, pattern := range patterns {
        if sub.Patterns[pattern] {
            continue
        }
        
        sub.Patterns[pattern] = true
        
        if h.patterns[pattern] == nil {
            h.patterns[pattern] = make(map[string]*Subscriber)
        }
        h.patterns[pattern][subscriberID] = sub
        
        sub.Messages <- Message{
            Type:    "psubscribe",
            Pattern: pattern,
            Count:   len(sub.Channels) + len(sub.Patterns),
        }
    }
    
    return sub
}

// PUnsubscribe removes pattern subscriptions
func (h *Hub) PUnsubscribe(subscriberID string, patterns ...string) {
    h.mu.Lock()
    defer h.mu.Unlock()
    
    sub, ok := h.subscribers[subscriberID]
    if !ok {
        return
    }
    
    if len(patterns) == 0 {
        patterns = make([]string, 0, len(sub.Patterns))
        for p := range sub.Patterns {
            patterns = append(patterns, p)
        }
    }
    
    for _, pattern := range patterns {
        if !sub.Patterns[pattern] {
            continue
        }
        
        delete(sub.Patterns, pattern)
        delete(h.patterns[pattern], subscriberID)
        
        if len(h.patterns[pattern]) == 0 {
            delete(h.patterns, pattern)
        }
        
        sub.Messages <- Message{
            Type:    "punsubscribe",
            Pattern: pattern,
            Count:   len(sub.Channels) + len(sub.Patterns),
        }
    }
}

// Publish sends a message to all subscribers of a channel
func (h *Hub) Publish(channel, message string) int {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    count := 0
    
    // Direct channel subscribers
    if subs, ok := h.channels[channel]; ok {
        for _, sub := range subs {
            select {
            case sub.Messages <- Message{
                Type:    "message",
                Channel: channel,
                Payload: message,
            }:
                count++
            default:
                // Channel full, skip (or handle differently)
            }
        }
    }
    
    // Pattern subscribers
    for pattern, subs := range h.patterns {
        if matchPattern(pattern, channel) {
            for _, sub := range subs {
                select {
                case sub.Messages <- Message{
                    Type:    "pmessage",
                    Pattern: pattern,
                    Channel: channel,
                    Payload: message,
                }:
                    count++
                default:
                }
            }
        }
    }
    
    return count
}

// RemoveSubscriber removes a subscriber completely
func (h *Hub) RemoveSubscriber(subscriberID string) {
    h.Unsubscribe(subscriberID)
    h.PUnsubscribe(subscriberID)
    
    h.mu.Lock()
    defer h.mu.Unlock()
    
    if sub, ok := h.subscribers[subscriberID]; ok {
        close(sub.Messages)
        delete(h.subscribers, subscriberID)
    }
}

func (h *Hub) getOrCreateSubscriber(id string) *Subscriber {
    if sub, ok := h.subscribers[id]; ok {
        return sub
    }
    
    sub := &Subscriber{
        ID:       id,
        Channels: make(map[string]bool),
        Patterns: make(map[string]bool),
        Messages: make(chan Message, 100), // Buffer for messages
    }
    h.subscribers[id] = sub
    return sub
}

// NumSub returns subscriber counts for channels
func (h *Hub) NumSub(channels ...string) map[string]int {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    result := make(map[string]int)
    for _, ch := range channels {
        result[ch] = len(h.channels[ch])
    }
    return result
}

// NumPat returns the number of active patterns
func (h *Hub) NumPat() int {
    h.mu.RLock()
    defer h.mu.RUnlock()
    return len(h.patterns)
}

// Channels returns all active channels matching a pattern
func (h *Hub) Channels(pattern string) []string {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    var result []string
    for ch := range h.channels {
        if pattern == "" || matchPattern(pattern, ch) {
            result = append(result, ch)
        }
    }
    return result
}

Part 2: Pattern Matching

Redis uses glob-style pattern matching for PSUBSCRIBE:
internal/pubsub/pattern.go
package pubsub

// matchPattern matches a Redis-style glob pattern against a string
// Supports: * (any chars), ? (single char), [abc] (char class)
func matchPattern(pattern, str string) bool {
    return matchPatternHelper(pattern, str, 0, 0)
}

func matchPatternHelper(pattern, str string, pi, si int) bool {
    for pi < len(pattern) {
        if si >= len(str) {
            // Check if remaining pattern is all *
            for pi < len(pattern) {
                if pattern[pi] != '*' {
                    return false
                }
                pi++
            }
            return true
        }
        
        switch pattern[pi] {
        case '*':
            // Match zero or more characters
            // Try matching * with nothing, then with each possible length
            for i := si; i <= len(str); i++ {
                if matchPatternHelper(pattern, str, pi+1, i) {
                    return true
                }
            }
            return false
            
        case '?':
            // Match exactly one character
            pi++
            si++
            
        case '[':
            // Character class
            pi++
            not := false
            if pi < len(pattern) && pattern[pi] == '^' {
                not = true
                pi++
            }
            
            matched := false
            for pi < len(pattern) && pattern[pi] != ']' {
                if pi+2 < len(pattern) && pattern[pi+1] == '-' {
                    // Range like [a-z]
                    if str[si] >= pattern[pi] && str[si] <= pattern[pi+2] {
                        matched = true
                    }
                    pi += 3
                } else {
                    if pattern[pi] == str[si] {
                        matched = true
                    }
                    pi++
                }
            }
            
            if pi < len(pattern) && pattern[pi] == ']' {
                pi++
            }
            
            if matched == not {
                return false
            }
            si++
            
        case '\\':
            // Escape next character
            pi++
            if pi >= len(pattern) {
                return false
            }
            if pattern[pi] != str[si] {
                return false
            }
            pi++
            si++
            
        default:
            if pattern[pi] != str[si] {
                return false
            }
            pi++
            si++
        }
    }
    
    return si == len(str)
}

Part 3: Client Integration

Integrate Pub/Sub with the connection handler:
internal/server/connection.go
package server

import (
    "bufio"
    "fmt"
    "net"
    "strings"
    
    "github.com/yourname/redis/internal/pubsub"
    "github.com/yourname/redis/internal/protocol"
)

type Connection struct {
    conn       net.Conn
    reader     *bufio.Reader
    writer     *bufio.Writer
    server     *Server
    id         string
    
    // Pub/Sub state
    subscriber *pubsub.Subscriber
    inPubSub   bool
}

func (c *Connection) Handle() {
    defer c.cleanup()
    
    for {
        // Check for pub/sub messages first
        if c.inPubSub {
            select {
            case msg, ok := <-c.subscriber.Messages:
                if !ok {
                    return
                }
                c.sendPubSubMessage(msg)
                continue
            default:
                // No message, proceed to read command
            }
        }
        
        // Read command with timeout
        c.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
        cmd, err := c.readCommand()
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                continue // Just a timeout, keep going
            }
            return
        }
        c.conn.SetReadDeadline(time.Time{})
        
        c.handleCommand(cmd)
    }
}

func (c *Connection) handleCommand(args []string) {
    if len(args) == 0 {
        return
    }
    
    cmd := strings.ToUpper(args[0])
    
    // In pub/sub mode, only allow specific commands
    if c.inPubSub {
        switch cmd {
        case "SUBSCRIBE", "UNSUBSCRIBE", "PSUBSCRIBE", "PUNSUBSCRIBE", "PING", "QUIT":
            // Allowed
        default:
            c.writeError("ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context")
            return
        }
    }
    
    switch cmd {
    case "SUBSCRIBE":
        c.handleSubscribe(args[1:])
    case "UNSUBSCRIBE":
        c.handleUnsubscribe(args[1:])
    case "PSUBSCRIBE":
        c.handlePSubscribe(args[1:])
    case "PUNSUBSCRIBE":
        c.handlePUnsubscribe(args[1:])
    case "PUBLISH":
        c.handlePublish(args[1:])
    case "PUBSUB":
        c.handlePubSubCommand(args[1:])
    default:
        // Handle other commands...
        c.server.HandleCommand(c, args)
    }
}

func (c *Connection) handleSubscribe(channels []string) {
    if len(channels) == 0 {
        c.writeError("ERR wrong number of arguments for 'subscribe' command")
        return
    }
    
    c.subscriber = c.server.Hub.Subscribe(c.id, channels...)
    c.inPubSub = true
    
    // Start listening for messages in background
    go c.listenForMessages()
}

func (c *Connection) handleUnsubscribe(channels []string) {
    if !c.inPubSub {
        return
    }
    
    c.server.Hub.Unsubscribe(c.id, channels...)
    
    // Exit pub/sub mode if no more subscriptions
    if len(c.subscriber.Channels) == 0 && len(c.subscriber.Patterns) == 0 {
        c.inPubSub = false
    }
}

func (c *Connection) handlePSubscribe(patterns []string) {
    if len(patterns) == 0 {
        c.writeError("ERR wrong number of arguments for 'psubscribe' command")
        return
    }
    
    c.subscriber = c.server.Hub.PSubscribe(c.id, patterns...)
    c.inPubSub = true
    
    go c.listenForMessages()
}

func (c *Connection) handlePUnsubscribe(patterns []string) {
    if !c.inPubSub {
        return
    }
    
    c.server.Hub.PUnsubscribe(c.id, patterns...)
    
    if len(c.subscriber.Channels) == 0 && len(c.subscriber.Patterns) == 0 {
        c.inPubSub = false
    }
}

func (c *Connection) handlePublish(args []string) {
    if len(args) != 2 {
        c.writeError("ERR wrong number of arguments for 'publish' command")
        return
    }
    
    channel := args[0]
    message := args[1]
    
    count := c.server.Hub.Publish(channel, message)
    c.writeInteger(count)
}

func (c *Connection) handlePubSubCommand(args []string) {
    if len(args) == 0 {
        c.writeError("ERR wrong number of arguments for 'pubsub' command")
        return
    }
    
    subCmd := strings.ToUpper(args[0])
    
    switch subCmd {
    case "CHANNELS":
        pattern := ""
        if len(args) > 1 {
            pattern = args[1]
        }
        channels := c.server.Hub.Channels(pattern)
        c.writeArray(channels)
        
    case "NUMSUB":
        counts := c.server.Hub.NumSub(args[1:]...)
        result := make([]interface{}, 0, len(counts)*2)
        for ch, count := range counts {
            result = append(result, ch, count)
        }
        c.writeArrayMixed(result)
        
    case "NUMPAT":
        c.writeInteger(c.server.Hub.NumPat())
        
    default:
        c.writeError(fmt.Sprintf("ERR Unknown PUBSUB subcommand '%s'", subCmd))
    }
}

func (c *Connection) listenForMessages() {
    for msg := range c.subscriber.Messages {
        c.sendPubSubMessage(msg)
    }
}

func (c *Connection) sendPubSubMessage(msg pubsub.Message) {
    switch msg.Type {
    case "message":
        c.writeArrayStrings([]string{"message", msg.Channel, msg.Payload})
    case "pmessage":
        c.writeArrayStrings([]string{"pmessage", msg.Pattern, msg.Channel, msg.Payload})
    case "subscribe", "psubscribe":
        channel := msg.Channel
        if msg.Pattern != "" {
            channel = msg.Pattern
        }
        c.writeArrayMixed([]interface{}{msg.Type, channel, msg.Count})
    case "unsubscribe", "punsubscribe":
        channel := msg.Channel
        if msg.Pattern != "" {
            channel = msg.Pattern
        }
        c.writeArrayMixed([]interface{}{msg.Type, channel, msg.Count})
    }
}

func (c *Connection) cleanup() {
    if c.subscriber != nil {
        c.server.Hub.RemoveSubscriber(c.id)
    }
    c.conn.Close()
}

Part 4: Message Serialization

internal/server/writer.go
package server

import (
    "fmt"
)

func (c *Connection) writeArrayStrings(items []string) {
    c.writer.WriteString(fmt.Sprintf("*%d\r\n", len(items)))
    for _, item := range items {
        c.writer.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(item), item))
    }
    c.writer.Flush()
}

func (c *Connection) writeArrayMixed(items []interface{}) {
    c.writer.WriteString(fmt.Sprintf("*%d\r\n", len(items)))
    for _, item := range items {
        switch v := item.(type) {
        case string:
            c.writer.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(v), v))
        case int:
            c.writer.WriteString(fmt.Sprintf(":%d\r\n", v))
        case nil:
            c.writer.WriteString("$-1\r\n")
        }
    }
    c.writer.Flush()
}

func (c *Connection) writeInteger(n int) {
    c.writer.WriteString(fmt.Sprintf(":%d\r\n", n))
    c.writer.Flush()
}

func (c *Connection) writeError(msg string) {
    c.writer.WriteString(fmt.Sprintf("-ERR %s\r\n", msg))
    c.writer.Flush()
}

func (c *Connection) writeArray(items []string) {
    c.writeArrayStrings(items)
}

Usage Example

┌─────────────────────────────────────────────────────────────────────────────┐
│                          PUB/SUB IN ACTION                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   SUBSCRIBER 1                      SUBSCRIBER 2                            │
│   ────────────                      ────────────                            │
│   > SUBSCRIBE news                  > PSUBSCRIBE news:*                     │
│   1) "subscribe"                    1) "psubscribe"                         │
│   2) "news"                         2) "news:*"                             │
│   3) (integer) 1                    3) (integer) 1                          │
│                                                                              │
│   PUBLISHER                                                                 │
│   ─────────                                                                 │
│   > PUBLISH news "Hello!"                                                   │
│   (integer) 1     ← Only sub1 (exact match)                                │
│                                                                              │
│   > PUBLISH news:sports "Goal!"                                             │
│   (integer) 1     ← Only sub2 (pattern match)                              │
│                                                                              │
│   SUBSCRIBER 1 receives:            SUBSCRIBER 2 receives:                  │
│   1) "message"                      1) "pmessage"                           │
│   2) "news"                         2) "news:*"                             │
│   3) "Hello!"                       3) "news:sports"                        │
│                                     4) "Goal!"                              │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Real-World Use Cases

Real-time Chat

Each chat room is a channel. Users subscribe to rooms they’re in.

Live Notifications

Push updates to connected clients (price changes, status updates).

Cache Invalidation

Publish when cache entries change; subscribers update local caches.

Event Broadcasting

Decouple services - publisher doesn’t know who’s listening.

Exercises

In Redis Cluster, implement shard-aware channels:
// PUBSUB SHARDCHANNELS [pattern]
// Return channels that hash to the current shard
Store recent messages so new subscribers can catch up:
// Option 1: Store last N messages per channel
// Option 2: Use Redis Streams (more advanced)
Track which clients are subscribed to what:
// CLIENT GETNAME
// CLIENT SETNAME connection-name
// CLIENT LIST

Key Takeaways

Fire and Forget

Messages aren’t stored - if no one is listening, they’re lost

Pattern Matching

PSUBSCRIBE enables flexible channel matching with globs

Push Model

Server pushes to clients - no polling needed

Decoupling

Publishers and subscribers are completely independent

Limitations to Consider

┌─────────────────────────────────────────────────────────────────────────────┐
│                        PUB/SUB LIMITATIONS                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   1. NO PERSISTENCE                                                         │
│      - Messages aren't saved                                                │
│      - If subscriber disconnects, messages are lost                         │
│      → Consider Redis Streams for durability                                │
│                                                                              │
│   2. NO ACKNOWLEDGMENTS                                                     │
│      - No way to confirm delivery                                           │
│      - At-most-once delivery semantics                                      │
│      → Use message queues (RabbitMQ, Kafka) for guaranteed delivery        │
│                                                                              │
│   3. BLOCKING IN SUB MODE                                                   │
│      - Can't run other commands while subscribed                            │
│      - Need separate connection for normal operations                       │
│                                                                              │
│   4. MEMORY PRESSURE                                                        │
│      - Slow subscribers can cause message buffer to grow                    │
│      - No backpressure mechanism                                            │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Congratulations! 🎉

You’ve built a working Redis clone with:
  • ✅ RESP protocol parser
  • ✅ TCP server with concurrent connections
  • ✅ Multiple data structures (Strings, Lists, Sets, Hashes, Sorted Sets)
  • ✅ Persistence (RDB snapshots, AOF logging)
  • ✅ Pub/Sub messaging

Redis Project Complete!

You now understand how Redis works internally!

What’s Next?

Continue with other Build Your Own X projects: