Chapter 2: TCP Server
Now that we have a RESP parser, let’s build the network layer. We’ll create a TCP server that accepts client connections and processes commands concurrently.Prerequisites: Chapter 1: RESP Protocol
Further Reading: Networking
Time: 2-3 hours
Outcome: A TCP server that responds to PING with PONG
Further Reading: Networking
Time: 2-3 hours
Outcome: A TCP server that responds to PING with PONG
Server Architecture
Copy
┌─────────────────────────────────────────────────────────────────────────────┐
│ REDIS SERVER ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ Client 1 │───┐ │
│ └──────────────┘ │ │
│ │ ┌─────────────────────────────────────────────┐ │
│ ┌──────────────┐ │ │ MAIN PROCESS │ │
│ │ Client 2 │───┼───►│ │ │
│ └──────────────┘ │ │ ┌─────────────┐ ┌────────────────┐ │ │
│ │ │ │ Accept │ │ Command │ │ │
│ ┌──────────────┐ │ │ │ Loop │───►│ Handler │ │ │
│ │ Client 3 │───┘ │ └─────────────┘ └───────┬────────┘ │ │
│ └──────────────┘ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ Data Store │ │ │
│ │ │ (in-memory) │ │ │
│ │ └────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ DESIGN CHOICE: One goroutine per connection (simple & scalable) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Why Go for Redis?
Go is perfect for building Redis because:- Goroutines: Lightweight threads for handling thousands of connections
- Channels: Safe communication between goroutines
- net package: Excellent networking primitives
- Sync package: Mutexes, atomic operations for thread-safety
Real Redis is single-threaded for simplicity. Our Go version uses goroutines but will still serialize writes to ensure consistency.
Implementation
Step 1: Server Structure
internal/server/server.go
Copy
package server
import (
"fmt"
"io"
"log"
"net"
"sync"
"sync/atomic"
"myredis/internal/protocol"
)
// Server represents a Redis server
type Server struct {
addr string
listener net.Listener
// Client management
clients map[uint64]*Client
clientsMu sync.RWMutex
clientID uint64
// Command handling
handlers map[string]CommandHandler
// Shared data store
store *Store
// Shutdown
shutdown chan struct{}
wg sync.WaitGroup
}
// CommandHandler is the function signature for command handlers
type CommandHandler func(client *Client, args []protocol.Value) protocol.Value
// NewServer creates a new Redis server
func NewServer(addr string) *Server {
s := &Server{
addr: addr,
clients: make(map[uint64]*Client),
handlers: make(map[string]CommandHandler),
store: NewStore(),
shutdown: make(chan struct{}),
}
// Register built-in commands
s.registerCommands()
return s
}
// RegisterCommand registers a command handler
func (s *Server) RegisterCommand(name string, handler CommandHandler) {
s.handlers[name] = handler
}
// Start begins listening for connections
func (s *Server) Start() error {
var err error
s.listener, err = net.Listen("tcp", s.addr)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", s.addr, err)
}
log.Printf("Redis server listening on %s", s.addr)
// Accept loop
for {
conn, err := s.listener.Accept()
if err != nil {
select {
case <-s.shutdown:
return nil
default:
log.Printf("Accept error: %v", err)
continue
}
}
// Handle each connection in a goroutine
s.wg.Add(1)
go s.handleConnection(conn)
}
}
// Stop gracefully shuts down the server
func (s *Server) Stop() {
close(s.shutdown)
s.listener.Close()
// Close all client connections
s.clientsMu.Lock()
for _, client := range s.clients {
client.Close()
}
s.clientsMu.Unlock()
s.wg.Wait()
log.Println("Server stopped")
}
func (s *Server) handleConnection(conn net.Conn) {
defer s.wg.Done()
// Create client
id := atomic.AddUint64(&s.clientID, 1)
client := NewClient(id, conn, s)
// Register client
s.clientsMu.Lock()
s.clients[id] = client
s.clientsMu.Unlock()
log.Printf("Client %d connected from %s", id, conn.RemoteAddr())
// Handle client
client.Handle()
// Unregister client
s.clientsMu.Lock()
delete(s.clients, id)
s.clientsMu.Unlock()
log.Printf("Client %d disconnected", id)
}
Step 2: Client Handler
internal/server/client.go
Copy
package server
import (
"io"
"net"
"strings"
"time"
"myredis/internal/protocol"
)
// Client represents a connected Redis client
type Client struct {
id uint64
conn net.Conn
server *Server
parser *protocol.Parser
writer *protocol.Writer
}
// NewClient creates a new client handler
func NewClient(id uint64, conn net.Conn, server *Server) *Client {
return &Client{
id: id,
conn: conn,
server: server,
parser: protocol.NewParser(conn),
writer: protocol.NewWriter(conn),
}
}
// Handle processes client commands in a loop
func (c *Client) Handle() {
defer c.conn.Close()
for {
// Set read deadline for idle timeout (optional)
c.conn.SetReadDeadline(time.Now().Add(5 * time.Minute))
// Parse next command
cmd, err := c.parser.Parse()
if err != nil {
if err != io.EOF {
// Only log if it's not a clean disconnect
if !isConnectionClosed(err) {
c.respond(protocol.NewError("ERR " + err.Error()))
}
}
return
}
// Process and respond
response := c.processCommand(cmd)
if err := c.respond(response); err != nil {
return
}
}
}
// processCommand routes the command to the appropriate handler
func (c *Client) processCommand(cmd protocol.Value) protocol.Value {
// Commands must be arrays
if cmd.Type != protocol.Array || len(cmd.Array) == 0 {
return protocol.NewError("ERR invalid command format")
}
// First element is the command name
cmdName := strings.ToUpper(cmd.Array[0].AsString())
args := cmd.Array[1:]
// Look up handler
handler, exists := c.server.handlers[cmdName]
if !exists {
return protocol.NewError(
"ERR unknown command '" + cmdName + "'",
)
}
// Execute handler
return handler(c, args)
}
// respond sends a response to the client
func (c *Client) respond(val protocol.Value) error {
return c.writer.Write(val)
}
// Close closes the client connection
func (c *Client) Close() error {
return c.conn.Close()
}
// isConnectionClosed checks if the error indicates a closed connection
func isConnectionClosed(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "connection reset")
}
Step 3: In-Memory Store
internal/server/store.go
Copy
package server
import (
"sync"
"time"
)
// Entry represents a stored value with optional expiration
type Entry struct {
Value []byte
ExpireAt *time.Time
}
// IsExpired checks if the entry has expired
func (e *Entry) IsExpired() bool {
if e.ExpireAt == nil {
return false
}
return time.Now().After(*e.ExpireAt)
}
// Store is the in-memory key-value store
type Store struct {
data map[string]*Entry
mu sync.RWMutex
}
// NewStore creates a new store
func NewStore() *Store {
s := &Store{
data: make(map[string]*Entry),
}
// Start background expiration cleanup
go s.expireLoop()
return s
}
// Get retrieves a value by key
func (s *Store) Get(key string) ([]byte, bool) {
s.mu.RLock()
entry, exists := s.data[key]
s.mu.RUnlock()
if !exists {
return nil, false
}
// Check expiration (lazy expiration)
if entry.IsExpired() {
s.Delete(key)
return nil, false
}
return entry.Value, true
}
// Set stores a value with optional TTL
func (s *Store) Set(key string, value []byte, ttl *time.Duration) {
entry := &Entry{Value: value}
if ttl != nil {
expireAt := time.Now().Add(*ttl)
entry.ExpireAt = &expireAt
}
s.mu.Lock()
s.data[key] = entry
s.mu.Unlock()
}
// Delete removes a key
func (s *Store) Delete(keys ...string) int {
s.mu.Lock()
defer s.mu.Unlock()
deleted := 0
for _, key := range keys {
if _, exists := s.data[key]; exists {
delete(s.data, key)
deleted++
}
}
return deleted
}
// Exists checks if keys exist
func (s *Store) Exists(keys ...string) int {
s.mu.RLock()
defer s.mu.RUnlock()
count := 0
for _, key := range keys {
if entry, exists := s.data[key]; exists && !entry.IsExpired() {
count++
}
}
return count
}
// Keys returns all keys matching a pattern (simple implementation)
func (s *Store) Keys(pattern string) []string {
s.mu.RLock()
defer s.mu.RUnlock()
var keys []string
for key, entry := range s.data {
if !entry.IsExpired() && matchPattern(pattern, key) {
keys = append(keys, key)
}
}
return keys
}
// expireLoop periodically cleans up expired keys
func (s *Store) expireLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
s.cleanupExpired()
}
}
func (s *Store) cleanupExpired() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for key, entry := range s.data {
if entry.ExpireAt != nil && now.After(*entry.ExpireAt) {
delete(s.data, key)
}
}
}
// matchPattern implements simple glob matching (* only)
func matchPattern(pattern, key string) bool {
if pattern == "*" {
return true
}
// Simple prefix match for pattern like "user:*"
if len(pattern) > 0 && pattern[len(pattern)-1] == '*' {
prefix := pattern[:len(pattern)-1]
return len(key) >= len(prefix) && key[:len(prefix)] == prefix
}
return pattern == key
}
Step 4: Register Commands
internal/server/commands.go
Copy
package server
import (
"strconv"
"strings"
"time"
"myredis/internal/protocol"
)
// registerCommands sets up all built-in command handlers
func (s *Server) registerCommands() {
// Connection commands
s.RegisterCommand("PING", s.cmdPing)
s.RegisterCommand("ECHO", s.cmdEcho)
s.RegisterCommand("QUIT", s.cmdQuit)
// String commands
s.RegisterCommand("GET", s.cmdGet)
s.RegisterCommand("SET", s.cmdSet)
s.RegisterCommand("DEL", s.cmdDel)
s.RegisterCommand("EXISTS", s.cmdExists)
s.RegisterCommand("KEYS", s.cmdKeys)
// Server commands
s.RegisterCommand("INFO", s.cmdInfo)
s.RegisterCommand("DBSIZE", s.cmdDbSize)
s.RegisterCommand("FLUSHDB", s.cmdFlushDb)
}
// PING [message]
func (s *Server) cmdPing(c *Client, args []protocol.Value) protocol.Value {
if len(args) == 0 {
return protocol.PONG
}
return protocol.NewBulkString(args[0].AsString())
}
// ECHO message
func (s *Server) cmdEcho(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 1 {
return protocol.NewError("ERR wrong number of arguments for 'echo' command")
}
return protocol.NewBulkString(args[0].AsString())
}
// QUIT
func (s *Server) cmdQuit(c *Client, args []protocol.Value) protocol.Value {
c.Close()
return protocol.OK
}
// GET key
func (s *Server) cmdGet(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 1 {
return protocol.NewError("ERR wrong number of arguments for 'get' command")
}
key := args[0].AsString()
value, exists := s.store.Get(key)
if !exists {
return protocol.NullBulk
}
return protocol.NewBulkBytes(value)
}
// SET key value [EX seconds] [PX milliseconds] [NX|XX]
func (s *Server) cmdSet(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 2 {
return protocol.NewError("ERR wrong number of arguments for 'set' command")
}
key := args[0].AsString()
value := args[1].AsBytes()
var ttl *time.Duration
nx := false // Only set if not exists
xx := false // Only set if exists
// Parse optional arguments
for i := 2; i < len(args); i++ {
opt := strings.ToUpper(args[i].AsString())
switch opt {
case "EX":
if i+1 >= len(args) {
return protocol.NewError("ERR syntax error")
}
seconds, err := strconv.Atoi(args[i+1].AsString())
if err != nil {
return protocol.NewError("ERR value is not an integer")
}
d := time.Duration(seconds) * time.Second
ttl = &d
i++
case "PX":
if i+1 >= len(args) {
return protocol.NewError("ERR syntax error")
}
ms, err := strconv.Atoi(args[i+1].AsString())
if err != nil {
return protocol.NewError("ERR value is not an integer")
}
d := time.Duration(ms) * time.Millisecond
ttl = &d
i++
case "NX":
nx = true
case "XX":
xx = true
}
}
// Check NX/XX conditions
_, exists := s.store.Get(key)
if nx && exists {
return protocol.NullBulk
}
if xx && !exists {
return protocol.NullBulk
}
s.store.Set(key, value, ttl)
return protocol.OK
}
// DEL key [key ...]
func (s *Server) cmdDel(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 1 {
return protocol.NewError("ERR wrong number of arguments for 'del' command")
}
keys := make([]string, len(args))
for i, arg := range args {
keys[i] = arg.AsString()
}
deleted := s.store.Delete(keys...)
return protocol.NewInteger(int64(deleted))
}
// EXISTS key [key ...]
func (s *Server) cmdExists(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 1 {
return protocol.NewError("ERR wrong number of arguments for 'exists' command")
}
keys := make([]string, len(args))
for i, arg := range args {
keys[i] = arg.AsString()
}
count := s.store.Exists(keys...)
return protocol.NewInteger(int64(count))
}
// KEYS pattern
func (s *Server) cmdKeys(c *Client, args []protocol.Value) protocol.Value {
if len(args) < 1 {
return protocol.NewError("ERR wrong number of arguments for 'keys' command")
}
pattern := args[0].AsString()
keys := s.store.Keys(pattern)
result := make([]protocol.Value, len(keys))
for i, key := range keys {
result[i] = protocol.NewBulkString(key)
}
return protocol.NewArray(result)
}
// INFO [section]
func (s *Server) cmdInfo(c *Client, args []protocol.Value) protocol.Value {
info := "# Server\r\nredis_version:0.1.0\r\nmyredis_version:1.0\r\n"
return protocol.NewBulkString(info)
}
// DBSIZE
func (s *Server) cmdDbSize(c *Client, args []protocol.Value) protocol.Value {
count := len(s.store.Keys("*"))
return protocol.NewInteger(int64(count))
}
// FLUSHDB
func (s *Server) cmdFlushDb(c *Client, args []protocol.Value) protocol.Value {
keys := s.store.Keys("*")
s.store.Delete(keys...)
return protocol.OK
}
Step 5: Main Entry Point
cmd/server/main.go
Copy
package main
import (
"flag"
"log"
"os"
"os/signal"
"syscall"
"myredis/internal/server"
)
func main() {
port := flag.String("port", "6379", "Port to listen on")
flag.Parse()
addr := ":" + *port
srv := server.NewServer(addr)
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutting down...")
srv.Stop()
}()
// Start server
if err := srv.Start(); err != nil {
log.Fatalf("Server error: %v", err)
}
}
Testing Your Server
Build and Run
Copy
go build -o myredis ./cmd/server
./myredis
# Redis server listening on :6379
Test with redis-cli
Copy
# In another terminal
redis-cli
> PING
PONG
> SET name "Alice"
OK
> GET name
"Alice"
> SET counter 100 EX 10
OK
> GET counter
"100"
# Wait 10 seconds...
> GET counter
(nil)
Exercises
Exercise 1: Add INCR/DECR commands
Exercise 1: Add INCR/DECR commands
Implement atomic increment/decrement:
Copy
// INCR key
// DECR key
// INCRBY key increment
// DECRBY key decrement
// These should:
// 1. Parse the value as an integer
// 2. Increment/decrement atomically
// 3. Return the new value
// 4. Create the key with value 0 if it doesn't exist
Exercise 2: Add TTL/EXPIRE commands
Exercise 2: Add TTL/EXPIRE commands
Copy
// TTL key - Return remaining TTL in seconds
// PTTL key - Return remaining TTL in milliseconds
// EXPIRE key seconds - Set expiration
// PEXPIRE key milliseconds - Set expiration in ms
// PERSIST key - Remove expiration
Exercise 3: Connection pooling
Exercise 3: Connection pooling
Create a client connection pool:
Copy
type Pool struct {
maxIdle int
maxActive int
conns chan net.Conn
}
func (p *Pool) Get() (net.Conn, error)
func (p *Pool) Put(conn net.Conn)
Key Takeaways
Goroutine per Connection
Go makes concurrent connection handling simple and efficient
Thread-Safe Store
Use sync.RWMutex for concurrent read access, exclusive writes
Lazy Expiration
Check expiration on access + background cleanup
Command Routing
Map command names to handler functions for clean design
Further Reading
Concurrency Patterns
Learn more about concurrent programming patterns
Linux Networking
Understand TCP/IP at the kernel level
What’s Next?
In Chapter 3: Data Structures, we’ll implement:- Lists (LPUSH, RPUSH, LPOP, LRANGE)
- Sets (SADD, SMEMBERS, SINTER)
- Hashes (HSET, HGET, HGETALL)
- Sorted Sets with skip lists
Next: Data Structures
Implement Redis data structures beyond strings