System Design Interview: Design a Distributed Messaging System (Apache Kafka)
Understanding distributed messaging systems is essential for senior engineering roles. Kafka’s design is commonly asked at LinkedIn (where Kafka was invented), Uber, Airbnb, Databricks, and any company with large-scale event streaming. This post covers the architecture that lets Kafka handle millions of messages per second with durability guarantees.
Requirements
Functional Requirements
- Publish messages to named topics
- Multiple consumers read messages independently (consumer groups)
- Durable persistence: messages retained for configurable period
- Ordered delivery within a partition
- At-least-once delivery guarantee
- Horizontal scalability for both producers and consumers
Non-Functional Requirements
- Throughput: millions of messages per second
- Latency: single-digit milliseconds end-to-end
- Durability: messages replicated across multiple brokers
- Fault tolerance: survive broker failures without data loss
- Consumer groups: multiple independent consumers reading same topic
Core Architecture
Producers
↓ (write to leader)
Kafka Broker Cluster
[Topic A, Partition 0 — Leader on Broker 1]
[Topic A, Partition 1 — Leader on Broker 2]
[Topic A, Partition 2 — Leader on Broker 3]
↑ (replicate to followers)
Broker 1 ←→ Broker 2 ←→ Broker 3
↓ (consumers pull)
Consumer Groups
[Group X: Consumer A (partition 0), Consumer B (partition 1,2)]
[Group Y: Consumer C (all partitions)]
ZooKeeper / KRaft (metadata, leader election)
Topic, Partition, and Offset
class KafkaMessage:
def __init__(self, key: bytes, value: bytes,
timestamp: int, headers: dict = None):
self.key = key # determines partition (hash(key) % num_partitions)
self.value = value # payload (bytes — schema-agnostic)
self.timestamp = timestamp
self.headers = headers or {}
self.offset: int = -1 # assigned by broker when written
class Partition:
"""
Ordered, immutable log of messages.
New messages appended to end (offset increments).
Consumers read by specifying offset.
Retention: messages deleted after retention.ms or retention.bytes exceeded.
"""
def __init__(self, topic: str, partition_id: int):
self.topic = topic
self.partition_id = partition_id
self.log: list[KafkaMessage] = [] # append-only
self.leader_broker_id: int = -1
self.follower_broker_ids: list[int] = []
self.hw: int = 0 # High Watermark: highest committed offset (all replicas have)
def append(self, message: KafkaMessage) -> int:
"""Returns offset of appended message"""
message.offset = len(self.log)
self.log.append(message)
return message.offset
def read(self, offset: int, max_count: int = 100) -> list:
"""Read messages starting from offset"""
return self.log[offset:offset + max_count]
Partition Assignment
class DefaultPartitioner:
"""
Kafka's default partitioning strategy.
With key: consistent hashing to same partition.
Without key: round-robin across partitions.
"""
def partition(self, key: bytes, num_partitions: int) -> int:
if key is None:
# Round-robin (sticky partitioner in modern Kafka)
return self._round_robin(num_partitions)
# Consistent hash: same key always goes to same partition
# Ensures ordering for messages with same key
return murmur2_hash(key) % num_partitions
# Key insight: all messages with same key go to same partition
# → ordering guaranteed for same key within a topic
# Example: order events for the same order_id
producer.send('order_events',
key=str(order_id).encode(), # same order → same partition → ordered
value=serialize(event)
)
Producer: Durability Guarantees
class KafkaProducerConfig:
"""
acks controls durability vs throughput tradeoff:
acks=0: fire and forget (fastest, may lose messages)
acks=1: leader acknowledges (lost if leader fails before replication)
acks=all: all in-sync replicas acknowledge (safest, slower)
"""
# High throughput (some data loss acceptable: metrics, logs)
fast_config = {
'acks': 1,
'batch.size': 16384, # 16KB batch
'linger.ms': 5, # wait 5ms to accumulate batch
'compression.type': 'snappy',
'buffer.memory': 33554432, # 32MB producer buffer
}
# High durability (financial transactions, audit logs)
safe_config = {
'acks': 'all',
'min.insync.replicas': 2, # at least 2 replicas must ack
'retries': 3,
'enable.idempotence': True, # dedup producer retries (exactly-once semantics)
'compression.type': 'lz4',
}
class Producer:
def send(self, topic: str, key: bytes, value: bytes,
callback=None) -> None:
"""
Asynchronous send: messages batched in memory buffer.
Batch sent when: batch.size reached OR linger.ms elapsed.
On failure: automatic retry with exponential backoff.
"""
message = KafkaMessage(key=key, value=value,
timestamp=current_time_ms())
partition = self.partitioner.partition(key, self.num_partitions(topic))
self.buffer[topic][partition].append(message)
if self._should_flush(topic, partition):
self._flush(topic, partition, callback)
Consumer Group: Parallel Processing
class ConsumerGroup:
"""
Multiple consumers sharing work by splitting partitions.
Rule: each partition assigned to exactly ONE consumer in group.
If consumers > partitions: some consumers are idle.
Rebalancing: triggered when consumer joins/leaves.
Kafka's Group Coordinator (broker) manages assignments.
"""
def assign_partitions(self, consumers: list[str],
partitions: list[int]) -> dict:
"""
Range assignor (default): assign contiguous partitions.
Round-robin assignor: distribute evenly across consumers.
Sticky assignor: minimize partition movements on rebalance.
"""
assignment = {}
# Round-robin for balanced distribution
for i, partition in enumerate(partitions):
consumer = consumers[i % len(consumers)]
if consumer not in assignment:
assignment[consumer] = []
assignment[consumer].append(partition)
return assignment
class Consumer:
def __init__(self, group_id: str, bootstrap_servers: str):
self.group_id = group_id
self.offsets: dict[tuple, int] = {} # (topic, partition) → committed offset
def poll(self, timeout_ms: int = 1000) -> list:
"""
Fetch messages from assigned partitions.
Returns records from current offset position.
Consumer tracks its own offset — broker doesn't push.
"""
records = []
for (topic, partition), offset in self.offsets.items():
new_records = self.fetch_from_broker(topic, partition, offset)
records.extend(new_records)
if new_records:
self.offsets[(topic, partition)] = new_records[-1].offset + 1
return records
def commit_offset(self, topic: str, partition: int, offset: int):
"""
Commit offset = "I've processed up to this offset."
Auto-commit: enabled by default (enable.auto.commit=true, auto.commit.interval.ms=5000)
Manual commit: commit AFTER processing for at-least-once guarantee.
"""
self.offsets[(topic, partition)] = offset
# Persist to __consumer_offsets topic (internal Kafka topic)
self.broker.commit(self.group_id, topic, partition, offset)
# At-least-once processing pattern:
def process_messages(consumer):
while True:
records = consumer.poll(timeout_ms=1000)
for record in records:
try:
process(record) # Your business logic
consumer.commit_offset(record.topic, record.partition, record.offset + 1)
except Exception as e:
# Don't commit offset → message will be redelivered
log_error(e)
# Implement DLQ (Dead Letter Queue) for repeated failures
Replication and Leader Election
class ReplicationManager:
"""
Each partition has 1 leader + N-1 followers (replication factor N).
Leader handles all reads and writes.
Followers replicate from leader continuously.
In-Sync Replicas (ISR): followers caught up with leader.
Leader failure: ZooKeeper notifies cluster, new leader elected from ISR.
"""
# Replication flow:
# 1. Producer → Leader (write to log)
# 2. Followers pull from leader (FetchRequest)
# 3. Leader tracks ISR: followers that are within replica.lag.time.max.ms
# 4. High Watermark advances when all ISR replicas have the message
# 5. Consumers only read up to High Watermark (committed messages)
def handle_follower_fetch(self, partition: str, fetch_offset: int,
follower_id: int) -> list:
"""Returns messages from fetch_offset to end of leader log"""
leader_partition = self.partitions[partition]
messages = leader_partition.read(fetch_offset)
# Update ISR tracking
self.update_follower_offset(partition, follower_id, fetch_offset)
self.advance_high_watermark(partition)
return messages
def advance_high_watermark(self, partition_id: str):
"""HW = min(offset) across all ISR followers"""
partition = self.partitions[partition_id]
isr_offsets = [
self.follower_offsets[partition_id][follower]
for follower in partition.follower_broker_ids
if self.is_in_sync(partition_id, follower)
]
if isr_offsets:
partition.hw = min(isr_offsets)
Key Kafka Design Decisions
- Log-structured storage: Messages written sequentially to append-only log segments on disk. Sequential I/O is 100x faster than random I/O — critical for Kafka’s throughput. Old segments deleted based on retention policy.
- Zero-copy transfer: Kafka uses OS-level sendfile() to transfer data from disk directly to network socket without copying to user space. Eliminates CPU overhead for data transfer — massive throughput improvement.
- Consumer pulls, not push: Consumers control their own pace. No back-pressure needed. Broker doesn’t need per-consumer state (just committed offsets in __consumer_offsets topic). Simple, scalable.
- Batching everywhere: Producers batch messages before sending. Brokers batch messages before writing to disk. Network transfers batched. Batching amortizes per-message overhead — critical for million-message-per-second throughput.
- Message ordering guarantee: Ordering only guaranteed within a partition, not across partitions. Design: use a consistent partition key (order_id, user_id) to ensure related messages go to the same partition.
Kafka vs Traditional Message Queues (RabbitMQ)
| Feature | Kafka | RabbitMQ |
|---|---|---|
| Message model | Persistent log (replay) | Queue (delete on consume) |
| Consumer model | Pull, offset-based | Push, acknowledgment-based |
| Ordering | Within partition | Within queue |
| Replayability | Yes (retention period) | No (deleted after ACK) |
| Multiple consumers | Consumer groups (all read) | Competing consumers (one gets) |
| Throughput | Millions/sec | Tens of thousands/sec |
| Use case | Event streaming, analytics, CDC | Task queues, RPC, routing |
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is a Kafka topic, partition, and offset?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Topic: a named, ordered stream of records (like a table in a database, but for events). Topics are split into partitions for parallelism. Partition: an ordered, immutable sequence of records within a topic. Each partition is a separate ordered log. Records are assigned a monotonically increasing offset within their partition. New records appended to the end. Offset: the unique ID of a record within a partition. Consumers track which offset they’ve read. Records in different partitions have no ordering relationship. Rule: messages with the same key always go to the same partition (deterministic hashing), ensuring ordering for related events.”}},{“@type”:”Question”,”name”:”What are Kafka’s durability guarantees and how are they configured?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Producer acks setting: acks=0 — no durability (fire and forget); acks=1 — leader acknowledges, lost if leader dies before replication (throughput-optimized for logs); acks=all — all in-sync replicas (ISR) acknowledge, no data loss unless entire ISR fails simultaneously (strongest guarantee). Combined with min.insync.replicas=2 and replication.factor=3: even if one broker dies, at least one other replica is current, and the producer gets an error if fewer than 2 replicas are available. For financial systems: acks=all, min.insync.replicas=2, enable.idempotence=true (deduplicates producer retries for exactly-once semantics).”}},{“@type”:”Question”,”name”:”How do Kafka consumer groups work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Consumer group: a set of consumers that jointly consume a topic. Kafka ensures each partition is consumed by exactly ONE consumer within a group. If you have 4 partitions and 2 consumers: each consumer gets 2 partitions. If you add a 3rd consumer: rebalancing occurs, each gets ~1-2 partitions. If consumers > partitions: extra consumers are idle. Multiple consumer groups can consume the same topic independently — each group maintains its own offset for each partition. This allows different services to process the same event stream independently (e.g., one group for real-time processing, another for batch analytics).”}},{“@type”:”Question”,”name”:”Why is Kafka so much faster than traditional message queues?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Key architectural choices: (1) Sequential disk I/O — Kafka appends messages to log segments sequentially. Sequential I/O is 100x faster than random I/O on spinning disks, and nearly as fast on SSDs. (2) Zero-copy transfer — OS sendfile() transfers data from disk to network socket without copying to user space or kernel buffers. Eliminates CPU overhead for data transfer. (3) Batching — producers batch hundreds of messages before sending; brokers batch before writing to disk; consumers fetch batches. Batching amortizes per-message overhead. (4) Consumer pull — consumers control their own pace, no backpressure mechanisms needed. (5) Partitioned parallelism — multiple partitions allow parallel writes and reads across brokers.”}},{“@type”:”Question”,”name”:”When should you use Kafka vs RabbitMQ?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Choose Kafka when: (1) High throughput required (millions of messages/sec), (2) Messages need to be replayed (event sourcing, audit logs, ML training), (3) Multiple independent consumers need same data (fan-out), (4) Long retention needed (hours to days), (5) Stream processing (Kafka Streams, Flink). Choose RabbitMQ when: (1) Complex routing rules (topic/fanout/direct exchanges), (2) Task queues where each task processed by exactly one worker, (3) Request-reply patterns (RPC), (4) Message priorities, (5) Lower throughput but complex routing requirements. Rule of thumb: Kafka for event streaming, analytics pipelines, and high-throughput logging; RabbitMQ for task distribution and complex message routing.”}}]}