Low-Level Design: Pub/Sub Message Broker (Observer Pattern)

Low-Level Design: Pub/Sub Message Broker

A Pub/Sub (Publish-Subscribe) message broker decouples producers from consumers through named topics. Producers publish events; subscribers receive them without knowing who published. This LLD question tests Observer pattern internals, thread safety, and delivery guarantees. Asked at Snap, Twitter, Stripe, and Uber.

Requirements

  • Publishers push messages to named topics.
  • Subscribers register callbacks for topics.
  • At-least-once delivery: messages are retried on failed callback.
  • Async delivery: subscriber callbacks run in a thread pool, not the publisher’s thread.
  • Unsubscribe at any time.
  • Message retention: store last N messages per topic for late-joining subscribers.

Core Implementation

from __future__ import annotations
import threading
import uuid
import time
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Callable

@dataclass
class Message:
    topic:      str
    payload:    object
    message_id: str   = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp:  float = field(default_factory=time.time)

@dataclass
class Subscription:
    sub_id:   str
    topic:    str
    callback: Callable
    active:   bool = True

class PubSubBroker:
    MAX_RETAINED = 100   # last N messages per topic

    def __init__(self, workers: int = 4, max_retries: int = 3):
        self._subs:     dict[str, list[Subscription]] = defaultdict(list)
        self._retained: dict[str, deque[Message]]     = defaultdict(
            lambda: deque(maxlen=self.MAX_RETAINED)
        )
        self._lock      = threading.RLock()
        self._pool      = ThreadPoolExecutor(max_workers=workers)
        self._max_retry = max_retries

    # ── Public API ────────────────────────────────────────────────────────

    def subscribe(self, topic: str, callback: Callable,
                  replay_last: int = 0) -> str:
        sub = Subscription(str(uuid.uuid4())[:8], topic, callback)
        with self._lock:
            self._subs[topic].append(sub)
            retained = list(self._retained[topic])

        # Replay past messages outside the lock
        if replay_last > 0:
            for msg in retained[-replay_last:]:
                self._pool.submit(self._deliver, sub, msg)

        return sub.sub_id

    def unsubscribe(self, sub_id: str) -> None:
        with self._lock:
            for subs in self._subs.values():
                for sub in subs:
                    if sub.sub_id == sub_id:
                        sub.active = False

    def publish(self, topic: str, payload: object) -> None:
        msg = Message(topic, payload)
        with self._lock:
            self._retained[topic].append(msg)
            active_subs = [s for s in self._subs[topic] if s.active]

        for sub in active_subs:
            self._pool.submit(self._deliver, sub, msg)

    def close(self) -> None:
        self._pool.shutdown(wait=True)

    # ── Internal ──────────────────────────────────────────────────────────

    def _deliver(self, sub: Subscription, msg: Message,
                 attempt: int = 1) -> None:
        if not sub.active:
            return
        try:
            sub.callback(msg)
        except Exception as e:
            if attempt < self._max_retry:
                backoff = 2 ** attempt
                time.sleep(backoff)
                self._deliver(sub, msg, attempt + 1)
            else:
                print(f"[Broker] Delivery failed after {self._max_retry} attempts: "
                      f"sub={sub.sub_id} msg={msg.message_id} err={e}")

Usage Example

broker = PubSubBroker(workers=4)

# Subscriber A: order notifications
def handle_order(msg):
    print(f"OrderService received: {msg.payload}")

# Subscriber B: analytics
def handle_analytics(msg):
    print(f"Analytics: {msg.payload}")

sub_a = broker.subscribe("orders", handle_order, replay_last=5)
sub_b = broker.subscribe("orders", handle_analytics)

broker.publish("orders", {"order_id": "o123", "total": 4999})
broker.publish("orders", {"order_id": "o124", "total": 1200})

broker.unsubscribe(sub_b)
broker.publish("orders", {"order_id": "o125", "total": 800})

broker.close()

Design Decisions

Decision Choice Alternative
Subscriber callbacks ThreadPoolExecutor (async delivery) Synchronous (blocks publisher)
Cancellation Lazy (mark inactive, skip in _deliver) Eager (remove from list, requires lock)
Message retention deque(maxlen=N) per topic Full log (unbounded); requires compaction
Retry backoff Exponential 2^attempt seconds Fixed interval; jitter for distributed
Lock scope RLock around subs list + retained Per-topic locks for finer granularity

Filtering and Routing Extensions

class FilteredBroker(PubSubBroker):
    def subscribe_filtered(self, topic: str, callback: Callable,
                           predicate: Callable[[Message], bool]) -> str:
        def filtered_callback(msg: Message):
            if predicate(msg):
                callback(msg)
        return self.subscribe(topic, filtered_callback)

# Usage: only orders over $100
broker.subscribe_filtered(
    "orders",
    handle_order,
    predicate=lambda msg: msg.payload.get("total", 0) > 10000  # cents
)

Dead Letter Queue

class DLQBroker(PubSubBroker):
    DLQ_TOPIC = "__dlq__"

    def _deliver(self, sub, msg, attempt=1):
        if not sub.active:
            return
        try:
            sub.callback(msg)
        except Exception as e:
            if attempt < self._max_retry:
                time.sleep(2 ** attempt)
                self._deliver(sub, msg, attempt + 1)
            else:
                # Publish to dead letter queue for inspection
                dlq_msg = Message(self.DLQ_TOPIC, {
                    "original_topic": msg.topic,
                    "payload":        msg.payload,
                    "error":          str(e),
                    "sub_id":         sub.sub_id,
                })
                with self._lock:
                    self._retained[self.DLQ_TOPIC].append(dlq_msg)

Distributed Extension

For a distributed Pub/Sub (cross-machine), replace the in-process deque and thread pool with:

  • Message storage: Kafka topics or Redis Streams (persistent, partitioned, replicated).
  • Routing: Consumer group per subscriber group; each group maintains an independent offset.
  • Fan-out: Multiple consumer group instances pull from the same topic in parallel.
  • Replay: Seek consumer offset to a past position; Kafka retains messages for configurable duration.

Interview Extensions

How do you ensure message ordering per subscriber?

Per subscriber, process messages sequentially: use a single-threaded executor per subscription instead of a shared pool. The shared pool above processes messages from different subscribers concurrently, but two messages to the same subscriber may be dispatched in parallel if the pool has spare threads. Fix: use a per-subscription queue (deque) drained by a single dedicated worker thread, or use a bounded channel with a single consumer.

How would you add priority to messages?

Replace the deque retained buffer with a heapq keyed by priority. In the delivery thread pool, use a priority queue per subscription instead of a FIFO queue. Higher-priority messages are dequeued first. This requires each worker thread to own its own priority queue rather than sharing a generic executor.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the Pub/Sub pattern and how does it differ from point-to-point messaging?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In Pub/Sub, publishers send messages to a named topic without knowing who will receive them. Multiple subscribers can listen to the same topic independently. In point-to-point (message queue), each message is consumed by exactly one receiver. Pub/Sub provides fan-out: one message reaches all subscribers. This decouples producers from consumers — adding a new subscriber requires no changes to the publisher.”
}
},
{
“@type”: “Question”,
“name”: “How does async delivery work in a Pub/Sub broker?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When publish() is called, it dispatches a task to a ThreadPoolExecutor rather than calling subscriber callbacks directly. This returns control to the publisher immediately. Worker threads in the pool invoke callbacks concurrently. Without async delivery, a slow subscriber callback would block the publisher and all other subscribers on the same thread.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement at-least-once delivery in a Pub/Sub broker?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Retry failed callbacks with exponential backoff: on exception, sleep 2^attempt seconds and retry up to max_retries times. After exhausting retries, send to a Dead Letter Queue (DLQ) topic for manual inspection. This guarantees messages are delivered at least once but may result in duplicate delivery if the callback succeeds but the ACK is lost — consumers must be idempotent.”
}
},
{
“@type”: “Question”,
“name”: “What is a Dead Letter Queue (DLQ) and why is it important?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A DLQ is a special topic where messages go after all delivery attempts have failed. Without a DLQ, permanently failing messages would be silently dropped. With a DLQ, operations teams can inspect failed messages, fix the underlying issue, and replay them. DLQs are a standard pattern in AWS SQS, RabbitMQ, and Kafka (as a separate error topic).”
}
},
{
“@type”: “Question”,
“name”: “How would you add message ordering guarantees per subscriber?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The shared thread pool above may process two messages to the same subscriber concurrently if threads are available. For ordering: give each subscription a dedicated single-threaded executor (or a per-subscription queue drained by one thread). Messages are enqueued in arrival order and processed sequentially per subscription. Tradeoff: more threads, less parallelism across subscriptions.”
}
}
]
}

Asked at: Snap Interview Guide

Asked at: Twitter/X Interview Guide

Asked at: Stripe Interview Guide

Asked at: Uber Interview Guide

Asked at: Lyft Interview Guide

Scroll to Top