Low-Level Design: Notification System
A notification system delivers messages to users via multiple channels (push, email, SMS, in-app). It must handle high volume (millions of notifications per day), support user preferences, throttle noisy senders, and track delivery status. This is a common OOP interview at Meta, Uber, LinkedIn, and Airbnb.
Core Classes
Enums
from enum import Enum
class NotificationChannel(Enum):
PUSH = "PUSH"
EMAIL = "EMAIL"
SMS = "SMS"
IN_APP = "IN_APP"
class NotificationStatus(Enum):
PENDING = "PENDING"
SENT = "SENT"
DELIVERED = "DELIVERED"
FAILED = "FAILED"
SKIPPED = "SKIPPED" # user preference or throttle
Notification and Template
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
@dataclass
class NotificationTemplate:
template_id: str
channel: NotificationChannel
subject_template: str # "{actor} liked your post"
body_template: str # "{actor} liked your post: '{post_title}'"
def render(self, context: dict) -> tuple[str, str]:
"""Render subject and body with context variables."""
subject = self.subject_template.format(**context)
body = self.body_template.format(**context)
return subject, body
@dataclass
class Notification:
notification_id: str = field(default_factory=lambda: str(uuid.uuid4()))
user_id: str = ""
channel: NotificationChannel = NotificationChannel.IN_APP
subject: str = ""
body: str = ""
status: NotificationStatus = NotificationStatus.PENDING
created_at: datetime = field(default_factory=datetime.now)
sent_at: datetime = None
metadata: dict = field(default_factory=dict)
User Preferences
@dataclass
class UserNotificationPreferences:
user_id: str
enabled_channels: set = field(default_factory=lambda: {
NotificationChannel.PUSH,
NotificationChannel.EMAIL,
NotificationChannel.IN_APP,
})
quiet_hours_start: int = 22 # 10 PM
quiet_hours_end: int = 8 # 8 AM
max_per_hour: dict = field(default_factory=lambda: {
NotificationChannel.PUSH: 5,
NotificationChannel.EMAIL: 10,
NotificationChannel.SMS: 2,
NotificationChannel.IN_APP: 50,
})
def is_channel_enabled(self, channel: NotificationChannel) -> bool:
return channel in self.enabled_channels
def is_quiet_hour(self, hour: int) -> bool:
if self.quiet_hours_start > self.quiet_hours_end:
# Wraps midnight: start=22, end=8 → quiet 22,23,0,1,...,7
return hour >= self.quiet_hours_start or hour < self.quiet_hours_end
return self.quiet_hours_start <= hour < self.quiet_hours_end
Channel Handlers (Strategy Pattern)
from abc import ABC, abstractmethod
class ChannelHandler(ABC):
@abstractmethod
def send(self, notification: Notification) -> bool:
"""Send notification. Returns True on success."""
pass
class PushNotificationHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
device_token = notification.metadata.get("device_token")
if not device_token:
print(f"[Push] No device token for user {notification.user_id}")
return False
# In real system: call FCM/APNs API
print(f"[Push] Sent to {notification.user_id}: {notification.subject}")
return True
class EmailHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
email = notification.metadata.get("email")
if not email:
return False
# In real system: call SendGrid/SES API
print(f"[Email] Sent to {email}: {notification.subject}")
return True
class SMSHandler(ChannelHandler):
def send(self, notification: Notification) -> bool:
phone = notification.metadata.get("phone")
if not phone:
return False
# In real system: call Twilio API
print(f"[SMS] Sent to {phone}: {notification.body[:160]}")
return True
class InAppHandler(ChannelHandler):
def __init__(self):
self._inbox: dict[str, list[Notification]] = {}
def send(self, notification: Notification) -> bool:
self._inbox.setdefault(notification.user_id, []).append(notification)
print(f"[InApp] Stored for {notification.user_id}: {notification.subject}")
return True
def get_inbox(self, user_id: str) -> list[Notification]:
return self._inbox.get(user_id, [])
NotificationService (Orchestrator)
from collections import defaultdict
from datetime import datetime
class NotificationService:
def __init__(self):
self._handlers: dict[NotificationChannel, ChannelHandler] = {
NotificationChannel.PUSH: PushNotificationHandler(),
NotificationChannel.EMAIL: EmailHandler(),
NotificationChannel.SMS: SMSHandler(),
NotificationChannel.IN_APP: InAppHandler(),
}
self._preferences: dict[str, UserNotificationPreferences] = {}
self._templates: dict[str, NotificationTemplate] = {}
# Rate limiting: user_id -> channel -> list of timestamps
self._sent_times: dict[str, dict] = defaultdict(lambda: defaultdict(list))
def register_preferences(self, prefs: UserNotificationPreferences) -> None:
self._preferences[prefs.user_id] = prefs
def register_template(self, template: NotificationTemplate) -> None:
self._templates[template.template_id] = template
def _is_rate_limited(self, user_id: str, channel: NotificationChannel) -> bool:
prefs = self._preferences.get(user_id)
if not prefs:
return False
max_per_hour = prefs.max_per_hour.get(channel, float('inf'))
now = datetime.now()
# Keep only timestamps from the last hour
recent = [
t for t in self._sent_times[user_id][channel]
if (now - t).total_seconds() = max_per_hour
def send(
self,
user_id: str,
template_id: str,
context: dict,
user_metadata: dict = None,
channels: list = None,
) -> list[Notification]:
prefs = self._preferences.get(user_id)
template = self._templates.get(template_id)
if not template:
raise ValueError(f"Template {template_id} not found")
target_channels = channels or [template.channel]
now = datetime.now()
results = []
for channel in target_channels:
# Check user preferences
if prefs and not prefs.is_channel_enabled(channel):
print(f"Skipped {channel.value}: disabled by user {user_id}")
continue
# Check quiet hours (skip push/SMS during quiet hours)
if prefs and channel in (NotificationChannel.PUSH, NotificationChannel.SMS):
if prefs.is_quiet_hour(now.hour):
print(f"Skipped {channel.value}: quiet hours for {user_id}")
continue
# Check rate limit
if self._is_rate_limited(user_id, channel):
print(f"Skipped {channel.value}: rate limit for {user_id}")
continue
subject, body = template.render(context)
notification = Notification(
user_id=user_id,
channel=channel,
subject=subject,
body=body,
metadata=user_metadata or {},
)
handler = self._handlers[channel]
success = handler.send(notification)
if success:
notification.status = NotificationStatus.SENT
notification.sent_at = datetime.now()
self._sent_times[user_id][channel].append(now)
else:
notification.status = NotificationStatus.FAILED
results.append(notification)
return results
Usage Example
service = NotificationService()
# Setup preferences
prefs = UserNotificationPreferences(
user_id="U001",
enabled_channels={NotificationChannel.PUSH, NotificationChannel.EMAIL},
quiet_hours_start=22,
quiet_hours_end=8,
)
service.register_preferences(prefs)
# Register template
template = NotificationTemplate(
template_id="like_notification",
channel=NotificationChannel.PUSH,
subject_template="{actor} liked your post",
body_template="{actor} liked your post: '{post_title}'",
)
service.register_template(template)
# Send notification
notifications = service.send(
user_id="U001",
template_id="like_notification",
context={"actor": "Bob", "post_title": "System Design Tips"},
user_metadata={"device_token": "abc123", "email": "alice@example.com"},
channels=[NotificationChannel.PUSH, NotificationChannel.EMAIL],
)
Interview Follow-ups
- Digest mode: Instead of sending each like individually, batch likes into a “Bob and 5 others liked your post” digest every 30 minutes. Store pending notifications in a digest queue; flush on timer.
- Priority: Add priority levels (URGENT overrides quiet hours and rate limits). Used for security alerts, password resets.
- Delivery tracking: Webhooks from email providers (SendGrid) and push services (FCM) call back with DELIVERED or BOUNCED status — update notification records accordingly.
- Async dispatch: In production, service.send() enqueues to Kafka/SQS and returns immediately. Worker pools consume and dispatch to channel handlers.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you design a multi-channel notification system?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Use the Strategy pattern for channel handlers: define a ChannelHandler abstract base class with a send(notification) method. Concrete implementations: PushNotificationHandler (calls FCM/APNs), EmailHandler (calls SendGrid/SES), SMSHandler (calls Twilio), InAppHandler (stores in DB/Redis). The NotificationService orchestrates: (1) Check user preferences — is this channel enabled? Is this a quiet hour? (2) Check rate limits — has this user received too many notifications on this channel in the last hour? (3) Render the notification from a template with context variables. (4) Call the appropriate channel handler. (5) Record the result and update delivery status. User preferences are stored per user and per channel. Templates separate content from delivery logic — the same “order confirmed” notification can be rendered for push, email, and SMS with appropriate formatting per channel.”}},{“@type”:”Question”,”name”:”How do you implement rate limiting and quiet hours in a notification system?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Rate limiting: track a sliding window of sent timestamps per user per channel. When a notification is about to be sent: filter the timestamp list to keep only entries within the last hour, check if the count exceeds the user’s max_per_hour limit for this channel. If over limit, skip (status=SKIPPED). At scale: use Redis INCR with TTL for the counter (key: “notif_rate:{user_id}:{channel}:{hour}”, TTL=3600). Redis atomic INCR prevents race conditions with concurrent notification sends. Quiet hours: store quiet_hours_start and quiet_hours_end on user preferences. Before sending push or SMS notifications (not email — email can wait in inbox): check if the current hour falls within quiet hours. Handle midnight wraparound: if start > end (e.g., 22 to 8), the quiet period crosses midnight. Formula: is_quiet = hour >= start OR hour < end. Store notifications that would be sent during quiet hours for delivery at quiet_hours_end (8 AM)."}},{"@type":"Question","name":"How do you handle notification templates with dynamic content?","acceptedAnswer":{"@type":"Answer","text":"Store templates in a database with placeholder syntax: subject_template = "{actor} liked your post", body_template = "{actor} liked your post: '{post_title}'". At send time, call template.render(context) where context = {"actor": "Bob", "post_title": "System Design Tips"}. This produces the final subject and body. Benefits: templates can be updated without code deploys; non-engineers can edit notification copy via a UI; A/B testing different copy is straightforward (create template variants, route users to variants by cohort). Per-channel formatting: each channel has its own template — email has HTML, SMS is truncated to 160 characters, push has a short subject. Template localization: store templates per locale (template_id + locale → template record), look up by user's preferred language. For high-volume notifications (millions of "X liked your post"), batch identical template renders: one render call produces the template with actor substituted, then fan out to all affected users."}}]}
🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale
🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering
🏢 Asked at: Snap Interview Guide