Time-Series Aggregation System: Low-Level Design
Storing raw metrics at full resolution is expensive at scale. A time-series aggregation system ingests raw data points, computes pre-rolled summaries at coarser resolutions, and routes queries to the cheapest tier that still satisfies the requested granularity. This post covers the full pipeline from ingestion to query.
Raw Metric Ingestion
Metrics arrive as (metric_name, tags, value, timestamp) tuples. Tags are key-value pairs that identify the source: host, region, service. The producer sends to a Kafka topic partitioned by a hash of (metric_name + tags) so that all data points for the same series land on the same partition in order.
from datetime import datetime, timezone
from dataclasses import dataclass
import json, hashlib
@dataclass
class MetricPoint:
name: str
tags: dict[str, str]
value: float
ts: datetime
def ingest_metric(name: str, tags: dict, value: float,
ts: datetime, producer) -> None:
"""Publish a metric point to Kafka."""
partition_key = hashlib.md5(
f"{name}:{json.dumps(tags, sort_keys=True)}".encode()
).hexdigest()
payload = {
"name": name, "tags": tags,
"value": value, "ts": ts.isoformat()
}
producer.produce(
topic="metrics.raw",
key=partition_key,
value=json.dumps(payload)
)
The consumer side writes batches to the metric_raw table partitioned by day. PostgreSQL range partitioning on ts means the planner prunes irrelevant partitions on range queries automatically.
Pre-Computed Rollups
Rather than aggregating on read, the system runs scheduled jobs that materialize rollups at four resolutions: 5-minute, 1-hour, and 1-day. Each job reads from the previous tier (raw for 5m, 5m for 1h, 1h for 1d), computes min/max/avg/sum/count, and upserts into the rollup table. The job is idempotent: re-running it for the same window overwrites the existing row.
from datetime import timedelta
RESOLUTION_MAP = {
"5m": timedelta(minutes=5),
"1h": timedelta(hours=1),
"1d": timedelta(days=1),
}
SOURCE_TABLE = {
"5m": "metric_raw",
"1h": "metric_rollup_5m",
"1d": "metric_rollup_1h",
}
DEST_TABLE = {
"5m": "metric_rollup_5m",
"1h": "metric_rollup_1h",
"1d": "metric_rollup_1d",
}
def compute_rollup(resolution: str, window_start: datetime, db) -> int:
"""
Aggregates one time window and upserts into the destination rollup table.
Returns the number of rows written.
"""
delta = RESOLUTION_MAP[resolution]
window_end = window_start + delta
src = SOURCE_TABLE[resolution]
dst = DEST_TABLE[resolution]
rows = db.fetchall(
f"""SELECT name, tags,
MIN(value) AS v_min,
MAX(value) AS v_max,
AVG(value) AS v_avg,
SUM(value) AS v_sum,
COUNT(*) AS v_count
FROM {src}
WHERE ts >= %s AND ts < %s
GROUP BY name, tags""",
(window_start, window_end)
)
for row in rows:
db.execute(
f"""INSERT INTO {dst}
(name, tags, window_start, v_min, v_max, v_avg, v_sum, v_count)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (name, tags, window_start)
DO UPDATE SET
v_min=EXCLUDED.v_min, v_max=EXCLUDED.v_max,
v_avg=EXCLUDED.v_avg, v_sum=EXCLUDED.v_sum,
v_count=EXCLUDED.v_count""",
(row.name, json.dumps(row.tags), window_start,
row.v_min, row.v_max, row.v_avg, row.v_sum, row.v_count)
)
return len(rows)
Query Routing by Resolution
The query layer picks the cheapest table that covers the requested time range at the desired granularity. If the range is within 7 days and the granularity is under 5 minutes, it hits raw. For granularity of 5 minutes to 1 hour, it hits metric_rollup_5m, and so on.
def query_metrics(name: str, start: datetime, end: datetime,
resolution: str, db) -> list[dict]:
"""
Routes the query to the appropriate rollup table.
resolution: 'raw' | '5m' | '1h' | '1d'
"""
table = {
"raw": "metric_raw",
"5m": "metric_rollup_5m",
"1h": "metric_rollup_1h",
"1d": "metric_rollup_1d",
}[resolution]
if resolution == "raw":
return db.fetchall(
f"SELECT ts, value FROM {table} "
"WHERE name=%s AND ts>=%s AND ts=%s AND window_start<%s "
"ORDER BY window_start",
(name, start, end)
)
Gap Filling
Some metrics are sparse: if no data arrived during a window, there is no rollup row. Dashboards expect a continuous series. PostgreSQL's generate_series combined with a LEFT JOIN fills the gaps with NULLs, which the frontend renders as “no data” rather than skipping the time point entirely.
SELECT
g.window_start,
COALESCE(r.v_avg, NULL) AS avg_value
FROM generate_series(
'2025-01-01 00:00'::TIMESTAMPTZ,
'2025-01-01 23:55'::TIMESTAMPTZ,
INTERVAL '5 minutes'
) AS g(window_start)
LEFT JOIN metric_rollup_5m r
ON r.name = 'cpu.usage'
AND r.window_start = g.window_start
ORDER BY g.window_start;
Database Schema
-- Raw table, partitioned by day
CREATE TABLE metric_raw (
id BIGSERIAL,
name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
value DOUBLE PRECISION NOT NULL,
ts TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (ts);
CREATE TABLE metric_rollup_5m (
name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
window_start TIMESTAMPTZ NOT NULL,
v_min DOUBLE PRECISION,
v_max DOUBLE PRECISION,
v_avg DOUBLE PRECISION,
v_sum DOUBLE PRECISION,
v_count BIGINT,
PRIMARY KEY (name, tags, window_start)
);
CREATE TABLE metric_rollup_1h (LIKE metric_rollup_5m INCLUDING ALL);
CREATE TABLE metric_rollup_1d (LIKE metric_rollup_5m INCLUDING ALL);
CREATE TABLE rollup_job (
id BIGSERIAL PRIMARY KEY,
resolution VARCHAR(8) NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
rows_written INT,
UNIQUE (resolution, window_start)
);
Retention Policy
Retention is enforced by a nightly DELETE job or, better, by dropping old partitions directly (partition drop is near-instant). The tiers are: raw data kept for 7 days, 5-minute rollups for 30 days, 1-hour rollups for 1 year, and 1-day rollups forever. With these tiers a 13-month chart always resolves by hitting only the 1-day table regardless of how far back it reaches.
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering