Incident Management System Low-Level Design: Detection, Escalation, and Post-Mortem

Incident Management System: Low-Level Design

An incident management system ingests alerts from monitoring infrastructure, classifies them by severity, pages the right on-call engineer, tracks a timeline of actions, links runbooks, and generates a post-mortem template after resolution. This article designs the full pipeline from alert ingestion through escalation to post-mortem generation, with SQL schema and Python implementation.

Alert Ingestion and Deduplication

Alerts arrive from Prometheus Alertmanager, PagerDuty, Datadog, or custom webhook sources. Each alert carries a fingerprint (a hash of its labels) used for deduplication: if an alert with the same fingerprint already has an open incident, the incoming alert is appended to the timeline rather than creating a duplicate incident.

SQL Schema


CREATE TABLE Incident (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    title           VARCHAR(512)      NOT NULL,
    severity        ENUM('P1','P2','P3','P4') NOT NULL,
    status          ENUM('open','acknowledged','mitigated','resolved','closed') NOT NULL DEFAULT 'open',
    alert_source    VARCHAR(128)      NOT NULL,
    alert_fingerprint VARCHAR(64)     NOT NULL,
    assignee_id     BIGINT UNSIGNED   NULL,
    runbook_url     VARCHAR(1000)     NULL,
    opened_at       DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    ack_at          DATETIME(3)       NULL,
    resolved_at     DATETIME(3)       NULL,
    closed_at       DATETIME(3)       NULL,
    PRIMARY KEY (id),
    INDEX idx_fingerprint_open (alert_fingerprint, status),
    INDEX idx_severity_status  (severity, status, opened_at DESC),
    INDEX idx_assignee         (assignee_id, status)
) ENGINE=InnoDB;

CREATE TABLE IncidentEvent (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    incident_id     BIGINT UNSIGNED   NOT NULL,
    event_type      ENUM('alert_received','assigned','acknowledged','escalated',
                         'mitigation_applied','comment','severity_changed','resolved','closed') NOT NULL,
    actor_id        BIGINT UNSIGNED   NULL,   -- NULL for system events
    payload         JSON              NULL,
    recorded_at     DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id),
    INDEX idx_incident_time (incident_id, recorded_at ASC)
) ENGINE=InnoDB;

CREATE TABLE EscalationPolicy (
    id              INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    name            VARCHAR(255)      NOT NULL,
    severity        ENUM('P1','P2','P3','P4') NOT NULL,
    steps           JSON              NOT NULL,
    -- steps: [{"delay_secs":0,"target":"oncall_primary"},
    --          {"delay_secs":300,"target":"oncall_secondary"},
    --          {"delay_secs":900,"target":"manager"}]
    PRIMARY KEY (id),
    UNIQUE KEY uq_name (name)
) ENGINE=InnoDB;

CREATE TABLE OnCallSchedule (
    id              INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    team            VARCHAR(128)      NOT NULL,
    rotation_start  DATETIME          NOT NULL,
    rotation_end    DATETIME          NULL,
    primary_user_id BIGINT UNSIGNED   NOT NULL,
    secondary_user_id BIGINT UNSIGNED NULL,
    override_user_id  BIGINT UNSIGNED NULL,   -- manual override takes precedence
    override_until    DATETIME        NULL,
    PRIMARY KEY (id),
    INDEX idx_team_time (team, rotation_start DESC)
) ENGINE=InnoDB;

Python Implementation


import hashlib
import json
from datetime import datetime, timezone, timedelta
import db
import pager  # abstraction over PagerDuty/OpsGenie/SMS

SEVERITY_MAP = {
    "critical": "P1",
    "high":     "P2",
    "warning":  "P3",
    "info":     "P4",
}

ACK_TIMEOUT = {
    "P1": 5 * 60,    # 5 minutes
    "P2": 15 * 60,
    "P3": 60 * 60,
    "P4": 4 * 3600,
}


def create_incident(alert: dict) -> dict:
    """
    Ingest an alert and create or update an incident.
    alert: {"source":"prometheus","severity":"critical","title":"...",
             "labels":{"alertname":"HighErrorRate","service":"payments"}, ...}
    """
    fingerprint = _compute_fingerprint(alert["labels"])
    severity = SEVERITY_MAP.get(alert.get("severity", "warning").lower(), "P3")

    # dedup: find open incident with same fingerprint
    existing = db.fetchone(
        "SELECT id, status FROM Incident WHERE alert_fingerprint=%s AND status NOT IN ('resolved','closed')",
        (fingerprint,)
    )

    if existing:
        # append to existing incident's timeline
        _add_event(existing["id"], "alert_received", payload={"alert": alert})
        return {"incident_id": existing["id"], "action": "appended"}

    # create new incident
    runbook_url = _lookup_runbook(alert.get("labels", {}))
    incident_id = db.execute(
        """INSERT INTO Incident (title, severity, alert_source, alert_fingerprint, runbook_url)
           VALUES (%s, %s, %s, %s, %s)""",
        (alert["title"], severity, alert["source"], fingerprint, runbook_url)
    )

    _add_event(incident_id, "alert_received", payload={"alert": alert})

    # assign on-call engineer
    team = alert.get("labels", {}).get("team", "platform")
    oncall = _get_oncall_user(team)
    if oncall:
        db.execute("UPDATE Incident SET assignee_id=%s WHERE id=%s", (oncall["user_id"], incident_id))
        _add_event(incident_id, "assigned", payload={"assignee_id": oncall["user_id"]})

    # page the on-call
    _page_oncall(incident_id, severity, oncall, alert["title"])

    # schedule escalation check
    _schedule_escalation_check(incident_id, severity)

    return {"incident_id": incident_id, "action": "created", "severity": severity}


def escalate_incident(incident_id: int) -> dict:
    """
    Called by a background job when ACK timeout expires.
    Looks up the escalation policy and pages the next target.
    """
    incident = db.fetchone(
        "SELECT id, severity, status, ack_at, opened_at, assignee_id FROM Incident WHERE id=%s",
        (incident_id,)
    )
    if not incident or incident["status"] in ("acknowledged", "resolved", "closed"):
        return {"action": "skipped", "reason": incident["status"] if incident else "not_found"}

    # how many escalation steps have already fired?
    escalation_events = db.fetchall(
        "SELECT id FROM IncidentEvent WHERE incident_id=%s AND event_type='escalated'",
        (incident_id,)
    )
    step_index = len(escalation_events)

    policy = db.fetchone(
        "SELECT steps FROM EscalationPolicy WHERE severity=%s LIMIT 1",
        (incident["severity"],)
    )
    if not policy:
        return {"action": "no_policy"}

    steps = json.loads(policy["steps"]) if isinstance(policy["steps"], str) else policy["steps"]

    if step_index >= len(steps):
        return {"action": "max_escalation_reached"}

    step = steps[step_index]
    target_type = step["target"]

    if target_type == "oncall_secondary":
        team = "platform"
        oncall = _get_oncall_user(team, role="secondary")
    elif target_type == "manager":
        oncall = _get_manager(incident["assignee_id"])
    else:
        oncall = _get_oncall_user("platform", role="primary")

    if oncall:
        pager.page(
            user_id=oncall["user_id"],
            subject=f"[ESCALATED] Incident #{incident_id} - {incident['severity']} unacknowledged",
            channel="sms" if incident["severity"] == "P1" else "push",
        )
        _add_event(incident_id, "escalated", payload={
            "step": step_index,
            "target": target_type,
            "paged_user": oncall["user_id"],
        })

    return {"action": "escalated", "step": step_index, "target": target_type}


def acknowledge_incident(incident_id: int, user_id: int) -> None:
    now = datetime.now(timezone.utc)
    db.execute(
        "UPDATE Incident SET status='acknowledged', ack_at=%s, assignee_id=%s WHERE id=%s",
        (now, user_id, incident_id)
    )
    _add_event(incident_id, "acknowledged", actor_id=user_id)


def resolve_incident(incident_id: int, user_id: int, summary: str = "") -> str:
    now = datetime.now(timezone.utc)
    db.execute(
        "UPDATE Incident SET status='resolved', resolved_at=%s WHERE id=%s",
        (now, incident_id)
    )
    _add_event(incident_id, "resolved", actor_id=user_id, payload={"summary": summary})
    return generate_postmortem_template(incident_id)


def generate_postmortem_template(incident_id: int) -> str:
    """Generate a Markdown post-mortem template from the incident timeline."""
    incident = db.fetchone("SELECT * FROM Incident WHERE id=%s", (incident_id,))
    events = db.fetchall(
        "SELECT event_type, actor_id, payload, recorded_at FROM IncidentEvent WHERE incident_id=%s ORDER BY recorded_at ASC",
        (incident_id,)
    )

    duration = ""
    if incident["resolved_at"] and incident["opened_at"]:
        delta = incident["resolved_at"] - incident["opened_at"]
        duration = str(delta)

    ttd = ""
    if incident["ack_at"] and incident["opened_at"]:
        ttd = str(incident["ack_at"] - incident["opened_at"])

    timeline_lines = []
    for e in events:
        ts = e["recorded_at"].strftime("%H:%M:%S") if hasattr(e["recorded_at"], "strftime") else str(e["recorded_at"])
        timeline_lines.append(f"- `{ts}` **{e['event_type']}**")

    timeline_md = "n".join(timeline_lines)

    template = f"""# Post-Mortem: {incident['title']}

**Severity:** {incident['severity']}
**Duration:** {duration}
**Time to Acknowledge:** {ttd}
**Runbook:** {incident.get('runbook_url') or 'N/A'}

## Summary
[1-2 sentence description of what happened and the user impact]

## Timeline
{timeline_md}

## Root Cause
[Describe the root cause]

## Contributing Factors
- [Factor 1]
- [Factor 2]

## Resolution
[What action resolved the incident]

## Action Items
| Action | Owner | Due Date |
|--------|-------|----------|
| [TODO] | @     | YYYY-MM-DD |

## Lessons Learned
[What went well, what could be improved]
"""
    return template


def _compute_fingerprint(labels: dict) -> str:
    canonical = json.dumps(labels, sort_keys=True)
    return hashlib.sha256(canonical.encode()).hexdigest()[:16]


def _get_oncall_user(team: str, role: str = "primary") -> dict | None:
    now = datetime.now(timezone.utc)
    row = db.fetchone(
        """SELECT primary_user_id, secondary_user_id, override_user_id, override_until
           FROM OnCallSchedule
           WHERE team=%s AND rotation_start  %s)
           ORDER BY rotation_start DESC LIMIT 1""",
        (team, now, now)
    )
    if not row:
        return None
    if row["override_user_id"] and (not row["override_until"] or row["override_until"] > now):
        return {"user_id": row["override_user_id"]}
    if role == "secondary" and row["secondary_user_id"]:
        return {"user_id": row["secondary_user_id"]}
    return {"user_id": row["primary_user_id"]}


def _get_manager(user_id: int | None) -> dict | None:
    if not user_id:
        return None
    row = db.fetchone("SELECT manager_id FROM UserProfile WHERE user_id=%s", (user_id,))
    return {"user_id": row["manager_id"]} if row and row["manager_id"] else None


def _page_oncall(incident_id: int, severity: str, oncall: dict | None, title: str) -> None:
    if not oncall:
        return
    channel = "sms" if severity == "P1" else "push"
    pager.page(user_id=oncall["user_id"],
               subject=f"[{severity}] Incident #{incident_id}: {title}",
               channel=channel)


def _lookup_runbook(labels: dict) -> str | None:
    service = labels.get("service") or labels.get("alertname", "")
    row = db.fetchone("SELECT url FROM RunbookIndex WHERE service=%s LIMIT 1", (service,))
    return row["url"] if row else None


def _schedule_escalation_check(incident_id: int, severity: str) -> None:
    delay = ACK_TIMEOUT.get(severity, 3600)
    # In production: enqueue a delayed job (Celery beat, SQS delay, etc.)
    # escalate_incident.apply_async((incident_id,), countdown=delay)
    pass


def _add_event(incident_id: int, event_type: str, actor_id: int = None, payload: dict = None) -> None:
    db.execute(
        "INSERT INTO IncidentEvent (incident_id, event_type, actor_id, payload) VALUES (%s,%s,%s,%s)",
        (incident_id, event_type, actor_id, json.dumps(payload) if payload else None)
    )

Escalation Policy

Each severity level has an EscalationPolicy with an ordered list of steps. Each step specifies a delay (seconds after the previous step) and a target (oncall_primary, oncall_secondary, manager). A background job (Celery beat, SQS delayed message) fires at the delay for each step. If the incident is acknowledged before the delay fires, the escalation job skips by checking status at execution time.

Post-Mortem Generation

On resolution, the system assembles the full IncidentEvent timeline and computes key metrics: total duration, time-to-acknowledge (TTA), time-to-mitigate (TTM). These are interpolated into a Markdown template with empty sections for root cause, contributing factors, and action items, which the incident commander fills in. The template is persisted as a document and linked back to the incident record.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does alert deduplication prevent duplicate incidents?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each alert is fingerprinted by hashing its label set (e.g., alertname, service, severity) into a short hash. Before creating a new incident, the system queries for any open incident with the same fingerprint. If found, the incoming alert is appended to that incident’s event timeline instead of creating a new record. This prevents alert storms from spawning hundreds of parallel incidents for the same underlying cause.”
}
},
{
“@type”: “Question”,
“name”: “How does an on-call escalation policy work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An escalation policy is a JSON array of steps, each with a delay_secs and a target (e.g., oncall_primary, oncall_secondary, manager). When an incident is created, a background job is scheduled for each step’s delay. At execution time, the job checks whether the incident has been acknowledged; if not, it pages the step’s target. P1 incidents use SMS; lower severities use push notifications. Overrides in the OnCallSchedule table allow manual rotation substitution.”
}
},
{
“@type”: “Question”,
“name”: “What is event sourcing in the context of an incident timeline?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Instead of storing only the current state of an incident, every state change (alert received, assigned, acknowledged, comment added, escalated, resolved) is appended as an immutable IncidentEvent row with a timestamp and actor. The current incident state can be derived by replaying these events. This provides a complete audit trail, enables time-to-acknowledge and time-to-mitigate metrics, and feeds the auto-generated post-mortem timeline.”
}
},
{
“@type”: “Question”,
“name”: “What key metrics should a post-mortem template capture automatically?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The system can auto-populate: total incident duration (resolved_at minus opened_at), time-to-acknowledge (ack_at minus opened_at), escalation count (number of escalated events), and a chronological timeline from IncidentEvent rows. The runbook URL is looked up from a RunbookIndex by service label. Human-authored sections (root cause, contributing factors, action items) are left as placeholders for the incident commander to complete.”
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How is incident severity auto-classified?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Alert metadata (affected service, error rate, latency percentile) is scored against a classification matrix; P1 requires >5% error rate on a critical service or complete outage.”}},{“@type”:”Question”,”name”:”How does the on-call escalation policy work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”If the primary on-call does not acknowledge within the ack_timeout, the policy escalates to the secondary; after a second timeout it pages the manager and creates a Slack/PagerDuty bridge.”}},{“@type”:”Question”,”name”:”How is incident timeline event sourcing implemented?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”IncidentEvent rows are append-only with (incident_id, event_type, actor_id, payload, created_at); the full incident history is reconstructed by replaying events in order.”}},{“@type”:”Question”,”name”:”What does the auto-generated post-mortem template include?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”It pre-fills incident duration, affected services (from IncidentEvent rows), contributing alerts, timeline of key events, and placeholder sections for root cause and action items.”}}]}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

Scroll to Top