Notification Routing Engine: Low-Level Design
A notification routing engine sits between the event source and the delivery channels (email, SMS, push, in-app, Slack). Its job is to translate an abstract notification event into the right channel for the right user at the right time — respecting quiet hours, priority levels, deduplication windows, and channel availability. This article designs the routing logic, data model, and Python implementation end to end.
Core Concepts
- Notification type: a semantic label like
order_shipped,security_alert,weekly_digest. - Channel: the delivery mechanism — email, SMS, push, in-app, webhook.
- Priority: CRITICAL / HIGH / NORMAL / LOW. Priority determines whether quiet hours can be bypassed and which channels are tried first.
- Deduplication window: suppress duplicate notifications of the same type to the same user within a time window.
- Escalation: if the primary channel fails (bounce, unregistered token), try the next channel in the user’s preference order.
SQL Schema
CREATE TABLE NotificationPreference (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
user_id BIGINT UNSIGNED NOT NULL,
notif_type VARCHAR(128) NOT NULL,
channel ENUM('email','sms','push','in_app','webhook') NOT NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
priority_order TINYINT UNSIGNED NOT NULL DEFAULT 1, -- 1 = try first
quiet_hours_override TINYINT(1) NOT NULL DEFAULT 0, -- ignore quiet hours
PRIMARY KEY (id),
UNIQUE KEY uq_user_type_channel (user_id, notif_type, channel),
INDEX idx_user_type (user_id, notif_type)
) ENGINE=InnoDB;
CREATE TABLE RoutingRule (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
notif_type VARCHAR(128) NOT NULL,
priority ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL DEFAULT 'NORMAL',
default_channels JSON NOT NULL, -- fallback if no user preference
dedup_window_secs INT UNSIGNED NOT NULL DEFAULT 0, -- 0 = no dedup
PRIMARY KEY (id),
UNIQUE KEY uq_type (notif_type)
) ENGINE=InnoDB;
CREATE TABLE QuietHours (
user_id BIGINT UNSIGNED NOT NULL,
channel ENUM('email','sms','push','in_app','webhook') NOT NULL,
start_time TIME NOT NULL, -- e.g. 22:00:00
end_time TIME NOT NULL, -- e.g. 08:00:00
timezone VARCHAR(64) NOT NULL DEFAULT 'UTC',
PRIMARY KEY (user_id, channel)
) ENGINE=InnoDB;
CREATE TABLE NotificationLog (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
user_id BIGINT UNSIGNED NOT NULL,
notif_type VARCHAR(128) NOT NULL,
channel ENUM('email','sms','push','in_app','webhook') NOT NULL,
status ENUM('pending','sent','failed','suppressed','deduplicated') NOT NULL DEFAULT 'pending',
priority ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL,
dedup_key VARCHAR(255) NULL,
payload JSON NULL,
attempted_at DATETIME(3) NULL,
delivered_at DATETIME(3) NULL,
error_msg VARCHAR(1000) NULL,
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (id),
INDEX idx_user_type_time (user_id, notif_type, created_at DESC),
INDEX idx_dedup_key (dedup_key, created_at DESC),
INDEX idx_status (status, created_at DESC)
) ENGINE=InnoDB;
Python: Routing Logic
from datetime import datetime, time as dtime
import pytz
import json
import db
import channel_adapters # email_adapter, sms_adapter, push_adapter, etc.
CHANNEL_ADAPTERS = {
"email": channel_adapters.EmailAdapter(),
"sms": channel_adapters.SmsAdapter(),
"push": channel_adapters.PushAdapter(),
"in_app": channel_adapters.InAppAdapter(),
"webhook": channel_adapters.WebhookAdapter(),
}
def route_notification(user_id: int, notif_type: str, payload: dict) -> dict:
"""
Main entry point. Returns a dict with routing decision and outcome.
"""
# 1. load routing rule (priority, dedup window, default channels)
rule = db.fetchone(
"SELECT * FROM RoutingRule WHERE notif_type = %s", (notif_type,)
)
if not rule:
return {"status": "no_rule", "notif_type": notif_type}
priority = rule["priority"]
# 2. deduplication check
dedup_key = None
if rule["dedup_window_secs"] > 0:
dedup_key = f"{user_id}:{notif_type}"
recent = db.fetchone(
"""SELECT id FROM NotificationLog
WHERE dedup_key = %s
AND status NOT IN ('failed','suppressed')
AND created_at >= NOW(3) - INTERVAL %s SECOND
LIMIT 1""",
(dedup_key, rule["dedup_window_secs"])
)
if recent:
_log_notification(user_id, notif_type, "in_app", priority, "deduplicated", dedup_key, payload)
return {"status": "deduplicated", "suppressed_by": recent["id"]}
# 3. resolve channel list (user preference or rule default)
channels = _resolve_channels(user_id, notif_type, rule)
# 4. filter by quiet hours
active_channels = [
ch for ch in channels
if priority == "CRITICAL" or not apply_quiet_hours(user_id, ch)
]
if not active_channels:
_log_notification(user_id, notif_type, channels[0] if channels else "email",
priority, "suppressed", dedup_key, payload,
error_msg="all channels in quiet hours")
return {"status": "suppressed", "reason": "quiet_hours"}
# 5. attempt delivery with escalation
for channel in active_channels:
adapter = CHANNEL_ADAPTERS.get(channel)
if not adapter:
continue
log_id = _log_notification(user_id, notif_type, channel, priority, "pending", dedup_key, payload)
try:
adapter.send(user_id=user_id, payload=payload)
db.execute(
"UPDATE NotificationLog SET status='sent', delivered_at=NOW(3) WHERE id=%s",
(log_id,)
)
return {"status": "sent", "channel": channel, "log_id": log_id}
except Exception as e:
db.execute(
"UPDATE NotificationLog SET status='failed', error_msg=%s WHERE id=%s",
(str(e)[:999], log_id)
)
# escalate to next channel
return {"status": "failed", "reason": "all_channels_failed"}
def apply_quiet_hours(user_id: int, channel: str) -> bool:
"""Returns True if the current time is within the user's quiet hours for this channel."""
row = db.fetchone(
"SELECT start_time, end_time, timezone FROM QuietHours WHERE user_id=%s AND channel=%s",
(user_id, channel)
)
if not row:
return False
tz = pytz.timezone(row["timezone"])
now_local = datetime.now(tz).time()
start = row["start_time"]
end = row["end_time"]
if isinstance(start, str):
start = dtime.fromisoformat(start)
if isinstance(end, str):
end = dtime.fromisoformat(end)
if start <= end:
return start <= now_local = start or now_local list[str]:
rows = db.fetchall(
"""SELECT channel FROM NotificationPreference
WHERE user_id=%s AND notif_type=%s AND enabled=1
ORDER BY priority_order ASC""",
(user_id, notif_type)
)
if rows:
return [r["channel"] for r in rows]
default = rule.get("default_channels")
if isinstance(default, str):
default = json.loads(default)
return default or ["in_app"]
def _log_notification(user_id, notif_type, channel, priority, status, dedup_key, payload, error_msg=None) -> int:
return db.execute(
"""INSERT INTO NotificationLog
(user_id, notif_type, channel, priority, status, dedup_key, payload, attempted_at, error_msg)
VALUES (%s,%s,%s,%s,%s,%s,%s,NOW(3),%s)""",
(user_id, notif_type, channel, priority, status, dedup_key, json.dumps(payload), error_msg)
)
Priority and Channel Matrix
CRITICAL notifications (e.g., account compromise, payment failure) bypass quiet hours on all channels and immediately escalate to SMS if push delivery fails. HIGH notifications respect quiet hours only for low-noise channels (in-app) but bypass them for SMS. NORMAL and LOW notifications are fully gated by quiet hours and deduplication windows, and LOW notifications are batched into digests rather than sent individually.
Escalation Policy
The channel list returned by _resolve_channels is ordered by priority_order. The router iterates channels in order, stops on the first successful delivery, and logs each failed attempt. An exponential-backoff retry queue handles transient failures (e.g., push service 503) separately from immediate escalation for hard failures (e.g., unregistered device token).
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
See also: Lyft Interview Guide 2026: Rideshare Engineering, Real-Time Dispatch, and Safety Systems