Chapter 4: Persistence
A database that loses all data on restart isn’t very useful! In this chapter, we’ll implement both persistence strategies Redis uses: RDB snapshots and AOF (Append-Only File) logging.Prerequisites: Chapter 3: Data Structures
Further Reading: Database Engineering: Storage Engines
Time: 3-4 hours
Outcome: Data survives server restarts
Further Reading: Database Engineering: Storage Engines
Time: 3-4 hours
Outcome: Data survives server restarts
Persistence Strategies Overview
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ PERSISTENCE COMPARISON │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ RDB SNAPSHOTS AOF (Append-Only File) │
│ ───────────── ──────────────────────── │
│ │
│ ┌───────────────┐ ┌───────────────────┐ │
│ │ Full dump │ │ SET foo bar │ │
│ │ at time T │ │ SET baz qux │ │
│ │ │ │ INCR counter │ │
│ │ Binary │ │ DEL foo │ │
│ │ Compact │ │ ... │ │
│ └───────────────┘ └───────────────────┘ │
│ │
│ PROS: PROS: │
│ ✓ Compact file size ✓ Every write is logged │
│ ✓ Fast recovery ✓ Minimal data loss │
│ ✓ Perfect for backups ✓ Human-readable │
│ │
│ CONS: CONS: │
│ ✗ Data loss between saves ✗ Larger file size │
│ ✗ Fork can be slow on large data ✗ Slower recovery │
│ │
│ BEST FOR: BEST FOR: │
│ → Periodic backups → Maximum durability │
│ → Disaster recovery → Audit logs │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Part 1: RDB Snapshots
RDB creates a point-in-time snapshot of all data in a binary format.RDB File Format
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ RDB FILE FORMAT │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┬────────────┬────────────┬─────────────────────┬─────────────┤
│ │ REDIS │ Version │ Database │ Key-Value │ EOF │
│ │ Magic │ (4 bytes) │ Selector │ Pairs │ Checksum │
│ │ (5 bytes)│ │ │ │ (8 bytes) │
│ └──────────┴────────────┴────────────┴─────────────────────┴─────────────┤
│ │
│ Key-Value Entry: │
│ ┌──────────┬────────────┬──────────────┬─────────────────────┐ │
│ │ Value │ Key Length │ Key │ Value │ │
│ │ Type │ (1-5 B) │ (N bytes) │ (varies) │ │
│ └──────────┴────────────┴──────────────┴─────────────────────┘ │
│ │
│ Value Types: │
│ 0 = String │
│ 1 = List │
│ 2 = Set │
│ 3 = Sorted Set (ZSet) │
│ 4 = Hash │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
RDB Encoder
internal/persistence/rdb.go
Copy
package persistence
import (
"encoding/binary"
"hash/crc64"
"io"
"os"
"sync"
)
const (
RDBVersion = 9
RDBMagic = "REDIS"
// Type constants
TypeString = 0
TypeList = 1
TypeSet = 2
TypeZSet = 3
TypeHash = 4
// Special values
OpCodeEOF = 255
OpCodeDB = 254
OpCodeExpire = 253
)
// RDBWriter handles writing RDB files
type RDBWriter struct {
file *os.File
crc *crc64.Table
checksum uint64
}
// NewRDBWriter creates a new RDB writer
func NewRDBWriter(path string) (*RDBWriter, error) {
file, err := os.Create(path + ".tmp")
if err != nil {
return nil, err
}
return &RDBWriter{
file: file,
crc: crc64.MakeTable(crc64.ECMA),
}, nil
}
// WriteHeader writes the RDB header
func (w *RDBWriter) WriteHeader() error {
// Magic string
if _, err := w.write([]byte(RDBMagic)); err != nil {
return err
}
// Version (4 bytes, zero-padded)
version := []byte("0009")
if _, err := w.write(version); err != nil {
return err
}
return nil
}
// SelectDB writes database selector
func (w *RDBWriter) SelectDB(dbNum int) error {
if err := w.writeByte(OpCodeDB); err != nil {
return err
}
return w.writeLength(dbNum)
}
// WriteString writes a string key-value pair
func (w *RDBWriter) WriteString(key string, value []byte) error {
// Type
if err := w.writeByte(TypeString); err != nil {
return err
}
// Key
if err := w.writeString(key); err != nil {
return err
}
// Value (as length-prefixed string)
return w.writeLengthPrefixedBytes(value)
}
// WriteList writes a list
func (w *RDBWriter) WriteList(key string, values [][]byte) error {
if err := w.writeByte(TypeList); err != nil {
return err
}
if err := w.writeString(key); err != nil {
return err
}
// List length
if err := w.writeLength(len(values)); err != nil {
return err
}
// Elements
for _, v := range values {
if err := w.writeLengthPrefixedBytes(v); err != nil {
return err
}
}
return nil
}
// WriteSet writes a set
func (w *RDBWriter) WriteSet(key string, members []string) error {
if err := w.writeByte(TypeSet); err != nil {
return err
}
if err := w.writeString(key); err != nil {
return err
}
if err := w.writeLength(len(members)); err != nil {
return err
}
for _, m := range members {
if err := w.writeString(m); err != nil {
return err
}
}
return nil
}
// WriteHash writes a hash
func (w *RDBWriter) WriteHash(key string, fields map[string][]byte) error {
if err := w.writeByte(TypeHash); err != nil {
return err
}
if err := w.writeString(key); err != nil {
return err
}
if err := w.writeLength(len(fields)); err != nil {
return err
}
for field, value := range fields {
if err := w.writeString(field); err != nil {
return err
}
if err := w.writeLengthPrefixedBytes(value); err != nil {
return err
}
}
return nil
}
// WriteZSet writes a sorted set
func (w *RDBWriter) WriteZSet(key string, members []struct{ Member string; Score float64 }) error {
if err := w.writeByte(TypeZSet); err != nil {
return err
}
if err := w.writeString(key); err != nil {
return err
}
if err := w.writeLength(len(members)); err != nil {
return err
}
for _, m := range members {
if err := w.writeString(m.Member); err != nil {
return err
}
// Write score as string for simplicity
scoreBytes := []byte(fmt.Sprintf("%.17g", m.Score))
if err := w.writeLengthPrefixedBytes(scoreBytes); err != nil {
return err
}
}
return nil
}
// WriteFooter writes EOF marker and checksum
func (w *RDBWriter) WriteFooter() error {
if err := w.writeByte(OpCodeEOF); err != nil {
return err
}
// Write checksum
checksumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(checksumBytes, w.checksum)
if _, err := w.file.Write(checksumBytes); err != nil {
return err
}
return nil
}
// Close finalizes the file
func (w *RDBWriter) Close(path string) error {
if err := w.file.Sync(); err != nil {
return err
}
if err := w.file.Close(); err != nil {
return err
}
// Atomic rename
return os.Rename(path+".tmp", path)
}
// Helper methods
func (w *RDBWriter) write(data []byte) (int, error) {
w.checksum = crc64.Update(w.checksum, w.crc, data)
return w.file.Write(data)
}
func (w *RDBWriter) writeByte(b byte) error {
_, err := w.write([]byte{b})
return err
}
func (w *RDBWriter) writeLength(length int) error {
// Simple length encoding (can be optimized)
if length < 64 {
return w.writeByte(byte(length))
} else if length < 16384 {
w.writeByte(byte(0x40 | (length >> 8)))
return w.writeByte(byte(length & 0xFF))
} else {
w.writeByte(0x80)
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, uint32(length))
_, err := w.write(buf)
return err
}
}
func (w *RDBWriter) writeString(s string) error {
if err := w.writeLength(len(s)); err != nil {
return err
}
_, err := w.write([]byte(s))
return err
}
func (w *RDBWriter) writeLengthPrefixedBytes(data []byte) error {
if err := w.writeLength(len(data)); err != nil {
return err
}
_, err := w.write(data)
return err
}
RDB Reader
internal/persistence/rdb_reader.go
Copy
package persistence
import (
"encoding/binary"
"fmt"
"hash/crc64"
"io"
"os"
)
// RDBReader handles reading RDB files
type RDBReader struct {
file *os.File
crc *crc64.Table
checksum uint64
}
// NewRDBReader opens an RDB file for reading
func NewRDBReader(path string) (*RDBReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
return &RDBReader{
file: file,
crc: crc64.MakeTable(crc64.ECMA),
}, nil
}
// VerifyHeader checks the RDB magic string and version
func (r *RDBReader) VerifyHeader() error {
magic := make([]byte, 5)
if _, err := io.ReadFull(r.file, magic); err != nil {
return err
}
if string(magic) != RDBMagic {
return fmt.Errorf("invalid RDB magic: %s", magic)
}
version := make([]byte, 4)
if _, err := io.ReadFull(r.file, version); err != nil {
return err
}
// Update checksum
r.checksum = crc64.Update(r.checksum, r.crc, magic)
r.checksum = crc64.Update(r.checksum, r.crc, version)
return nil
}
// ReadEntry reads the next entry from the RDB file
func (r *RDBReader) ReadEntry() (string, interface{}, error) {
typeByte, err := r.readByte()
if err != nil {
return "", nil, err
}
switch typeByte {
case OpCodeEOF:
return "", nil, io.EOF
case OpCodeDB:
// Skip database number
r.readLength()
return r.ReadEntry()
case TypeString:
key, err := r.readString()
if err != nil {
return "", nil, err
}
value, err := r.readBytes()
if err != nil {
return "", nil, err
}
return key, value, nil
case TypeList:
return r.readList()
case TypeSet:
return r.readSet()
case TypeHash:
return r.readHash()
case TypeZSet:
return r.readZSet()
default:
return "", nil, fmt.Errorf("unknown type: %d", typeByte)
}
}
func (r *RDBReader) readList() (string, interface{}, error) {
key, err := r.readString()
if err != nil {
return "", nil, err
}
length, err := r.readLength()
if err != nil {
return "", nil, err
}
values := make([][]byte, length)
for i := 0; i < length; i++ {
values[i], err = r.readBytes()
if err != nil {
return "", nil, err
}
}
return key, values, nil
}
func (r *RDBReader) readSet() (string, interface{}, error) {
key, err := r.readString()
if err != nil {
return "", nil, err
}
length, err := r.readLength()
if err != nil {
return "", nil, err
}
members := make([]string, length)
for i := 0; i < length; i++ {
members[i], err = r.readString()
if err != nil {
return "", nil, err
}
}
return key, members, nil
}
func (r *RDBReader) readHash() (string, interface{}, error) {
key, err := r.readString()
if err != nil {
return "", nil, err
}
length, err := r.readLength()
if err != nil {
return "", nil, err
}
fields := make(map[string][]byte, length)
for i := 0; i < length; i++ {
field, err := r.readString()
if err != nil {
return "", nil, err
}
value, err := r.readBytes()
if err != nil {
return "", nil, err
}
fields[field] = value
}
return key, fields, nil
}
// Helper methods
func (r *RDBReader) readByte() (byte, error) {
buf := make([]byte, 1)
if _, err := io.ReadFull(r.file, buf); err != nil {
return 0, err
}
r.checksum = crc64.Update(r.checksum, r.crc, buf)
return buf[0], nil
}
func (r *RDBReader) readLength() (int, error) {
first, err := r.readByte()
if err != nil {
return 0, err
}
switch first >> 6 {
case 0:
return int(first & 0x3F), nil
case 1:
second, err := r.readByte()
if err != nil {
return 0, err
}
return int(first&0x3F)<<8 | int(second), nil
case 2:
buf := make([]byte, 4)
if _, err := io.ReadFull(r.file, buf); err != nil {
return 0, err
}
r.checksum = crc64.Update(r.checksum, r.crc, buf)
return int(binary.BigEndian.Uint32(buf)), nil
default:
return 0, fmt.Errorf("invalid length encoding")
}
}
func (r *RDBReader) readString() (string, error) {
length, err := r.readLength()
if err != nil {
return "", err
}
buf := make([]byte, length)
if _, err := io.ReadFull(r.file, buf); err != nil {
return "", err
}
r.checksum = crc64.Update(r.checksum, r.crc, buf)
return string(buf), nil
}
func (r *RDBReader) readBytes() ([]byte, error) {
length, err := r.readLength()
if err != nil {
return nil, err
}
buf := make([]byte, length)
if _, err := io.ReadFull(r.file, buf); err != nil {
return nil, err
}
r.checksum = crc64.Update(r.checksum, r.crc, buf)
return buf, nil
}
func (r *RDBReader) Close() error {
return r.file.Close()
}
Part 2: Background Saves (BGSAVE)
Redis forks to create snapshots without blocking the main process.internal/persistence/bgsave.go
Copy
package persistence
import (
"log"
"sync"
"time"
)
// BackgroundSaver handles background RDB saves
type BackgroundSaver struct {
store *store.Store
path string
lastSave time.Time
saving bool
mu sync.Mutex
// Auto-save conditions
saveAfter map[int]int // changes -> seconds
}
// NewBackgroundSaver creates a new background saver
func NewBackgroundSaver(store *store.Store, path string) *BackgroundSaver {
return &BackgroundSaver{
store: store,
path: path,
lastSave: time.Now(),
saveAfter: map[int]int{
1: 900, // After 900 sec if at least 1 change
10: 300, // After 300 sec if at least 10 changes
10000: 60, // After 60 sec if at least 10000 changes
},
}
}
// Save performs a background save
func (bs *BackgroundSaver) Save() error {
bs.mu.Lock()
if bs.saving {
bs.mu.Unlock()
return fmt.Errorf("save already in progress")
}
bs.saving = true
bs.mu.Unlock()
defer func() {
bs.mu.Lock()
bs.saving = false
bs.lastSave = time.Now()
bs.mu.Unlock()
}()
// In Go, we can't fork like C, so we use a goroutine
// For true copy-on-write, you'd need to use OS-specific calls
return bs.saveSnapshot()
}
func (bs *BackgroundSaver) saveSnapshot() error {
writer, err := NewRDBWriter(bs.path)
if err != nil {
return err
}
if err := writer.WriteHeader(); err != nil {
return err
}
if err := writer.SelectDB(0); err != nil {
return err
}
// Iterate over all keys
bs.store.ForEach(func(key string, value interface{}) {
switch v := value.(type) {
case []byte:
writer.WriteString(key, v)
case *store.List:
writer.WriteList(key, v.LRange(0, -1))
case *store.Set:
writer.WriteSet(key, v.Members())
case *store.Hash:
writer.WriteHash(key, v.GetAll())
case *store.ZSet:
writer.WriteZSet(key, v.Range(0, -1, true))
}
})
if err := writer.WriteFooter(); err != nil {
return err
}
return writer.Close(bs.path)
}
// StartAutoSave starts the automatic save checker
func (bs *BackgroundSaver) StartAutoSave() {
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
bs.checkAutoSave()
}
}()
}
func (bs *BackgroundSaver) checkAutoSave() {
bs.mu.Lock()
if bs.saving {
bs.mu.Unlock()
return
}
elapsed := int(time.Since(bs.lastSave).Seconds())
changes := bs.store.ChangesSinceLastSave()
bs.mu.Unlock()
for requiredChanges, requiredSeconds := range bs.saveAfter {
if changes >= requiredChanges && elapsed >= requiredSeconds {
log.Printf("Auto-save triggered: %d changes in %d seconds", changes, elapsed)
bs.Save()
return
}
}
}
Part 3: Append-Only File (AOF)
AOF logs every write command for maximum durability.AOF Writer
internal/persistence/aof.go
Copy
package persistence
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
// AOFSyncPolicy defines when to sync AOF to disk
type AOFSyncPolicy int
const (
SyncAlways AOFSyncPolicy = iota // Sync after every write
SyncEverySec // Sync every second
SyncNo // Let OS decide
)
// AOFWriter handles append-only file operations
type AOFWriter struct {
file *os.File
writer *bufio.Writer
policy AOFSyncPolicy
mu sync.Mutex
// For SyncEverySec
lastSync time.Time
}
// NewAOFWriter opens or creates an AOF file
func NewAOFWriter(path string, policy AOFSyncPolicy) (*AOFWriter, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
aof := &AOFWriter{
file: file,
writer: bufio.NewWriter(file),
policy: policy,
lastSync: time.Now(),
}
if policy == SyncEverySec {
go aof.syncLoop()
}
return aof, nil
}
// AppendCommand logs a command to AOF in RESP format
func (a *AOFWriter) AppendCommand(args []string) error {
a.mu.Lock()
defer a.mu.Unlock()
// Write as RESP array
fmt.Fprintf(a.writer, "*%d\r\n", len(args))
for _, arg := range args {
fmt.Fprintf(a.writer, "$%d\r\n%s\r\n", len(arg), arg)
}
if a.policy == SyncAlways {
if err := a.writer.Flush(); err != nil {
return err
}
return a.file.Sync()
}
return nil
}
func (a *AOFWriter) syncLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
a.mu.Lock()
if time.Since(a.lastSync) >= time.Second {
a.writer.Flush()
a.file.Sync()
a.lastSync = time.Now()
}
a.mu.Unlock()
}
}
// Close closes the AOF file
func (a *AOFWriter) Close() error {
a.mu.Lock()
defer a.mu.Unlock()
a.writer.Flush()
a.file.Sync()
return a.file.Close()
}
AOF Loader
internal/persistence/aof_loader.go
Copy
package persistence
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
)
// AOFLoader replays AOF commands to rebuild state
type AOFLoader struct {
file *os.File
reader *bufio.Reader
}
// NewAOFLoader opens an AOF file for reading
func NewAOFLoader(path string) (*AOFLoader, error) {
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // No AOF file, that's okay
}
return nil, err
}
return &AOFLoader{
file: file,
reader: bufio.NewReader(file),
}, nil
}
// ReadCommand reads the next command from AOF
func (l *AOFLoader) ReadCommand() ([]string, error) {
// Read array marker
line, err := l.reader.ReadString('\n')
if err != nil {
if err == io.EOF {
return nil, io.EOF
}
return nil, err
}
line = strings.TrimSuffix(line, "\r\n")
if !strings.HasPrefix(line, "*") {
return nil, fmt.Errorf("expected array, got: %s", line)
}
count, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
args := make([]string, count)
for i := 0; i < count; i++ {
// Read bulk string marker
line, err := l.reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSuffix(line, "\r\n")
if !strings.HasPrefix(line, "$") {
return nil, fmt.Errorf("expected bulk string, got: %s", line)
}
length, err := strconv.Atoi(line[1:])
if err != nil {
return nil, err
}
// Read the actual string
buf := make([]byte, length+2) // +2 for \r\n
if _, err := io.ReadFull(l.reader, buf); err != nil {
return nil, err
}
args[i] = string(buf[:length])
}
return args, nil
}
// Close closes the AOF file
func (l *AOFLoader) Close() error {
return l.file.Close()
}
// ReplayAOF replays all commands from AOF to rebuild state
func ReplayAOF(path string, handler func([]string) error) (int, error) {
loader, err := NewAOFLoader(path)
if err != nil {
return 0, err
}
if loader == nil {
return 0, nil // No AOF file
}
defer loader.Close()
count := 0
for {
cmd, err := loader.ReadCommand()
if err == io.EOF {
break
}
if err != nil {
return count, err
}
if err := handler(cmd); err != nil {
return count, err
}
count++
}
return count, nil
}
Part 4: AOF Rewriting
Over time, AOF files grow. Rewriting compacts them:Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ AOF REWRITING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ BEFORE (AOF file): AFTER (Rewritten): │
│ ───────────────── ───────────────── │
│ SET counter 1 SET counter 100 │
│ INCR counter SET name "final" │
│ INCR counter RPUSH list "a" "b" "c" │
│ SET name "foo" │
│ INCR counter Much smaller! │
│ ... Only final state. │
│ SET name "final" │
│ INCR counter (x97 more) │
│ RPUSH list "a" │
│ RPUSH list "b" │
│ RPUSH list "c" │
│ │
│ Rewriting creates a new AOF from current database state, │
│ not by reading the old AOF. │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
internal/persistence/aof_rewrite.go
Copy
package persistence
import (
"fmt"
"os"
)
// AOFRewriter handles AOF compaction
type AOFRewriter struct {
store *store.Store
path string
}
// NewAOFRewriter creates a new AOF rewriter
func NewAOFRewriter(store *store.Store, path string) *AOFRewriter {
return &AOFRewriter{store: store, path: path}
}
// Rewrite creates a new compacted AOF file
func (r *AOFRewriter) Rewrite() error {
tmpPath := r.path + ".rewrite"
// Create new AOF with current state
writer, err := NewAOFWriter(tmpPath, SyncNo)
if err != nil {
return err
}
// Write current state as commands
r.store.ForEach(func(key string, value interface{}) {
switch v := value.(type) {
case []byte:
writer.AppendCommand([]string{"SET", key, string(v)})
case *store.List:
elements := v.LRange(0, -1)
if len(elements) > 0 {
args := []string{"RPUSH", key}
for _, e := range elements {
args = append(args, string(e))
}
writer.AppendCommand(args)
}
case *store.Set:
members := v.Members()
if len(members) > 0 {
args := []string{"SADD", key}
args = append(args, members...)
writer.AppendCommand(args)
}
case *store.Hash:
fields := v.GetAll()
if len(fields) > 0 {
args := []string{"HSET", key}
for field, val := range fields {
args = append(args, field, string(val))
}
writer.AppendCommand(args)
}
case *store.ZSet:
members := v.Range(0, -1, true)
if len(members) > 0 {
args := []string{"ZADD", key}
for _, m := range members {
args = append(args, fmt.Sprintf("%g", m.Score), m.Member)
}
writer.AppendCommand(args)
}
}
})
writer.Close()
// Atomic rename
return os.Rename(tmpPath, r.path)
}
Part 5: Integrating Persistence
internal/server/server.go
Copy
package server
import (
"log"
)
type Server struct {
store *store.Store
aof *persistence.AOFWriter
rdb *persistence.BackgroundSaver
}
func NewServer(config Config) (*Server, error) {
store := store.NewStore()
// Load existing data
if config.AOFEnabled {
count, err := persistence.ReplayAOF(config.AOFPath, func(cmd []string) error {
// Execute command on store
return store.ExecuteCommand(cmd)
})
if err != nil {
return nil, err
}
log.Printf("Loaded %d commands from AOF", count)
} else if config.RDBEnabled {
// Load from RDB
if err := loadRDB(config.RDBPath, store); err != nil {
log.Printf("Warning: could not load RDB: %v", err)
}
}
s := &Server{store: store}
// Initialize AOF
if config.AOFEnabled {
aof, err := persistence.NewAOFWriter(config.AOFPath, config.AOFSyncPolicy)
if err != nil {
return nil, err
}
s.aof = aof
}
// Initialize RDB background saver
if config.RDBEnabled {
s.rdb = persistence.NewBackgroundSaver(store, config.RDBPath)
s.rdb.StartAutoSave()
}
return s, nil
}
// HandleCommand processes a command and logs to AOF if needed
func (s *Server) HandleCommand(cmd []string) (interface{}, error) {
result, err := s.store.ExecuteCommand(cmd)
if err != nil {
return nil, err
}
// Log write commands to AOF
if s.aof != nil && isWriteCommand(cmd[0]) {
s.aof.AppendCommand(cmd)
}
return result, nil
}
func isWriteCommand(cmd string) bool {
switch strings.ToUpper(cmd) {
case "SET", "DEL", "INCR", "DECR",
"LPUSH", "RPUSH", "LPOP", "RPOP",
"SADD", "SREM",
"HSET", "HDEL",
"ZADD", "ZREM":
return true
}
return false
}
Exercises
Exercise 1: Implement Expiration Persistence
Exercise 1: Implement Expiration Persistence
Persist key expiration times:
Copy
// In RDB: Store expire time before key-value
// OpCodeExpire + timestamp + key + value
// In AOF: Log EXPIRE commands
// Or use SETEX format
Exercise 2: Implement BGREWRITEAOF
Exercise 2: Implement BGREWRITEAOF
Add the BGREWRITEAOF command:
Copy
// 1. Create new AOF in background
// 2. Buffer new commands during rewrite
// 3. Append buffer to new AOF
// 4. Atomic swap
Exercise 3: Implement RDB Compression
Exercise 3: Implement RDB Compression
Add LZF compression to RDB:
Copy
// Compress string values > 64 bytes
// Use special type marker for compressed strings
// TypeCompressedString = 0xC0
Key Takeaways
RDB Snapshots
Point-in-time backups, compact but may lose recent data
AOF Logging
Every write logged, durable but larger files
Checksums
CRC64 ensures data integrity
AOF Rewriting
Compact AOF by rebuilding from current state
What’s Next?
In Chapter 5: Pub/Sub, we’ll implement:- Publish/Subscribe messaging
- Pattern subscriptions
- Real-time event streaming
Next: Pub/Sub
Implement real-time messaging