Streaming ETL Pipeline Low-Level Design
A streaming ETL pipeline continuously consumes events from a message broker, transforms them in real time, and writes results to downstream sinks. Unlike batch ETL, streaming ETL processes events within seconds of production, enabling real-time dashboards, fraud detection, and live ML feature updates. The central challenges are exactly-once processing, stateful computation, and failure recovery.
Consumer Groups and Partition Assignment
Kafka organizes topic data into partitions. A consumer group is a set of worker processes that collectively consume a topic: each partition is assigned to exactly one consumer in the group at any time. This provides horizontal scalability: adding more consumers (up to the partition count) increases throughput linearly.
Partition assignment is managed by the Kafka group coordinator using a rebalance protocol (eager or cooperative-sticky). Cooperative-sticky rebalancing minimizes partition reassignment on consumer joins/leaves, reducing the processing pause during rebalances.
Stateful Transformations with RocksDB
Stateless transformations (filter, map, type cast) require no state between records. Stateful transformations (windowed aggregation, deduplication, joins) require storing state across records.
Kafka Streams uses RocksDB as the local state store embedded in the consumer process. Each partition's state is stored locally, making state access fast (sub-millisecond). State is backed up to a Kafka changelog topic: on failure and restart, the consumer replays the changelog to restore state before resuming event processing. This provides durability without requiring a remote database for every state lookup.
Windowed aggregation: events are grouped by key and time window (e.g., count of clicks per user per 5-minute tumbling window). RocksDB stores window state keyed by (key, window_start). When the window closes, the aggregate is emitted downstream.
Stream Joins
Stream-stream join (event-time windowed): Two streams are joined on a shared key within a time window. Events from both streams are buffered in RocksDB for the join window duration. When a matching event arrives from the other stream within the window, the join produces output. Events outside the window are dropped. This is used for joining click events with impression events within a 1-hour attribution window.
Stream-table join (KTable enrichment): A KTable represents the current state of a compacted topic (latest value per key). The stream is enriched by looking up the KTable for each incoming event key. This is used for enriching user events with user profile data from a user profiles topic. KTable lookups are local (RocksDB), making them fast and not requiring a database round-trip.
Exactly-Once Semantics
Without exactly-once guarantees, failures cause either duplicate processing (at-least-once) or data loss (at-most-once). Kafka transactions provide exactly-once end-to-end:
- The producer is configured with
enable.idempotence=true(exactly-once per partition). - A transactional producer wraps the output write and the offset commit in a single atomic Kafka transaction. Either both succeed or both are aborted.
- On failure, the consumer restarts from the last committed offset. The output written in the failed transaction is invisible to downstream consumers (transactional isolation).
- Downstream consumers use
isolation.level=read_committedto only see outputs from committed transactions.
Dead Letter Queue
Records that cannot be processed — due to deserialization errors, schema violations, or unhandled processing exceptions — are routed to a dead letter topic (DLQ) instead of blocking the pipeline. Each DLQ record includes the original partition and offset, the error type and message, and the raw bytes. A separate DLQ consumer monitors the DLQ and alerts on error rate spikes. Dead letters can be replayed after the bug is fixed by re-publishing them to the source topic.
Backpressure
When a downstream sink (database, HTTP API, S3) is slow or unavailable, the consumer processes events faster than they can be written. The standard Kafka mechanism for backpressure is to pause consumption on the affected partitions (consumer.pause(partitions)), allowing downstream to recover without the consumer running out of memory. The consumer monitors downstream lag and resumes partitions when the sink is healthy.
Schema Evolution with Avro
Producers serialize events using Avro schemas registered in the schema registry. Consumers deserialize using their own version of the schema. The schema registry enforces backward compatibility so that new producer schema versions are always readable by consumers on the previous schema version, enabling rolling producer upgrades without consumer downtime.
SQL Schema (Checkpoint Store)
CREATE TABLE ETLCheckpoint (
id BIGSERIAL PRIMARY KEY,
consumer_group TEXT NOT NULL,
topic TEXT NOT NULL,
partition INTEGER NOT NULL,
offset BIGINT NOT NULL,
committed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (consumer_group, topic, partition)
);
CREATE TABLE ETLDeadLetter (
id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL,
partition INTEGER NOT NULL,
original_offset BIGINT NOT NULL,
error_type TEXT NOT NULL,
raw_message BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON ETLDeadLetter (topic, created_at DESC);
CREATE TABLE ETLJobMetric (
id BIGSERIAL PRIMARY KEY,
job_name TEXT NOT NULL,
processed_count BIGINT NOT NULL DEFAULT 0,
error_count BIGINT NOT NULL DEFAULT 0,
lag_seconds NUMERIC,
sampled_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON ETLJobMetric (job_name, sampled_at DESC);
Python Implementation
import json, time
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from confluent_kafka import Consumer, Producer, KafkaError, TopicPartition
@dataclass
class KafkaRecord:
topic: str
partition: int
offset: int
key: Optional[bytes]
value: bytes
class KafkaETLWorker:
def __init__(self, consumer_conf: dict, producer_conf: dict,
input_topic: str, output_topic: str, dlq_topic: str,
group_id: str):
self.consumer = Consumer({**consumer_conf,
"group.id": group_id,
"enable.auto.commit": False,
"isolation.level": "read_committed"})
self.producer = Producer({**producer_conf,
"enable.idempotence": True,
"transactional.id": f"etl-{group_id}"})
self.input_topic = input_topic
self.output_topic = output_topic
self.dlq_topic = dlq_topic
self.producer.init_transactions()
self.consumer.subscribe([input_topic])
def process_batch(self, records: List[KafkaRecord]) -> List[Dict]:
results = []
for rec in records:
try:
event = json.loads(rec.value)
transformed = self._transform(event)
results.append({"record": rec, "output": transformed, "error": None})
except Exception as e:
results.append({"record": rec, "output": None, "error": str(e)})
return results
def _transform(self, event: dict) -> dict:
# Application-specific transformation logic
return {**event, "processed_at": int(time.time())}
def commit_offsets(self, offsets: List[TopicPartition]):
self.producer.send_offsets_to_transaction(
offsets,
self.consumer.consumer_group_metadata()
)
def route_to_dlq(self, record: KafkaRecord, error: str):
dlq_value = json.dumps({
"original_topic": record.topic,
"original_partition": record.partition,
"original_offset": record.offset,
"error": error,
"raw": record.value.decode(errors="replace")
}).encode()
self.producer.produce(self.dlq_topic, value=dlq_value)
def run_once(self):
msgs = self.consumer.consume(num_messages=500, timeout=1.0)
if not msgs:
return
records = [
KafkaRecord(m.topic(), m.partition(), m.offset(), m.key(), m.value())
for m in msgs if not m.error()
]
results = self.process_batch(records)
self.producer.begin_transaction()
try:
offsets = []
for r in results:
rec = r["record"]
if r["error"]:
self.route_to_dlq(rec, r["error"])
else:
self.producer.produce(
self.output_topic,
value=json.dumps(r["output"]).encode()
)
offsets.append(TopicPartition(rec.topic, rec.partition, rec.offset + 1))
self.commit_offsets(offsets)
self.producer.commit_transaction()
except Exception as e:
self.producer.abort_transaction()
print(f"Transaction aborted: {e}")
Frequently Asked Questions
How do Kafka transactions provide exactly-once semantics end-to-end?
The producer wraps the output record write and the consumer offset commit in a single Kafka transaction. The transaction coordinator ensures both are committed atomically or both are aborted. On failure and restart, the consumer resumes from the last committed offset. Any output records written in the aborted transaction are invisible to downstream consumers because they read with isolation.level=read_committed, which filters out records from uncommitted or aborted transactions.
How does RocksDB state management work in Kafka Streams?
Each stream task (one per partition) maintains a local RocksDB instance that stores state for its assigned partition. State is co-located with the consumer process, eliminating network round-trips for state lookups. Every state update is also written to a Kafka changelog topic. On failure, a new consumer instance replays the changelog to rebuild the local RocksDB state before resuming event processing, providing durable state without a centralized database.
What is a stream-table join and how is it different from a stream-stream join?
A stream-table join enriches each incoming stream record with the current value from a KTable (a changelog-backed key-value store representing the latest state per key). The lookup is local and synchronous — no time window is needed because the KTable always holds the latest value. A stream-stream join joins two event streams within a bounded time window, buffering events from both sides in RocksDB and emitting output when a matching event arrives from the other stream within the window.
How are dead letter queue records replayed after a bug fix?
The DLQ record stores the original topic, partition, offset, error reason, and raw message bytes. After fixing the processing bug, the DLQ replay tool reads records from the DLQ topic, deserializes the raw_message field, and republishes those bytes to the original input topic. The ETL pipeline processes them on the next poll, this time successfully. Replaying via the input topic ensures the records go through the same consumer group and transformation logic as new records, maintaining ordering guarantees.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do Kafka transactions provide exactly-once semantics end-to-end?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The producer wraps the output record write and the consumer offset commit in a single Kafka transaction. The transaction coordinator ensures both are committed atomically or both are aborted. On failure and restart, the consumer resumes from the last committed offset. Output records from aborted transactions are invisible to downstream consumers reading with isolation.level=read_committed.”
}
},
{
“@type”: “Question”,
“name”: “How does RocksDB state management work in Kafka Streams?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each stream task maintains a local RocksDB instance storing state for its assigned partition, co-located with the consumer process to eliminate network round-trips. Every state update is also written to a Kafka changelog topic. On failure, a new consumer instance replays the changelog to rebuild local RocksDB state before resuming, providing durable state without a centralized database.”
}
},
{
“@type”: “Question”,
“name”: “What is a stream-table join and how is it different from a stream-stream join?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A stream-table join enriches each incoming stream record with the current value from a KTable representing the latest state per key. The lookup is local and synchronous with no time window needed. A stream-stream join joins two event streams within a bounded time window, buffering events from both sides and emitting output when a matching event arrives from the other stream within the window.”
}
},
{
“@type”: “Question”,
“name”: “How are dead letter queue records replayed after a bug fix?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The DLQ record stores the original topic, partition, offset, error reason, and raw message bytes. After fixing the bug, the replay tool reads DLQ records, deserializes the raw_message field, and republishes those bytes to the original input topic. The ETL pipeline processes them on the next poll, this time successfully. Replaying via the input topic ensures the same consumer group and transformation logic processes them.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does Kafka exactly-once semantics work in a streaming ETL?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The producer uses idempotent delivery (producer ID + sequence number) to prevent duplicate writes; transactions wrap the read-process-write cycle so that offset commits and output writes are atomically committed or rolled back together.”
}
},
{
“@type”: “Question”,
“name”: “How is stateful stream processing managed across restarts?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “RocksDB stores the local state store on disk; on restart, the processor reloads state from the changelog topic (a Kafka topic mirroring all state changes), restoring the exact state before the failure.”
}
},
{
“@type”: “Question”,
“name”: “How does a stream-table join work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The table side is loaded as a KTable (materialized view of a compacted topic); for each stream event, the processor looks up the matching key in the KTable's local store to enrich the event without a remote database call.”
}
},
{
“@type”: “Question”,
“name”: “How are dead letter queue messages replayed?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A replay job re-publishes DLQ messages to the original input topic after manual inspection and optional correction; the consumer processes them normally, and the replay_count field in ETLDeadLetter tracks how many times each message was replayed.”
}
}
]
}
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety