Streaming ETL Pipeline Low-Level Design
A streaming ETL pipeline continuously consumes events from a message broker, transforms them in real time, and writes results to downstream sinks. Unlike batch ETL, streaming ETL processes events within seconds of production, enabling real-time dashboards, fraud detection, and live ML feature updates. The central challenges are exactly-once processing, stateful computation, and failure recovery.
Consumer Groups and Partition Assignment
Kafka organizes topic data into partitions. A consumer group is a set of worker processes that collectively consume a topic: each partition is assigned to exactly one consumer in the group at any time. This provides horizontal scalability: adding more consumers (up to the partition count) increases throughput linearly.
Partition assignment is managed by the Kafka group coordinator using a rebalance protocol (eager or cooperative-sticky). Cooperative-sticky rebalancing minimizes partition reassignment on consumer joins/leaves, reducing the processing pause during rebalances.
Stateful Transformations with RocksDB
Stateless transformations (filter, map, type cast) require no state between records. Stateful transformations (windowed aggregation, deduplication, joins) require storing state across records.
Kafka Streams uses RocksDB as the local state store embedded in the consumer process. Each partition's state is stored locally, making state access fast (sub-millisecond). State is backed up to a Kafka changelog topic: on failure and restart, the consumer replays the changelog to restore state before resuming event processing. This provides durability without requiring a remote database for every state lookup.
Windowed aggregation: events are grouped by key and time window (e.g., count of clicks per user per 5-minute tumbling window). RocksDB stores window state keyed by (key, window_start). When the window closes, the aggregate is emitted downstream.
Stream Joins
Stream-stream join (event-time windowed): Two streams are joined on a shared key within a time window. Events from both streams are buffered in RocksDB for the join window duration. When a matching event arrives from the other stream within the window, the join produces output. Events outside the window are dropped. This is used for joining click events with impression events within a 1-hour attribution window.
Stream-table join (KTable enrichment): A KTable represents the current state of a compacted topic (latest value per key). The stream is enriched by looking up the KTable for each incoming event key. This is used for enriching user events with user profile data from a user profiles topic. KTable lookups are local (RocksDB), making them fast and not requiring a database round-trip.
Exactly-Once Semantics
Without exactly-once guarantees, failures cause either duplicate processing (at-least-once) or data loss (at-most-once). Kafka transactions provide exactly-once end-to-end:
- The producer is configured with
enable.idempotence=true(exactly-once per partition). - A transactional producer wraps the output write and the offset commit in a single atomic Kafka transaction. Either both succeed or both are aborted.
- On failure, the consumer restarts from the last committed offset. The output written in the failed transaction is invisible to downstream consumers (transactional isolation).
- Downstream consumers use
isolation.level=read_committedto only see outputs from committed transactions.
Dead Letter Queue
Records that cannot be processed — due to deserialization errors, schema violations, or unhandled processing exceptions — are routed to a dead letter topic (DLQ) instead of blocking the pipeline. Each DLQ record includes the original partition and offset, the error type and message, and the raw bytes. A separate DLQ consumer monitors the DLQ and alerts on error rate spikes. Dead letters can be replayed after the bug is fixed by re-publishing them to the source topic.
Backpressure
When a downstream sink (database, HTTP API, S3) is slow or unavailable, the consumer processes events faster than they can be written. The standard Kafka mechanism for backpressure is to pause consumption on the affected partitions (consumer.pause(partitions)), allowing downstream to recover without the consumer running out of memory. The consumer monitors downstream lag and resumes partitions when the sink is healthy.
Schema Evolution with Avro
Producers serialize events using Avro schemas registered in the schema registry. Consumers deserialize using their own version of the schema. The schema registry enforces backward compatibility so that new producer schema versions are always readable by consumers on the previous schema version, enabling rolling producer upgrades without consumer downtime.
SQL Schema (Checkpoint Store)
CREATE TABLE ETLCheckpoint (
id BIGSERIAL PRIMARY KEY,
consumer_group TEXT NOT NULL,
topic TEXT NOT NULL,
partition INTEGER NOT NULL,
offset BIGINT NOT NULL,
committed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (consumer_group, topic, partition)
);
CREATE TABLE ETLDeadLetter (
id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL,
partition INTEGER NOT NULL,
original_offset BIGINT NOT NULL,
error_type TEXT NOT NULL,
raw_message BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON ETLDeadLetter (topic, created_at DESC);
CREATE TABLE ETLJobMetric (
id BIGSERIAL PRIMARY KEY,
job_name TEXT NOT NULL,
processed_count BIGINT NOT NULL DEFAULT 0,
error_count BIGINT NOT NULL DEFAULT 0,
lag_seconds NUMERIC,
sampled_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON ETLJobMetric (job_name, sampled_at DESC);
Python Implementation
import json, time
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from confluent_kafka import Consumer, Producer, KafkaError, TopicPartition
@dataclass
class KafkaRecord:
topic: str
partition: int
offset: int
key: Optional[bytes]
value: bytes
class KafkaETLWorker:
def __init__(self, consumer_conf: dict, producer_conf: dict,
input_topic: str, output_topic: str, dlq_topic: str,
group_id: str):
self.consumer = Consumer({**consumer_conf,
"group.id": group_id,
"enable.auto.commit": False,
"isolation.level": "read_committed"})
self.producer = Producer({**producer_conf,
"enable.idempotence": True,
"transactional.id": f"etl-{group_id}"})
self.input_topic = input_topic
self.output_topic = output_topic
self.dlq_topic = dlq_topic
self.producer.init_transactions()
self.consumer.subscribe([input_topic])
def process_batch(self, records: List[KafkaRecord]) -> List[Dict]:
results = []
for rec in records:
try:
event = json.loads(rec.value)
transformed = self._transform(event)
results.append({"record": rec, "output": transformed, "error": None})
except Exception as e:
results.append({"record": rec, "output": None, "error": str(e)})
return results
def _transform(self, event: dict) -> dict:
# Application-specific transformation logic
return {**event, "processed_at": int(time.time())}
def commit_offsets(self, offsets: List[TopicPartition]):
self.producer.send_offsets_to_transaction(
offsets,
self.consumer.consumer_group_metadata()
)
def route_to_dlq(self, record: KafkaRecord, error: str):
dlq_value = json.dumps({
"original_topic": record.topic,
"original_partition": record.partition,
"original_offset": record.offset,
"error": error,
"raw": record.value.decode(errors="replace")
}).encode()
self.producer.produce(self.dlq_topic, value=dlq_value)
def run_once(self):
msgs = self.consumer.consume(num_messages=500, timeout=1.0)
if not msgs:
return
records = [
KafkaRecord(m.topic(), m.partition(), m.offset(), m.key(), m.value())
for m in msgs if not m.error()
]
results = self.process_batch(records)
self.producer.begin_transaction()
try:
offsets = []
for r in results:
rec = r["record"]
if r["error"]:
self.route_to_dlq(rec, r["error"])
else:
self.producer.produce(
self.output_topic,
value=json.dumps(r["output"]).encode()
)
offsets.append(TopicPartition(rec.topic, rec.partition, rec.offset + 1))
self.commit_offsets(offsets)
self.producer.commit_transaction()
except Exception as e:
self.producer.abort_transaction()
print(f"Transaction aborted: {e}")
Frequently Asked Questions
How do Kafka transactions provide exactly-once semantics end-to-end?
The producer wraps the output record write and the consumer offset commit in a single Kafka transaction. The transaction coordinator ensures both are committed atomically or both are aborted. On failure and restart, the consumer resumes from the last committed offset. Any output records written in the aborted transaction are invisible to downstream consumers because they read with isolation.level=read_committed, which filters out records from uncommitted or aborted transactions.
How does RocksDB state management work in Kafka Streams?
Each stream task (one per partition) maintains a local RocksDB instance that stores state for its assigned partition. State is co-located with the consumer process, eliminating network round-trips for state lookups. Every state update is also written to a Kafka changelog topic. On failure, a new consumer instance replays the changelog to rebuild the local RocksDB state before resuming event processing, providing durable state without a centralized database.
What is a stream-table join and how is it different from a stream-stream join?
A stream-table join enriches each incoming stream record with the current value from a KTable (a changelog-backed key-value store representing the latest state per key). The lookup is local and synchronous — no time window is needed because the KTable always holds the latest value. A stream-stream join joins two event streams within a bounded time window, buffering events from both sides in RocksDB and emitting output when a matching event arrives from the other stream within the window.
How are dead letter queue records replayed after a bug fix?
The DLQ record stores the original topic, partition, offset, error reason, and raw message bytes. After fixing the processing bug, the DLQ replay tool reads records from the DLQ topic, deserializes the raw_message field, and republishes those bytes to the original input topic. The ETL pipeline processes them on the next poll, this time successfully. Replaying via the input topic ensures the records go through the same consumer group and transformation logic as new records, maintaining ordering guarantees.
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety