Incident Management System: Low-Level Design
An incident management system ingests alerts from monitoring infrastructure, classifies them by severity, pages the right on-call engineer, tracks a timeline of actions, links runbooks, and generates a post-mortem template after resolution. This article designs the full pipeline from alert ingestion through escalation to post-mortem generation, with SQL schema and Python implementation.
Alert Ingestion and Deduplication
Alerts arrive from Prometheus Alertmanager, PagerDuty, Datadog, or custom webhook sources. Each alert carries a fingerprint (a hash of its labels) used for deduplication: if an alert with the same fingerprint already has an open incident, the incoming alert is appended to the timeline rather than creating a duplicate incident.
SQL Schema
CREATE TABLE Incident (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
title VARCHAR(512) NOT NULL,
severity ENUM('P1','P2','P3','P4') NOT NULL,
status ENUM('open','acknowledged','mitigated','resolved','closed') NOT NULL DEFAULT 'open',
alert_source VARCHAR(128) NOT NULL,
alert_fingerprint VARCHAR(64) NOT NULL,
assignee_id BIGINT UNSIGNED NULL,
runbook_url VARCHAR(1000) NULL,
opened_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
ack_at DATETIME(3) NULL,
resolved_at DATETIME(3) NULL,
closed_at DATETIME(3) NULL,
PRIMARY KEY (id),
INDEX idx_fingerprint_open (alert_fingerprint, status),
INDEX idx_severity_status (severity, status, opened_at DESC),
INDEX idx_assignee (assignee_id, status)
) ENGINE=InnoDB;
CREATE TABLE IncidentEvent (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
incident_id BIGINT UNSIGNED NOT NULL,
event_type ENUM('alert_received','assigned','acknowledged','escalated',
'mitigation_applied','comment','severity_changed','resolved','closed') NOT NULL,
actor_id BIGINT UNSIGNED NULL, -- NULL for system events
payload JSON NULL,
recorded_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (id),
INDEX idx_incident_time (incident_id, recorded_at ASC)
) ENGINE=InnoDB;
CREATE TABLE EscalationPolicy (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
severity ENUM('P1','P2','P3','P4') NOT NULL,
steps JSON NOT NULL,
-- steps: [{"delay_secs":0,"target":"oncall_primary"},
-- {"delay_secs":300,"target":"oncall_secondary"},
-- {"delay_secs":900,"target":"manager"}]
PRIMARY KEY (id),
UNIQUE KEY uq_name (name)
) ENGINE=InnoDB;
CREATE TABLE OnCallSchedule (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
team VARCHAR(128) NOT NULL,
rotation_start DATETIME NOT NULL,
rotation_end DATETIME NULL,
primary_user_id BIGINT UNSIGNED NOT NULL,
secondary_user_id BIGINT UNSIGNED NULL,
override_user_id BIGINT UNSIGNED NULL, -- manual override takes precedence
override_until DATETIME NULL,
PRIMARY KEY (id),
INDEX idx_team_time (team, rotation_start DESC)
) ENGINE=InnoDB;
Python Implementation
import hashlib
import json
from datetime import datetime, timezone, timedelta
import db
import pager # abstraction over PagerDuty/OpsGenie/SMS
SEVERITY_MAP = {
"critical": "P1",
"high": "P2",
"warning": "P3",
"info": "P4",
}
ACK_TIMEOUT = {
"P1": 5 * 60, # 5 minutes
"P2": 15 * 60,
"P3": 60 * 60,
"P4": 4 * 3600,
}
def create_incident(alert: dict) -> dict:
"""
Ingest an alert and create or update an incident.
alert: {"source":"prometheus","severity":"critical","title":"...",
"labels":{"alertname":"HighErrorRate","service":"payments"}, ...}
"""
fingerprint = _compute_fingerprint(alert["labels"])
severity = SEVERITY_MAP.get(alert.get("severity", "warning").lower(), "P3")
# dedup: find open incident with same fingerprint
existing = db.fetchone(
"SELECT id, status FROM Incident WHERE alert_fingerprint=%s AND status NOT IN ('resolved','closed')",
(fingerprint,)
)
if existing:
# append to existing incident's timeline
_add_event(existing["id"], "alert_received", payload={"alert": alert})
return {"incident_id": existing["id"], "action": "appended"}
# create new incident
runbook_url = _lookup_runbook(alert.get("labels", {}))
incident_id = db.execute(
"""INSERT INTO Incident (title, severity, alert_source, alert_fingerprint, runbook_url)
VALUES (%s, %s, %s, %s, %s)""",
(alert["title"], severity, alert["source"], fingerprint, runbook_url)
)
_add_event(incident_id, "alert_received", payload={"alert": alert})
# assign on-call engineer
team = alert.get("labels", {}).get("team", "platform")
oncall = _get_oncall_user(team)
if oncall:
db.execute("UPDATE Incident SET assignee_id=%s WHERE id=%s", (oncall["user_id"], incident_id))
_add_event(incident_id, "assigned", payload={"assignee_id": oncall["user_id"]})
# page the on-call
_page_oncall(incident_id, severity, oncall, alert["title"])
# schedule escalation check
_schedule_escalation_check(incident_id, severity)
return {"incident_id": incident_id, "action": "created", "severity": severity}
def escalate_incident(incident_id: int) -> dict:
"""
Called by a background job when ACK timeout expires.
Looks up the escalation policy and pages the next target.
"""
incident = db.fetchone(
"SELECT id, severity, status, ack_at, opened_at, assignee_id FROM Incident WHERE id=%s",
(incident_id,)
)
if not incident or incident["status"] in ("acknowledged", "resolved", "closed"):
return {"action": "skipped", "reason": incident["status"] if incident else "not_found"}
# how many escalation steps have already fired?
escalation_events = db.fetchall(
"SELECT id FROM IncidentEvent WHERE incident_id=%s AND event_type='escalated'",
(incident_id,)
)
step_index = len(escalation_events)
policy = db.fetchone(
"SELECT steps FROM EscalationPolicy WHERE severity=%s LIMIT 1",
(incident["severity"],)
)
if not policy:
return {"action": "no_policy"}
steps = json.loads(policy["steps"]) if isinstance(policy["steps"], str) else policy["steps"]
if step_index >= len(steps):
return {"action": "max_escalation_reached"}
step = steps[step_index]
target_type = step["target"]
if target_type == "oncall_secondary":
team = "platform"
oncall = _get_oncall_user(team, role="secondary")
elif target_type == "manager":
oncall = _get_manager(incident["assignee_id"])
else:
oncall = _get_oncall_user("platform", role="primary")
if oncall:
pager.page(
user_id=oncall["user_id"],
subject=f"[ESCALATED] Incident #{incident_id} - {incident['severity']} unacknowledged",
channel="sms" if incident["severity"] == "P1" else "push",
)
_add_event(incident_id, "escalated", payload={
"step": step_index,
"target": target_type,
"paged_user": oncall["user_id"],
})
return {"action": "escalated", "step": step_index, "target": target_type}
def acknowledge_incident(incident_id: int, user_id: int) -> None:
now = datetime.now(timezone.utc)
db.execute(
"UPDATE Incident SET status='acknowledged', ack_at=%s, assignee_id=%s WHERE id=%s",
(now, user_id, incident_id)
)
_add_event(incident_id, "acknowledged", actor_id=user_id)
def resolve_incident(incident_id: int, user_id: int, summary: str = "") -> str:
now = datetime.now(timezone.utc)
db.execute(
"UPDATE Incident SET status='resolved', resolved_at=%s WHERE id=%s",
(now, incident_id)
)
_add_event(incident_id, "resolved", actor_id=user_id, payload={"summary": summary})
return generate_postmortem_template(incident_id)
def generate_postmortem_template(incident_id: int) -> str:
"""Generate a Markdown post-mortem template from the incident timeline."""
incident = db.fetchone("SELECT * FROM Incident WHERE id=%s", (incident_id,))
events = db.fetchall(
"SELECT event_type, actor_id, payload, recorded_at FROM IncidentEvent WHERE incident_id=%s ORDER BY recorded_at ASC",
(incident_id,)
)
duration = ""
if incident["resolved_at"] and incident["opened_at"]:
delta = incident["resolved_at"] - incident["opened_at"]
duration = str(delta)
ttd = ""
if incident["ack_at"] and incident["opened_at"]:
ttd = str(incident["ack_at"] - incident["opened_at"])
timeline_lines = []
for e in events:
ts = e["recorded_at"].strftime("%H:%M:%S") if hasattr(e["recorded_at"], "strftime") else str(e["recorded_at"])
timeline_lines.append(f"- `{ts}` **{e['event_type']}**")
timeline_md = "n".join(timeline_lines)
template = f"""# Post-Mortem: {incident['title']}
**Severity:** {incident['severity']}
**Duration:** {duration}
**Time to Acknowledge:** {ttd}
**Runbook:** {incident.get('runbook_url') or 'N/A'}
## Summary
[1-2 sentence description of what happened and the user impact]
## Timeline
{timeline_md}
## Root Cause
[Describe the root cause]
## Contributing Factors
- [Factor 1]
- [Factor 2]
## Resolution
[What action resolved the incident]
## Action Items
| Action | Owner | Due Date |
|--------|-------|----------|
| [TODO] | @ | YYYY-MM-DD |
## Lessons Learned
[What went well, what could be improved]
"""
return template
def _compute_fingerprint(labels: dict) -> str:
canonical = json.dumps(labels, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
def _get_oncall_user(team: str, role: str = "primary") -> dict | None:
now = datetime.now(timezone.utc)
row = db.fetchone(
"""SELECT primary_user_id, secondary_user_id, override_user_id, override_until
FROM OnCallSchedule
WHERE team=%s AND rotation_start %s)
ORDER BY rotation_start DESC LIMIT 1""",
(team, now, now)
)
if not row:
return None
if row["override_user_id"] and (not row["override_until"] or row["override_until"] > now):
return {"user_id": row["override_user_id"]}
if role == "secondary" and row["secondary_user_id"]:
return {"user_id": row["secondary_user_id"]}
return {"user_id": row["primary_user_id"]}
def _get_manager(user_id: int | None) -> dict | None:
if not user_id:
return None
row = db.fetchone("SELECT manager_id FROM UserProfile WHERE user_id=%s", (user_id,))
return {"user_id": row["manager_id"]} if row and row["manager_id"] else None
def _page_oncall(incident_id: int, severity: str, oncall: dict | None, title: str) -> None:
if not oncall:
return
channel = "sms" if severity == "P1" else "push"
pager.page(user_id=oncall["user_id"],
subject=f"[{severity}] Incident #{incident_id}: {title}",
channel=channel)
def _lookup_runbook(labels: dict) -> str | None:
service = labels.get("service") or labels.get("alertname", "")
row = db.fetchone("SELECT url FROM RunbookIndex WHERE service=%s LIMIT 1", (service,))
return row["url"] if row else None
def _schedule_escalation_check(incident_id: int, severity: str) -> None:
delay = ACK_TIMEOUT.get(severity, 3600)
# In production: enqueue a delayed job (Celery beat, SQS delay, etc.)
# escalate_incident.apply_async((incident_id,), countdown=delay)
pass
def _add_event(incident_id: int, event_type: str, actor_id: int = None, payload: dict = None) -> None:
db.execute(
"INSERT INTO IncidentEvent (incident_id, event_type, actor_id, payload) VALUES (%s,%s,%s,%s)",
(incident_id, event_type, actor_id, json.dumps(payload) if payload else None)
)
Escalation Policy
Each severity level has an EscalationPolicy with an ordered list of steps. Each step specifies a delay (seconds after the previous step) and a target (oncall_primary, oncall_secondary, manager). A background job (Celery beat, SQS delayed message) fires at the delay for each step. If the incident is acknowledged before the delay fires, the escalation job skips by checking status at execution time.
Post-Mortem Generation
On resolution, the system assembles the full IncidentEvent timeline and computes key metrics: total duration, time-to-acknowledge (TTA), time-to-mitigate (TTM). These are interpolated into a Markdown template with empty sections for root cause, contributing factors, and action items, which the incident commander fills in. The template is persisted as a document and linked back to the incident record.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety