Overview
An event analytics pipeline is the backbone of any product analytics product. It ingests raw behavioral events from client SDKs, buffers them reliably, processes and enriches them in a stream, stores them in a query-optimized OLAP store, and exposes a query API for dashboards and ad-hoc analysis. This low level design walks through every component with concrete data structures, API contracts, and scaling decisions.
Requirements
Functional
- Accept events from web, iOS, and Android SDKs over HTTP.
- Validate, enrich (geo-IP, user-agent parsing), and deduplicate events.
- Support arbitrary event schemas defined by tenants (schema-on-read with optional schema enforcement).
- Store events in an OLAP engine queryable by SQL.
- Expose a query API that translates structured queries into SQL and returns results with pagination.
- Retain raw events for 12 months; aggregated data indefinitely.
Non-functional
- Ingest: 1 M events/sec sustained, 3 M/sec burst.
- End-to-end latency (event visible in query results): < 30 s p99.
- Query API p99 for last-7-day queries on a single tenant: < 2 s.
- Durability: zero data loss after acknowledgment.
Event Schema Design
Canonical Event Envelope
All events, regardless of tenant-defined properties, share a canonical envelope:
{
// System fields (set by collection layer, immutable)
event_id: "018f-...", // UUIDv7 (time-ordered)
tenant_id: "acme",
received_at: "2024-04-13T10:00:00.123Z",
sdk_version: "js-2.4.1",
ip: "203.0.113.5", // dropped after enrichment
// Client fields (set by SDK)
event_type: "button_click",
sent_at: "2024-04-13T09:59:59.900Z",
user_id: "u_abc",
anonymous_id: "anon_xyz",
session_id: "sess_qrs",
// Enriched fields (set by enrichment layer)
country: "US",
region: "CA",
city: "San Francisco",
device_type: "desktop",
browser: "Chrome",
os: "macOS",
// Tenant-defined properties (arbitrary key-value)
properties: {
button_label: "Upgrade",
page: "/pricing",
plan: "pro"
}
}
UUIDv7 for event_id
UUIDv7 encodes a millisecond timestamp in the high bits, making IDs time-ordered. This gives two benefits: deduplication state can be purged by time without a full scan, and OLAP engines can use the ID as a clustering key for efficient range scans.
Collection SDK
JavaScript SDK
The SDK is loaded as a snippet that bootstraps an async queue. Key behaviors:
- Buffering: events are buffered in memory and flushed every 2 s or at 25 events.
- Persistence: unflushed events are serialized to localStorage on visibilitychange (tab hide) so they survive page navigations.
- Transport: primary transport is
fetchwith keepalive. Fallback isnavigator.sendBeaconfor unload events. Both use gzip-compressed JSON batches. - Retry: exponential backoff (1 s, 2 s, 4 s) on 5xx or network error. 429 responses respect the Retry-After header.
- Identity: anonymous_id is a UUIDv4 persisted in a first-party cookie (SameSite=Lax, 2-year expiry). user_id is set via an explicit identify() call and joined server-side.
Mobile SDKs (iOS / Android)
Mobile SDKs use SQLite as a durable event queue. Events are written synchronously to SQLite before the enqueue call returns, guaranteeing no data loss even on app crash. A background worker dequeues and POSTs batches, deleting rows only after HTTP 200/202. Batches are capped at 500 KB to avoid request timeouts on slow mobile networks.
Ingestion Buffer
Collection Endpoint
Stateless Go services accept POST /v1/batch with a JSON array of events. Per-request processing:
- Authenticate: validate API key against an in-memory LRU cache (1 000 tenants, 60 s TTL).
- Parse: unmarshal JSON, reject malformed payloads with HTTP 400.
- Stamp: set received_at = now(), derive server_id for tracing.
- Size check: reject batches > 1 MB (HTTP 413).
- Produce to Kafka topic
events.raw, partition key = tenant_id. - Return HTTP 202 (async; no processing guarantee beyond Kafka durability).
The Kafka producer is configured with acks=all, enable.idempotence=true, compression.type=snappy, linger.ms=10, batch.size=256KB. This achieves > 98% compression on typical JSON event payloads and batches network I/O.
Kafka Topic Configuration
Topic: events.raw
Partitions: 512
Replication factor: 3
Min ISR: 2
Retention: 48 h (safety buffer; Flink processes within seconds)
Segment size: 256 MB
Partition key: tenant_id (ensures per-tenant ordering)
Stream Processing
Enrichment Job (Flink)
The enrichment Flink job reads from events.raw and writes to events.enriched. Per-event operations (all asynchronous to avoid blocking the main pipeline thread):
- Geo-IP lookup: async call to an in-process MaxMind GeoLite2-City database (memory-mapped, ~60 MB). Resolves country, region, city from IP. IP is then dropped.
- User-agent parsing: ua-parser2 in-process library. Resolves device_type, browser, browser_version, os, os_version.
- Identity resolution: async lookup of user_id → canonical_user_id mapping from a Redis hash (populated by identify() calls). Merges anonymous_id to canonical_user_id if a mapping exists.
- Schema validation: optional per-tenant JSON Schema validation. Violations are routed to a dead-letter topic
events.dlqrather than dropped.
Deduplication Job (Flink)
A separate Flink job reads from events.enriched and performs deduplication before writing to events.deduped.
State: RocksDB-backed keyed state on event_id, TTL = 1 hour (covers SDK retry windows). On each event:
- If event_id exists in state: drop the event, increment a dedup_count metric.
- If not: add to state, forward the event.
The 1-hour TTL means events arriving more than 1 hour late could produce duplicates. The OLAP storage layer handles this with an idempotent upsert keyed on event_id.
OLAP Storage
ClickHouse Schema
Raw events are stored in a ClickHouse distributed table:
CREATE TABLE events (
tenant_id LowCardinality(String),
event_id String,
event_type LowCardinality(String),
received_at DateTime64(3),
sent_at DateTime64(3),
user_id String,
anonymous_id String,
session_id String,
country LowCardinality(String),
region LowCardinality(String),
device_type LowCardinality(String),
browser LowCardinality(String),
os LowCardinality(String),
properties String -- JSON blob
) ENGINE = ReplicatedReplacingMergeTree(received_at)
PARTITION BY (tenant_id, toYYYYMM(received_at))
ORDER BY (tenant_id, event_type, toStartOfHour(received_at), event_id)
SETTINGS index_granularity = 8192;
ReplacingMergeTree (replacing version = received_at) handles late duplicate inserts: ClickHouse background merges keep only the row with the highest received_at per sort key, which for duplicate event_ids means the later (re-inserted) copy. Combined with ORDER BY including event_id, this achieves eventual deduplication at the storage layer.
The properties JSON blob is queried using ClickHouse's JSON path functions (JSONExtractString, JSONExtractFloat). For high-cardinality property columns accessed frequently, tenants can define materialized columns:
ALTER TABLE events ADD COLUMN plan LowCardinality(String)
MATERIALIZED JSONExtractString(properties, 'plan');
ClickHouse Sink (Flink)
The Flink ClickHouse sink uses the official JDBC connector with batching: flush every 5 s or 10 000 rows, whichever comes first. Inserts use HTTP interface for better batching semantics. Checkpointing ensures exactly-once delivery at the Flink level; ClickHouse ReplacingMergeTree handles the storage-level idempotency.
Query API
Query Model
The query API accepts structured JSON queries, avoiding the need to expose raw SQL to tenants:
POST /v1/query
{
"tenant_id": "acme",
"event_type": "button_click",
"time_range": {
"start": "2024-04-06T00:00:00Z",
"end": "2024-04-13T00:00:00Z"
},
"filters": [
{field: "country", op: "eq", value: "US"},
{field: "properties.plan", op: "in", value: ["pro","enterprise"]}
],
"aggregations": [
{function: "count", alias: "total"},
{function: "count_distinct", field: "user_id", alias: "unique_users"}
],
"group_by": ["country", "device_type"],
"order_by": [{field: "total", direction: "desc"}],
"limit": 100,
"cursor": null
}
SQL Translation
The query service translates the above into parameterized ClickHouse SQL:
SELECT
country,
device_type,
count() AS total,
uniq(user_id) AS unique_users
FROM events
WHERE tenant_id = 'acme'
AND event_type = 'button_click'
AND received_at >= '2024-04-06 00:00:00'
AND received_at < '2024-04-13 00:00:00'
AND country = 'US'
AND JSONExtractString(properties, 'plan') IN ('pro','enterprise')
GROUP BY country, device_type
ORDER BY total DESC
LIMIT 101; -- fetch 101 to detect if there is a next page
Cursor-based pagination encodes the last row's sort key values. The next page query adds a WHERE clause on those values, avoiding the OFFSET performance problem in ClickHouse.
Query Timeout and Circuit Breaker
All ClickHouse queries run with max_execution_time=10 (seconds). The query service imposes a 15 s application-level timeout. A per-tenant circuit breaker (Hystrix-style) opens after 5 consecutive timeouts, returning HTTP 503 with a Retry-After header, preventing a runaway query from consuming all ClickHouse threads.
Identity Resolution
When a user identifies (logs in), the SDK calls analytics.identify(userId). This produces an identify event stored in a separate Redis hash: HSET identity:{tenant} {anonymous_id} {user_id}. The enrichment Flink job looks up this mapping for each event, backfilling user_id on events that previously had only anonymous_id. Historical event backfill (retroactively assigning user_id to past anonymous events) is handled by a nightly batch Spark job that rewrites affected ClickHouse partitions.
Data Retention and TTL
-- Raw events: 12-month TTL
ALTER TABLE events MODIFY TTL received_at + INTERVAL 12 MONTH DELETE;
-- Rollup tables: no TTL (retain indefinitely)
-- Kafka events.raw: 48 h
-- Kafka events.enriched, events.deduped: 6 h
Monitoring and Observability
- Collection lag: Kafka consumer group lag for Flink enrichment and dedup jobs. Alert if lag > 100 K events.
- End-to-end latency: a synthetic event injected every 30 s with a known event_id. Query API polls until it appears. Alert if latency > 30 s.
- DLQ depth: events.dlq message count. Alert on any new messages (indicates schema violations or enrichment failures).
- ClickHouse merge depth: number of parts per partition. Alert if parts > 300 (indicates insert rate exceeding merge rate).
Interview Tips
- Be ready to discuss why deduplication happens in Flink (low latency, exact dedup within retry window) AND in ClickHouse (eventual correctness for late arrivals).
- Explain the trade-off between schema-on-read (flexible, slower queries on properties JSON) vs materialized columns (fast, requires schema decisions upfront).
- Know that ClickHouse ReplacingMergeTree deduplication is eventual — reads may see duplicates until a merge happens. Use FINAL modifier or deduplicate at query time for correctness-critical queries.
- Discuss identity stitching complexity: the retroactive backfill problem and why it requires a batch rewrite rather than a streaming update.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering