Approval Workflow System Low-Level Design: Multi-Step Routing, Delegation, Escalation, and Audit Trail

Approval Workflow System: Low-Level Design

An approval workflow routes a request — a purchase order, a content post, a contract, a refund — through one or more human approvers before it takes effect. The system must track who approved or rejected at each step, enforce ordering (step 2 cannot start until step 1 is approved), handle delegation (approver is on vacation), send notifications, and provide an audit trail. This design covers the workflow definition model, the state machine for approval routing, and the delegation/escalation logic.

Core Data Model

-- Workflow template: defines steps, approvers, and ordering
CREATE TABLE WorkflowTemplate (
    template_id    SERIAL PRIMARY KEY,
    template_key   VARCHAR(100) UNIQUE NOT NULL,  -- 'purchase_order', 'content_publish'
    description    TEXT,
    is_active      BOOLEAN NOT NULL DEFAULT TRUE,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE WorkflowStep (
    step_id        SERIAL PRIMARY KEY,
    template_id    INT NOT NULL REFERENCES WorkflowTemplate(template_id),
    step_order     SMALLINT NOT NULL,
    step_name      VARCHAR(200) NOT NULL,          -- 'Manager Approval', 'Finance Review'
    approver_type  VARCHAR(30) NOT NULL,           -- role, user, dynamic, any_of
    approver_value VARCHAR(200),                   -- role name, user_id, or expression
    required_count SMALLINT NOT NULL DEFAULT 1,    -- # approvals needed (for any_of)
    sla_hours      INT NOT NULL DEFAULT 48,        -- escalate after this many hours
    UNIQUE (template_id, step_order)
);

-- A specific request moving through a workflow
CREATE TABLE WorkflowInstance (
    instance_id    BIGSERIAL PRIMARY KEY,
    template_id    INT NOT NULL REFERENCES WorkflowTemplate(template_id),
    entity_type    VARCHAR(100) NOT NULL,   -- 'purchase_order', 'blog_post'
    entity_id      BIGINT NOT NULL,
    requested_by   BIGINT NOT NULL,
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, in_progress, approved, rejected, cancelled
    current_step_id INT REFERENCES WorkflowStep(step_id),
    metadata       JSONB NOT NULL DEFAULT '{}',  -- context for dynamic approver resolution
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at   TIMESTAMPTZ,
    UNIQUE (entity_type, entity_id)  -- one active workflow per entity
);

CREATE TABLE ApprovalDecision (
    decision_id    BIGSERIAL PRIMARY KEY,
    instance_id    BIGINT NOT NULL REFERENCES WorkflowInstance(instance_id),
    step_id        INT NOT NULL REFERENCES WorkflowStep(step_id),
    approver_id    BIGINT NOT NULL,
    decision       VARCHAR(20) NOT NULL,   -- approved, rejected, delegated
    comment        TEXT,
    delegated_to   BIGINT,                 -- if decision=delegated
    decided_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE WorkflowNotification (
    notification_id BIGSERIAL PRIMARY KEY,
    instance_id    BIGINT NOT NULL,
    step_id        INT NOT NULL,
    recipient_id   BIGINT NOT NULL,
    notification_type VARCHAR(30) NOT NULL,  -- assigned, reminder, escalated, decided
    sent_at        TIMESTAMPTZ,
    due_at         TIMESTAMPTZ NOT NULL       -- for SLA tracking
);

CREATE INDEX ON WorkflowInstance(status, current_step_id) WHERE status='in_progress';
CREATE INDEX ON WorkflowNotification(due_at) WHERE sent_at IS NULL;
CREATE INDEX ON ApprovalDecision(instance_id, step_id);

Workflow State Machine

import datetime
from typing import Optional, List

def start_workflow(template_key: str, entity_type: str, entity_id: int,
                   requested_by: int, metadata: dict = None) -> int:
    """
    Create a new workflow instance and activate the first step.
    """
    template = db.fetchone(
        "SELECT template_id FROM WorkflowTemplate WHERE template_key=%s AND is_active=TRUE",
        (template_key,)
    )
    if not template:
        raise ValueError(f"No active template: {template_key}")

    first_step = db.fetchone("""
        SELECT step_id FROM WorkflowStep
        WHERE template_id=%s ORDER BY step_order ASC LIMIT 1
    """, (template['template_id'],))

    instance_id = db.fetchone("""
        INSERT INTO WorkflowInstance
            (template_id, entity_type, entity_id, requested_by, status,
             current_step_id, metadata)
        VALUES (%s, %s, %s, %s, 'in_progress', %s, %s)
        RETURNING instance_id
    """, (
        template['template_id'], entity_type, entity_id, requested_by,
        first_step['step_id'] if first_step else None,
        json.dumps(metadata or {})
    ))['instance_id']

    if first_step:
        _activate_step(instance_id, first_step['step_id'])

    return instance_id

def _activate_step(instance_id: int, step_id: int):
    """Assign approvers, send notifications, set SLA deadline."""
    step = db.fetchone("""
        SELECT * FROM WorkflowStep WHERE step_id=%s
    """, (step_id,))
    instance = db.fetchone(
        "SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,)
    )

    approvers = _resolve_approvers(step, instance)
    due_at = datetime.datetime.utcnow() + datetime.timedelta(hours=step['sla_hours'])

    for approver_id in approvers:
        db.execute("""
            INSERT INTO WorkflowNotification
                (instance_id, step_id, recipient_id, notification_type, due_at)
            VALUES (%s, %s, %s, 'assigned', %s)
        """, (instance_id, step_id, approver_id, due_at))
        _send_notification(approver_id, instance_id, step_id, 'assigned')

def _resolve_approvers(step: dict, instance: dict) -> List[int]:
    """
    Resolve approver_type to a list of user IDs.
    dynamic: evaluate an expression against instance metadata.
    """
    t = step['approver_type']
    val = step['approver_value']

    if t == 'user':
        return [int(val)]
    elif t == 'role':
        rows = db.fetchall("SELECT user_id FROM UserRole WHERE role=%s", (val,))
        return [r['user_id'] for r in rows]
    elif t == 'dynamic':
        # val is a dotted path into instance.metadata, e.g. "manager_id"
        meta = instance['metadata'] or {}
        resolved = meta.get(val)
        return [int(resolved)] if resolved else []
    elif t == 'any_of':
        # val is a comma-separated list of user_ids; any required_count can approve
        return [int(uid) for uid in val.split(',')]
    return []

Approve / Reject

def decide(instance_id: int, step_id: int, approver_id: int,
           decision: str, comment: str = None, delegate_to: int = None):
    """
    Record an approval decision and advance the workflow.
    decision: 'approved', 'rejected', or 'delegated'
    """
    instance = db.fetchone(
        "SELECT * FROM WorkflowInstance WHERE instance_id=%s AND status='in_progress'",
        (instance_id,)
    )
    if not instance or instance['current_step_id'] != step_id:
        raise WorkflowStateError("Instance not at this step")

    # Verify this approver is valid for the step
    step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (step_id,))
    valid_approvers = _resolve_approvers(step, instance)
    if approver_id not in valid_approvers:
        # Check delegations: was this approver delegated to?
        delegated = db.fetchone("""
            SELECT 1 FROM ApprovalDecision
            WHERE instance_id=%s AND step_id=%s AND decision='delegated'
              AND delegated_to=%s
        """, (instance_id, step_id, approver_id))
        if not delegated:
            raise PermissionError(f"User {approver_id} is not an approver for this step")

    db.execute("""
        INSERT INTO ApprovalDecision
            (instance_id, step_id, approver_id, decision, comment, delegated_to)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (instance_id, step_id, approver_id, decision, comment, delegate_to))

    if decision == 'rejected':
        db.execute("""
            UPDATE WorkflowInstance SET status='rejected', completed_at=NOW()
            WHERE instance_id=%s
        """, (instance_id,))
        _notify_requester(instance_id, 'rejected', comment)

    elif decision == 'delegated':
        # Add delegated user as a valid approver for this step
        _send_notification(delegate_to, instance_id, step_id, 'assigned')

    elif decision == 'approved':
        # Check if required_count approvals are met
        approval_count = db.fetchone("""
            SELECT COUNT(*) AS cnt FROM ApprovalDecision
            WHERE instance_id=%s AND step_id=%s AND decision='approved'
        """, (instance_id, step_id))['cnt']

        if approval_count >= step['required_count']:
            _advance_workflow(instance_id, step_id)

def _advance_workflow(instance_id: int, completed_step_id: int):
    """Move to the next step or mark the workflow complete."""
    current_step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (completed_step_id,))

    next_step = db.fetchone("""
        SELECT step_id FROM WorkflowStep
        WHERE template_id=%s AND step_order > %s
        ORDER BY step_order ASC LIMIT 1
    """, (current_step['template_id'], current_step['step_order']))

    if next_step:
        db.execute("""
            UPDATE WorkflowInstance SET current_step_id=%s WHERE instance_id=%s
        """, (next_step['step_id'], instance_id))
        _activate_step(instance_id, next_step['step_id'])
    else:
        # All steps approved — workflow complete
        db.execute("""
            UPDATE WorkflowInstance SET status='approved', completed_at=NOW()
            WHERE instance_id=%s
        """, (instance_id,))
        _notify_requester(instance_id, 'approved', None)
        _execute_workflow_action(instance_id)

def _execute_workflow_action(instance_id: int):
    """Trigger the downstream action after full approval."""
    instance = db.fetchone("SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,))
    # Enqueue a job to actually perform the approved action
    enqueue('execute_approved_workflow', {
        'entity_type': instance['entity_type'],
        'entity_id': instance['entity_id'],
    })

Key Design Decisions

  • Template-driven steps: approval logic lives in WorkflowTemplate/WorkflowStep rows, not in code. Adding a new approval level (e.g., adding a CFO review above $50K) is a database insert, not a code deploy. The dynamic approver_type lets the approver be resolved at runtime from the entity’s context (e.g., the submitting user’s manager, fetched from instance.metadata.manager_id).
  • required_count for consensus approvals: an any_of step with required_count=2 and 5 listed approvers requires any 2 of the 5 to approve. This models committee approvals, quorum decisions, and dual-control requirements (two-person integrity for financial transactions).
  • Delegation as a decision type: rather than modifying the approver list, delegation is recorded as a decision record. The delegated user is checked against ApprovalDecision.delegated_to in the permission check. This preserves the audit trail: the original approver delegated, to whom, and when.
  • SLA notification queue: WorkflowNotification.due_at enables a cron job to query WHERE sent_at IS NULL AND due_at <= NOW() and send reminder or escalation notifications. Escalation: after SLA expires, re-assign the step to the approver’s manager (resolve from HR system) and send an escalation alert.

Approval workflow and content review system design is discussed in Atlassian system design interview questions.

Approval workflow and merchant review system design is covered in Shopify system design interview preparation.

Approval workflow and compliance review design is discussed in Coinbase system design interview guide.

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems

Scroll to Top