Notification Routing Engine Low-Level Design: Channel Selection, Priority, and Quiet Hours

Notification Routing Engine: Low-Level Design

A notification routing engine sits between the event source and the delivery channels (email, SMS, push, in-app, Slack). Its job is to translate an abstract notification event into the right channel for the right user at the right time — respecting quiet hours, priority levels, deduplication windows, and channel availability. This article designs the routing logic, data model, and Python implementation end to end.

Core Concepts

  • Notification type: a semantic label like order_shipped, security_alert, weekly_digest.
  • Channel: the delivery mechanism — email, SMS, push, in-app, webhook.
  • Priority: CRITICAL / HIGH / NORMAL / LOW. Priority determines whether quiet hours can be bypassed and which channels are tried first.
  • Deduplication window: suppress duplicate notifications of the same type to the same user within a time window.
  • Escalation: if the primary channel fails (bounce, unregistered token), try the next channel in the user’s preference order.

SQL Schema


CREATE TABLE NotificationPreference (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id         BIGINT UNSIGNED   NOT NULL,
    notif_type      VARCHAR(128)      NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    enabled         TINYINT(1)        NOT NULL DEFAULT 1,
    priority_order  TINYINT UNSIGNED  NOT NULL DEFAULT 1,   -- 1 = try first
    quiet_hours_override TINYINT(1)   NOT NULL DEFAULT 0,   -- ignore quiet hours
    PRIMARY KEY (id),
    UNIQUE KEY uq_user_type_channel (user_id, notif_type, channel),
    INDEX idx_user_type (user_id, notif_type)
) ENGINE=InnoDB;

CREATE TABLE RoutingRule (
    id              INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    notif_type      VARCHAR(128)      NOT NULL,
    priority        ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL DEFAULT 'NORMAL',
    default_channels JSON             NOT NULL,  -- fallback if no user preference
    dedup_window_secs INT UNSIGNED    NOT NULL DEFAULT 0,   -- 0 = no dedup
    PRIMARY KEY (id),
    UNIQUE KEY uq_type (notif_type)
) ENGINE=InnoDB;

CREATE TABLE QuietHours (
    user_id         BIGINT UNSIGNED   NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    start_time      TIME              NOT NULL,  -- e.g. 22:00:00
    end_time        TIME              NOT NULL,  -- e.g. 08:00:00
    timezone        VARCHAR(64)       NOT NULL DEFAULT 'UTC',
    PRIMARY KEY (user_id, channel)
) ENGINE=InnoDB;

CREATE TABLE NotificationLog (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id         BIGINT UNSIGNED   NOT NULL,
    notif_type      VARCHAR(128)      NOT NULL,
    channel         ENUM('email','sms','push','in_app','webhook') NOT NULL,
    status          ENUM('pending','sent','failed','suppressed','deduplicated') NOT NULL DEFAULT 'pending',
    priority        ENUM('CRITICAL','HIGH','NORMAL','LOW') NOT NULL,
    dedup_key       VARCHAR(255)      NULL,
    payload         JSON              NULL,
    attempted_at    DATETIME(3)       NULL,
    delivered_at    DATETIME(3)       NULL,
    error_msg       VARCHAR(1000)     NULL,
    created_at      DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id),
    INDEX idx_user_type_time  (user_id, notif_type, created_at DESC),
    INDEX idx_dedup_key       (dedup_key, created_at DESC),
    INDEX idx_status          (status, created_at DESC)
) ENGINE=InnoDB;

Python: Routing Logic


from datetime import datetime, time as dtime
import pytz
import json
import db
import channel_adapters  # email_adapter, sms_adapter, push_adapter, etc.

CHANNEL_ADAPTERS = {
    "email":  channel_adapters.EmailAdapter(),
    "sms":    channel_adapters.SmsAdapter(),
    "push":   channel_adapters.PushAdapter(),
    "in_app": channel_adapters.InAppAdapter(),
    "webhook": channel_adapters.WebhookAdapter(),
}


def route_notification(user_id: int, notif_type: str, payload: dict) -> dict:
    """
    Main entry point. Returns a dict with routing decision and outcome.
    """
    # 1. load routing rule (priority, dedup window, default channels)
    rule = db.fetchone(
        "SELECT * FROM RoutingRule WHERE notif_type = %s", (notif_type,)
    )
    if not rule:
        return {"status": "no_rule", "notif_type": notif_type}

    priority = rule["priority"]

    # 2. deduplication check
    dedup_key = None
    if rule["dedup_window_secs"] > 0:
        dedup_key = f"{user_id}:{notif_type}"
        recent = db.fetchone(
            """SELECT id FROM NotificationLog
               WHERE dedup_key = %s
                 AND status NOT IN ('failed','suppressed')
                 AND created_at >= NOW(3) - INTERVAL %s SECOND
               LIMIT 1""",
            (dedup_key, rule["dedup_window_secs"])
        )
        if recent:
            _log_notification(user_id, notif_type, "in_app", priority, "deduplicated", dedup_key, payload)
            return {"status": "deduplicated", "suppressed_by": recent["id"]}

    # 3. resolve channel list (user preference or rule default)
    channels = _resolve_channels(user_id, notif_type, rule)

    # 4. filter by quiet hours
    active_channels = [
        ch for ch in channels
        if priority == "CRITICAL" or not apply_quiet_hours(user_id, ch)
    ]

    if not active_channels:
        _log_notification(user_id, notif_type, channels[0] if channels else "email",
                          priority, "suppressed", dedup_key, payload,
                          error_msg="all channels in quiet hours")
        return {"status": "suppressed", "reason": "quiet_hours"}

    # 5. attempt delivery with escalation
    for channel in active_channels:
        adapter = CHANNEL_ADAPTERS.get(channel)
        if not adapter:
            continue
        log_id = _log_notification(user_id, notif_type, channel, priority, "pending", dedup_key, payload)
        try:
            adapter.send(user_id=user_id, payload=payload)
            db.execute(
                "UPDATE NotificationLog SET status='sent', delivered_at=NOW(3) WHERE id=%s",
                (log_id,)
            )
            return {"status": "sent", "channel": channel, "log_id": log_id}
        except Exception as e:
            db.execute(
                "UPDATE NotificationLog SET status='failed', error_msg=%s WHERE id=%s",
                (str(e)[:999], log_id)
            )
            # escalate to next channel

    return {"status": "failed", "reason": "all_channels_failed"}


def apply_quiet_hours(user_id: int, channel: str) -> bool:
    """Returns True if the current time is within the user's quiet hours for this channel."""
    row = db.fetchone(
        "SELECT start_time, end_time, timezone FROM QuietHours WHERE user_id=%s AND channel=%s",
        (user_id, channel)
    )
    if not row:
        return False

    tz = pytz.timezone(row["timezone"])
    now_local = datetime.now(tz).time()
    start = row["start_time"]
    end = row["end_time"]

    if isinstance(start, str):
        start = dtime.fromisoformat(start)
    if isinstance(end, str):
        end = dtime.fromisoformat(end)

    if start <= end:
        return start <= now_local = start or now_local  list[str]:
    rows = db.fetchall(
        """SELECT channel FROM NotificationPreference
           WHERE user_id=%s AND notif_type=%s AND enabled=1
           ORDER BY priority_order ASC""",
        (user_id, notif_type)
    )
    if rows:
        return [r["channel"] for r in rows]
    default = rule.get("default_channels")
    if isinstance(default, str):
        default = json.loads(default)
    return default or ["in_app"]


def _log_notification(user_id, notif_type, channel, priority, status, dedup_key, payload, error_msg=None) -> int:
    return db.execute(
        """INSERT INTO NotificationLog
               (user_id, notif_type, channel, priority, status, dedup_key, payload, attempted_at, error_msg)
           VALUES (%s,%s,%s,%s,%s,%s,%s,NOW(3),%s)""",
        (user_id, notif_type, channel, priority, status, dedup_key, json.dumps(payload), error_msg)
    )

Priority and Channel Matrix

CRITICAL notifications (e.g., account compromise, payment failure) bypass quiet hours on all channels and immediately escalate to SMS if push delivery fails. HIGH notifications respect quiet hours only for low-noise channels (in-app) but bypass them for SMS. NORMAL and LOW notifications are fully gated by quiet hours and deduplication windows, and LOW notifications are batched into digests rather than sent individually.

Escalation Policy

The channel list returned by _resolve_channels is ordered by priority_order. The router iterates channels in order, stops on the first successful delivery, and logs each failed attempt. An exponential-backoff retry queue handles transient failures (e.g., push service 503) separately from immediate escalation for hard failures (e.g., unregistered device token).

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a notification routing engine select the right delivery channel?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The engine first looks up the user’s per-notification-type channel preferences ordered by priority_order. If no user preference exists, it falls back to the default channel list defined on the RoutingRule. It then filters out channels currently in quiet hours (unless priority is CRITICAL), and attempts delivery in order, escalating to the next channel on failure.”
}
},
{
“@type”: “Question”,
“name”: “How does notification deduplication work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A dedup_key is constructed as ‘{user_id}:{notif_type}’. Before routing, the engine queries NotificationLog for any non-failed entry with the same dedup_key within the RoutingRule’s dedup_window_secs. If found, the new notification is logged as ‘deduplicated’ and not delivered. This prevents flooding users with repeated alerts for the same event (e.g., the same order update triggering 10 retries).”
}
},
{
“@type”: “Question”,
“name”: “How are quiet hours implemented across timezones?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Quiet hours are stored per user per channel with a timezone column. At routing time, datetime.now() is localized to the user’s timezone and compared against the start/end time pair. When the quiet hour window crosses midnight (e.g., 22:00 to 08:00), the comparison uses an OR condition: current time >= start OR current time <= end."
}
},
{
"@type": "Question",
"name": "What is the escalation policy when a notification channel fails?",
"acceptedAnswer": {
"@type": "Answer",
"text": "The router iterates the ordered channel list and stops on the first success. Each failure is logged with the error message in NotificationLog. Transient failures (HTTP 5xx from the push provider) are retried with exponential backoff via a job queue. Hard failures (unregistered device token, invalid email) immediately escalate to the next channel without retry. If all channels fail, the notification is logged with status='failed' for alerting and audit."
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How is the channel preference matrix stored?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”NotificationPreference rows map (user_id, notification_type) to an ordered list of channels (push, email, SMS); the routing engine iterates the list until a channel is available and not in quiet hours.”}},{“@type”:”Question”,”name”:”How are quiet hours enforced for cross-midnight ranges?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Quiet hours store start_hour and end_hour in the user’s timezone; the engine converts current UTC time to the user’s timezone and handles the wrap-around case (e.g., 22:00–08:00) with a modular comparison.”}},{“@type”:”Question”,”name”:”How does notification deduplication work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A deduplication key (user_id + notification_type + entity_id) with a Redis SET NX and TTL prevents the same notification from being sent twice within the dedup window.”}},{“@type”:”Question”,”name”:”How does escalation work when the primary channel fails?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”On provider delivery failure, the routing engine picks the next channel in the preference list; after all channels fail, the notification is marked failed and logged for manual review.”}}]}

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Lyft Interview Guide 2026: Rideshare Engineering, Real-Time Dispatch, and Safety Systems

Scroll to Top