Time-Series Aggregation System: Low-Level Design
Storing raw metrics at full resolution is expensive at scale. A time-series aggregation system ingests raw data points, computes pre-rolled summaries at coarser resolutions, and routes queries to the cheapest tier that still satisfies the requested granularity. This post covers the full pipeline from ingestion to query.
Raw Metric Ingestion
Metrics arrive as (metric_name, tags, value, timestamp) tuples. Tags are key-value pairs that identify the source: host, region, service. The producer sends to a Kafka topic partitioned by a hash of (metric_name + tags) so that all data points for the same series land on the same partition in order.
from datetime import datetime, timezone
from dataclasses import dataclass
import json, hashlib
@dataclass
class MetricPoint:
name: str
tags: dict[str, str]
value: float
ts: datetime
def ingest_metric(name: str, tags: dict, value: float,
ts: datetime, producer) -> None:
"""Publish a metric point to Kafka."""
partition_key = hashlib.md5(
f"{name}:{json.dumps(tags, sort_keys=True)}".encode()
).hexdigest()
payload = {
"name": name, "tags": tags,
"value": value, "ts": ts.isoformat()
}
producer.produce(
topic="metrics.raw",
key=partition_key,
value=json.dumps(payload)
)
The consumer side writes batches to the metric_raw table partitioned by day. PostgreSQL range partitioning on ts means the planner prunes irrelevant partitions on range queries automatically.
Pre-Computed Rollups
Rather than aggregating on read, the system runs scheduled jobs that materialize rollups at four resolutions: 5-minute, 1-hour, and 1-day. Each job reads from the previous tier (raw for 5m, 5m for 1h, 1h for 1d), computes min/max/avg/sum/count, and upserts into the rollup table. The job is idempotent: re-running it for the same window overwrites the existing row.
from datetime import timedelta
RESOLUTION_MAP = {
"5m": timedelta(minutes=5),
"1h": timedelta(hours=1),
"1d": timedelta(days=1),
}
SOURCE_TABLE = {
"5m": "metric_raw",
"1h": "metric_rollup_5m",
"1d": "metric_rollup_1h",
}
DEST_TABLE = {
"5m": "metric_rollup_5m",
"1h": "metric_rollup_1h",
"1d": "metric_rollup_1d",
}
def compute_rollup(resolution: str, window_start: datetime, db) -> int:
"""
Aggregates one time window and upserts into the destination rollup table.
Returns the number of rows written.
"""
delta = RESOLUTION_MAP[resolution]
window_end = window_start + delta
src = SOURCE_TABLE[resolution]
dst = DEST_TABLE[resolution]
rows = db.fetchall(
f"""SELECT name, tags,
MIN(value) AS v_min,
MAX(value) AS v_max,
AVG(value) AS v_avg,
SUM(value) AS v_sum,
COUNT(*) AS v_count
FROM {src}
WHERE ts >= %s AND ts < %s
GROUP BY name, tags""",
(window_start, window_end)
)
for row in rows:
db.execute(
f"""INSERT INTO {dst}
(name, tags, window_start, v_min, v_max, v_avg, v_sum, v_count)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (name, tags, window_start)
DO UPDATE SET
v_min=EXCLUDED.v_min, v_max=EXCLUDED.v_max,
v_avg=EXCLUDED.v_avg, v_sum=EXCLUDED.v_sum,
v_count=EXCLUDED.v_count""",
(row.name, json.dumps(row.tags), window_start,
row.v_min, row.v_max, row.v_avg, row.v_sum, row.v_count)
)
return len(rows)
Query Routing by Resolution
The query layer picks the cheapest table that covers the requested time range at the desired granularity. If the range is within 7 days and the granularity is under 5 minutes, it hits raw. For granularity of 5 minutes to 1 hour, it hits metric_rollup_5m, and so on.
def query_metrics(name: str, start: datetime, end: datetime,
resolution: str, db) -> list[dict]:
"""
Routes the query to the appropriate rollup table.
resolution: 'raw' | '5m' | '1h' | '1d'
"""
table = {
"raw": "metric_raw",
"5m": "metric_rollup_5m",
"1h": "metric_rollup_1h",
"1d": "metric_rollup_1d",
}[resolution]
if resolution == "raw":
return db.fetchall(
f"SELECT ts, value FROM {table} "
"WHERE name=%s AND ts>=%s AND ts=%s AND window_start<%s "
"ORDER BY window_start",
(name, start, end)
)
Gap Filling
Some metrics are sparse: if no data arrived during a window, there is no rollup row. Dashboards expect a continuous series. PostgreSQL's generate_series combined with a LEFT JOIN fills the gaps with NULLs, which the frontend renders as “no data” rather than skipping the time point entirely.
SELECT
g.window_start,
COALESCE(r.v_avg, NULL) AS avg_value
FROM generate_series(
'2025-01-01 00:00'::TIMESTAMPTZ,
'2025-01-01 23:55'::TIMESTAMPTZ,
INTERVAL '5 minutes'
) AS g(window_start)
LEFT JOIN metric_rollup_5m r
ON r.name = 'cpu.usage'
AND r.window_start = g.window_start
ORDER BY g.window_start;
Database Schema
-- Raw table, partitioned by day
CREATE TABLE metric_raw (
id BIGSERIAL,
name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
value DOUBLE PRECISION NOT NULL,
ts TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (ts);
CREATE TABLE metric_rollup_5m (
name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
window_start TIMESTAMPTZ NOT NULL,
v_min DOUBLE PRECISION,
v_max DOUBLE PRECISION,
v_avg DOUBLE PRECISION,
v_sum DOUBLE PRECISION,
v_count BIGINT,
PRIMARY KEY (name, tags, window_start)
);
CREATE TABLE metric_rollup_1h (LIKE metric_rollup_5m INCLUDING ALL);
CREATE TABLE metric_rollup_1d (LIKE metric_rollup_5m INCLUDING ALL);
CREATE TABLE rollup_job (
id BIGSERIAL PRIMARY KEY,
resolution VARCHAR(8) NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
rows_written INT,
UNIQUE (resolution, window_start)
);
Retention Policy
Retention is enforced by a nightly DELETE job or, better, by dropping old partitions directly (partition drop is near-instant). The tiers are: raw data kept for 7 days, 5-minute rollups for 30 days, 1-hour rollups for 1 year, and 1-day rollups forever. With these tiers a 13-month chart always resolves by hitting only the 1-day table regardless of how far back it reaches.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are rollups computed without double-counting?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each rollup job reads from the previous tier for exactly one non-overlapping window and upserts the result using ON CONFLICT DO UPDATE. Running the job twice for the same window overwrites rather than appends, so there is no double-counting regardless of retries.”
}
},
{
“@type”: “Question”,
“name”: “How is gap filling handled for sparse metrics?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A generate_series call produces every expected window timestamp, and a LEFT JOIN against the rollup table fills unmatched slots with NULL. The frontend receives a complete time series and renders NULL slots as gaps or zeros depending on the display preference.”
}
},
{
“@type”: “Question”,
“name”: “Why are there separate retention tiers instead of one table?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Separate tables allow independent retention policies and partition-level drops, which are nearly instantaneous. A single table with a timestamp column would require expensive DELETE scans. Separate tables also let the query router pick the smallest dataset that satisfies the query.”
}
},
{
“@type”: “Question”,
“name”: “How does the system decide which rollup table to query?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The query layer uses the requested granularity and time range to pick a table. Requests with granularity under 5 minutes and a range within 7 days go to metric_raw. Requests for 5-minute granularity within 30 days go to metric_rollup_5m, and so on. The caller specifies the resolution; the router translates it to a table name.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are rollup jobs made idempotent?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each rollup uses INSERT … ON CONFLICT (metric_name, tags, bucket_start) DO UPDATE, so re-running a job over the same window produces the same result.”
}
},
{
“@type”: “Question”,
“name”: “How are queries routed to the correct resolution table?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The query layer compares the requested time range to resolution thresholds; ranges over 7 days use the 1h table, over 30 days use the 1d table, shorter ranges use raw or 5m.”
}
},
{
“@type”: “Question”,
“name”: “How are gaps in time series filled?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A generate_series() call produces the full expected time grid; a LEFT JOIN with the metric table returns NULL for missing buckets, which COALESCE fills with zero or the last known value.”
}
},
{
“@type”: “Question”,
“name”: “How is retention enforced per resolution tier?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A scheduled job runs DELETE FROM metric_raw WHERE recorded_at < NOW() – INTERVAL '7 days' using partition detach and drop for efficiency on partitioned tables."
}
}
]
}
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering