Project: Key-Value Store
Build a production-quality key-value store with persistence, memory management, and basic indexing. This project teaches file I/O, data structures, and systems design.Features
1
In-Memory Storage
Hash table for fast O(1) lookups
2
Persistence
Write-ahead log and snapshots
3
Variable-Size Values
Store arbitrary binary data
4
Concurrent Access
Thread-safe operations
Data Structures
Copy
// kvstore.h
#ifndef KVSTORE_H
#define KVSTORE_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
#define MAX_KEY_SIZE 256
#define INITIAL_CAPACITY 1024
#define LOAD_FACTOR 0.75
typedef struct Entry {
char *key;
void *value;
size_t value_size;
struct Entry *next;
uint64_t version; // For optimistic locking
time_t created_at;
time_t updated_at;
} Entry;
typedef struct {
Entry **buckets;
size_t capacity;
size_t size;
pthread_rwlock_t lock;
// Persistence
FILE *wal; // Write-ahead log
char *data_dir;
uint64_t version; // Global version counter
} KVStore;
// Core operations
KVStore *kv_create(const char *data_dir);
void kv_destroy(KVStore *store);
int kv_put(KVStore *store, const char *key, const void *value, size_t size);
int kv_get(KVStore *store, const char *key, void **value, size_t *size);
int kv_delete(KVStore *store, const char *key);
bool kv_exists(KVStore *store, const char *key);
// Persistence
int kv_save(KVStore *store);
int kv_load(KVStore *store);
// Iteration
typedef void (*kv_callback)(const char *key, const void *value, size_t size, void *ctx);
void kv_foreach(KVStore *store, kv_callback cb, void *ctx);
// Stats
typedef struct {
size_t count;
size_t memory_used;
size_t bucket_count;
double load_factor;
} KVStats;
KVStats kv_stats(KVStore *store);
#endif
Hash Table Implementation
Copy
// hashtable.c
#include "kvstore.h"
// FNV-1a hash
static uint64_t hash_key(const char *key) {
uint64_t hash = 0xcbf29ce484222325ULL;
while (*key) {
hash ^= (uint8_t)*key++;
hash *= 0x100000001b3ULL;
}
return hash;
}
static Entry *entry_create(const char *key, const void *value, size_t size) {
Entry *entry = calloc(1, sizeof(Entry));
if (!entry) return NULL;
entry->key = strdup(key);
entry->value = malloc(size);
if (!entry->key || !entry->value) {
free(entry->key);
free(entry->value);
free(entry);
return NULL;
}
memcpy(entry->value, value, size);
entry->value_size = size;
entry->created_at = time(NULL);
entry->updated_at = entry->created_at;
return entry;
}
static void entry_destroy(Entry *entry) {
if (entry) {
free(entry->key);
free(entry->value);
free(entry);
}
}
static int resize_if_needed(KVStore *store) {
double load = (double)store->size / store->capacity;
if (load < LOAD_FACTOR) return 0;
size_t new_capacity = store->capacity * 2;
Entry **new_buckets = calloc(new_capacity, sizeof(Entry*));
if (!new_buckets) return -1;
// Rehash all entries
for (size_t i = 0; i < store->capacity; i++) {
Entry *entry = store->buckets[i];
while (entry) {
Entry *next = entry->next;
size_t new_index = hash_key(entry->key) % new_capacity;
entry->next = new_buckets[new_index];
new_buckets[new_index] = entry;
entry = next;
}
}
free(store->buckets);
store->buckets = new_buckets;
store->capacity = new_capacity;
return 0;
}
KVStore *kv_create(const char *data_dir) {
KVStore *store = calloc(1, sizeof(KVStore));
if (!store) return NULL;
store->buckets = calloc(INITIAL_CAPACITY, sizeof(Entry*));
if (!store->buckets) {
free(store);
return NULL;
}
store->capacity = INITIAL_CAPACITY;
store->data_dir = data_dir ? strdup(data_dir) : NULL;
pthread_rwlock_init(&store->lock, NULL);
// Open WAL if data directory specified
if (data_dir) {
char wal_path[512];
snprintf(wal_path, sizeof(wal_path), "%s/wal.log", data_dir);
store->wal = fopen(wal_path, "a+b");
}
return store;
}
void kv_destroy(KVStore *store) {
if (!store) return;
pthread_rwlock_wrlock(&store->lock);
for (size_t i = 0; i < store->capacity; i++) {
Entry *entry = store->buckets[i];
while (entry) {
Entry *next = entry->next;
entry_destroy(entry);
entry = next;
}
}
free(store->buckets);
free(store->data_dir);
if (store->wal) fclose(store->wal);
pthread_rwlock_unlock(&store->lock);
pthread_rwlock_destroy(&store->lock);
free(store);
}
Core Operations
Copy
// operations.c
#include "kvstore.h"
int kv_put(KVStore *store, const char *key, const void *value, size_t size) {
if (!store || !key || !value || size == 0) return -1;
if (strlen(key) >= MAX_KEY_SIZE) return -1;
pthread_rwlock_wrlock(&store->lock);
// Check if we need to resize
resize_if_needed(store);
size_t index = hash_key(key) % store->capacity;
// Look for existing entry
Entry *entry = store->buckets[index];
while (entry) {
if (strcmp(entry->key, key) == 0) {
// Update existing
void *new_value = malloc(size);
if (!new_value) {
pthread_rwlock_unlock(&store->lock);
return -1;
}
memcpy(new_value, value, size);
free(entry->value);
entry->value = new_value;
entry->value_size = size;
entry->updated_at = time(NULL);
entry->version++;
// Write to WAL
if (store->wal) {
// WAL entry format: op(1) | key_len(2) | key | value_len(4) | value
uint8_t op = 'P'; // Put
uint16_t key_len = strlen(key);
uint32_t val_len = size;
fwrite(&op, 1, 1, store->wal);
fwrite(&key_len, sizeof(key_len), 1, store->wal);
fwrite(key, 1, key_len, store->wal);
fwrite(&val_len, sizeof(val_len), 1, store->wal);
fwrite(value, 1, size, store->wal);
fflush(store->wal);
}
store->version++;
pthread_rwlock_unlock(&store->lock);
return 0;
}
entry = entry->next;
}
// Create new entry
Entry *new_entry = entry_create(key, value, size);
if (!new_entry) {
pthread_rwlock_unlock(&store->lock);
return -1;
}
new_entry->version = store->version++;
new_entry->next = store->buckets[index];
store->buckets[index] = new_entry;
store->size++;
// Write to WAL
if (store->wal) {
uint8_t op = 'P';
uint16_t key_len = strlen(key);
uint32_t val_len = size;
fwrite(&op, 1, 1, store->wal);
fwrite(&key_len, sizeof(key_len), 1, store->wal);
fwrite(key, 1, key_len, store->wal);
fwrite(&val_len, sizeof(val_len), 1, store->wal);
fwrite(value, 1, size, store->wal);
fflush(store->wal);
}
pthread_rwlock_unlock(&store->lock);
return 0;
}
int kv_get(KVStore *store, const char *key, void **value, size_t *size) {
if (!store || !key || !value || !size) return -1;
pthread_rwlock_rdlock(&store->lock);
size_t index = hash_key(key) % store->capacity;
Entry *entry = store->buckets[index];
while (entry) {
if (strcmp(entry->key, key) == 0) {
*value = malloc(entry->value_size);
if (!*value) {
pthread_rwlock_unlock(&store->lock);
return -1;
}
memcpy(*value, entry->value, entry->value_size);
*size = entry->value_size;
pthread_rwlock_unlock(&store->lock);
return 0;
}
entry = entry->next;
}
pthread_rwlock_unlock(&store->lock);
return -1; // Not found
}
int kv_delete(KVStore *store, const char *key) {
if (!store || !key) return -1;
pthread_rwlock_wrlock(&store->lock);
size_t index = hash_key(key) % store->capacity;
Entry *entry = store->buckets[index];
Entry *prev = NULL;
while (entry) {
if (strcmp(entry->key, key) == 0) {
if (prev) {
prev->next = entry->next;
} else {
store->buckets[index] = entry->next;
}
// Write to WAL
if (store->wal) {
uint8_t op = 'D'; // Delete
uint16_t key_len = strlen(key);
fwrite(&op, 1, 1, store->wal);
fwrite(&key_len, sizeof(key_len), 1, store->wal);
fwrite(key, 1, key_len, store->wal);
fflush(store->wal);
}
entry_destroy(entry);
store->size--;
store->version++;
pthread_rwlock_unlock(&store->lock);
return 0;
}
prev = entry;
entry = entry->next;
}
pthread_rwlock_unlock(&store->lock);
return -1; // Not found
}
bool kv_exists(KVStore *store, const char *key) {
if (!store || !key) return false;
pthread_rwlock_rdlock(&store->lock);
size_t index = hash_key(key) % store->capacity;
Entry *entry = store->buckets[index];
while (entry) {
if (strcmp(entry->key, key) == 0) {
pthread_rwlock_unlock(&store->lock);
return true;
}
entry = entry->next;
}
pthread_rwlock_unlock(&store->lock);
return false;
}
Persistence
Copy
// persistence.c
#include "kvstore.h"
#include <sys/stat.h>
// Snapshot file format:
// Header: magic(4) | version(4) | count(8)
// Entries: key_len(2) | key | value_len(4) | value | timestamp(8)
#define SNAPSHOT_MAGIC 0x4B565354 // "KVST"
#define SNAPSHOT_VERSION 1
int kv_save(KVStore *store) {
if (!store || !store->data_dir) return -1;
char snapshot_path[512];
snprintf(snapshot_path, sizeof(snapshot_path), "%s/data.snap", store->data_dir);
char temp_path[512];
snprintf(temp_path, sizeof(temp_path), "%s/data.snap.tmp", store->data_dir);
pthread_rwlock_rdlock(&store->lock);
FILE *f = fopen(temp_path, "wb");
if (!f) {
pthread_rwlock_unlock(&store->lock);
return -1;
}
// Write header
uint32_t magic = SNAPSHOT_MAGIC;
uint32_t version = SNAPSHOT_VERSION;
uint64_t count = store->size;
fwrite(&magic, sizeof(magic), 1, f);
fwrite(&version, sizeof(version), 1, f);
fwrite(&count, sizeof(count), 1, f);
// Write entries
for (size_t i = 0; i < store->capacity; i++) {
Entry *entry = store->buckets[i];
while (entry) {
uint16_t key_len = strlen(entry->key);
uint32_t value_len = entry->value_size;
uint64_t timestamp = entry->updated_at;
fwrite(&key_len, sizeof(key_len), 1, f);
fwrite(entry->key, 1, key_len, f);
fwrite(&value_len, sizeof(value_len), 1, f);
fwrite(entry->value, 1, value_len, f);
fwrite(×tamp, sizeof(timestamp), 1, f);
entry = entry->next;
}
}
fclose(f);
pthread_rwlock_unlock(&store->lock);
// Atomic rename
if (rename(temp_path, snapshot_path) != 0) {
unlink(temp_path);
return -1;
}
// Clear WAL after successful snapshot
if (store->wal) {
char wal_path[512];
snprintf(wal_path, sizeof(wal_path), "%s/wal.log", store->data_dir);
fclose(store->wal);
store->wal = fopen(wal_path, "wb"); // Truncate
}
return 0;
}
int kv_load(KVStore *store) {
if (!store || !store->data_dir) return -1;
char snapshot_path[512];
snprintf(snapshot_path, sizeof(snapshot_path), "%s/data.snap", store->data_dir);
FILE *f = fopen(snapshot_path, "rb");
if (!f) {
// No snapshot, try to replay WAL
return kv_replay_wal(store);
}
// Read header
uint32_t magic, version;
uint64_t count;
if (fread(&magic, sizeof(magic), 1, f) != 1 || magic != SNAPSHOT_MAGIC) {
fclose(f);
return -1;
}
fread(&version, sizeof(version), 1, f);
fread(&count, sizeof(count), 1, f);
// Read entries
for (uint64_t i = 0; i < count; i++) {
uint16_t key_len;
uint32_t value_len;
uint64_t timestamp;
if (fread(&key_len, sizeof(key_len), 1, f) != 1) break;
char *key = malloc(key_len + 1);
fread(key, 1, key_len, f);
key[key_len] = '\0';
fread(&value_len, sizeof(value_len), 1, f);
void *value = malloc(value_len);
fread(value, 1, value_len, f);
fread(×tamp, sizeof(timestamp), 1, f);
kv_put(store, key, value, value_len);
free(key);
free(value);
}
fclose(f);
// Replay WAL for any operations after snapshot
kv_replay_wal(store);
return 0;
}
static int kv_replay_wal(KVStore *store) {
if (!store->wal) return 0;
rewind(store->wal);
while (!feof(store->wal)) {
uint8_t op;
if (fread(&op, 1, 1, store->wal) != 1) break;
uint16_t key_len;
fread(&key_len, sizeof(key_len), 1, store->wal);
char *key = malloc(key_len + 1);
fread(key, 1, key_len, store->wal);
key[key_len] = '\0';
if (op == 'P') {
uint32_t value_len;
fread(&value_len, sizeof(value_len), 1, store->wal);
void *value = malloc(value_len);
fread(value, 1, value_len, store->wal);
kv_put(store, key, value, value_len);
free(value);
} else if (op == 'D') {
kv_delete(store, key);
}
free(key);
}
return 0;
}
CLI Interface
Copy
// cli.c
#include "kvstore.h"
#include <readline/readline.h>
#include <readline/history.h>
void print_help(void) {
printf("Commands:\n");
printf(" SET <key> <value> Store a key-value pair\n");
printf(" GET <key> Retrieve a value\n");
printf(" DEL <key> Delete a key\n");
printf(" EXISTS <key> Check if key exists\n");
printf(" KEYS List all keys\n");
printf(" STATS Show statistics\n");
printf(" SAVE Save to disk\n");
printf(" HELP Show this help\n");
printf(" QUIT Exit\n");
}
int main(int argc, char *argv[]) {
const char *data_dir = argc > 1 ? argv[1] : "./kvdata";
// Create data directory
mkdir(data_dir, 0755);
KVStore *store = kv_create(data_dir);
if (!store) {
fprintf(stderr, "Failed to create store\n");
return 1;
}
// Load existing data
kv_load(store);
printf("KVStore ready. Type HELP for commands.\n");
char *line;
while ((line = readline("kv> ")) != NULL) {
if (strlen(line) == 0) {
free(line);
continue;
}
add_history(line);
char cmd[32], key[MAX_KEY_SIZE], value[4096];
if (sscanf(line, "%31s", cmd) != 1) {
free(line);
continue;
}
if (strcasecmp(cmd, "SET") == 0) {
if (sscanf(line, "%*s %255s %4095[^\n]", key, value) == 2) {
if (kv_put(store, key, value, strlen(value) + 1) == 0) {
printf("OK\n");
} else {
printf("ERROR\n");
}
} else {
printf("Usage: SET <key> <value>\n");
}
}
else if (strcasecmp(cmd, "GET") == 0) {
if (sscanf(line, "%*s %255s", key) == 1) {
void *value;
size_t size;
if (kv_get(store, key, &value, &size) == 0) {
printf("%s\n", (char*)value);
free(value);
} else {
printf("(nil)\n");
}
} else {
printf("Usage: GET <key>\n");
}
}
else if (strcasecmp(cmd, "DEL") == 0) {
if (sscanf(line, "%*s %255s", key) == 1) {
if (kv_delete(store, key) == 0) {
printf("OK\n");
} else {
printf("NOT FOUND\n");
}
}
}
else if (strcasecmp(cmd, "EXISTS") == 0) {
if (sscanf(line, "%*s %255s", key) == 1) {
printf("%s\n", kv_exists(store, key) ? "1" : "0");
}
}
else if (strcasecmp(cmd, "KEYS") == 0) {
void print_key(const char *k, const void *v, size_t s, void *ctx) {
(void)v; (void)s; (void)ctx;
printf("%s\n", k);
}
kv_foreach(store, print_key, NULL);
}
else if (strcasecmp(cmd, "STATS") == 0) {
KVStats stats = kv_stats(store);
printf("Keys: %zu\n", stats.count);
printf("Buckets: %zu\n", stats.bucket_count);
printf("Load factor: %.2f\n", stats.load_factor);
}
else if (strcasecmp(cmd, "SAVE") == 0) {
if (kv_save(store) == 0) {
printf("OK\n");
} else {
printf("ERROR\n");
}
}
else if (strcasecmp(cmd, "HELP") == 0) {
print_help();
}
else if (strcasecmp(cmd, "QUIT") == 0) {
free(line);
break;
}
else {
printf("Unknown command: %s\n", cmd);
}
free(line);
}
// Save before exit
kv_save(store);
kv_destroy(store);
printf("Goodbye!\n");
return 0;
}
Extensions
TTL/Expiration
Add automatic key expiration
Transactions
Implement MULTI/EXEC transactions
Replication
Add master-slave replication
Compression
Compress values with LZ4 or zstd
Binary Protocol
Add a Redis-like binary protocol
Cluster Mode
Shard data across nodes
Next Up
Build a Memory Allocator
Implement your own malloc