Notification Routing Engine Low-Level Design: Channel Selection, Priority, and Quiet Hours

Notification Routing Engine: Low-Level Design

A notification routing engine sits between the event source and the delivery channels (email, SMS, push, in-app, Slack). Its job is to translate an abstract notification event into the right channel for the right user at the right time — respecting quiet hours, priority levels, deduplication windows, and channel availability. This article designs the routing logic, data model, and Python implementation end to end.

Core Concepts

  • Notification type: a semantic label like order_shipped, security_alert, weekly_digest.
  • Channel: the delivery mechanism — email, SMS, push, in-app, webhook.
  • Priority: CRITICAL / HIGH / NORMAL / LOW. Priority determines whether quiet hours can be bypassed and which channels are tried first.
  • Deduplication window: suppress duplicate notifications of the same type to the same user within a time window.
  • Escalation: if the primary channel fails (bounce, unregistered token), try the next channel in the user’s preference order.

SQL Schema


CREATE TABLE NotificationPreference (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id         BIGINT UNSIGNED   NOT NULL,
    notif_type      VARCHAR(128)      NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    enabled         TINYINT(1)        NOT NULL DEFAULT 1,
    priority_order  TINYINT UNSIGNED  NOT NULL DEFAULT 1,   -- 1 = try first
    quiet_hours_override TINYINT(1)   NOT NULL DEFAULT 0,   -- ignore quiet hours
    PRIMARY KEY (id),
    UNIQUE KEY uq_user_type_channel (user_id, notif_type, channel),
    INDEX idx_user_type (user_id, notif_type)
) ENGINE=InnoDB;

CREATE TABLE RoutingRule (
    id              INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    notif_type      VARCHAR(128)      NOT NULL,
    priority        ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL DEFAULT 'NORMAL',
    default_channels JSON             NOT NULL,  -- fallback if no user preference
    dedup_window_secs INT UNSIGNED    NOT NULL DEFAULT 0,   -- 0 = no dedup
    PRIMARY KEY (id),
    UNIQUE KEY uq_type (notif_type)
) ENGINE=InnoDB;

CREATE TABLE QuietHours (
    user_id         BIGINT UNSIGNED   NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    start_time      TIME              NOT NULL,  -- e.g. 22:00:00
    end_time        TIME              NOT NULL,  -- e.g. 08:00:00
    timezone        VARCHAR(64)       NOT NULL DEFAULT 'UTC',
    PRIMARY KEY (user_id, channel)
) ENGINE=InnoDB;

CREATE TABLE NotificationLog (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id         BIGINT UNSIGNED   NOT NULL,
    notif_type      VARCHAR(128)      NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    status          ENUM('pending','sent','failed','suppressed','deduplicated') NOT NULL DEFAULT 'pending',
    priority        ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL,
    dedup_key       VARCHAR(255)      NULL,
    payload         JSON              NULL,
    attempted_at    DATETIME(3)       NULL,
    delivered_at    DATETIME(3)       NULL,
    error_msg       VARCHAR(1000)     NULL,
    created_at      DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id),
    INDEX idx_user_type_time  (user_id, notif_type, created_at DESC),
    INDEX idx_dedup_key       (dedup_key, created_at DESC),
    INDEX idx_status          (status, created_at DESC)
) ENGINE=InnoDB;

Python: Routing Logic


from datetime import datetime, time as dtime
import pytz
import json
import db
import channel_adapters  # email_adapter, sms_adapter, push_adapter, etc.

CHANNEL_ADAPTERS = {
    "email":  channel_adapters.EmailAdapter(),
    "sms":    channel_adapters.SmsAdapter(),
    "push":   channel_adapters.PushAdapter(),
    "in_app": channel_adapters.InAppAdapter(),
    "webhook": channel_adapters.WebhookAdapter(),
}


def route_notification(user_id: int, notif_type: str, payload: dict) -> dict:
    """
    Main entry point. Returns a dict with routing decision and outcome.
    """
    # 1. load routing rule (priority, dedup window, default channels)
    rule = db.fetchone(
        "SELECT * FROM RoutingRule WHERE notif_type = %s", (notif_type,)
    )
    if not rule:
        return {"status": "no_rule", "notif_type": notif_type}

    priority = rule["priority"]

    # 2. deduplication check
    dedup_key = None
    if rule["dedup_window_secs"] > 0:
        dedup_key = f"{user_id}:{notif_type}"
        recent = db.fetchone(
            """SELECT id FROM NotificationLog
               WHERE dedup_key = %s
                 AND status NOT IN ('failed','suppressed')
                 AND created_at >= NOW(3) - INTERVAL %s SECOND
               LIMIT 1""",
            (dedup_key, rule["dedup_window_secs"])
        )
        if recent:
            _log_notification(user_id, notif_type, "in_app", priority, "deduplicated", dedup_key, payload)
            return {"status": "deduplicated", "suppressed_by": recent["id"]}

    # 3. resolve channel list (user preference or rule default)
    channels = _resolve_channels(user_id, notif_type, rule)

    # 4. filter by quiet hours
    active_channels = [
        ch for ch in channels
        if priority == "CRITICAL" or not apply_quiet_hours(user_id, ch)
    ]

    if not active_channels:
        _log_notification(user_id, notif_type, channels[0] if channels else "email",
                          priority, "suppressed", dedup_key, payload,
                          error_msg="all channels in quiet hours")
        return {"status": "suppressed", "reason": "quiet_hours"}

    # 5. attempt delivery with escalation
    for channel in active_channels:
        adapter = CHANNEL_ADAPTERS.get(channel)
        if not adapter:
            continue
        log_id = _log_notification(user_id, notif_type, channel, priority, "pending", dedup_key, payload)
        try:
            adapter.send(user_id=user_id, payload=payload)
            db.execute(
                "UPDATE NotificationLog SET status='sent', delivered_at=NOW(3) WHERE id=%s",
                (log_id,)
            )
            return {"status": "sent", "channel": channel, "log_id": log_id}
        except Exception as e:
            db.execute(
                "UPDATE NotificationLog SET status='failed', error_msg=%s WHERE id=%s",
                (str(e)[:999], log_id)
            )
            # escalate to next channel

    return {"status": "failed", "reason": "all_channels_failed"}


def apply_quiet_hours(user_id: int, channel: str) -> bool:
    """Returns True if the current time is within the user's quiet hours for this channel."""
    row = db.fetchone(
        "SELECT start_time, end_time, timezone FROM QuietHours WHERE user_id=%s AND channel=%s",
        (user_id, channel)
    )
    if not row:
        return False

    tz = pytz.timezone(row["timezone"])
    now_local = datetime.now(tz).time()
    start = row["start_time"]
    end = row["end_time"]

    if isinstance(start, str):
        start = dtime.fromisoformat(start)
    if isinstance(end, str):
        end = dtime.fromisoformat(end)

    if start <= end:
        return start <= now_local = start or now_local  list[str]:
    rows = db.fetchall(
        """SELECT channel FROM NotificationPreference
           WHERE user_id=%s AND notif_type=%s AND enabled=1
           ORDER BY priority_order ASC""",
        (user_id, notif_type)
    )
    if rows:
        return [r["channel"] for r in rows]
    default = rule.get("default_channels")
    if isinstance(default, str):
        default = json.loads(default)
    return default or ["in_app"]


def _log_notification(user_id, notif_type, channel, priority, status, dedup_key, payload, error_msg=None) -> int:
    return db.execute(
        """INSERT INTO NotificationLog
               (user_id, notif_type, channel, priority, status, dedup_key, payload, attempted_at, error_msg)
           VALUES (%s,%s,%s,%s,%s,%s,%s,NOW(3),%s)""",
        (user_id, notif_type, channel, priority, status, dedup_key, json.dumps(payload), error_msg)
    )

Priority and Channel Matrix

CRITICAL notifications (e.g., account compromise, payment failure) bypass quiet hours on all channels and immediately escalate to SMS if push delivery fails. HIGH notifications respect quiet hours only for low-noise channels (in-app) but bypass them for SMS. NORMAL and LOW notifications are fully gated by quiet hours and deduplication windows, and LOW notifications are batched into digests rather than sent individually.

Escalation Policy

The channel list returned by _resolve_channels is ordered by priority_order. The router iterates channels in order, stops on the first successful delivery, and logs each failed attempt. An exponential-backoff retry queue handles transient failures (e.g., push service 503) separately from immediate escalation for hard failures (e.g., unregistered device token).

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Lyft Interview Guide 2026: Rideshare Engineering, Real-Time Dispatch, and Safety Systems

Scroll to Top