Low-Level Design: Pub/Sub Message Broker
A Pub/Sub (Publish-Subscribe) message broker decouples producers from consumers through named topics. Producers publish events; subscribers receive them without knowing who published. This LLD question tests Observer pattern internals, thread safety, and delivery guarantees. Asked at Snap, Twitter, Stripe, and Uber.
Requirements
- Publishers push messages to named topics.
- Subscribers register callbacks for topics.
- At-least-once delivery: messages are retried on failed callback.
- Async delivery: subscriber callbacks run in a thread pool, not the publisher’s thread.
- Unsubscribe at any time.
- Message retention: store last N messages per topic for late-joining subscribers.
Core Implementation
from __future__ import annotations
import threading
import uuid
import time
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class Message:
topic: str
payload: object
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
@dataclass
class Subscription:
sub_id: str
topic: str
callback: Callable
active: bool = True
class PubSubBroker:
MAX_RETAINED = 100 # last N messages per topic
def __init__(self, workers: int = 4, max_retries: int = 3):
self._subs: dict[str, list[Subscription]] = defaultdict(list)
self._retained: dict[str, deque[Message]] = defaultdict(
lambda: deque(maxlen=self.MAX_RETAINED)
)
self._lock = threading.RLock()
self._pool = ThreadPoolExecutor(max_workers=workers)
self._max_retry = max_retries
# ── Public API ────────────────────────────────────────────────────────
def subscribe(self, topic: str, callback: Callable,
replay_last: int = 0) -> str:
sub = Subscription(str(uuid.uuid4())[:8], topic, callback)
with self._lock:
self._subs[topic].append(sub)
retained = list(self._retained[topic])
# Replay past messages outside the lock
if replay_last > 0:
for msg in retained[-replay_last:]:
self._pool.submit(self._deliver, sub, msg)
return sub.sub_id
def unsubscribe(self, sub_id: str) -> None:
with self._lock:
for subs in self._subs.values():
for sub in subs:
if sub.sub_id == sub_id:
sub.active = False
def publish(self, topic: str, payload: object) -> None:
msg = Message(topic, payload)
with self._lock:
self._retained[topic].append(msg)
active_subs = [s for s in self._subs[topic] if s.active]
for sub in active_subs:
self._pool.submit(self._deliver, sub, msg)
def close(self) -> None:
self._pool.shutdown(wait=True)
# ── Internal ──────────────────────────────────────────────────────────
def _deliver(self, sub: Subscription, msg: Message,
attempt: int = 1) -> None:
if not sub.active:
return
try:
sub.callback(msg)
except Exception as e:
if attempt < self._max_retry:
backoff = 2 ** attempt
time.sleep(backoff)
self._deliver(sub, msg, attempt + 1)
else:
print(f"[Broker] Delivery failed after {self._max_retry} attempts: "
f"sub={sub.sub_id} msg={msg.message_id} err={e}")
Usage Example
broker = PubSubBroker(workers=4)
# Subscriber A: order notifications
def handle_order(msg):
print(f"OrderService received: {msg.payload}")
# Subscriber B: analytics
def handle_analytics(msg):
print(f"Analytics: {msg.payload}")
sub_a = broker.subscribe("orders", handle_order, replay_last=5)
sub_b = broker.subscribe("orders", handle_analytics)
broker.publish("orders", {"order_id": "o123", "total": 4999})
broker.publish("orders", {"order_id": "o124", "total": 1200})
broker.unsubscribe(sub_b)
broker.publish("orders", {"order_id": "o125", "total": 800})
broker.close()
Design Decisions
| Decision | Choice | Alternative |
|---|---|---|
| Subscriber callbacks | ThreadPoolExecutor (async delivery) | Synchronous (blocks publisher) |
| Cancellation | Lazy (mark inactive, skip in _deliver) | Eager (remove from list, requires lock) |
| Message retention | deque(maxlen=N) per topic | Full log (unbounded); requires compaction |
| Retry backoff | Exponential 2^attempt seconds | Fixed interval; jitter for distributed |
| Lock scope | RLock around subs list + retained | Per-topic locks for finer granularity |
Filtering and Routing Extensions
class FilteredBroker(PubSubBroker):
def subscribe_filtered(self, topic: str, callback: Callable,
predicate: Callable[[Message], bool]) -> str:
def filtered_callback(msg: Message):
if predicate(msg):
callback(msg)
return self.subscribe(topic, filtered_callback)
# Usage: only orders over $100
broker.subscribe_filtered(
"orders",
handle_order,
predicate=lambda msg: msg.payload.get("total", 0) > 10000 # cents
)
Dead Letter Queue
class DLQBroker(PubSubBroker):
DLQ_TOPIC = "__dlq__"
def _deliver(self, sub, msg, attempt=1):
if not sub.active:
return
try:
sub.callback(msg)
except Exception as e:
if attempt < self._max_retry:
time.sleep(2 ** attempt)
self._deliver(sub, msg, attempt + 1)
else:
# Publish to dead letter queue for inspection
dlq_msg = Message(self.DLQ_TOPIC, {
"original_topic": msg.topic,
"payload": msg.payload,
"error": str(e),
"sub_id": sub.sub_id,
})
with self._lock:
self._retained[self.DLQ_TOPIC].append(dlq_msg)
Distributed Extension
For a distributed Pub/Sub (cross-machine), replace the in-process deque and thread pool with:
- Message storage: Kafka topics or Redis Streams (persistent, partitioned, replicated).
- Routing: Consumer group per subscriber group; each group maintains an independent offset.
- Fan-out: Multiple consumer group instances pull from the same topic in parallel.
- Replay: Seek consumer offset to a past position; Kafka retains messages for configurable duration.
Interview Extensions
How do you ensure message ordering per subscriber?
Per subscriber, process messages sequentially: use a single-threaded executor per subscription instead of a shared pool. The shared pool above processes messages from different subscribers concurrently, but two messages to the same subscriber may be dispatched in parallel if the pool has spare threads. Fix: use a per-subscription queue (deque) drained by a single dedicated worker thread, or use a bounded channel with a single consumer.
How would you add priority to messages?
Replace the deque retained buffer with a heapq keyed by priority. In the delivery thread pool, use a priority queue per subscription instead of a FIFO queue. Higher-priority messages are dequeued first. This requires each worker thread to own its own priority queue rather than sharing a generic executor.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the Pub/Sub pattern and how does it differ from point-to-point messaging?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In Pub/Sub, publishers send messages to a named topic without knowing who will receive them. Multiple subscribers can listen to the same topic independently. In point-to-point (message queue), each message is consumed by exactly one receiver. Pub/Sub provides fan-out: one message reaches all subscribers. This decouples producers from consumers — adding a new subscriber requires no changes to the publisher.”
}
},
{
“@type”: “Question”,
“name”: “How does async delivery work in a Pub/Sub broker?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When publish() is called, it dispatches a task to a ThreadPoolExecutor rather than calling subscriber callbacks directly. This returns control to the publisher immediately. Worker threads in the pool invoke callbacks concurrently. Without async delivery, a slow subscriber callback would block the publisher and all other subscribers on the same thread.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement at-least-once delivery in a Pub/Sub broker?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Retry failed callbacks with exponential backoff: on exception, sleep 2^attempt seconds and retry up to max_retries times. After exhausting retries, send to a Dead Letter Queue (DLQ) topic for manual inspection. This guarantees messages are delivered at least once but may result in duplicate delivery if the callback succeeds but the ACK is lost — consumers must be idempotent.”
}
},
{
“@type”: “Question”,
“name”: “What is a Dead Letter Queue (DLQ) and why is it important?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A DLQ is a special topic where messages go after all delivery attempts have failed. Without a DLQ, permanently failing messages would be silently dropped. With a DLQ, operations teams can inspect failed messages, fix the underlying issue, and replay them. DLQs are a standard pattern in AWS SQS, RabbitMQ, and Kafka (as a separate error topic).”
}
},
{
“@type”: “Question”,
“name”: “How would you add message ordering guarantees per subscriber?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The shared thread pool above may process two messages to the same subscriber concurrently if threads are available. For ordering: give each subscription a dedicated single-threaded executor (or a per-subscription queue drained by one thread). Messages are enqueued in arrival order and processed sequentially per subscription. Tradeoff: more threads, less parallelism across subscriptions.”
}
}
]
}
Asked at: Snap Interview Guide
Asked at: Twitter/X Interview Guide
Asked at: Stripe Interview Guide
Asked at: Uber Interview Guide
Asked at: Lyft Interview Guide