A metrics pipeline collects numeric time-series data from every service and infrastructure component, aggregates it efficiently, stores it compactly, and powers real-time alerting and dashboards. This guide walks through a production-grade design from ingestion to alert delivery.
Requirements
Functional Requirements
- Accept counters, gauges, histograms, and summaries from services
- Aggregate raw data points into rollups (1-minute, 1-hour, 1-day)
- Store raw data for 7 days; rollups for 30 days (1-min) and 1 year (1-hr)
- Evaluate alert rules every 60 seconds; deliver notifications via PagerDuty/Slack/email
- Protect against cardinality explosion
Non-Functional Requirements
- Ingest: 5M data points/sec
- Query latency: p99 < 500ms for last-1h dashboard queries
- Durability: no metric loss after acknowledgment
- Cardinality limit: 10,000 unique tag combinations per metric name
Data Model
MetricPoint Schema (TimescaleDB Hypertable)
CREATE TABLE metric_points (
metric_name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
value DOUBLE PRECISION NOT NULL,
timestamp TIMESTAMPTZ NOT NULL
);
-- Convert to hypertable partitioned by 1-hour chunks
SELECT create_hypertable(
'metric_points',
'timestamp',
chunk_time_interval => INTERVAL '1 hour'
);
-- Composite index for fast per-metric queries
CREATE INDEX idx_mp_metric_time ON metric_points (metric_name, timestamp DESC);
-- GIN index for tag filtering
CREATE INDEX idx_mp_tags ON metric_points USING GIN (tags);
Rollup Tables
CREATE TABLE metric_rollup_1m (
metric_name VARCHAR(255) NOT NULL,
tags JSONB NOT NULL DEFAULT '{}',
bucket TIMESTAMPTZ NOT NULL, -- truncated to 1 minute
min_val DOUBLE PRECISION NOT NULL,
max_val DOUBLE PRECISION NOT NULL,
avg_val DOUBLE PRECISION NOT NULL,
sum_val DOUBLE PRECISION NOT NULL,
count_val BIGINT NOT NULL,
p50 DOUBLE PRECISION,
p95 DOUBLE PRECISION,
p99 DOUBLE PRECISION,
PRIMARY KEY (metric_name, tags, bucket)
);
-- Mirror structure for 1-hour rollups
CREATE TABLE metric_rollup_1h (LIKE metric_rollup_1m INCLUDING ALL);
Alert Rules Schema
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
metric_name VARCHAR(255) NOT NULL,
tag_filter JSONB, -- e.g. {"env":"production"}
condition VARCHAR(10) NOT NULL, -- gt | lt | gte | lte | eq
threshold DOUBLE PRECISION NOT NULL,
window_minutes INT NOT NULL DEFAULT 5, -- evaluation window
severity VARCHAR(20) NOT NULL, -- critical | warning | info
notification_channel VARCHAR(255) NOT NULL, -- pagerduty | slack | email
enabled BOOLEAN NOT NULL DEFAULT TRUE,
last_fired_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Architecture
Pipeline Overview
┌──────────────────────────┐
│ Services / Hosts │
│ (Prometheus exporters, │
│ StatsD clients) │
└────────────┬─────────────┘
│ push (StatsD UDP) / pull (Prometheus scrape)
▼
┌────────────────────────┐
│ Ingestion Gateway │ validates, normalizes, checks cardinality
│ (stateless, N pods) │
└────────────┬───────────┘
│ produce to Kafka
▼
┌────────────────────────┐
│ Kafka Cluster │ topic-per-metric-type (counters / gauges / histograms)
│ 64 partitions each │
└───────┬────────────────┘
│ consume (consumer group)
▼
┌────────────────────────┐ ┌──────────────────────┐
│ Aggregation Workers │───────▶│ TimescaleDB │
│ (stateful, per-shard) │ │ (raw + rollups) │
└────────────────────────┘ └──────────────────────┘
│
┌───────────▼──────────────┐
│ Alert Evaluator │
│ (runs every 60s) │
└───────────┬──────────────┘
│
┌───────────▼──────────────┐
│ Notification Router │
│ PagerDuty / Slack / Email│
└──────────────────────────┘
Ingestion
StatsD/Prometheus Push Gateway
# StatsD wire format (UDP)
api.request.duration:142|ms|#env:production,service:checkout
api.requests.total:1|c|#env:production,service:checkout,status:200
memory.used_bytes:536870912|g|#host:ip-10-0-1-42
# Prometheus push gateway (used by batch jobs that can't be scraped)
POST http://pushgateway:9091/metrics/job/batch-importer
# TYPE batch_records_processed counter
batch_records_processed{env="production"} 84302
Cardinality Protection
class CardinalityGuard:
def __init__(self, limit=10_000):
self.limit = limit
self.counts = {} # metric_name -> HyperLogLog
def check(self, metric_name: str, tags: dict) -> bool:
tag_fingerprint = stable_hash(sorted(tags.items()))
hll = self.counts.setdefault(metric_name, HyperLogLog(error_rate=0.01))
if hll.cardinality() >= self.limit and tag_fingerprint not in hll:
raise CardinalityLimitExceeded(
f"{metric_name} exceeds {self.limit} unique tag combinations"
)
hll.add(tag_fingerprint)
return True
Aggregation Worker
Per-Minute Rollup via TimescaleDB Continuous Aggregates
CREATE MATERIALIZED VIEW metric_rollup_1m_cagg
WITH (timescaledb.continuous) AS
SELECT
metric_name,
tags,
time_bucket('1 minute', timestamp) AS bucket,
MIN(value) AS min_val,
MAX(value) AS max_val,
AVG(value) AS avg_val,
SUM(value) AS sum_val,
COUNT(*) AS count_val,
percentile_cont(0.50) WITHIN GROUP (ORDER BY value) AS p50,
percentile_cont(0.95) WITHIN GROUP (ORDER BY value) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY value) AS p99
FROM metric_points
GROUP BY metric_name, tags, bucket;
-- Refresh policy: materialize new 1-min buckets as data arrives
SELECT add_continuous_aggregate_policy(
'metric_rollup_1m_cagg',
start_offset => INTERVAL '10 minutes',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '1 minute'
);
Hourly Rollup from 1-Minute Rollup
CREATE MATERIALIZED VIEW metric_rollup_1h_cagg
WITH (timescaledb.continuous) AS
SELECT
metric_name,
tags,
time_bucket('1 hour', bucket) AS bucket,
MIN(min_val) AS min_val,
MAX(max_val) AS max_val,
SUM(avg_val * count_val) / SUM(count_val) AS avg_val,
SUM(sum_val) AS sum_val,
SUM(count_val) AS count_val
FROM metric_rollup_1m_cagg
GROUP BY metric_name, tags, time_bucket('1 hour', bucket);
Retention Policies
-- Raw data: keep 7 days
SELECT add_retention_policy('metric_points', INTERVAL '7 days');
-- 1-minute rollup: keep 30 days
SELECT add_retention_policy('metric_rollup_1m_cagg', INTERVAL '30 days');
-- 1-hour rollup: keep 1 year
SELECT add_retention_policy('metric_rollup_1h_cagg', INTERVAL '365 days');
Alert Evaluation
Evaluator Loop
class AlertEvaluator:
def run_forever(self):
while True:
rules = db.fetch("SELECT * FROM alert_rules WHERE enabled = TRUE")
for rule in rules:
self.evaluate(rule)
sleep(60)
def evaluate(self, rule):
val = db.fetchone("""
SELECT avg_val FROM metric_rollup_1m_cagg
WHERE metric_name = %s
AND tags @> %s::jsonb
AND bucket >= NOW() - (%s * INTERVAL '1 minute')
ORDER BY bucket DESC LIMIT 1
""", [rule.metric_name, json.dumps(rule.tag_filter), rule.window_minutes])
if val is None:
self.fire_dead_mans_switch(rule)
return
if self.condition_met(val, rule.condition, rule.threshold):
self.fire_alert(rule, val)
def condition_met(self, val, condition, threshold):
return {
'gt': val > threshold,
'lt': val = threshold,
'lte': val <= threshold,
'eq': val == threshold,
}[condition]
Dead Man's Switch
A dead man's switch fires when a metric stops arriving — useful for detecting silent failures in batch jobs or exporters.
def fire_dead_mans_switch(self, rule):
# No data in the window means the metric source is silent
self.notify(rule, message=f"No data received for {rule.metric_name} "
f"in the last {rule.window_minutes} minutes")
Query API
Time-Series Query
GET /metrics?name=api.request.duration&tags=env:production,service:checkout&from=2026-04-17T09:00:00Z&to=2026-04-17T10:00:00Z&resolution=1m&stat=p99
HTTP/1.1 200 OK
{
"metric": "api.request.duration",
"resolution": "1m",
"stat": "p99",
"datapoints": [
{ "bucket": "2026-04-17T09:00:00Z", "value": 142.0 },
{ "bucket": "2026-04-17T09:01:00Z", "value": 158.3 },
...
]
}
Scaling Considerations
- Kafka partitioning: Partition by
metric_namehash so all points for a metric land on the same aggregation worker, enabling in-memory pre-aggregation before database writes. - TimescaleDB compression: Enable columnar compression on chunks older than 7 days for 10–20x space reduction.
- Write batching: Buffer 5,000 points or 1 second of data in the aggregation worker before bulk-inserting to reduce WAL pressure.
- Federation: Run regional TimescaleDB nodes; a global query tier fans out and merges results for cross-region dashboards.
- Prometheus remote write: Existing Prometheus deployments can forward data via
remote_writeto the ingestion gateway — no agent changes needed.
Trade-offs
- TimescaleDB vs. InfluxDB vs. VictoriaMetrics: TimescaleDB is SQL-native and supports joins with relational data. VictoriaMetrics offers best-in-class compression and ingest speed but is purpose-built and less flexible for ad-hoc queries.
- Pre-aggregation vs. raw storage: Storing raw points and computing aggregates on read (Prometheus model) is simpler but expensive at scale. Pre-aggregating into rollups trades write amplification for fast reads.
- Histogram accuracy: Pre-computed percentiles lose bucket data. For high-accuracy percentiles at query time, store DDSketch summaries instead of raw histograms.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is cardinality in a metrics pipeline and why does it cause problems?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Cardinality is the number of unique time series created by a metric name and its tag combinations. A metric api.latency with tags for env, service, status_code, and user_id can produce millions of unique series if user_id has high cardinality. Each unique series requires its own storage, index entry, and aggregation state. Cardinality explosions exhaust memory in TSDB systems, cause OOM crashes, and make dashboards unusably slow. The solution is to enforce a hard limit (e.g., 10,000 unique tag combinations per metric) and reject new series that exceed it.”
}
},
{
“@type”: “Question”,
“name”: “How do TimescaleDB continuous aggregates work for metric rollups?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “TimescaleDB continuous aggregates are materialized views backed by a background worker that incrementally refreshes only the new or changed time buckets rather than recomputing the entire view. A refresh policy triggers every minute and materializes the latest 1-minute aggregates (min, max, avg, p50, p95, p99) from raw metric_points data. Because only new chunks are processed, refresh is O(new data) not O(total data), making it practical to maintain multiple rollup granularities without full table scans.”
}
},
{
“@type”: “Question”,
“name”: “What is a dead man's switch in alerting and when should you use it?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A dead man's switch alert fires when a metric stops arriving rather than when it crosses a threshold. It detects silent failures: a batch job that stops emitting its heartbeat metric, an exporter that crashes, or a network partition that blocks metric delivery. You configure it by asserting that a metric must have been seen within the last N minutes; absence triggers the alert. Use it for any critical process whose failure would produce no error metric of its own.”
}
},
{
“@type”: “Question”,
“name”: “What is the difference between a counter, gauge, histogram, and summary metric type?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A counter is a monotonically increasing integer (e.g., total requests served) that only resets on process restart; rates are computed by derivative. A gauge is a value that can go up or down arbitrarily (e.g., current memory usage, queue depth). A histogram buckets observations into pre-defined ranges and records a count per bucket plus a running sum, enabling server-side percentile approximation across replicas. A summary computes configurable quantiles (e.g., p50, p99) client-side over a sliding time window and is accurate but cannot be aggregated across instances.”
}
}
]
}
Frequently Asked Questions
What is cardinality in a metrics pipeline and why does it cause problems?
Cardinality is the number of unique time series created by a metric name and its tag combinations. A metric api.latency with tags for env, service, status_code, and user_id can produce millions of unique series if user_id has high cardinality. Each unique series requires its own storage, index entry, and aggregation state. Cardinality explosions exhaust memory in TSDB systems, cause OOM crashes, and make dashboards unusably slow. The solution is to enforce a hard limit (e.g., 10,000 unique tag combinations per metric) and reject new series that exceed it.
How do TimescaleDB continuous aggregates work for metric rollups?
TimescaleDB continuous aggregates are materialized views backed by a background worker that incrementally refreshes only the new or changed time buckets rather than recomputing the entire view. A refresh policy triggers every minute and materializes the latest 1-minute aggregates (min, max, avg, p50, p95, p99) from raw metric_points data. Because only new chunks are processed, refresh is O(new data) not O(total data), making it practical to maintain multiple rollup granularities without full table scans.
What is a dead man's switch in alerting and when should you use it?
A dead man's switch alert fires when a metric stops arriving rather than when it crosses a threshold. It detects silent failures: a batch job that stops emitting its heartbeat metric, an exporter that crashes, or a network partition that blocks metric delivery. You configure it by asserting that a metric must have been seen within the last N minutes; absence triggers the alert. Use it for any critical process whose failure would produce no error metric of its own.
What is the difference between a counter, gauge, histogram, and summary metric type?
A counter is a monotonically increasing integer (e.g., total requests served) that only resets on process restart; rates are computed by derivative. A gauge is a value that can go up or down arbitrarily (e.g., current memory usage, queue depth). A histogram buckets observations into pre-defined ranges and records a count per bucket plus a running sum, enabling server-side percentile approximation across replicas. A summary computes configurable quantiles (e.g., p50, p99) client-side over a sliding time window and is accurate but cannot be aggregated across instances.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering