Skip to main content

Chapter 4: Persistence

A database that loses all data on restart isn’t very useful! In this chapter, we’ll implement both persistence strategies Redis uses: RDB snapshots and AOF (Append-Only File) logging.
Prerequisites: Chapter 3: Data Structures
Further Reading: Database Engineering: Storage Engines
Time: 3-4 hours
Outcome: Data survives server restarts

Persistence Strategies Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                         PERSISTENCE COMPARISON                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   RDB SNAPSHOTS                        AOF (Append-Only File)               │
│   ─────────────                        ────────────────────────              │
│                                                                              │
│   ┌───────────────┐                    ┌───────────────────┐                │
│   │  Full dump    │                    │ SET foo bar       │                │
│   │  at time T    │                    │ SET baz qux       │                │
│   │               │                    │ INCR counter      │                │
│   │  Binary       │                    │ DEL foo           │                │
│   │  Compact      │                    │ ...               │                │
│   └───────────────┘                    └───────────────────┘                │
│                                                                              │
│   PROS:                                PROS:                                 │
│   ✓ Compact file size                  ✓ Every write is logged             │
│   ✓ Fast recovery                      ✓ Minimal data loss                  │
│   ✓ Perfect for backups                ✓ Human-readable                     │
│                                                                              │
│   CONS:                                CONS:                                 │
│   ✗ Data loss between saves            ✗ Larger file size                   │
│   ✗ Fork can be slow on large data     ✗ Slower recovery                    │
│                                                                              │
│   BEST FOR:                            BEST FOR:                            │
│   → Periodic backups                   → Maximum durability                  │
│   → Disaster recovery                  → Audit logs                          │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Part 1: RDB Snapshots

RDB creates a point-in-time snapshot of all data in a binary format.

RDB File Format

┌─────────────────────────────────────────────────────────────────────────────┐
│                           RDB FILE FORMAT                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   ┌──────────┬────────────┬────────────┬─────────────────────┬─────────────┤
│   │  REDIS   │  Version   │  Database  │     Key-Value       │    EOF      │
│   │  Magic   │  (4 bytes) │  Selector  │       Pairs         │  Checksum   │
│   │ (5 bytes)│            │            │                     │  (8 bytes)  │
│   └──────────┴────────────┴────────────┴─────────────────────┴─────────────┤
│                                                                              │
│   Key-Value Entry:                                                          │
│   ┌──────────┬────────────┬──────────────┬─────────────────────┐           │
│   │ Value    │ Key Length │    Key       │       Value         │           │
│   │ Type     │  (1-5 B)   │   (N bytes)  │    (varies)         │           │
│   └──────────┴────────────┴──────────────┴─────────────────────┘           │
│                                                                              │
│   Value Types:                                                               │
│   0 = String                                                                 │
│   1 = List                                                                   │
│   2 = Set                                                                    │
│   3 = Sorted Set (ZSet)                                                     │
│   4 = Hash                                                                   │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

RDB Encoder

internal/persistence/rdb.go
package persistence

import (
    "encoding/binary"
    "hash/crc64"
    "io"
    "os"
    "sync"
)

const (
    RDBVersion   = 9
    RDBMagic     = "REDIS"
    
    // Type constants
    TypeString   = 0
    TypeList     = 1
    TypeSet      = 2
    TypeZSet     = 3
    TypeHash     = 4
    
    // Special values
    OpCodeEOF    = 255
    OpCodeDB     = 254
    OpCodeExpire = 253
)

// RDBWriter handles writing RDB files
type RDBWriter struct {
    file    *os.File
    crc     *crc64.Table
    checksum uint64
}

// NewRDBWriter creates a new RDB writer
func NewRDBWriter(path string) (*RDBWriter, error) {
    file, err := os.Create(path + ".tmp")
    if err != nil {
        return nil, err
    }
    
    return &RDBWriter{
        file: file,
        crc:  crc64.MakeTable(crc64.ECMA),
    }, nil
}

// WriteHeader writes the RDB header
func (w *RDBWriter) WriteHeader() error {
    // Magic string
    if _, err := w.write([]byte(RDBMagic)); err != nil {
        return err
    }
    
    // Version (4 bytes, zero-padded)
    version := []byte("0009")
    if _, err := w.write(version); err != nil {
        return err
    }
    
    return nil
}

// SelectDB writes database selector
func (w *RDBWriter) SelectDB(dbNum int) error {
    if err := w.writeByte(OpCodeDB); err != nil {
        return err
    }
    return w.writeLength(dbNum)
}

// WriteString writes a string key-value pair
func (w *RDBWriter) WriteString(key string, value []byte) error {
    // Type
    if err := w.writeByte(TypeString); err != nil {
        return err
    }
    
    // Key
    if err := w.writeString(key); err != nil {
        return err
    }
    
    // Value (as length-prefixed string)
    return w.writeLengthPrefixedBytes(value)
}

// WriteList writes a list
func (w *RDBWriter) WriteList(key string, values [][]byte) error {
    if err := w.writeByte(TypeList); err != nil {
        return err
    }
    
    if err := w.writeString(key); err != nil {
        return err
    }
    
    // List length
    if err := w.writeLength(len(values)); err != nil {
        return err
    }
    
    // Elements
    for _, v := range values {
        if err := w.writeLengthPrefixedBytes(v); err != nil {
            return err
        }
    }
    
    return nil
}

// WriteSet writes a set
func (w *RDBWriter) WriteSet(key string, members []string) error {
    if err := w.writeByte(TypeSet); err != nil {
        return err
    }
    
    if err := w.writeString(key); err != nil {
        return err
    }
    
    if err := w.writeLength(len(members)); err != nil {
        return err
    }
    
    for _, m := range members {
        if err := w.writeString(m); err != nil {
            return err
        }
    }
    
    return nil
}

// WriteHash writes a hash
func (w *RDBWriter) WriteHash(key string, fields map[string][]byte) error {
    if err := w.writeByte(TypeHash); err != nil {
        return err
    }
    
    if err := w.writeString(key); err != nil {
        return err
    }
    
    if err := w.writeLength(len(fields)); err != nil {
        return err
    }
    
    for field, value := range fields {
        if err := w.writeString(field); err != nil {
            return err
        }
        if err := w.writeLengthPrefixedBytes(value); err != nil {
            return err
        }
    }
    
    return nil
}

// WriteZSet writes a sorted set
func (w *RDBWriter) WriteZSet(key string, members []struct{ Member string; Score float64 }) error {
    if err := w.writeByte(TypeZSet); err != nil {
        return err
    }
    
    if err := w.writeString(key); err != nil {
        return err
    }
    
    if err := w.writeLength(len(members)); err != nil {
        return err
    }
    
    for _, m := range members {
        if err := w.writeString(m.Member); err != nil {
            return err
        }
        // Write score as string for simplicity
        scoreBytes := []byte(fmt.Sprintf("%.17g", m.Score))
        if err := w.writeLengthPrefixedBytes(scoreBytes); err != nil {
            return err
        }
    }
    
    return nil
}

// WriteFooter writes EOF marker and checksum
func (w *RDBWriter) WriteFooter() error {
    if err := w.writeByte(OpCodeEOF); err != nil {
        return err
    }
    
    // Write checksum
    checksumBytes := make([]byte, 8)
    binary.LittleEndian.PutUint64(checksumBytes, w.checksum)
    if _, err := w.file.Write(checksumBytes); err != nil {
        return err
    }
    
    return nil
}

// Close finalizes the file
func (w *RDBWriter) Close(path string) error {
    if err := w.file.Sync(); err != nil {
        return err
    }
    if err := w.file.Close(); err != nil {
        return err
    }
    
    // Atomic rename
    return os.Rename(path+".tmp", path)
}

// Helper methods
func (w *RDBWriter) write(data []byte) (int, error) {
    w.checksum = crc64.Update(w.checksum, w.crc, data)
    return w.file.Write(data)
}

func (w *RDBWriter) writeByte(b byte) error {
    _, err := w.write([]byte{b})
    return err
}

func (w *RDBWriter) writeLength(length int) error {
    // Simple length encoding (can be optimized)
    if length < 64 {
        return w.writeByte(byte(length))
    } else if length < 16384 {
        w.writeByte(byte(0x40 | (length >> 8)))
        return w.writeByte(byte(length & 0xFF))
    } else {
        w.writeByte(0x80)
        buf := make([]byte, 4)
        binary.BigEndian.PutUint32(buf, uint32(length))
        _, err := w.write(buf)
        return err
    }
}

func (w *RDBWriter) writeString(s string) error {
    if err := w.writeLength(len(s)); err != nil {
        return err
    }
    _, err := w.write([]byte(s))
    return err
}

func (w *RDBWriter) writeLengthPrefixedBytes(data []byte) error {
    if err := w.writeLength(len(data)); err != nil {
        return err
    }
    _, err := w.write(data)
    return err
}

RDB Reader

internal/persistence/rdb_reader.go
package persistence

import (
    "encoding/binary"
    "fmt"
    "hash/crc64"
    "io"
    "os"
)

// RDBReader handles reading RDB files
type RDBReader struct {
    file     *os.File
    crc      *crc64.Table
    checksum uint64
}

// NewRDBReader opens an RDB file for reading
func NewRDBReader(path string) (*RDBReader, error) {
    file, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    
    return &RDBReader{
        file: file,
        crc:  crc64.MakeTable(crc64.ECMA),
    }, nil
}

// VerifyHeader checks the RDB magic string and version
func (r *RDBReader) VerifyHeader() error {
    magic := make([]byte, 5)
    if _, err := io.ReadFull(r.file, magic); err != nil {
        return err
    }
    
    if string(magic) != RDBMagic {
        return fmt.Errorf("invalid RDB magic: %s", magic)
    }
    
    version := make([]byte, 4)
    if _, err := io.ReadFull(r.file, version); err != nil {
        return err
    }
    
    // Update checksum
    r.checksum = crc64.Update(r.checksum, r.crc, magic)
    r.checksum = crc64.Update(r.checksum, r.crc, version)
    
    return nil
}

// ReadEntry reads the next entry from the RDB file
func (r *RDBReader) ReadEntry() (string, interface{}, error) {
    typeByte, err := r.readByte()
    if err != nil {
        return "", nil, err
    }
    
    switch typeByte {
    case OpCodeEOF:
        return "", nil, io.EOF
    case OpCodeDB:
        // Skip database number
        r.readLength()
        return r.ReadEntry()
    case TypeString:
        key, err := r.readString()
        if err != nil {
            return "", nil, err
        }
        value, err := r.readBytes()
        if err != nil {
            return "", nil, err
        }
        return key, value, nil
    case TypeList:
        return r.readList()
    case TypeSet:
        return r.readSet()
    case TypeHash:
        return r.readHash()
    case TypeZSet:
        return r.readZSet()
    default:
        return "", nil, fmt.Errorf("unknown type: %d", typeByte)
    }
}

func (r *RDBReader) readList() (string, interface{}, error) {
    key, err := r.readString()
    if err != nil {
        return "", nil, err
    }
    
    length, err := r.readLength()
    if err != nil {
        return "", nil, err
    }
    
    values := make([][]byte, length)
    for i := 0; i < length; i++ {
        values[i], err = r.readBytes()
        if err != nil {
            return "", nil, err
        }
    }
    
    return key, values, nil
}

func (r *RDBReader) readSet() (string, interface{}, error) {
    key, err := r.readString()
    if err != nil {
        return "", nil, err
    }
    
    length, err := r.readLength()
    if err != nil {
        return "", nil, err
    }
    
    members := make([]string, length)
    for i := 0; i < length; i++ {
        members[i], err = r.readString()
        if err != nil {
            return "", nil, err
        }
    }
    
    return key, members, nil
}

func (r *RDBReader) readHash() (string, interface{}, error) {
    key, err := r.readString()
    if err != nil {
        return "", nil, err
    }
    
    length, err := r.readLength()
    if err != nil {
        return "", nil, err
    }
    
    fields := make(map[string][]byte, length)
    for i := 0; i < length; i++ {
        field, err := r.readString()
        if err != nil {
            return "", nil, err
        }
        value, err := r.readBytes()
        if err != nil {
            return "", nil, err
        }
        fields[field] = value
    }
    
    return key, fields, nil
}

// Helper methods
func (r *RDBReader) readByte() (byte, error) {
    buf := make([]byte, 1)
    if _, err := io.ReadFull(r.file, buf); err != nil {
        return 0, err
    }
    r.checksum = crc64.Update(r.checksum, r.crc, buf)
    return buf[0], nil
}

func (r *RDBReader) readLength() (int, error) {
    first, err := r.readByte()
    if err != nil {
        return 0, err
    }
    
    switch first >> 6 {
    case 0:
        return int(first & 0x3F), nil
    case 1:
        second, err := r.readByte()
        if err != nil {
            return 0, err
        }
        return int(first&0x3F)<<8 | int(second), nil
    case 2:
        buf := make([]byte, 4)
        if _, err := io.ReadFull(r.file, buf); err != nil {
            return 0, err
        }
        r.checksum = crc64.Update(r.checksum, r.crc, buf)
        return int(binary.BigEndian.Uint32(buf)), nil
    default:
        return 0, fmt.Errorf("invalid length encoding")
    }
}

func (r *RDBReader) readString() (string, error) {
    length, err := r.readLength()
    if err != nil {
        return "", err
    }
    
    buf := make([]byte, length)
    if _, err := io.ReadFull(r.file, buf); err != nil {
        return "", err
    }
    r.checksum = crc64.Update(r.checksum, r.crc, buf)
    
    return string(buf), nil
}

func (r *RDBReader) readBytes() ([]byte, error) {
    length, err := r.readLength()
    if err != nil {
        return nil, err
    }
    
    buf := make([]byte, length)
    if _, err := io.ReadFull(r.file, buf); err != nil {
        return nil, err
    }
    r.checksum = crc64.Update(r.checksum, r.crc, buf)
    
    return buf, nil
}

func (r *RDBReader) Close() error {
    return r.file.Close()
}

Part 2: Background Saves (BGSAVE)

Redis forks to create snapshots without blocking the main process.
internal/persistence/bgsave.go
package persistence

import (
    "log"
    "sync"
    "time"
)

// BackgroundSaver handles background RDB saves
type BackgroundSaver struct {
    store       *store.Store
    path        string
    lastSave    time.Time
    saving      bool
    mu          sync.Mutex
    
    // Auto-save conditions
    saveAfter   map[int]int // changes -> seconds
}

// NewBackgroundSaver creates a new background saver
func NewBackgroundSaver(store *store.Store, path string) *BackgroundSaver {
    return &BackgroundSaver{
        store:    store,
        path:     path,
        lastSave: time.Now(),
        saveAfter: map[int]int{
            1:     900,  // After 900 sec if at least 1 change
            10:    300,  // After 300 sec if at least 10 changes
            10000: 60,   // After 60 sec if at least 10000 changes
        },
    }
}

// Save performs a background save
func (bs *BackgroundSaver) Save() error {
    bs.mu.Lock()
    if bs.saving {
        bs.mu.Unlock()
        return fmt.Errorf("save already in progress")
    }
    bs.saving = true
    bs.mu.Unlock()
    
    defer func() {
        bs.mu.Lock()
        bs.saving = false
        bs.lastSave = time.Now()
        bs.mu.Unlock()
    }()
    
    // In Go, we can't fork like C, so we use a goroutine
    // For true copy-on-write, you'd need to use OS-specific calls
    return bs.saveSnapshot()
}

func (bs *BackgroundSaver) saveSnapshot() error {
    writer, err := NewRDBWriter(bs.path)
    if err != nil {
        return err
    }
    
    if err := writer.WriteHeader(); err != nil {
        return err
    }
    
    if err := writer.SelectDB(0); err != nil {
        return err
    }
    
    // Iterate over all keys
    bs.store.ForEach(func(key string, value interface{}) {
        switch v := value.(type) {
        case []byte:
            writer.WriteString(key, v)
        case *store.List:
            writer.WriteList(key, v.LRange(0, -1))
        case *store.Set:
            writer.WriteSet(key, v.Members())
        case *store.Hash:
            writer.WriteHash(key, v.GetAll())
        case *store.ZSet:
            writer.WriteZSet(key, v.Range(0, -1, true))
        }
    })
    
    if err := writer.WriteFooter(); err != nil {
        return err
    }
    
    return writer.Close(bs.path)
}

// StartAutoSave starts the automatic save checker
func (bs *BackgroundSaver) StartAutoSave() {
    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        for range ticker.C {
            bs.checkAutoSave()
        }
    }()
}

func (bs *BackgroundSaver) checkAutoSave() {
    bs.mu.Lock()
    if bs.saving {
        bs.mu.Unlock()
        return
    }
    
    elapsed := int(time.Since(bs.lastSave).Seconds())
    changes := bs.store.ChangesSinceLastSave()
    bs.mu.Unlock()
    
    for requiredChanges, requiredSeconds := range bs.saveAfter {
        if changes >= requiredChanges && elapsed >= requiredSeconds {
            log.Printf("Auto-save triggered: %d changes in %d seconds", changes, elapsed)
            bs.Save()
            return
        }
    }
}

Part 3: Append-Only File (AOF)

AOF logs every write command for maximum durability.

AOF Writer

internal/persistence/aof.go
package persistence

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

// AOFSyncPolicy defines when to sync AOF to disk
type AOFSyncPolicy int

const (
    SyncAlways   AOFSyncPolicy = iota // Sync after every write
    SyncEverySec                       // Sync every second
    SyncNo                             // Let OS decide
)

// AOFWriter handles append-only file operations
type AOFWriter struct {
    file    *os.File
    writer  *bufio.Writer
    policy  AOFSyncPolicy
    mu      sync.Mutex
    
    // For SyncEverySec
    lastSync time.Time
}

// NewAOFWriter opens or creates an AOF file
func NewAOFWriter(path string, policy AOFSyncPolicy) (*AOFWriter, error) {
    file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
    if err != nil {
        return nil, err
    }
    
    aof := &AOFWriter{
        file:     file,
        writer:   bufio.NewWriter(file),
        policy:   policy,
        lastSync: time.Now(),
    }
    
    if policy == SyncEverySec {
        go aof.syncLoop()
    }
    
    return aof, nil
}

// AppendCommand logs a command to AOF in RESP format
func (a *AOFWriter) AppendCommand(args []string) error {
    a.mu.Lock()
    defer a.mu.Unlock()
    
    // Write as RESP array
    fmt.Fprintf(a.writer, "*%d\r\n", len(args))
    for _, arg := range args {
        fmt.Fprintf(a.writer, "$%d\r\n%s\r\n", len(arg), arg)
    }
    
    if a.policy == SyncAlways {
        if err := a.writer.Flush(); err != nil {
            return err
        }
        return a.file.Sync()
    }
    
    return nil
}

func (a *AOFWriter) syncLoop() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        a.mu.Lock()
        if time.Since(a.lastSync) >= time.Second {
            a.writer.Flush()
            a.file.Sync()
            a.lastSync = time.Now()
        }
        a.mu.Unlock()
    }
}

// Close closes the AOF file
func (a *AOFWriter) Close() error {
    a.mu.Lock()
    defer a.mu.Unlock()
    
    a.writer.Flush()
    a.file.Sync()
    return a.file.Close()
}

AOF Loader

internal/persistence/aof_loader.go
package persistence

import (
    "bufio"
    "fmt"
    "io"
    "os"
    "strconv"
    "strings"
)

// AOFLoader replays AOF commands to rebuild state
type AOFLoader struct {
    file   *os.File
    reader *bufio.Reader
}

// NewAOFLoader opens an AOF file for reading
func NewAOFLoader(path string) (*AOFLoader, error) {
    file, err := os.Open(path)
    if err != nil {
        if os.IsNotExist(err) {
            return nil, nil // No AOF file, that's okay
        }
        return nil, err
    }
    
    return &AOFLoader{
        file:   file,
        reader: bufio.NewReader(file),
    }, nil
}

// ReadCommand reads the next command from AOF
func (l *AOFLoader) ReadCommand() ([]string, error) {
    // Read array marker
    line, err := l.reader.ReadString('\n')
    if err != nil {
        if err == io.EOF {
            return nil, io.EOF
        }
        return nil, err
    }
    
    line = strings.TrimSuffix(line, "\r\n")
    
    if !strings.HasPrefix(line, "*") {
        return nil, fmt.Errorf("expected array, got: %s", line)
    }
    
    count, err := strconv.Atoi(line[1:])
    if err != nil {
        return nil, err
    }
    
    args := make([]string, count)
    for i := 0; i < count; i++ {
        // Read bulk string marker
        line, err := l.reader.ReadString('\n')
        if err != nil {
            return nil, err
        }
        line = strings.TrimSuffix(line, "\r\n")
        
        if !strings.HasPrefix(line, "$") {
            return nil, fmt.Errorf("expected bulk string, got: %s", line)
        }
        
        length, err := strconv.Atoi(line[1:])
        if err != nil {
            return nil, err
        }
        
        // Read the actual string
        buf := make([]byte, length+2) // +2 for \r\n
        if _, err := io.ReadFull(l.reader, buf); err != nil {
            return nil, err
        }
        
        args[i] = string(buf[:length])
    }
    
    return args, nil
}

// Close closes the AOF file
func (l *AOFLoader) Close() error {
    return l.file.Close()
}

// ReplayAOF replays all commands from AOF to rebuild state
func ReplayAOF(path string, handler func([]string) error) (int, error) {
    loader, err := NewAOFLoader(path)
    if err != nil {
        return 0, err
    }
    if loader == nil {
        return 0, nil // No AOF file
    }
    defer loader.Close()
    
    count := 0
    for {
        cmd, err := loader.ReadCommand()
        if err == io.EOF {
            break
        }
        if err != nil {
            return count, err
        }
        
        if err := handler(cmd); err != nil {
            return count, err
        }
        count++
    }
    
    return count, nil
}

Part 4: AOF Rewriting

Over time, AOF files grow. Rewriting compacts them:
┌─────────────────────────────────────────────────────────────────────────────┐
│                            AOF REWRITING                                     │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│   BEFORE (AOF file):                    AFTER (Rewritten):                  │
│   ─────────────────                     ─────────────────                   │
│   SET counter 1                         SET counter 100                     │
│   INCR counter                          SET name "final"                    │
│   INCR counter                          RPUSH list "a" "b" "c"              │
│   SET name "foo"                                                            │
│   INCR counter                          Much smaller!                       │
│   ...                                   Only final state.                   │
│   SET name "final"                                                          │
│   INCR counter (x97 more)                                                   │
│   RPUSH list "a"                                                            │
│   RPUSH list "b"                                                            │
│   RPUSH list "c"                                                            │
│                                                                              │
│   Rewriting creates a new AOF from current database state,                  │
│   not by reading the old AOF.                                               │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘
internal/persistence/aof_rewrite.go
package persistence

import (
    "fmt"
    "os"
)

// AOFRewriter handles AOF compaction
type AOFRewriter struct {
    store *store.Store
    path  string
}

// NewAOFRewriter creates a new AOF rewriter
func NewAOFRewriter(store *store.Store, path string) *AOFRewriter {
    return &AOFRewriter{store: store, path: path}
}

// Rewrite creates a new compacted AOF file
func (r *AOFRewriter) Rewrite() error {
    tmpPath := r.path + ".rewrite"
    
    // Create new AOF with current state
    writer, err := NewAOFWriter(tmpPath, SyncNo)
    if err != nil {
        return err
    }
    
    // Write current state as commands
    r.store.ForEach(func(key string, value interface{}) {
        switch v := value.(type) {
        case []byte:
            writer.AppendCommand([]string{"SET", key, string(v)})
            
        case *store.List:
            elements := v.LRange(0, -1)
            if len(elements) > 0 {
                args := []string{"RPUSH", key}
                for _, e := range elements {
                    args = append(args, string(e))
                }
                writer.AppendCommand(args)
            }
            
        case *store.Set:
            members := v.Members()
            if len(members) > 0 {
                args := []string{"SADD", key}
                args = append(args, members...)
                writer.AppendCommand(args)
            }
            
        case *store.Hash:
            fields := v.GetAll()
            if len(fields) > 0 {
                args := []string{"HSET", key}
                for field, val := range fields {
                    args = append(args, field, string(val))
                }
                writer.AppendCommand(args)
            }
            
        case *store.ZSet:
            members := v.Range(0, -1, true)
            if len(members) > 0 {
                args := []string{"ZADD", key}
                for _, m := range members {
                    args = append(args, fmt.Sprintf("%g", m.Score), m.Member)
                }
                writer.AppendCommand(args)
            }
        }
    })
    
    writer.Close()
    
    // Atomic rename
    return os.Rename(tmpPath, r.path)
}

Part 5: Integrating Persistence

internal/server/server.go
package server

import (
    "log"
)

type Server struct {
    store      *store.Store
    aof        *persistence.AOFWriter
    rdb        *persistence.BackgroundSaver
}

func NewServer(config Config) (*Server, error) {
    store := store.NewStore()
    
    // Load existing data
    if config.AOFEnabled {
        count, err := persistence.ReplayAOF(config.AOFPath, func(cmd []string) error {
            // Execute command on store
            return store.ExecuteCommand(cmd)
        })
        if err != nil {
            return nil, err
        }
        log.Printf("Loaded %d commands from AOF", count)
    } else if config.RDBEnabled {
        // Load from RDB
        if err := loadRDB(config.RDBPath, store); err != nil {
            log.Printf("Warning: could not load RDB: %v", err)
        }
    }
    
    s := &Server{store: store}
    
    // Initialize AOF
    if config.AOFEnabled {
        aof, err := persistence.NewAOFWriter(config.AOFPath, config.AOFSyncPolicy)
        if err != nil {
            return nil, err
        }
        s.aof = aof
    }
    
    // Initialize RDB background saver
    if config.RDBEnabled {
        s.rdb = persistence.NewBackgroundSaver(store, config.RDBPath)
        s.rdb.StartAutoSave()
    }
    
    return s, nil
}

// HandleCommand processes a command and logs to AOF if needed
func (s *Server) HandleCommand(cmd []string) (interface{}, error) {
    result, err := s.store.ExecuteCommand(cmd)
    if err != nil {
        return nil, err
    }
    
    // Log write commands to AOF
    if s.aof != nil && isWriteCommand(cmd[0]) {
        s.aof.AppendCommand(cmd)
    }
    
    return result, nil
}

func isWriteCommand(cmd string) bool {
    switch strings.ToUpper(cmd) {
    case "SET", "DEL", "INCR", "DECR",
         "LPUSH", "RPUSH", "LPOP", "RPOP",
         "SADD", "SREM",
         "HSET", "HDEL",
         "ZADD", "ZREM":
        return true
    }
    return false
}

Exercises

Persist key expiration times:
// In RDB: Store expire time before key-value
// OpCodeExpire + timestamp + key + value

// In AOF: Log EXPIRE commands
// Or use SETEX format
Add the BGREWRITEAOF command:
// 1. Create new AOF in background
// 2. Buffer new commands during rewrite
// 3. Append buffer to new AOF
// 4. Atomic swap
Add LZF compression to RDB:
// Compress string values > 64 bytes
// Use special type marker for compressed strings
// TypeCompressedString = 0xC0

Key Takeaways

RDB Snapshots

Point-in-time backups, compact but may lose recent data

AOF Logging

Every write logged, durable but larger files

Checksums

CRC64 ensures data integrity

AOF Rewriting

Compact AOF by rebuilding from current state

What’s Next?

In Chapter 5: Pub/Sub, we’ll implement:
  • Publish/Subscribe messaging
  • Pattern subscriptions
  • Real-time event streaming

Next: Pub/Sub

Implement real-time messaging