Skip to main content

Chapter 2: TCP Server

Now that we have a RESP parser, let’s build the network layer. We’ll create a TCP server that accepts client connections and processes commands concurrently.
Prerequisites: Chapter 1: RESP Protocol
Further Reading: Networking
Time: 2-3 hours
Outcome: A TCP server that responds to PING with PONG

Server Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         REDIS SERVER ARCHITECTURE                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌──────────────┐                                                          │
│   │   Client 1   │───┐                                                      │
│   └──────────────┘   │                                                      │
│                      │    ┌─────────────────────────────────────────────┐  │
│   ┌──────────────┐   │    │              MAIN PROCESS                   │  │
│   │   Client 2   │───┼───►│                                             │  │
│   └──────────────┘   │    │   ┌─────────────┐    ┌────────────────┐    │  │
│                      │    │   │   Accept    │    │   Command      │    │  │
│   ┌──────────────┐   │    │   │   Loop      │───►│   Handler      │    │  │
│   │   Client 3   │───┘    │   └─────────────┘    └───────┬────────┘    │  │
│   └──────────────┘        │                              │             │  │
│                           │                              ▼             │  │
│                           │                      ┌────────────────┐    │  │
│                           │                      │   Data Store   │    │  │
│                           │                      │   (in-memory)  │    │  │
│                           │                      └────────────────┘    │  │
│                           │                                             │  │
│                           └─────────────────────────────────────────────┘  │
│                                                                              │
│   DESIGN CHOICE: One goroutine per connection (simple & scalable)           │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Why Go for Redis?

Go is perfect for building Redis because:
  1. Goroutines: Lightweight threads for handling thousands of connections
  2. Channels: Safe communication between goroutines
  3. net package: Excellent networking primitives
  4. Sync package: Mutexes, atomic operations for thread-safety
Real Redis is single-threaded for simplicity. Our Go version uses goroutines but will still serialize writes to ensure consistency.

Implementation

Step 1: Server Structure

internal/server/server.go
package server

import (
    "fmt"
    "io"
    "log"
    "net"
    "sync"
    "sync/atomic"
    
    "myredis/internal/protocol"
)

// Server represents a Redis server
type Server struct {
    addr     string
    listener net.Listener
    
    // Client management
    clients   map[uint64]*Client
    clientsMu sync.RWMutex
    clientID  uint64
    
    // Command handling
    handlers map[string]CommandHandler
    
    // Shared data store
    store *Store
    
    // Shutdown
    shutdown chan struct{}
    wg       sync.WaitGroup
}

// CommandHandler is the function signature for command handlers
type CommandHandler func(client *Client, args []protocol.Value) protocol.Value

// NewServer creates a new Redis server
func NewServer(addr string) *Server {
    s := &Server{
        addr:     addr,
        clients:  make(map[uint64]*Client),
        handlers: make(map[string]CommandHandler),
        store:    NewStore(),
        shutdown: make(chan struct{}),
    }
    
    // Register built-in commands
    s.registerCommands()
    
    return s
}

// RegisterCommand registers a command handler
func (s *Server) RegisterCommand(name string, handler CommandHandler) {
    s.handlers[name] = handler
}

// Start begins listening for connections
func (s *Server) Start() error {
    var err error
    s.listener, err = net.Listen("tcp", s.addr)
    if err != nil {
        return fmt.Errorf("failed to listen on %s: %w", s.addr, err)
    }
    
    log.Printf("Redis server listening on %s", s.addr)
    
    // Accept loop
    for {
        conn, err := s.listener.Accept()
        if err != nil {
            select {
            case <-s.shutdown:
                return nil
            default:
                log.Printf("Accept error: %v", err)
                continue
            }
        }
        
        // Handle each connection in a goroutine
        s.wg.Add(1)
        go s.handleConnection(conn)
    }
}

// Stop gracefully shuts down the server
func (s *Server) Stop() {
    close(s.shutdown)
    s.listener.Close()
    
    // Close all client connections
    s.clientsMu.Lock()
    for _, client := range s.clients {
        client.Close()
    }
    s.clientsMu.Unlock()
    
    s.wg.Wait()
    log.Println("Server stopped")
}

func (s *Server) handleConnection(conn net.Conn) {
    defer s.wg.Done()
    
    // Create client
    id := atomic.AddUint64(&s.clientID, 1)
    client := NewClient(id, conn, s)
    
    // Register client
    s.clientsMu.Lock()
    s.clients[id] = client
    s.clientsMu.Unlock()
    
    log.Printf("Client %d connected from %s", id, conn.RemoteAddr())
    
    // Handle client
    client.Handle()
    
    // Unregister client
    s.clientsMu.Lock()
    delete(s.clients, id)
    s.clientsMu.Unlock()
    
    log.Printf("Client %d disconnected", id)
}

Step 2: Client Handler

internal/server/client.go
package server

import (
    "io"
    "net"
    "strings"
    "time"
    
    "myredis/internal/protocol"
)

// Client represents a connected Redis client
type Client struct {
    id     uint64
    conn   net.Conn
    server *Server
    parser *protocol.Parser
    writer *protocol.Writer
}

// NewClient creates a new client handler
func NewClient(id uint64, conn net.Conn, server *Server) *Client {
    return &Client{
        id:     id,
        conn:   conn,
        server: server,
        parser: protocol.NewParser(conn),
        writer: protocol.NewWriter(conn),
    }
}

// Handle processes client commands in a loop
func (c *Client) Handle() {
    defer c.conn.Close()
    
    for {
        // Set read deadline for idle timeout (optional)
        c.conn.SetReadDeadline(time.Now().Add(5 * time.Minute))
        
        // Parse next command
        cmd, err := c.parser.Parse()
        if err != nil {
            if err != io.EOF {
                // Only log if it's not a clean disconnect
                if !isConnectionClosed(err) {
                    c.respond(protocol.NewError("ERR " + err.Error()))
                }
            }
            return
        }
        
        // Process and respond
        response := c.processCommand(cmd)
        if err := c.respond(response); err != nil {
            return
        }
    }
}

// processCommand routes the command to the appropriate handler
func (c *Client) processCommand(cmd protocol.Value) protocol.Value {
    // Commands must be arrays
    if cmd.Type != protocol.Array || len(cmd.Array) == 0 {
        return protocol.NewError("ERR invalid command format")
    }
    
    // First element is the command name
    cmdName := strings.ToUpper(cmd.Array[0].AsString())
    args := cmd.Array[1:]
    
    // Look up handler
    handler, exists := c.server.handlers[cmdName]
    if !exists {
        return protocol.NewError(
            "ERR unknown command '" + cmdName + "'",
        )
    }
    
    // Execute handler
    return handler(c, args)
}

// respond sends a response to the client
func (c *Client) respond(val protocol.Value) error {
    return c.writer.Write(val)
}

// Close closes the client connection
func (c *Client) Close() error {
    return c.conn.Close()
}

// isConnectionClosed checks if the error indicates a closed connection
func isConnectionClosed(err error) bool {
    if err == nil {
        return false
    }
    return strings.Contains(err.Error(), "use of closed network connection") ||
           strings.Contains(err.Error(), "connection reset")
}

Step 3: In-Memory Store

internal/server/store.go
package server

import (
    "sync"
    "time"
)

// Entry represents a stored value with optional expiration
type Entry struct {
    Value    []byte
    ExpireAt *time.Time
}

// IsExpired checks if the entry has expired
func (e *Entry) IsExpired() bool {
    if e.ExpireAt == nil {
        return false
    }
    return time.Now().After(*e.ExpireAt)
}

// Store is the in-memory key-value store
type Store struct {
    data map[string]*Entry
    mu   sync.RWMutex
}

// NewStore creates a new store
func NewStore() *Store {
    s := &Store{
        data: make(map[string]*Entry),
    }
    
    // Start background expiration cleanup
    go s.expireLoop()
    
    return s
}

// Get retrieves a value by key
func (s *Store) Get(key string) ([]byte, bool) {
    s.mu.RLock()
    entry, exists := s.data[key]
    s.mu.RUnlock()
    
    if !exists {
        return nil, false
    }
    
    // Check expiration (lazy expiration)
    if entry.IsExpired() {
        s.Delete(key)
        return nil, false
    }
    
    return entry.Value, true
}

// Set stores a value with optional TTL
func (s *Store) Set(key string, value []byte, ttl *time.Duration) {
    entry := &Entry{Value: value}
    
    if ttl != nil {
        expireAt := time.Now().Add(*ttl)
        entry.ExpireAt = &expireAt
    }
    
    s.mu.Lock()
    s.data[key] = entry
    s.mu.Unlock()
}

// Delete removes a key
func (s *Store) Delete(keys ...string) int {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    deleted := 0
    for _, key := range keys {
        if _, exists := s.data[key]; exists {
            delete(s.data, key)
            deleted++
        }
    }
    return deleted
}

// Exists checks if keys exist
func (s *Store) Exists(keys ...string) int {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    count := 0
    for _, key := range keys {
        if entry, exists := s.data[key]; exists && !entry.IsExpired() {
            count++
        }
    }
    return count
}

// Keys returns all keys matching a pattern (simple implementation)
func (s *Store) Keys(pattern string) []string {
    s.mu.RLock()
    defer s.mu.RUnlock()
    
    var keys []string
    for key, entry := range s.data {
        if !entry.IsExpired() && matchPattern(pattern, key) {
            keys = append(keys, key)
        }
    }
    return keys
}

// expireLoop periodically cleans up expired keys
func (s *Store) expireLoop() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        s.cleanupExpired()
    }
}

func (s *Store) cleanupExpired() {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    now := time.Now()
    for key, entry := range s.data {
        if entry.ExpireAt != nil && now.After(*entry.ExpireAt) {
            delete(s.data, key)
        }
    }
}

// matchPattern implements simple glob matching (* only)
func matchPattern(pattern, key string) bool {
    if pattern == "*" {
        return true
    }
    // Simple prefix match for pattern like "user:*"
    if len(pattern) > 0 && pattern[len(pattern)-1] == '*' {
        prefix := pattern[:len(pattern)-1]
        return len(key) >= len(prefix) && key[:len(prefix)] == prefix
    }
    return pattern == key
}

Step 4: Register Commands

internal/server/commands.go
package server

import (
    "strconv"
    "strings"
    "time"
    
    "myredis/internal/protocol"
)

// registerCommands sets up all built-in command handlers
func (s *Server) registerCommands() {
    // Connection commands
    s.RegisterCommand("PING", s.cmdPing)
    s.RegisterCommand("ECHO", s.cmdEcho)
    s.RegisterCommand("QUIT", s.cmdQuit)
    
    // String commands
    s.RegisterCommand("GET", s.cmdGet)
    s.RegisterCommand("SET", s.cmdSet)
    s.RegisterCommand("DEL", s.cmdDel)
    s.RegisterCommand("EXISTS", s.cmdExists)
    s.RegisterCommand("KEYS", s.cmdKeys)
    
    // Server commands
    s.RegisterCommand("INFO", s.cmdInfo)
    s.RegisterCommand("DBSIZE", s.cmdDbSize)
    s.RegisterCommand("FLUSHDB", s.cmdFlushDb)
}

// PING [message]
func (s *Server) cmdPing(c *Client, args []protocol.Value) protocol.Value {
    if len(args) == 0 {
        return protocol.PONG
    }
    return protocol.NewBulkString(args[0].AsString())
}

// ECHO message
func (s *Server) cmdEcho(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 1 {
        return protocol.NewError("ERR wrong number of arguments for 'echo' command")
    }
    return protocol.NewBulkString(args[0].AsString())
}

// QUIT
func (s *Server) cmdQuit(c *Client, args []protocol.Value) protocol.Value {
    c.Close()
    return protocol.OK
}

// GET key
func (s *Server) cmdGet(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 1 {
        return protocol.NewError("ERR wrong number of arguments for 'get' command")
    }
    
    key := args[0].AsString()
    value, exists := s.store.Get(key)
    
    if !exists {
        return protocol.NullBulk
    }
    
    return protocol.NewBulkBytes(value)
}

// SET key value [EX seconds] [PX milliseconds] [NX|XX]
func (s *Server) cmdSet(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 2 {
        return protocol.NewError("ERR wrong number of arguments for 'set' command")
    }
    
    key := args[0].AsString()
    value := args[1].AsBytes()
    
    var ttl *time.Duration
    nx := false // Only set if not exists
    xx := false // Only set if exists
    
    // Parse optional arguments
    for i := 2; i < len(args); i++ {
        opt := strings.ToUpper(args[i].AsString())
        
        switch opt {
        case "EX":
            if i+1 >= len(args) {
                return protocol.NewError("ERR syntax error")
            }
            seconds, err := strconv.Atoi(args[i+1].AsString())
            if err != nil {
                return protocol.NewError("ERR value is not an integer")
            }
            d := time.Duration(seconds) * time.Second
            ttl = &d
            i++
            
        case "PX":
            if i+1 >= len(args) {
                return protocol.NewError("ERR syntax error")
            }
            ms, err := strconv.Atoi(args[i+1].AsString())
            if err != nil {
                return protocol.NewError("ERR value is not an integer")
            }
            d := time.Duration(ms) * time.Millisecond
            ttl = &d
            i++
            
        case "NX":
            nx = true
        case "XX":
            xx = true
        }
    }
    
    // Check NX/XX conditions
    _, exists := s.store.Get(key)
    if nx && exists {
        return protocol.NullBulk
    }
    if xx && !exists {
        return protocol.NullBulk
    }
    
    s.store.Set(key, value, ttl)
    return protocol.OK
}

// DEL key [key ...]
func (s *Server) cmdDel(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 1 {
        return protocol.NewError("ERR wrong number of arguments for 'del' command")
    }
    
    keys := make([]string, len(args))
    for i, arg := range args {
        keys[i] = arg.AsString()
    }
    
    deleted := s.store.Delete(keys...)
    return protocol.NewInteger(int64(deleted))
}

// EXISTS key [key ...]
func (s *Server) cmdExists(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 1 {
        return protocol.NewError("ERR wrong number of arguments for 'exists' command")
    }
    
    keys := make([]string, len(args))
    for i, arg := range args {
        keys[i] = arg.AsString()
    }
    
    count := s.store.Exists(keys...)
    return protocol.NewInteger(int64(count))
}

// KEYS pattern
func (s *Server) cmdKeys(c *Client, args []protocol.Value) protocol.Value {
    if len(args) < 1 {
        return protocol.NewError("ERR wrong number of arguments for 'keys' command")
    }
    
    pattern := args[0].AsString()
    keys := s.store.Keys(pattern)
    
    result := make([]protocol.Value, len(keys))
    for i, key := range keys {
        result[i] = protocol.NewBulkString(key)
    }
    
    return protocol.NewArray(result)
}

// INFO [section]
func (s *Server) cmdInfo(c *Client, args []protocol.Value) protocol.Value {
    info := "# Server\r\nredis_version:0.1.0\r\nmyredis_version:1.0\r\n"
    return protocol.NewBulkString(info)
}

// DBSIZE
func (s *Server) cmdDbSize(c *Client, args []protocol.Value) protocol.Value {
    count := len(s.store.Keys("*"))
    return protocol.NewInteger(int64(count))
}

// FLUSHDB
func (s *Server) cmdFlushDb(c *Client, args []protocol.Value) protocol.Value {
    keys := s.store.Keys("*")
    s.store.Delete(keys...)
    return protocol.OK
}

Step 5: Main Entry Point

cmd/server/main.go
package main

import (
    "flag"
    "log"
    "os"
    "os/signal"
    "syscall"
    
    "myredis/internal/server"
)

func main() {
    port := flag.String("port", "6379", "Port to listen on")
    flag.Parse()
    
    addr := ":" + *port
    srv := server.NewServer(addr)
    
    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        log.Println("Shutting down...")
        srv.Stop()
    }()
    
    // Start server
    if err := srv.Start(); err != nil {
        log.Fatalf("Server error: %v", err)
    }
}

Testing Your Server

Build and Run

go build -o myredis ./cmd/server
./myredis
# Redis server listening on :6379

Test with redis-cli

# In another terminal
redis-cli

> PING
PONG

> SET name "Alice"
OK

> GET name
"Alice"

> SET counter 100 EX 10
OK

> GET counter
"100"

# Wait 10 seconds...
> GET counter
(nil)

Exercises

Implement atomic increment/decrement:
// INCR key
// DECR key
// INCRBY key increment
// DECRBY key decrement

// These should:
// 1. Parse the value as an integer
// 2. Increment/decrement atomically
// 3. Return the new value
// 4. Create the key with value 0 if it doesn't exist
// TTL key - Return remaining TTL in seconds
// PTTL key - Return remaining TTL in milliseconds
// EXPIRE key seconds - Set expiration
// PEXPIRE key milliseconds - Set expiration in ms
// PERSIST key - Remove expiration
Create a client connection pool:
type Pool struct {
    maxIdle    int
    maxActive  int
    conns      chan net.Conn
}

func (p *Pool) Get() (net.Conn, error)
func (p *Pool) Put(conn net.Conn)

Key Takeaways

Goroutine per Connection

Go makes concurrent connection handling simple and efficient

Thread-Safe Store

Use sync.RWMutex for concurrent read access, exclusive writes

Lazy Expiration

Check expiration on access + background cleanup

Command Routing

Map command names to handler functions for clean design

Further Reading


What’s Next?

In Chapter 3: Data Structures, we’ll implement:
  • Lists (LPUSH, RPUSH, LPOP, LRANGE)
  • Sets (SADD, SMEMBERS, SINTER)
  • Hashes (HSET, HGET, HGETALL)
  • Sorted Sets with skip lists

Next: Data Structures

Implement Redis data structures beyond strings