Time Series Metrics System Low-Level Design

Time Series Metrics System — Low-Level Design

A time series metrics system stores and queries measurements that change over time: server CPU, API latency, business KPIs, and IoT sensor readings. It must handle high write throughput, efficient range queries, and downsampling for long-term storage. This design is asked at Datadog, Netflix, and any company building observability infrastructure.

Core Data Model Options

Option 1: TimescaleDB (PostgreSQL extension for time series)

CREATE TABLE Metric (
    time        TIMESTAMPTZ NOT NULL,
    metric_name TEXT NOT NULL,
    tags        JSONB,              -- {'host': 'web-01', 'region': 'us-east'}
    value       DOUBLE PRECISION NOT NULL
);

-- Partition by time automatically (hypertable)
SELECT create_hypertable('Metric', 'time', chunk_time_interval => INTERVAL '1 day');

-- Index for fast name + time range queries
CREATE INDEX ON Metric(metric_name, time DESC);
-- Compress old chunks automatically
SELECT add_compression_policy('Metric', INTERVAL '7 days');

Option 2: Purpose-built TSDB (InfluxDB, Prometheus, VictoriaMetrics)
  Better compression, faster queries, but less SQL flexibility.
  Use for pure metrics workloads at scale.

Option 3: Wide-column (Cassandra/Bigtable)
  Row key: (metric_name, time_bucket). Columns: timestamps.
  Excellent write throughput, good for streaming reads.

Write Path: Batching and Buffering

class MetricsBuffer:
    """Buffer metrics in memory, flush every 10 seconds."""
    def __init__(self, flush_interval=10, max_buffer=10000):
        self._buffer = []
        self._lock = threading.Lock()
        self._flush_interval = flush_interval
        self._start_flush_thread()

    def record(self, metric_name, value, tags=None, timestamp=None):
        with self._lock:
            self._buffer.append({
                'time': timestamp or now(),
                'metric_name': metric_name,
                'tags': tags or {},
                'value': value,
            })
            if len(self._buffer) >= 10000:
                self._flush_sync()

    def _flush(self):
        with self._lock:
            if not self._buffer:
                return
            batch = self._buffer
            self._buffer = []

        # Bulk insert to TimescaleDB
        db.execute("""
            INSERT INTO Metric (time, metric_name, tags, value)
            SELECT * FROM UNNEST(%(times)s, %(names)s, %(tags)s, %(values)s)
        """, {
            'times': [r['time'] for r in batch],
            'names': [r['metric_name'] for r in batch],
            'tags': [json.dumps(r['tags']) for r in batch],
            'values': [r['value'] for r in batch],
        })

Query: Aggregated Time Range

def query_metrics(metric_name, start, end, interval='5m',
                  aggregation='avg', tag_filters=None):
    """
    Return aggregated metric values bucketed by time interval.
    interval: '1m', '5m', '1h', '1d'
    aggregation: 'avg', 'max', 'min', 'sum', 'count', 'p99'
    """
    agg_func = {
        'avg': 'AVG(value)',
        'max': 'MAX(value)',
        'min': 'MIN(value)',
        'sum': 'SUM(value)',
        'count': 'COUNT(*)',
        'p99': 'percentile_cont(0.99) WITHIN GROUP (ORDER BY value)',
    }[aggregation]

    tag_clause = ''
    if tag_filters:
        conditions = [f"tags->>'{k}' = '{v}'" for k, v in tag_filters.items()]
        tag_clause = 'AND ' + ' AND '.join(conditions)

    return db.execute(f"""
        SELECT
            time_bucket(%(interval)s::interval, time) AS bucket,
            {agg_func} AS value
        FROM Metric
        WHERE metric_name=%(name)s
          AND time BETWEEN %(start)s AND %(end)s
          {tag_clause}
        GROUP BY bucket
        ORDER BY bucket ASC
    """, {'name': metric_name, 'start': start, 'end': end,
          'interval': interval})

Downsampling for Long-Term Storage

-- Continuous aggregates: pre-compute hourly rollups
-- (TimescaleDB feature; equivalent: a scheduled materialized view job)

CREATE MATERIALIZED VIEW metric_hourly
WITH (timescaledb.continuous) AS
    SELECT
        time_bucket('1 hour', time) AS hour,
        metric_name,
        tags,
        AVG(value) AS avg_value,
        MAX(value) AS max_value,
        MIN(value) AS min_value,
        COUNT(*) AS sample_count
    FROM Metric
    GROUP BY hour, metric_name, tags;

-- Retention policy: keep raw data 7 days, hourly rollups 90 days, daily 2 years
SELECT add_retention_policy('Metric', INTERVAL '7 days');
SELECT add_retention_policy('metric_hourly', INTERVAL '90 days');

-- Query routing: use the appropriate rollup based on time range
def smart_query(metric_name, start, end):
    range_hours = (end - start).total_seconds() / 3600
    if range_hours <= 24:      return query_raw(metric_name, start, end, '1m')
    elif range_hours <= 168:   return query_hourly(metric_name, start, end, '1h')
    else:                       return query_daily(metric_name, start, end, '1d')

Alerting on Threshold Breach

def check_alerts():
    """Run every 60 seconds via cron."""
    active_alerts = db.query("SELECT * FROM AlertRule WHERE enabled=true")

    for alert in active_alerts:
        # Get recent metric value
        recent = db.execute("""
            SELECT AVG(value) as val
            FROM Metric
            WHERE metric_name=%(name)s
              AND time > NOW() - INTERVAL '%(window)s seconds'
        """, {'name': alert.metric_name, 'window': alert.window_seconds}).first()

        if recent.val is None:
            continue

        breached = (
            (alert.condition == 'gt' and recent.val > alert.threshold) or
            (alert.condition == 'lt' and recent.val < alert.threshold)
        )

        if breached and not is_alert_firing(alert.id):
            fire_alert(alert, recent.val)
        elif not breached and is_alert_firing(alert.id):
            resolve_alert(alert.id)

Key Interview Points

  • Write batching is non-negotiable: A metrics agent on every server may emit hundreds of data points per second. One INSERT per point saturates the DB. Buffer in memory and flush every 5-10 seconds in batches of thousands.
  • time_bucket is the core query primitive: TimescaleDB’s time_bucket (or date_trunc in vanilla Postgres) is the equivalent of GROUP BY for time series. Master this function for interval aggregation queries.
  • Store raw data with short retention: Raw data (1-second resolution) for 7 days is expensive. Downsample to hourly aggregates for 90-day history and daily for 2 years. Design rollup jobs and retention policies upfront.
  • Tag cardinality matters: High-cardinality tags (e.g., user_id as a tag) create millions of unique time series, exploding storage. Tags should be low-cardinality dimensions (host, region, service), not unique identifiers.

Time series metrics and observability system design is discussed in Databricks system design interview questions.

Time series metrics and monitoring system design is covered in Netflix system design interview preparation.

Time series metrics and alerting system design is discussed in Amazon system design interview guide.

Scroll to Top