Low Level Design: Event Analytics Pipeline

Overview

An event analytics pipeline is the backbone of any product analytics product. It ingests raw behavioral events from client SDKs, buffers them reliably, processes and enriches them in a stream, stores them in a query-optimized OLAP store, and exposes a query API for dashboards and ad-hoc analysis. This low level design walks through every component with concrete data structures, API contracts, and scaling decisions.

Requirements

Functional

  • Accept events from web, iOS, and Android SDKs over HTTP.
  • Validate, enrich (geo-IP, user-agent parsing), and deduplicate events.
  • Support arbitrary event schemas defined by tenants (schema-on-read with optional schema enforcement).
  • Store events in an OLAP engine queryable by SQL.
  • Expose a query API that translates structured queries into SQL and returns results with pagination.
  • Retain raw events for 12 months; aggregated data indefinitely.

Non-functional

  • Ingest: 1 M events/sec sustained, 3 M/sec burst.
  • End-to-end latency (event visible in query results): < 30 s p99.
  • Query API p99 for last-7-day queries on a single tenant: < 2 s.
  • Durability: zero data loss after acknowledgment.

Event Schema Design

Canonical Event Envelope

All events, regardless of tenant-defined properties, share a canonical envelope:

{
  // System fields (set by collection layer, immutable)
  event_id:      "018f-...",          // UUIDv7 (time-ordered)
  tenant_id:     "acme",
  received_at:   "2024-04-13T10:00:00.123Z",
  sdk_version:   "js-2.4.1",
  ip:            "203.0.113.5",       // dropped after enrichment
  // Client fields (set by SDK)
  event_type:    "button_click",
  sent_at:       "2024-04-13T09:59:59.900Z",
  user_id:       "u_abc",
  anonymous_id:  "anon_xyz",
  session_id:    "sess_qrs",
  // Enriched fields (set by enrichment layer)
  country:       "US",
  region:        "CA",
  city:          "San Francisco",
  device_type:   "desktop",
  browser:       "Chrome",
  os:            "macOS",
  // Tenant-defined properties (arbitrary key-value)
  properties: {
    button_label: "Upgrade",
    page:         "/pricing",
    plan:         "pro"
  }
}

UUIDv7 for event_id

UUIDv7 encodes a millisecond timestamp in the high bits, making IDs time-ordered. This gives two benefits: deduplication state can be purged by time without a full scan, and OLAP engines can use the ID as a clustering key for efficient range scans.

Collection SDK

JavaScript SDK

The SDK is loaded as a snippet that bootstraps an async queue. Key behaviors:

  • Buffering: events are buffered in memory and flushed every 2 s or at 25 events.
  • Persistence: unflushed events are serialized to localStorage on visibilitychange (tab hide) so they survive page navigations.
  • Transport: primary transport is fetch with keepalive. Fallback is navigator.sendBeacon for unload events. Both use gzip-compressed JSON batches.
  • Retry: exponential backoff (1 s, 2 s, 4 s) on 5xx or network error. 429 responses respect the Retry-After header.
  • Identity: anonymous_id is a UUIDv4 persisted in a first-party cookie (SameSite=Lax, 2-year expiry). user_id is set via an explicit identify() call and joined server-side.

Mobile SDKs (iOS / Android)

Mobile SDKs use SQLite as a durable event queue. Events are written synchronously to SQLite before the enqueue call returns, guaranteeing no data loss even on app crash. A background worker dequeues and POSTs batches, deleting rows only after HTTP 200/202. Batches are capped at 500 KB to avoid request timeouts on slow mobile networks.

Ingestion Buffer

Collection Endpoint

Stateless Go services accept POST /v1/batch with a JSON array of events. Per-request processing:

  1. Authenticate: validate API key against an in-memory LRU cache (1 000 tenants, 60 s TTL).
  2. Parse: unmarshal JSON, reject malformed payloads with HTTP 400.
  3. Stamp: set received_at = now(), derive server_id for tracing.
  4. Size check: reject batches > 1 MB (HTTP 413).
  5. Produce to Kafka topic events.raw, partition key = tenant_id.
  6. Return HTTP 202 (async; no processing guarantee beyond Kafka durability).

The Kafka producer is configured with acks=all, enable.idempotence=true, compression.type=snappy, linger.ms=10, batch.size=256KB. This achieves > 98% compression on typical JSON event payloads and batches network I/O.

Kafka Topic Configuration

Topic: events.raw
  Partitions:         512
  Replication factor: 3
  Min ISR:            2
  Retention:          48 h (safety buffer; Flink processes within seconds)
  Segment size:       256 MB
  Partition key:      tenant_id (ensures per-tenant ordering)

Stream Processing

Enrichment Job (Flink)

The enrichment Flink job reads from events.raw and writes to events.enriched. Per-event operations (all asynchronous to avoid blocking the main pipeline thread):

  • Geo-IP lookup: async call to an in-process MaxMind GeoLite2-City database (memory-mapped, ~60 MB). Resolves country, region, city from IP. IP is then dropped.
  • User-agent parsing: ua-parser2 in-process library. Resolves device_type, browser, browser_version, os, os_version.
  • Identity resolution: async lookup of user_id → canonical_user_id mapping from a Redis hash (populated by identify() calls). Merges anonymous_id to canonical_user_id if a mapping exists.
  • Schema validation: optional per-tenant JSON Schema validation. Violations are routed to a dead-letter topic events.dlq rather than dropped.

Deduplication Job (Flink)

A separate Flink job reads from events.enriched and performs deduplication before writing to events.deduped.

State: RocksDB-backed keyed state on event_id, TTL = 1 hour (covers SDK retry windows). On each event:

  • If event_id exists in state: drop the event, increment a dedup_count metric.
  • If not: add to state, forward the event.

The 1-hour TTL means events arriving more than 1 hour late could produce duplicates. The OLAP storage layer handles this with an idempotent upsert keyed on event_id.

OLAP Storage

ClickHouse Schema

Raw events are stored in a ClickHouse distributed table:

CREATE TABLE events (
  tenant_id      LowCardinality(String),
  event_id       String,
  event_type     LowCardinality(String),
  received_at    DateTime64(3),
  sent_at        DateTime64(3),
  user_id        String,
  anonymous_id   String,
  session_id     String,
  country        LowCardinality(String),
  region         LowCardinality(String),
  device_type    LowCardinality(String),
  browser        LowCardinality(String),
  os             LowCardinality(String),
  properties     String              -- JSON blob
) ENGINE = ReplicatedReplacingMergeTree(received_at)
  PARTITION BY (tenant_id, toYYYYMM(received_at))
  ORDER BY (tenant_id, event_type, toStartOfHour(received_at), event_id)
  SETTINGS index_granularity = 8192;

ReplacingMergeTree (replacing version = received_at) handles late duplicate inserts: ClickHouse background merges keep only the row with the highest received_at per sort key, which for duplicate event_ids means the later (re-inserted) copy. Combined with ORDER BY including event_id, this achieves eventual deduplication at the storage layer.

The properties JSON blob is queried using ClickHouse's JSON path functions (JSONExtractString, JSONExtractFloat). For high-cardinality property columns accessed frequently, tenants can define materialized columns:

ALTER TABLE events ADD COLUMN plan LowCardinality(String)
  MATERIALIZED JSONExtractString(properties, 'plan');

ClickHouse Sink (Flink)

The Flink ClickHouse sink uses the official JDBC connector with batching: flush every 5 s or 10 000 rows, whichever comes first. Inserts use HTTP interface for better batching semantics. Checkpointing ensures exactly-once delivery at the Flink level; ClickHouse ReplacingMergeTree handles the storage-level idempotency.

Query API

Query Model

The query API accepts structured JSON queries, avoiding the need to expose raw SQL to tenants:

POST /v1/query
{
  "tenant_id":  "acme",
  "event_type": "button_click",
  "time_range": {
    "start": "2024-04-06T00:00:00Z",
    "end":   "2024-04-13T00:00:00Z"
  },
  "filters": [
    {field: "country",             op: "eq",       value: "US"},
    {field: "properties.plan",     op: "in",       value: ["pro","enterprise"]}
  ],
  "aggregations": [
    {function: "count",            alias: "total"},
    {function: "count_distinct",   field: "user_id", alias: "unique_users"}
  ],
  "group_by":   ["country", "device_type"],
  "order_by":   [{field: "total", direction: "desc"}],
  "limit":      100,
  "cursor":     null
}

SQL Translation

The query service translates the above into parameterized ClickHouse SQL:

SELECT
  country,
  device_type,
  count()                    AS total,
  uniq(user_id)              AS unique_users
FROM events
WHERE tenant_id = 'acme'
  AND event_type = 'button_click'
  AND received_at >= '2024-04-06 00:00:00'
  AND received_at <  '2024-04-13 00:00:00'
  AND country = 'US'
  AND JSONExtractString(properties, 'plan') IN ('pro','enterprise')
GROUP BY country, device_type
ORDER BY total DESC
LIMIT 101;  -- fetch 101 to detect if there is a next page

Cursor-based pagination encodes the last row's sort key values. The next page query adds a WHERE clause on those values, avoiding the OFFSET performance problem in ClickHouse.

Query Timeout and Circuit Breaker

All ClickHouse queries run with max_execution_time=10 (seconds). The query service imposes a 15 s application-level timeout. A per-tenant circuit breaker (Hystrix-style) opens after 5 consecutive timeouts, returning HTTP 503 with a Retry-After header, preventing a runaway query from consuming all ClickHouse threads.

Identity Resolution

When a user identifies (logs in), the SDK calls analytics.identify(userId). This produces an identify event stored in a separate Redis hash: HSET identity:{tenant} {anonymous_id} {user_id}. The enrichment Flink job looks up this mapping for each event, backfilling user_id on events that previously had only anonymous_id. Historical event backfill (retroactively assigning user_id to past anonymous events) is handled by a nightly batch Spark job that rewrites affected ClickHouse partitions.

Data Retention and TTL

-- Raw events: 12-month TTL
ALTER TABLE events MODIFY TTL received_at + INTERVAL 12 MONTH DELETE;
-- Rollup tables: no TTL (retain indefinitely)
-- Kafka events.raw: 48 h
-- Kafka events.enriched, events.deduped: 6 h

Monitoring and Observability

  • Collection lag: Kafka consumer group lag for Flink enrichment and dedup jobs. Alert if lag > 100 K events.
  • End-to-end latency: a synthetic event injected every 30 s with a known event_id. Query API polls until it appears. Alert if latency > 30 s.
  • DLQ depth: events.dlq message count. Alert on any new messages (indicates schema violations or enrichment failures).
  • ClickHouse merge depth: number of parts per partition. Alert if parts > 300 (indicates insert rate exceeding merge rate).

Interview Tips

  • Be ready to discuss why deduplication happens in Flink (low latency, exact dedup within retry window) AND in ClickHouse (eventual correctness for late arrivals).
  • Explain the trade-off between schema-on-read (flexible, slower queries on properties JSON) vs materialized columns (fast, requires schema decisions upfront).
  • Know that ClickHouse ReplacingMergeTree deduplication is eventual — reads may see duplicates until a merge happens. Use FINAL modifier or deduplicate at query time for correctness-critical queries.
  • Discuss identity stitching complexity: the retroactive backfill problem and why it requires a batch rewrite rather than a streaming update.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

Scroll to Top