Low-Level Design: Pub/Sub Event System
The Publish-Subscribe pattern is one of the most important OOP design patterns. It decouples event producers from event consumers — publishers don’t know who’s listening, and subscribers don’t know who’s sending. This pattern underlies event-driven architectures, UI frameworks, and distributed systems. Designing a clean in-process pub/sub is a common OOP interview at companies like Atlassian, Uber, and Meta.
Core Classes
Event
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@dataclass
class Event:
event_type: str
payload: Any
source: str
timestamp: datetime = field(default_factory=datetime.now)
event_id: str = field(default_factory=lambda: __import__('uuid').uuid4().hex)
def __str__(self) -> str:
return f"Event({self.event_type}, source={self.source}, id={self.event_id[:8]})"
Subscriber (Abstract)
from abc import ABC, abstractmethod
class Subscriber(ABC):
@abstractmethod
def on_event(self, event: Event) -> None:
"""Handle an incoming event."""
pass
@property
def subscriber_id(self) -> str:
return f"{self.__class__.__name__}_{id(self)}"
EventBus
from collections import defaultdict
import threading
class EventBus:
"""
Central pub/sub coordinator.
Thread-safe: multiple threads can publish and subscribe concurrently.
"""
def __init__(self):
self._subscribers: dict[str, list[Subscriber]] = defaultdict(list)
self._lock = threading.RLock() # RLock allows same thread to re-acquire
def subscribe(self, event_type: str, subscriber: Subscriber) -> None:
with self._lock:
# Prevent duplicate subscriptions
existing_ids = {s.subscriber_id for s in self._subscribers[event_type]}
if subscriber.subscriber_id not in existing_ids:
self._subscribers[event_type].append(subscriber)
print(f"Subscribed: {subscriber.subscriber_id} -> {event_type}")
def unsubscribe(self, event_type: str, subscriber: Subscriber) -> None:
with self._lock:
self._subscribers[event_type] = [
s for s in self._subscribers[event_type]
if s.subscriber_id != subscriber.subscriber_id
]
def publish(self, event: Event) -> int:
"""
Publish event to all subscribers.
Returns number of subscribers notified.
"""
with self._lock:
subscribers = list(self._subscribers.get(event.event_type, []))
notified = 0
for subscriber in subscribers:
try:
subscriber.on_event(event)
notified += 1
except Exception as e:
print(f"Error in {subscriber.subscriber_id}: {e}")
# Don't let one failing subscriber block others
return notified
def publish_all(self, event: Event) -> int:
"""Also publish to wildcard (*) subscribers."""
count = self.publish(event)
if event.event_type != "*":
# Shallow copy to avoid lock issues
with self._lock:
wildcard_subs = list(self._subscribers.get("*", []))
for subscriber in wildcard_subs:
try:
subscriber.on_event(event)
count += 1
except Exception as e:
print(f"Error in wildcard subscriber {subscriber.subscriber_id}: {e}")
return count
def subscriber_count(self, event_type: str) -> int:
with self._lock:
return len(self._subscribers[event_type])
Concrete Subscribers
class EmailNotificationService(Subscriber):
def on_event(self, event: Event) -> None:
if event.event_type == "user.registered":
user = event.payload
print(f"[Email] Sending welcome email to {user['email']}")
elif event.event_type == "order.placed":
order = event.payload
print(f"[Email] Order confirmation sent for order {order['order_id']}")
class AuditLogger(Subscriber):
def __init__(self):
self.log: list[Event] = []
def on_event(self, event: Event) -> None:
self.log.append(event)
print(f"[Audit] {event.timestamp.strftime('%H:%M:%S')} | {event}")
class AnalyticsService(Subscriber):
def on_event(self, event: Event) -> None:
print(f"[Analytics] Tracking: {event.event_type} from {event.source}")
# In a real system: send to data pipeline (Kafka, Segment, etc.)
Publisher Helper
class Publisher:
"""Convenience class for publishing events with a consistent source name."""
def __init__(self, source: str, event_bus: EventBus):
self.source = source
self.event_bus = event_bus
def publish(self, event_type: str, payload: Any) -> None:
event = Event(event_type=event_type, payload=payload, source=self.source)
count = self.event_bus.publish_all(event)
print(f"Published {event_type}: notified {count} subscriber(s)")
Usage Example
bus = EventBus()
# Subscribers
email_svc = EmailNotificationService()
audit_log = AuditLogger()
analytics = AnalyticsService()
# Subscriptions
bus.subscribe("user.registered", email_svc)
bus.subscribe("user.registered", analytics)
bus.subscribe("order.placed", email_svc)
bus.subscribe("*", audit_log) # audit ALL events
# Publishers
user_svc = Publisher("UserService", bus)
order_svc = Publisher("OrderService", bus)
# Simulate events
user_svc.publish("user.registered", {"user_id": "U001", "email": "alice@example.com"})
order_svc.publish("order.placed", {"order_id": "O42", "user_id": "U001", "total": 99.99})
print(f"Audit log entries: {len(audit_log.log)}")
Interview Follow-ups
- Async delivery: Modify publish() to dispatch events on a thread pool executor rather than blocking the publisher. Use concurrent.futures.ThreadPoolExecutor. Subscribers process in parallel.
- Event filtering: Allow subscribers to register with a filter predicate:
subscribe("order.*", subscriber, filter=lambda e: e.payload["total"] > 100). The EventBus checks the predicate before dispatching. - Dead letter queue: When a subscriber raises an exception, enqueue the event in a dead_letter list for later replay rather than discarding it silently.
- Topic wildcards: Support glob patterns like “order.*” matching “order.placed”, “order.shipped”, etc. Use fnmatch.fnmatch() for matching.
- Why use RLock? If publish() calls subscriber.on_event(), which itself calls bus.subscribe() (a subscriber that subscribes others in response to events), a regular Lock would deadlock. RLock allows the same thread to re-enter.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the Observer pattern and how does it differ from Pub/Sub?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”The Observer pattern (Gang of Four) has direct coupling: the Subject (observable) maintains a list of Observer objects and calls their update() method directly. Observers know about the Subject and register with it explicitly. Pub/Sub adds an Event Bus (message broker) between publishers and subscribers — neither knows about the other. Publishers emit events to the bus; subscribers listen to the bus for event types they care about. Key differences: (1) Coupling — Observer is tightly coupled (Subject knows Observer interface); Pub/Sub is decoupled (neither side knows the other). (2) Filtering — Pub/Sub supports topic-based filtering; basic Observer delivers all events. (3) Distribution — Pub/Sub naturally extends to distributed systems (Kafka, Redis pub/sub); Observer is in-process. Use Observer for UI component updates, Model-View connections, and simple event callbacks. Use Pub/Sub for complex event routing, multiple independent consumers, and anything that may need to go distributed.”}},{“@type”:”Question”,”name”:”How do you make a pub/sub event bus thread-safe?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”The EventBus has two critical sections: (1) subscribe/unsubscribe — modifying the subscriber list. (2) publish — reading the subscriber list and calling handlers. Use threading.RLock (reentrant lock) rather than threading.Lock. Why RLock? If a subscriber’s on_event() handler calls bus.subscribe() (e.g., a subscriber that dynamically adds other subscribers based on events), a regular Lock would deadlock — the thread already holds the lock from publish() and tries to acquire it again in subscribe(). RLock allows the same thread to re-enter. In publish(): acquire the lock to get a shallow copy of the subscriber list, release the lock, then iterate the copy and call handlers WITHOUT the lock. This prevents holding the lock during potentially long-running handler code and allows concurrent subscriptions during event dispatch. If a handler throws an exception, catch it individually so one failing subscriber doesn’t block others from receiving the event.”}},{“@type”:”Question”,”name”:”How do you extend a pub/sub system to support asynchronous event delivery?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Replace synchronous subscriber.on_event() calls with thread pool submission. Use concurrent.futures.ThreadPoolExecutor: pool = ThreadPoolExecutor(max_workers=N). In publish(): instead of calling subscriber.on_event(event) directly, call pool.submit(subscriber.on_event, event). This returns a Future immediately — the publisher is not blocked by subscriber processing. Tradeoffs: (1) Ordering — async dispatch means subscribers may process events out of order if previous events take longer. Fix: use a single-threaded executor per subscriber (each subscriber gets its own thread). (2) Error handling — exceptions in submitted tasks are captured in the Future; use future.add_done_callback() to log failures. (3) Backpressure — if subscribers are slow, the thread pool queue grows unbounded. Add a bounded queue and reject-or-block policy. (4) Shutdown — call pool.shutdown(wait=True) on application exit to wait for in-flight events to complete before terminating.”}}]}
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale
🏢 Asked at: Snap Interview Guide