Skip to main content

Introduction

Time-series data is one of the fastest-growing data types, powering monitoring systems, IoT platforms, financial trading, and real-time analytics.
Interview Signal: Designing systems that handle time-series data shows you understand specialized storage patterns and real-time processing.

Time-Series Data Characteristics

What Makes Time-Series Special?

Regular Time-Series:
├── Metrics: CPU usage every 10 seconds
├── IoT: Temperature readings every minute
└── Financial: Stock prices every second

Irregular Time-Series:
├── Events: User clicks (whenever they happen)
├── Logs: Application logs (variable rate)
└── Transactions: Payments (unpredictable)

Key Properties

┌─────────────────────────────────────────────────────────┐
│           Time-Series Data Characteristics              │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  1. Append-Only: New data always inserted, rarely      │
│     updated or deleted                                  │
│                                                         │
│  2. Time-Ordered: Primary access pattern by time       │
│     range                                               │
│                                                         │
│  3. High Volume: Millions of data points per second    │
│                                                         │
│  4. Recent Bias: Recent data accessed more frequently  │
│                                                         │
│  5. Aggregations: Often need rollups (avg, max, min)   │
│                                                         │
└─────────────────────────────────────────────────────────┘

Time-Series Database Architecture

Storage Engine Design

Write Path:
┌─────────────────────────────────────────────────────────┐
│                                                         │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────┐  │
│  │ Incoming │───►│  Write   │───►│    MemTable      │  │
│  │  Points  │    │  Buffer  │    │ (In-Memory Index)│  │
│  └──────────┘    └──────────┘    └────────┬─────────┘  │
│                                           │             │
│                                    Flush when full      │
│                                           │             │
│                                    ┌──────▼─────────┐   │
│                                    │  Segment/TSM   │   │
│                                    │    Files       │   │
│                                    └────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

Read Path:
┌─────────────────────────────────────────────────────────┐
│                                                         │
│  Query: SELECT mean(cpu) WHERE time > now() - 1h       │
│                    │                                    │
│         ┌──────────┼──────────┐                        │
│         │          │          │                        │
│    ┌────▼────┐ ┌───▼───┐ ┌───▼───┐                    │
│    │MemTable│ │Segment│ │Segment│  (merged result)    │
│    │ (hot)  │ │  1    │ │  2    │                     │
│    └────────┘ └───────┘ └───────┘                     │
│                                                         │
└─────────────────────────────────────────────────────────┘

Data Compression Techniques

import struct
from typing import List, Tuple

class TimeSeriesCompressor:
    """Delta-of-delta encoding for timestamps"""
    
    def compress_timestamps(
        self, 
        timestamps: List[int]
    ) -> bytes:
        """
        Compress timestamps using delta-of-delta encoding.
        
        Typical time-series have regular intervals,
        so delta-of-delta is often 0 or small.
        
        Original: [1000, 1010, 1020, 1030, 1040]
        Deltas:   [-, 10, 10, 10, 10]
        DoD:      [-, -, 0, 0, 0]  <- Very compressible!
        """
        if not timestamps:
            return b''
        
        result = []
        # Store first timestamp as-is
        result.append(struct.pack('>Q', timestamps[0]))
        
        if len(timestamps) == 1:
            return b''.join(result)
        
        # Store first delta
        prev_delta = timestamps[1] - timestamps[0]
        result.append(struct.pack('>q', prev_delta))
        
        # Store delta-of-deltas
        for i in range(2, len(timestamps)):
            delta = timestamps[i] - timestamps[i-1]
            dod = delta - prev_delta
            
            # Variable-length encoding for DoD
            result.append(self._encode_varint(dod))
            prev_delta = delta
        
        return b''.join(result)
    
    def _encode_varint(self, value: int) -> bytes:
        """Variable-length integer encoding"""
        # Simple implementation - real systems use more
        # sophisticated bit-packing
        if -64 <= value <= 63:
            return struct.pack('b', value)
        elif -8192 <= value <= 8191:
            return struct.pack('>h', value)
        else:
            return struct.pack('>q', value)


class XORCompressor:
    """XOR compression for floating-point values"""
    
    def compress_values(self, values: List[float]) -> bytes:
        """
        Gorilla-style XOR compression.
        
        Consecutive values are often similar,
        so XOR produces many leading/trailing zeros.
        
        Value 1: 1.5 = 0x3FF8000000000000
        Value 2: 1.6 = 0x3FF999999999999A
        XOR:         = 0x00019999999999A  <- Few bits different
        """
        if not values:
            return b''
        
        result = []
        # Store first value as-is
        result.append(struct.pack('>d', values[0]))
        
        prev_bits = self._float_to_bits(values[0])
        
        for value in values[1:]:
            curr_bits = self._float_to_bits(value)
            xor = prev_bits ^ curr_bits
            
            if xor == 0:
                # Identical value - just store 0 bit
                result.append(b'\x00')
            else:
                # Store meaningful bits
                leading_zeros = self._count_leading_zeros(xor)
                trailing_zeros = self._count_trailing_zeros(xor)
                result.append(struct.pack(
                    '>BQ', 
                    (leading_zeros << 2) | trailing_zeros,
                    xor
                ))
            
            prev_bits = curr_bits
        
        return b''.join(result)
    
    def _float_to_bits(self, f: float) -> int:
        return struct.unpack('>Q', struct.pack('>d', f))[0]
    
    def _count_leading_zeros(self, n: int) -> int:
        if n == 0:
            return 64
        count = 0
        for i in range(63, -1, -1):
            if n & (1 << i):
                break
            count += 1
        return count
    
    def _count_trailing_zeros(self, n: int) -> int:
        if n == 0:
            return 64
        count = 0
        for i in range(64):
            if n & (1 << i):
                break
            count += 1
        return count

Data Modeling for Time-Series

Schema Design

Wide Table (one measurement per table):
┌────────────────────────────────────────────────────────┐
│                    cpu_usage                           │
├────────────┬──────────┬───────────┬──────────┬────────┤
│ timestamp  │   host   │   region  │  value   │  unit  │
├────────────┼──────────┼───────────┼──────────┼────────┤
│ 1699900000 │ server-1 │ us-east-1 │   75.5   │   %    │
│ 1699900010 │ server-1 │ us-east-1 │   78.2   │   %    │
│ 1699900000 │ server-2 │ us-west-2 │   62.1   │   %    │
└────────────┴──────────┴───────────┴──────────┴────────┘

Narrow Table (multiple measurements):
┌──────────────────────────────────────────────────────┐
│                    metrics                            │
├────────────┬──────────┬───────────┬────────┬─────────┤
│ timestamp  │   host   │   metric  │  value │  tags   │
├────────────┼──────────┼───────────┼────────┼─────────┤
│ 1699900000 │ server-1 │ cpu_usage │  75.5  │ {...}   │
│ 1699900000 │ server-1 │ mem_usage │  82.3  │ {...}   │
│ 1699900000 │ server-1 │ disk_io   │  1024  │ {...}   │
└────────────┴──────────┴───────────┴────────┴─────────┘

Partitioning Strategies

Time-Based Partitioning:
┌─────────────────────────────────────────────────┐
│ Partition Key: YEAR-MONTH or WEEK              │
├─────────────────────────────────────────────────┤
│ metrics_2024_01  │ metrics_2024_02  │ ...      │
├──────────────────┼──────────────────┼──────────┤
│ - Fast time range queries                       │
│ - Easy retention (drop old partitions)          │
│ - Balanced write distribution                   │
└─────────────────────────────────────────────────┘

Hybrid Partitioning:
┌─────────────────────────────────────────────────┐
│ Partition: (device_id, time_bucket)             │
├─────────────────────────────────────────────────┤
│ device_001_2024_01 │ device_001_2024_02        │
│ device_002_2024_01 │ device_002_2024_02        │
├─────────────────────────────────────────────────┤
│ - Good for device-specific queries              │
│ - Parallel ingestion per device                 │
└─────────────────────────────────────────────────┘

Downsampling and Retention

Continuous Aggregations

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import asyncio

@dataclass
class AggregatedPoint:
    timestamp: datetime
    min_value: float
    max_value: float
    avg_value: float
    count: int
    sum_value: float

class ContinuousAggregator:
    """
    Automatically downsample data to different resolutions.
    
    Raw data:     1 second resolution  -> 7 days retention
    1-min rollup: 1 minute resolution  -> 30 days retention
    1-hour rollup: 1 hour resolution   -> 1 year retention
    1-day rollup: 1 day resolution     -> Forever
    """
    
    def __init__(self, storage):
        self.storage = storage
        self.rollup_configs = [
            {"source": "raw", "target": "1min", "interval": 60},
            {"source": "1min", "target": "1hour", "interval": 3600},
            {"source": "1hour", "target": "1day", "interval": 86400},
        ]
    
    async def aggregate_interval(
        self,
        metric: str,
        source_table: str,
        target_table: str,
        start_time: datetime,
        end_time: datetime,
        interval_seconds: int
    ) -> List[AggregatedPoint]:
        """
        Aggregate raw points into rollup points.
        """
        points = await self.storage.query(
            table=source_table,
            metric=metric,
            start=start_time,
            end=end_time
        )
        
        # Group by interval
        buckets: Dict[datetime, List[float]] = {}
        
        for point in points:
            # Floor timestamp to interval
            bucket_ts = datetime.fromtimestamp(
                (point.timestamp.timestamp() // interval_seconds) 
                * interval_seconds
            )
            
            if bucket_ts not in buckets:
                buckets[bucket_ts] = []
            buckets[bucket_ts].append(point.value)
        
        # Calculate aggregates
        aggregated = []
        for timestamp, values in sorted(buckets.items()):
            aggregated.append(AggregatedPoint(
                timestamp=timestamp,
                min_value=min(values),
                max_value=max(values),
                avg_value=sum(values) / len(values),
                count=len(values),
                sum_value=sum(values)
            ))
        
        # Store aggregated data
        await self.storage.insert_aggregated(
            table=target_table,
            metric=metric,
            points=aggregated
        )
        
        return aggregated
    
    async def run_continuous_aggregation(self):
        """
        Background job to continuously aggregate data.
        """
        while True:
            for config in self.rollup_configs:
                # Get last aggregation timestamp
                last_agg = await self.storage.get_last_aggregation(
                    config["target"]
                )
                
                # Aggregate new data
                end_time = datetime.utcnow() - timedelta(minutes=5)
                
                if last_agg < end_time:
                    await self.aggregate_interval(
                        metric="*",
                        source_table=config["source"],
                        target_table=config["target"],
                        start_time=last_agg,
                        end_time=end_time,
                        interval_seconds=config["interval"]
                    )
            
            # Run every minute
            await asyncio.sleep(60)


class RetentionManager:
    """
    Manage data retention policies.
    """
    
    def __init__(self, storage):
        self.storage = storage
        self.retention_policies = {
            "raw": timedelta(days=7),
            "1min": timedelta(days=30),
            "1hour": timedelta(days=365),
            "1day": None,  # Keep forever
        }
    
    async def enforce_retention(self):
        """
        Delete data older than retention policy.
        """
        now = datetime.utcnow()
        
        for table, retention in self.retention_policies.items():
            if retention is None:
                continue
            
            cutoff = now - retention
            deleted = await self.storage.delete_before(
                table=table,
                before=cutoff
            )
            
            print(f"Deleted {deleted} rows from {table} "
                  f"(before {cutoff})")

Query Patterns

Common Time-Series Queries

-- Last value for each series
SELECT DISTINCT ON (host)
    host, timestamp, value
FROM cpu_usage
ORDER BY host, timestamp DESC;

-- Time-weighted average (for irregular data)
SELECT 
    host,
    SUM(value * duration) / SUM(duration) as time_weighted_avg
FROM (
    SELECT 
        host,
        value,
        EXTRACT(EPOCH FROM (
            LEAD(timestamp) OVER (PARTITION BY host ORDER BY timestamp) 
            - timestamp
        )) as duration
    FROM cpu_usage
    WHERE timestamp BETWEEN '2024-01-01' AND '2024-01-02'
) t
GROUP BY host;

-- Moving average
SELECT 
    timestamp,
    value,
    AVG(value) OVER (
        ORDER BY timestamp 
        ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
    ) as moving_avg_5
FROM cpu_usage
WHERE host = 'server-1';

-- Rate of change (derivative)
SELECT 
    timestamp,
    value,
    (value - LAG(value) OVER (ORDER BY timestamp)) /
    EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp)))
    as rate_per_second
FROM network_bytes
WHERE host = 'server-1';

-- Bucket/histogram aggregation
SELECT 
    time_bucket('5 minutes', timestamp) as bucket,
    host,
    AVG(value) as avg_cpu,
    MAX(value) as max_cpu,
    percentile_cont(0.95) WITHIN GROUP (ORDER BY value) as p95_cpu
FROM cpu_usage
WHERE timestamp > NOW() - INTERVAL '1 hour'
GROUP BY bucket, host
ORDER BY bucket;

System Design: Metrics Platform

High-Level Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Metrics Platform                          │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Collectors/Agents                                         │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐                   │
│   │Prometheus│ │StatsD    │ │Telegraf  │                   │
│   │Agent     │ │          │ │          │                   │
│   └────┬─────┘ └────┬─────┘ └────┬─────┘                   │
│        │            │            │                          │
│        └────────────┼────────────┘                          │
│                     │                                        │
│              ┌──────▼──────┐                                │
│              │   Gateway   │  Rate limiting, auth           │
│              └──────┬──────┘                                │
│                     │                                        │
│              ┌──────▼──────┐                                │
│              │   Kafka     │  Buffer, durability            │
│              └──────┬──────┘                                │
│                     │                                        │
│         ┌───────────┼───────────┐                           │
│         │           │           │                            │
│    ┌────▼────┐ ┌────▼────┐ ┌────▼────┐                     │
│    │Ingester │ │Ingester │ │Ingester │  Parallel write     │
│    │    1    │ │    2    │ │    N    │                      │
│    └────┬────┘ └────┬────┘ └────┬────┘                     │
│         │           │           │                            │
│         └───────────┼───────────┘                           │
│                     │                                        │
│    ┌────────────────▼────────────────┐                      │
│    │       Time-Series Database       │                      │
│    │  ┌─────────────────────────────┐ │                      │
│    │  │    InfluxDB / TimescaleDB   │ │                      │
│    │  │    / ClickHouse / Victoria  │ │                      │
│    │  └─────────────────────────────┘ │                      │
│    └────────────────┬────────────────┘                      │
│                     │                                        │
│              ┌──────▼──────┐                                │
│              │Query Service│  Caching, federation           │
│              └──────┬──────┘                                │
│                     │                                        │
│         ┌───────────┼───────────┐                           │
│         │           │           │                            │
│    ┌────▼────┐ ┌────▼────┐ ┌────▼────┐                     │
│    │Grafana  │ │Alerting │ │ API     │                      │
│    │Dashboard│ │ Engine  │ │Consumers│                      │
│    └─────────┘ └─────────┘ └─────────┘                     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Capacity Planning

Scenario: IoT Platform with 1M devices

Data Volume:
├── Devices: 1,000,000
├── Metrics per device: 10
├── Reporting interval: 60 seconds
├── Points per second: 1M × 10 / 60 = 166,667 points/sec

Storage:
├── Bytes per point: ~16 bytes (compressed)
├── Daily data: 166,667 × 86,400 × 16 = 230 GB/day
├── Monthly data: ~7 TB
├── With 30-day retention: 7 TB raw + rollups

Hardware (rough estimate):
├── Ingesters: 3-5 nodes (for redundancy)
├── Storage nodes: 10-15 nodes (1TB SSD each)
├── Query nodes: 3-5 nodes
└── Kafka: 3-5 node cluster

Comparison: Time-Series Databases

DatabaseBest ForCompressionQuery Language
InfluxDBMetrics, IoT10-20xInfluxQL, Flux
TimescaleDBSQL-heavy10-20xPostgreSQL
ClickHouseAnalytics10-50xSQL
PrometheusMonitoring5-10xPromQL
VictoriaMetricsHigh cardinality10-20xPromQL
QuestDBHigh throughput10-20xSQL

Interview Tips

Common Interview Questions:
  1. “Design a monitoring system” → Focus on ingestion pipeline, storage tiers, and alerting
  2. “How would you handle high cardinality?” → Discuss bloom filters, cardinality limits, and query optimization
  3. “How to query across long time ranges efficiently?” → Explain rollups and downsampling
  4. “How to ensure durability of metrics?” → Discuss write-ahead logs, replication, and Kafka buffering

Key Points to Mention

✓ Append-only nature enables efficient compression
✓ Time-based partitioning for easy retention
✓ Downsampling for long-term storage efficiency
✓ Separate hot/warm/cold storage tiers
✓ Pre-aggregation for common queries
✓ Cardinality limits to prevent explosion

Practice Problem

Design a Stock Trading Analytics PlatformRequirements:
  • Ingest tick data for 10,000 symbols at 1000 ticks/second each
  • Support real-time OHLCV aggregation (1s, 1m, 5m, 1h, 1d)
  • Historical queries for backtesting (years of data)
  • Low latency for trading decisions (<10ms P99)
Consider:
  1. How would you partition the data?
  2. What’s your ingestion pipeline?
  3. How do you handle market hours vs off-hours?
  4. What pre-computations would you do?