Skip to main content

Project: Key-Value Store

Build a production-quality key-value store with persistence, memory management, and basic indexing. This project teaches file I/O, data structures, and systems design.

Features

1

In-Memory Storage

Hash table for fast O(1) lookups
2

Persistence

Write-ahead log and snapshots
3

Variable-Size Values

Store arbitrary binary data
4

Concurrent Access

Thread-safe operations

Data Structures

// kvstore.h
#ifndef KVSTORE_H
#define KVSTORE_H

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>

#define MAX_KEY_SIZE 256
#define INITIAL_CAPACITY 1024
#define LOAD_FACTOR 0.75

typedef struct Entry {
    char *key;
    void *value;
    size_t value_size;
    struct Entry *next;
    uint64_t version;      // For optimistic locking
    time_t created_at;
    time_t updated_at;
} Entry;

typedef struct {
    Entry **buckets;
    size_t capacity;
    size_t size;
    pthread_rwlock_t lock;
    
    // Persistence
    FILE *wal;           // Write-ahead log
    char *data_dir;
    uint64_t version;    // Global version counter
} KVStore;

// Core operations
KVStore *kv_create(const char *data_dir);
void kv_destroy(KVStore *store);

int kv_put(KVStore *store, const char *key, const void *value, size_t size);
int kv_get(KVStore *store, const char *key, void **value, size_t *size);
int kv_delete(KVStore *store, const char *key);
bool kv_exists(KVStore *store, const char *key);

// Persistence
int kv_save(KVStore *store);
int kv_load(KVStore *store);

// Iteration
typedef void (*kv_callback)(const char *key, const void *value, size_t size, void *ctx);
void kv_foreach(KVStore *store, kv_callback cb, void *ctx);

// Stats
typedef struct {
    size_t count;
    size_t memory_used;
    size_t bucket_count;
    double load_factor;
} KVStats;

KVStats kv_stats(KVStore *store);

#endif

Hash Table Implementation

// hashtable.c
#include "kvstore.h"

// FNV-1a hash
static uint64_t hash_key(const char *key) {
    uint64_t hash = 0xcbf29ce484222325ULL;
    while (*key) {
        hash ^= (uint8_t)*key++;
        hash *= 0x100000001b3ULL;
    }
    return hash;
}

static Entry *entry_create(const char *key, const void *value, size_t size) {
    Entry *entry = calloc(1, sizeof(Entry));
    if (!entry) return NULL;
    
    entry->key = strdup(key);
    entry->value = malloc(size);
    
    if (!entry->key || !entry->value) {
        free(entry->key);
        free(entry->value);
        free(entry);
        return NULL;
    }
    
    memcpy(entry->value, value, size);
    entry->value_size = size;
    entry->created_at = time(NULL);
    entry->updated_at = entry->created_at;
    
    return entry;
}

static void entry_destroy(Entry *entry) {
    if (entry) {
        free(entry->key);
        free(entry->value);
        free(entry);
    }
}

static int resize_if_needed(KVStore *store) {
    double load = (double)store->size / store->capacity;
    if (load < LOAD_FACTOR) return 0;
    
    size_t new_capacity = store->capacity * 2;
    Entry **new_buckets = calloc(new_capacity, sizeof(Entry*));
    if (!new_buckets) return -1;
    
    // Rehash all entries
    for (size_t i = 0; i < store->capacity; i++) {
        Entry *entry = store->buckets[i];
        while (entry) {
            Entry *next = entry->next;
            size_t new_index = hash_key(entry->key) % new_capacity;
            entry->next = new_buckets[new_index];
            new_buckets[new_index] = entry;
            entry = next;
        }
    }
    
    free(store->buckets);
    store->buckets = new_buckets;
    store->capacity = new_capacity;
    
    return 0;
}

KVStore *kv_create(const char *data_dir) {
    KVStore *store = calloc(1, sizeof(KVStore));
    if (!store) return NULL;
    
    store->buckets = calloc(INITIAL_CAPACITY, sizeof(Entry*));
    if (!store->buckets) {
        free(store);
        return NULL;
    }
    
    store->capacity = INITIAL_CAPACITY;
    store->data_dir = data_dir ? strdup(data_dir) : NULL;
    
    pthread_rwlock_init(&store->lock, NULL);
    
    // Open WAL if data directory specified
    if (data_dir) {
        char wal_path[512];
        snprintf(wal_path, sizeof(wal_path), "%s/wal.log", data_dir);
        store->wal = fopen(wal_path, "a+b");
    }
    
    return store;
}

void kv_destroy(KVStore *store) {
    if (!store) return;
    
    pthread_rwlock_wrlock(&store->lock);
    
    for (size_t i = 0; i < store->capacity; i++) {
        Entry *entry = store->buckets[i];
        while (entry) {
            Entry *next = entry->next;
            entry_destroy(entry);
            entry = next;
        }
    }
    
    free(store->buckets);
    free(store->data_dir);
    if (store->wal) fclose(store->wal);
    
    pthread_rwlock_unlock(&store->lock);
    pthread_rwlock_destroy(&store->lock);
    
    free(store);
}

Core Operations

// operations.c
#include "kvstore.h"

int kv_put(KVStore *store, const char *key, const void *value, size_t size) {
    if (!store || !key || !value || size == 0) return -1;
    if (strlen(key) >= MAX_KEY_SIZE) return -1;
    
    pthread_rwlock_wrlock(&store->lock);
    
    // Check if we need to resize
    resize_if_needed(store);
    
    size_t index = hash_key(key) % store->capacity;
    
    // Look for existing entry
    Entry *entry = store->buckets[index];
    while (entry) {
        if (strcmp(entry->key, key) == 0) {
            // Update existing
            void *new_value = malloc(size);
            if (!new_value) {
                pthread_rwlock_unlock(&store->lock);
                return -1;
            }
            
            memcpy(new_value, value, size);
            free(entry->value);
            entry->value = new_value;
            entry->value_size = size;
            entry->updated_at = time(NULL);
            entry->version++;
            
            // Write to WAL
            if (store->wal) {
                // WAL entry format: op(1) | key_len(2) | key | value_len(4) | value
                uint8_t op = 'P';  // Put
                uint16_t key_len = strlen(key);
                uint32_t val_len = size;
                
                fwrite(&op, 1, 1, store->wal);
                fwrite(&key_len, sizeof(key_len), 1, store->wal);
                fwrite(key, 1, key_len, store->wal);
                fwrite(&val_len, sizeof(val_len), 1, store->wal);
                fwrite(value, 1, size, store->wal);
                fflush(store->wal);
            }
            
            store->version++;
            pthread_rwlock_unlock(&store->lock);
            return 0;
        }
        entry = entry->next;
    }
    
    // Create new entry
    Entry *new_entry = entry_create(key, value, size);
    if (!new_entry) {
        pthread_rwlock_unlock(&store->lock);
        return -1;
    }
    
    new_entry->version = store->version++;
    new_entry->next = store->buckets[index];
    store->buckets[index] = new_entry;
    store->size++;
    
    // Write to WAL
    if (store->wal) {
        uint8_t op = 'P';
        uint16_t key_len = strlen(key);
        uint32_t val_len = size;
        
        fwrite(&op, 1, 1, store->wal);
        fwrite(&key_len, sizeof(key_len), 1, store->wal);
        fwrite(key, 1, key_len, store->wal);
        fwrite(&val_len, sizeof(val_len), 1, store->wal);
        fwrite(value, 1, size, store->wal);
        fflush(store->wal);
    }
    
    pthread_rwlock_unlock(&store->lock);
    return 0;
}

int kv_get(KVStore *store, const char *key, void **value, size_t *size) {
    if (!store || !key || !value || !size) return -1;
    
    pthread_rwlock_rdlock(&store->lock);
    
    size_t index = hash_key(key) % store->capacity;
    Entry *entry = store->buckets[index];
    
    while (entry) {
        if (strcmp(entry->key, key) == 0) {
            *value = malloc(entry->value_size);
            if (!*value) {
                pthread_rwlock_unlock(&store->lock);
                return -1;
            }
            
            memcpy(*value, entry->value, entry->value_size);
            *size = entry->value_size;
            
            pthread_rwlock_unlock(&store->lock);
            return 0;
        }
        entry = entry->next;
    }
    
    pthread_rwlock_unlock(&store->lock);
    return -1;  // Not found
}

int kv_delete(KVStore *store, const char *key) {
    if (!store || !key) return -1;
    
    pthread_rwlock_wrlock(&store->lock);
    
    size_t index = hash_key(key) % store->capacity;
    Entry *entry = store->buckets[index];
    Entry *prev = NULL;
    
    while (entry) {
        if (strcmp(entry->key, key) == 0) {
            if (prev) {
                prev->next = entry->next;
            } else {
                store->buckets[index] = entry->next;
            }
            
            // Write to WAL
            if (store->wal) {
                uint8_t op = 'D';  // Delete
                uint16_t key_len = strlen(key);
                
                fwrite(&op, 1, 1, store->wal);
                fwrite(&key_len, sizeof(key_len), 1, store->wal);
                fwrite(key, 1, key_len, store->wal);
                fflush(store->wal);
            }
            
            entry_destroy(entry);
            store->size--;
            store->version++;
            
            pthread_rwlock_unlock(&store->lock);
            return 0;
        }
        prev = entry;
        entry = entry->next;
    }
    
    pthread_rwlock_unlock(&store->lock);
    return -1;  // Not found
}

bool kv_exists(KVStore *store, const char *key) {
    if (!store || !key) return false;
    
    pthread_rwlock_rdlock(&store->lock);
    
    size_t index = hash_key(key) % store->capacity;
    Entry *entry = store->buckets[index];
    
    while (entry) {
        if (strcmp(entry->key, key) == 0) {
            pthread_rwlock_unlock(&store->lock);
            return true;
        }
        entry = entry->next;
    }
    
    pthread_rwlock_unlock(&store->lock);
    return false;
}

Persistence

// persistence.c
#include "kvstore.h"
#include <sys/stat.h>

// Snapshot file format:
// Header: magic(4) | version(4) | count(8)
// Entries: key_len(2) | key | value_len(4) | value | timestamp(8)

#define SNAPSHOT_MAGIC 0x4B565354  // "KVST"
#define SNAPSHOT_VERSION 1

int kv_save(KVStore *store) {
    if (!store || !store->data_dir) return -1;
    
    char snapshot_path[512];
    snprintf(snapshot_path, sizeof(snapshot_path), "%s/data.snap", store->data_dir);
    
    char temp_path[512];
    snprintf(temp_path, sizeof(temp_path), "%s/data.snap.tmp", store->data_dir);
    
    pthread_rwlock_rdlock(&store->lock);
    
    FILE *f = fopen(temp_path, "wb");
    if (!f) {
        pthread_rwlock_unlock(&store->lock);
        return -1;
    }
    
    // Write header
    uint32_t magic = SNAPSHOT_MAGIC;
    uint32_t version = SNAPSHOT_VERSION;
    uint64_t count = store->size;
    
    fwrite(&magic, sizeof(magic), 1, f);
    fwrite(&version, sizeof(version), 1, f);
    fwrite(&count, sizeof(count), 1, f);
    
    // Write entries
    for (size_t i = 0; i < store->capacity; i++) {
        Entry *entry = store->buckets[i];
        while (entry) {
            uint16_t key_len = strlen(entry->key);
            uint32_t value_len = entry->value_size;
            uint64_t timestamp = entry->updated_at;
            
            fwrite(&key_len, sizeof(key_len), 1, f);
            fwrite(entry->key, 1, key_len, f);
            fwrite(&value_len, sizeof(value_len), 1, f);
            fwrite(entry->value, 1, value_len, f);
            fwrite(&timestamp, sizeof(timestamp), 1, f);
            
            entry = entry->next;
        }
    }
    
    fclose(f);
    pthread_rwlock_unlock(&store->lock);
    
    // Atomic rename
    if (rename(temp_path, snapshot_path) != 0) {
        unlink(temp_path);
        return -1;
    }
    
    // Clear WAL after successful snapshot
    if (store->wal) {
        char wal_path[512];
        snprintf(wal_path, sizeof(wal_path), "%s/wal.log", store->data_dir);
        
        fclose(store->wal);
        store->wal = fopen(wal_path, "wb");  // Truncate
    }
    
    return 0;
}

int kv_load(KVStore *store) {
    if (!store || !store->data_dir) return -1;
    
    char snapshot_path[512];
    snprintf(snapshot_path, sizeof(snapshot_path), "%s/data.snap", store->data_dir);
    
    FILE *f = fopen(snapshot_path, "rb");
    if (!f) {
        // No snapshot, try to replay WAL
        return kv_replay_wal(store);
    }
    
    // Read header
    uint32_t magic, version;
    uint64_t count;
    
    if (fread(&magic, sizeof(magic), 1, f) != 1 || magic != SNAPSHOT_MAGIC) {
        fclose(f);
        return -1;
    }
    
    fread(&version, sizeof(version), 1, f);
    fread(&count, sizeof(count), 1, f);
    
    // Read entries
    for (uint64_t i = 0; i < count; i++) {
        uint16_t key_len;
        uint32_t value_len;
        uint64_t timestamp;
        
        if (fread(&key_len, sizeof(key_len), 1, f) != 1) break;
        
        char *key = malloc(key_len + 1);
        fread(key, 1, key_len, f);
        key[key_len] = '\0';
        
        fread(&value_len, sizeof(value_len), 1, f);
        void *value = malloc(value_len);
        fread(value, 1, value_len, f);
        
        fread(&timestamp, sizeof(timestamp), 1, f);
        
        kv_put(store, key, value, value_len);
        
        free(key);
        free(value);
    }
    
    fclose(f);
    
    // Replay WAL for any operations after snapshot
    kv_replay_wal(store);
    
    return 0;
}

static int kv_replay_wal(KVStore *store) {
    if (!store->wal) return 0;
    
    rewind(store->wal);
    
    while (!feof(store->wal)) {
        uint8_t op;
        if (fread(&op, 1, 1, store->wal) != 1) break;
        
        uint16_t key_len;
        fread(&key_len, sizeof(key_len), 1, store->wal);
        
        char *key = malloc(key_len + 1);
        fread(key, 1, key_len, store->wal);
        key[key_len] = '\0';
        
        if (op == 'P') {
            uint32_t value_len;
            fread(&value_len, sizeof(value_len), 1, store->wal);
            
            void *value = malloc(value_len);
            fread(value, 1, value_len, store->wal);
            
            kv_put(store, key, value, value_len);
            free(value);
        } else if (op == 'D') {
            kv_delete(store, key);
        }
        
        free(key);
    }
    
    return 0;
}

CLI Interface

// cli.c
#include "kvstore.h"
#include <readline/readline.h>
#include <readline/history.h>

void print_help(void) {
    printf("Commands:\n");
    printf("  SET <key> <value>   Store a key-value pair\n");
    printf("  GET <key>           Retrieve a value\n");
    printf("  DEL <key>           Delete a key\n");
    printf("  EXISTS <key>        Check if key exists\n");
    printf("  KEYS                List all keys\n");
    printf("  STATS               Show statistics\n");
    printf("  SAVE                Save to disk\n");
    printf("  HELP                Show this help\n");
    printf("  QUIT                Exit\n");
}

int main(int argc, char *argv[]) {
    const char *data_dir = argc > 1 ? argv[1] : "./kvdata";
    
    // Create data directory
    mkdir(data_dir, 0755);
    
    KVStore *store = kv_create(data_dir);
    if (!store) {
        fprintf(stderr, "Failed to create store\n");
        return 1;
    }
    
    // Load existing data
    kv_load(store);
    
    printf("KVStore ready. Type HELP for commands.\n");
    
    char *line;
    while ((line = readline("kv> ")) != NULL) {
        if (strlen(line) == 0) {
            free(line);
            continue;
        }
        
        add_history(line);
        
        char cmd[32], key[MAX_KEY_SIZE], value[4096];
        
        if (sscanf(line, "%31s", cmd) != 1) {
            free(line);
            continue;
        }
        
        if (strcasecmp(cmd, "SET") == 0) {
            if (sscanf(line, "%*s %255s %4095[^\n]", key, value) == 2) {
                if (kv_put(store, key, value, strlen(value) + 1) == 0) {
                    printf("OK\n");
                } else {
                    printf("ERROR\n");
                }
            } else {
                printf("Usage: SET <key> <value>\n");
            }
        }
        else if (strcasecmp(cmd, "GET") == 0) {
            if (sscanf(line, "%*s %255s", key) == 1) {
                void *value;
                size_t size;
                if (kv_get(store, key, &value, &size) == 0) {
                    printf("%s\n", (char*)value);
                    free(value);
                } else {
                    printf("(nil)\n");
                }
            } else {
                printf("Usage: GET <key>\n");
            }
        }
        else if (strcasecmp(cmd, "DEL") == 0) {
            if (sscanf(line, "%*s %255s", key) == 1) {
                if (kv_delete(store, key) == 0) {
                    printf("OK\n");
                } else {
                    printf("NOT FOUND\n");
                }
            }
        }
        else if (strcasecmp(cmd, "EXISTS") == 0) {
            if (sscanf(line, "%*s %255s", key) == 1) {
                printf("%s\n", kv_exists(store, key) ? "1" : "0");
            }
        }
        else if (strcasecmp(cmd, "KEYS") == 0) {
            void print_key(const char *k, const void *v, size_t s, void *ctx) {
                (void)v; (void)s; (void)ctx;
                printf("%s\n", k);
            }
            kv_foreach(store, print_key, NULL);
        }
        else if (strcasecmp(cmd, "STATS") == 0) {
            KVStats stats = kv_stats(store);
            printf("Keys: %zu\n", stats.count);
            printf("Buckets: %zu\n", stats.bucket_count);
            printf("Load factor: %.2f\n", stats.load_factor);
        }
        else if (strcasecmp(cmd, "SAVE") == 0) {
            if (kv_save(store) == 0) {
                printf("OK\n");
            } else {
                printf("ERROR\n");
            }
        }
        else if (strcasecmp(cmd, "HELP") == 0) {
            print_help();
        }
        else if (strcasecmp(cmd, "QUIT") == 0) {
            free(line);
            break;
        }
        else {
            printf("Unknown command: %s\n", cmd);
        }
        
        free(line);
    }
    
    // Save before exit
    kv_save(store);
    kv_destroy(store);
    
    printf("Goodbye!\n");
    return 0;
}

Extensions

TTL/Expiration

Add automatic key expiration

Transactions

Implement MULTI/EXEC transactions

Replication

Add master-slave replication

Compression

Compress values with LZ4 or zstd

Binary Protocol

Add a Redis-like binary protocol

Cluster Mode

Shard data across nodes

Next Up

Build a Memory Allocator

Implement your own malloc