Approval Workflow System: Low-Level Design
An approval workflow routes a request — a purchase order, a content post, a contract, a refund — through one or more human approvers before it takes effect. The system must track who approved or rejected at each step, enforce ordering (step 2 cannot start until step 1 is approved), handle delegation (approver is on vacation), send notifications, and provide an audit trail. This design covers the workflow definition model, the state machine for approval routing, and the delegation/escalation logic.
Core Data Model
-- Workflow template: defines steps, approvers, and ordering
CREATE TABLE WorkflowTemplate (
template_id SERIAL PRIMARY KEY,
template_key VARCHAR(100) UNIQUE NOT NULL, -- 'purchase_order', 'content_publish'
description TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE WorkflowStep (
step_id SERIAL PRIMARY KEY,
template_id INT NOT NULL REFERENCES WorkflowTemplate(template_id),
step_order SMALLINT NOT NULL,
step_name VARCHAR(200) NOT NULL, -- 'Manager Approval', 'Finance Review'
approver_type VARCHAR(30) NOT NULL, -- role, user, dynamic, any_of
approver_value VARCHAR(200), -- role name, user_id, or expression
required_count SMALLINT NOT NULL DEFAULT 1, -- # approvals needed (for any_of)
sla_hours INT NOT NULL DEFAULT 48, -- escalate after this many hours
UNIQUE (template_id, step_order)
);
-- A specific request moving through a workflow
CREATE TABLE WorkflowInstance (
instance_id BIGSERIAL PRIMARY KEY,
template_id INT NOT NULL REFERENCES WorkflowTemplate(template_id),
entity_type VARCHAR(100) NOT NULL, -- 'purchase_order', 'blog_post'
entity_id BIGINT NOT NULL,
requested_by BIGINT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, in_progress, approved, rejected, cancelled
current_step_id INT REFERENCES WorkflowStep(step_id),
metadata JSONB NOT NULL DEFAULT '{}', -- context for dynamic approver resolution
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
UNIQUE (entity_type, entity_id) -- one active workflow per entity
);
CREATE TABLE ApprovalDecision (
decision_id BIGSERIAL PRIMARY KEY,
instance_id BIGINT NOT NULL REFERENCES WorkflowInstance(instance_id),
step_id INT NOT NULL REFERENCES WorkflowStep(step_id),
approver_id BIGINT NOT NULL,
decision VARCHAR(20) NOT NULL, -- approved, rejected, delegated
comment TEXT,
delegated_to BIGINT, -- if decision=delegated
decided_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE WorkflowNotification (
notification_id BIGSERIAL PRIMARY KEY,
instance_id BIGINT NOT NULL,
step_id INT NOT NULL,
recipient_id BIGINT NOT NULL,
notification_type VARCHAR(30) NOT NULL, -- assigned, reminder, escalated, decided
sent_at TIMESTAMPTZ,
due_at TIMESTAMPTZ NOT NULL -- for SLA tracking
);
CREATE INDEX ON WorkflowInstance(status, current_step_id) WHERE status='in_progress';
CREATE INDEX ON WorkflowNotification(due_at) WHERE sent_at IS NULL;
CREATE INDEX ON ApprovalDecision(instance_id, step_id);
Workflow State Machine
import datetime
from typing import Optional, List
def start_workflow(template_key: str, entity_type: str, entity_id: int,
requested_by: int, metadata: dict = None) -> int:
"""
Create a new workflow instance and activate the first step.
"""
template = db.fetchone(
"SELECT template_id FROM WorkflowTemplate WHERE template_key=%s AND is_active=TRUE",
(template_key,)
)
if not template:
raise ValueError(f"No active template: {template_key}")
first_step = db.fetchone("""
SELECT step_id FROM WorkflowStep
WHERE template_id=%s ORDER BY step_order ASC LIMIT 1
""", (template['template_id'],))
instance_id = db.fetchone("""
INSERT INTO WorkflowInstance
(template_id, entity_type, entity_id, requested_by, status,
current_step_id, metadata)
VALUES (%s, %s, %s, %s, 'in_progress', %s, %s)
RETURNING instance_id
""", (
template['template_id'], entity_type, entity_id, requested_by,
first_step['step_id'] if first_step else None,
json.dumps(metadata or {})
))['instance_id']
if first_step:
_activate_step(instance_id, first_step['step_id'])
return instance_id
def _activate_step(instance_id: int, step_id: int):
"""Assign approvers, send notifications, set SLA deadline."""
step = db.fetchone("""
SELECT * FROM WorkflowStep WHERE step_id=%s
""", (step_id,))
instance = db.fetchone(
"SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,)
)
approvers = _resolve_approvers(step, instance)
due_at = datetime.datetime.utcnow() + datetime.timedelta(hours=step['sla_hours'])
for approver_id in approvers:
db.execute("""
INSERT INTO WorkflowNotification
(instance_id, step_id, recipient_id, notification_type, due_at)
VALUES (%s, %s, %s, 'assigned', %s)
""", (instance_id, step_id, approver_id, due_at))
_send_notification(approver_id, instance_id, step_id, 'assigned')
def _resolve_approvers(step: dict, instance: dict) -> List[int]:
"""
Resolve approver_type to a list of user IDs.
dynamic: evaluate an expression against instance metadata.
"""
t = step['approver_type']
val = step['approver_value']
if t == 'user':
return [int(val)]
elif t == 'role':
rows = db.fetchall("SELECT user_id FROM UserRole WHERE role=%s", (val,))
return [r['user_id'] for r in rows]
elif t == 'dynamic':
# val is a dotted path into instance.metadata, e.g. "manager_id"
meta = instance['metadata'] or {}
resolved = meta.get(val)
return [int(resolved)] if resolved else []
elif t == 'any_of':
# val is a comma-separated list of user_ids; any required_count can approve
return [int(uid) for uid in val.split(',')]
return []
Approve / Reject
def decide(instance_id: int, step_id: int, approver_id: int,
decision: str, comment: str = None, delegate_to: int = None):
"""
Record an approval decision and advance the workflow.
decision: 'approved', 'rejected', or 'delegated'
"""
instance = db.fetchone(
"SELECT * FROM WorkflowInstance WHERE instance_id=%s AND status='in_progress'",
(instance_id,)
)
if not instance or instance['current_step_id'] != step_id:
raise WorkflowStateError("Instance not at this step")
# Verify this approver is valid for the step
step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (step_id,))
valid_approvers = _resolve_approvers(step, instance)
if approver_id not in valid_approvers:
# Check delegations: was this approver delegated to?
delegated = db.fetchone("""
SELECT 1 FROM ApprovalDecision
WHERE instance_id=%s AND step_id=%s AND decision='delegated'
AND delegated_to=%s
""", (instance_id, step_id, approver_id))
if not delegated:
raise PermissionError(f"User {approver_id} is not an approver for this step")
db.execute("""
INSERT INTO ApprovalDecision
(instance_id, step_id, approver_id, decision, comment, delegated_to)
VALUES (%s, %s, %s, %s, %s, %s)
""", (instance_id, step_id, approver_id, decision, comment, delegate_to))
if decision == 'rejected':
db.execute("""
UPDATE WorkflowInstance SET status='rejected', completed_at=NOW()
WHERE instance_id=%s
""", (instance_id,))
_notify_requester(instance_id, 'rejected', comment)
elif decision == 'delegated':
# Add delegated user as a valid approver for this step
_send_notification(delegate_to, instance_id, step_id, 'assigned')
elif decision == 'approved':
# Check if required_count approvals are met
approval_count = db.fetchone("""
SELECT COUNT(*) AS cnt FROM ApprovalDecision
WHERE instance_id=%s AND step_id=%s AND decision='approved'
""", (instance_id, step_id))['cnt']
if approval_count >= step['required_count']:
_advance_workflow(instance_id, step_id)
def _advance_workflow(instance_id: int, completed_step_id: int):
"""Move to the next step or mark the workflow complete."""
current_step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (completed_step_id,))
next_step = db.fetchone("""
SELECT step_id FROM WorkflowStep
WHERE template_id=%s AND step_order > %s
ORDER BY step_order ASC LIMIT 1
""", (current_step['template_id'], current_step['step_order']))
if next_step:
db.execute("""
UPDATE WorkflowInstance SET current_step_id=%s WHERE instance_id=%s
""", (next_step['step_id'], instance_id))
_activate_step(instance_id, next_step['step_id'])
else:
# All steps approved — workflow complete
db.execute("""
UPDATE WorkflowInstance SET status='approved', completed_at=NOW()
WHERE instance_id=%s
""", (instance_id,))
_notify_requester(instance_id, 'approved', None)
_execute_workflow_action(instance_id)
def _execute_workflow_action(instance_id: int):
"""Trigger the downstream action after full approval."""
instance = db.fetchone("SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,))
# Enqueue a job to actually perform the approved action
enqueue('execute_approved_workflow', {
'entity_type': instance['entity_type'],
'entity_id': instance['entity_id'],
})
Key Design Decisions
- Template-driven steps: approval logic lives in WorkflowTemplate/WorkflowStep rows, not in code. Adding a new approval level (e.g., adding a CFO review above $50K) is a database insert, not a code deploy. The dynamic approver_type lets the approver be resolved at runtime from the entity’s context (e.g., the submitting user’s manager, fetched from instance.metadata.manager_id).
- required_count for consensus approvals: an any_of step with required_count=2 and 5 listed approvers requires any 2 of the 5 to approve. This models committee approvals, quorum decisions, and dual-control requirements (two-person integrity for financial transactions).
- Delegation as a decision type: rather than modifying the approver list, delegation is recorded as a decision record. The delegated user is checked against ApprovalDecision.delegated_to in the permission check. This preserves the audit trail: the original approver delegated, to whom, and when.
- SLA notification queue: WorkflowNotification.due_at enables a cron job to query WHERE sent_at IS NULL AND due_at <= NOW() and send reminder or escalation notifications. Escalation: after SLA expires, re-assign the step to the approver’s manager (resolve from HR system) and send an escalation alert.
Approval workflow and content review system design is discussed in Atlassian system design interview questions.
Approval workflow and merchant review system design is covered in Shopify system design interview preparation.
Approval workflow and compliance review design is discussed in Coinbase system design interview guide.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems