Approval Workflow System Low-Level Design: Multi-Step Routing, Delegation, Escalation, and Audit Trail

Approval Workflow System: Low-Level Design

An approval workflow routes a request — a purchase order, a content post, a contract, a refund — through one or more human approvers before it takes effect. The system must track who approved or rejected at each step, enforce ordering (step 2 cannot start until step 1 is approved), handle delegation (approver is on vacation), send notifications, and provide an audit trail. This design covers the workflow definition model, the state machine for approval routing, and the delegation/escalation logic.

Core Data Model

-- Workflow template: defines steps, approvers, and ordering
CREATE TABLE WorkflowTemplate (
    template_id    SERIAL PRIMARY KEY,
    template_key   VARCHAR(100) UNIQUE NOT NULL,  -- 'purchase_order', 'content_publish'
    description    TEXT,
    is_active      BOOLEAN NOT NULL DEFAULT TRUE,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE WorkflowStep (
    step_id        SERIAL PRIMARY KEY,
    template_id    INT NOT NULL REFERENCES WorkflowTemplate(template_id),
    step_order     SMALLINT NOT NULL,
    step_name      VARCHAR(200) NOT NULL,          -- 'Manager Approval', 'Finance Review'
    approver_type  VARCHAR(30) NOT NULL,           -- role, user, dynamic, any_of
    approver_value VARCHAR(200),                   -- role name, user_id, or expression
    required_count SMALLINT NOT NULL DEFAULT 1,    -- # approvals needed (for any_of)
    sla_hours      INT NOT NULL DEFAULT 48,        -- escalate after this many hours
    UNIQUE (template_id, step_order)
);

-- A specific request moving through a workflow
CREATE TABLE WorkflowInstance (
    instance_id    BIGSERIAL PRIMARY KEY,
    template_id    INT NOT NULL REFERENCES WorkflowTemplate(template_id),
    entity_type    VARCHAR(100) NOT NULL,   -- 'purchase_order', 'blog_post'
    entity_id      BIGINT NOT NULL,
    requested_by   BIGINT NOT NULL,
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, in_progress, approved, rejected, cancelled
    current_step_id INT REFERENCES WorkflowStep(step_id),
    metadata       JSONB NOT NULL DEFAULT '{}',  -- context for dynamic approver resolution
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at   TIMESTAMPTZ,
    UNIQUE (entity_type, entity_id)  -- one active workflow per entity
);

CREATE TABLE ApprovalDecision (
    decision_id    BIGSERIAL PRIMARY KEY,
    instance_id    BIGINT NOT NULL REFERENCES WorkflowInstance(instance_id),
    step_id        INT NOT NULL REFERENCES WorkflowStep(step_id),
    approver_id    BIGINT NOT NULL,
    decision       VARCHAR(20) NOT NULL,   -- approved, rejected, delegated
    comment        TEXT,
    delegated_to   BIGINT,                 -- if decision=delegated
    decided_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE WorkflowNotification (
    notification_id BIGSERIAL PRIMARY KEY,
    instance_id    BIGINT NOT NULL,
    step_id        INT NOT NULL,
    recipient_id   BIGINT NOT NULL,
    notification_type VARCHAR(30) NOT NULL,  -- assigned, reminder, escalated, decided
    sent_at        TIMESTAMPTZ,
    due_at         TIMESTAMPTZ NOT NULL       -- for SLA tracking
);

CREATE INDEX ON WorkflowInstance(status, current_step_id) WHERE status='in_progress';
CREATE INDEX ON WorkflowNotification(due_at) WHERE sent_at IS NULL;
CREATE INDEX ON ApprovalDecision(instance_id, step_id);

Workflow State Machine

import datetime
from typing import Optional, List

def start_workflow(template_key: str, entity_type: str, entity_id: int,
                   requested_by: int, metadata: dict = None) -> int:
    """
    Create a new workflow instance and activate the first step.
    """
    template = db.fetchone(
        "SELECT template_id FROM WorkflowTemplate WHERE template_key=%s AND is_active=TRUE",
        (template_key,)
    )
    if not template:
        raise ValueError(f"No active template: {template_key}")

    first_step = db.fetchone("""
        SELECT step_id FROM WorkflowStep
        WHERE template_id=%s ORDER BY step_order ASC LIMIT 1
    """, (template['template_id'],))

    instance_id = db.fetchone("""
        INSERT INTO WorkflowInstance
            (template_id, entity_type, entity_id, requested_by, status,
             current_step_id, metadata)
        VALUES (%s, %s, %s, %s, 'in_progress', %s, %s)
        RETURNING instance_id
    """, (
        template['template_id'], entity_type, entity_id, requested_by,
        first_step['step_id'] if first_step else None,
        json.dumps(metadata or {})
    ))['instance_id']

    if first_step:
        _activate_step(instance_id, first_step['step_id'])

    return instance_id

def _activate_step(instance_id: int, step_id: int):
    """Assign approvers, send notifications, set SLA deadline."""
    step = db.fetchone("""
        SELECT * FROM WorkflowStep WHERE step_id=%s
    """, (step_id,))
    instance = db.fetchone(
        "SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,)
    )

    approvers = _resolve_approvers(step, instance)
    due_at = datetime.datetime.utcnow() + datetime.timedelta(hours=step['sla_hours'])

    for approver_id in approvers:
        db.execute("""
            INSERT INTO WorkflowNotification
                (instance_id, step_id, recipient_id, notification_type, due_at)
            VALUES (%s, %s, %s, 'assigned', %s)
        """, (instance_id, step_id, approver_id, due_at))
        _send_notification(approver_id, instance_id, step_id, 'assigned')

def _resolve_approvers(step: dict, instance: dict) -> List[int]:
    """
    Resolve approver_type to a list of user IDs.
    dynamic: evaluate an expression against instance metadata.
    """
    t = step['approver_type']
    val = step['approver_value']

    if t == 'user':
        return [int(val)]
    elif t == 'role':
        rows = db.fetchall("SELECT user_id FROM UserRole WHERE role=%s", (val,))
        return [r['user_id'] for r in rows]
    elif t == 'dynamic':
        # val is a dotted path into instance.metadata, e.g. "manager_id"
        meta = instance['metadata'] or {}
        resolved = meta.get(val)
        return [int(resolved)] if resolved else []
    elif t == 'any_of':
        # val is a comma-separated list of user_ids; any required_count can approve
        return [int(uid) for uid in val.split(',')]
    return []

Approve / Reject

def decide(instance_id: int, step_id: int, approver_id: int,
           decision: str, comment: str = None, delegate_to: int = None):
    """
    Record an approval decision and advance the workflow.
    decision: 'approved', 'rejected', or 'delegated'
    """
    instance = db.fetchone(
        "SELECT * FROM WorkflowInstance WHERE instance_id=%s AND status='in_progress'",
        (instance_id,)
    )
    if not instance or instance['current_step_id'] != step_id:
        raise WorkflowStateError("Instance not at this step")

    # Verify this approver is valid for the step
    step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (step_id,))
    valid_approvers = _resolve_approvers(step, instance)
    if approver_id not in valid_approvers:
        # Check delegations: was this approver delegated to?
        delegated = db.fetchone("""
            SELECT 1 FROM ApprovalDecision
            WHERE instance_id=%s AND step_id=%s AND decision='delegated'
              AND delegated_to=%s
        """, (instance_id, step_id, approver_id))
        if not delegated:
            raise PermissionError(f"User {approver_id} is not an approver for this step")

    db.execute("""
        INSERT INTO ApprovalDecision
            (instance_id, step_id, approver_id, decision, comment, delegated_to)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (instance_id, step_id, approver_id, decision, comment, delegate_to))

    if decision == 'rejected':
        db.execute("""
            UPDATE WorkflowInstance SET status='rejected', completed_at=NOW()
            WHERE instance_id=%s
        """, (instance_id,))
        _notify_requester(instance_id, 'rejected', comment)

    elif decision == 'delegated':
        # Add delegated user as a valid approver for this step
        _send_notification(delegate_to, instance_id, step_id, 'assigned')

    elif decision == 'approved':
        # Check if required_count approvals are met
        approval_count = db.fetchone("""
            SELECT COUNT(*) AS cnt FROM ApprovalDecision
            WHERE instance_id=%s AND step_id=%s AND decision='approved'
        """, (instance_id, step_id))['cnt']

        if approval_count >= step['required_count']:
            _advance_workflow(instance_id, step_id)

def _advance_workflow(instance_id: int, completed_step_id: int):
    """Move to the next step or mark the workflow complete."""
    current_step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (completed_step_id,))

    next_step = db.fetchone("""
        SELECT step_id FROM WorkflowStep
        WHERE template_id=%s AND step_order > %s
        ORDER BY step_order ASC LIMIT 1
    """, (current_step['template_id'], current_step['step_order']))

    if next_step:
        db.execute("""
            UPDATE WorkflowInstance SET current_step_id=%s WHERE instance_id=%s
        """, (next_step['step_id'], instance_id))
        _activate_step(instance_id, next_step['step_id'])
    else:
        # All steps approved — workflow complete
        db.execute("""
            UPDATE WorkflowInstance SET status='approved', completed_at=NOW()
            WHERE instance_id=%s
        """, (instance_id,))
        _notify_requester(instance_id, 'approved', None)
        _execute_workflow_action(instance_id)

def _execute_workflow_action(instance_id: int):
    """Trigger the downstream action after full approval."""
    instance = db.fetchone("SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,))
    # Enqueue a job to actually perform the approved action
    enqueue('execute_approved_workflow', {
        'entity_type': instance['entity_type'],
        'entity_id': instance['entity_id'],
    })

Key Design Decisions

  • Template-driven steps: approval logic lives in WorkflowTemplate/WorkflowStep rows, not in code. Adding a new approval level (e.g., adding a CFO review above $50K) is a database insert, not a code deploy. The dynamic approver_type lets the approver be resolved at runtime from the entity’s context (e.g., the submitting user’s manager, fetched from instance.metadata.manager_id).
  • required_count for consensus approvals: an any_of step with required_count=2 and 5 listed approvers requires any 2 of the 5 to approve. This models committee approvals, quorum decisions, and dual-control requirements (two-person integrity for financial transactions).
  • Delegation as a decision type: rather than modifying the approver list, delegation is recorded as a decision record. The delegated user is checked against ApprovalDecision.delegated_to in the permission check. This preserves the audit trail: the original approver delegated, to whom, and when.
  • SLA notification queue: WorkflowNotification.due_at enables a cron job to query WHERE sent_at IS NULL AND due_at <= NOW() and send reminder or escalation notifications. Escalation: after SLA expires, re-assign the step to the approver’s manager (resolve from HR system) and send an escalation alert.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you enforce that an approver cannot approve their own request?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Self-approval prevention: when resolving approvers for a step, filter out the requesting user. In _resolve_approvers(), after building the approvers list, check: if instance[‘requested_by’] in approvers and len(approvers) > 1: approvers.remove(instance[‘requested_by’]). If the requester is the only available approver (e.g., the only user in a role), escalate to their manager rather than allowing self-approval. Record this as an auto-escalation in WorkflowNotification. For the decide() function: add an explicit check at the top — if approver_id == instance[‘requested_by’]: raise SelfApprovalError(). Log the attempt in the audit trail. This enforcement is mandatory for financial controls (purchase orders, expense reimbursements) where self-approval would constitute a control bypass. For compliance reporting: query SELECT COUNT(*) FROM ApprovalDecision WHERE approver_id = (SELECT requested_by FROM WorkflowInstance WHERE instance_id=d.instance_id) — should always return zero.”}},{“@type”:”Question”,”name”:”How do you handle an approver who is on vacation or unavailable?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Three mechanisms: (1) Out-of-office delegation: a user sets a delegation in their profile (delegate_to_user_id, active_from, active_until). When resolving approvers, check if the resolved approver has an active delegation: if yes, add the delegate to the approver list. Both the original approver and the delegate can approve. (2) Manual delegation via the decide() decision=’delegated’ path: the approver explicitly reassigns the step to another user mid-workflow. (3) SLA-based escalation: if the WorkflowNotification.due_at passes without a decision, the escalation job auto-assigns the step to the approver’s manager (fetched from HR system or user.manager_id). Log escalation in WorkflowNotification with notification_type=’escalated’. Design principle: always have a fallback escalation path so that no workflow can be permanently blocked by a single unavailable approver.”}},{“@type”:”Question”,”name”:”How do you implement conditional branching in a workflow (approve if amount < $1000, different path if above)?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Conditional routing adds a condition column to WorkflowStep: a JSON expression evaluated at runtime against the instance metadata. Example: step with approver_type=’user’, approver_value=’123′ and condition={‘amount_lt’: 1000} is only activated if instance.metadata.amount < 1000. Implementation in _advance_workflow(): after the current step completes, query all steps with step_order > current where condition is NULL (unconditional) or condition evaluates TRUE against instance.metadata. If two steps have the same step_order (parallel branches), activate both. A simpler approach: use separate workflow templates per path — WorkflowTemplate for ‘purchase_small’ (<$1K, 1-step) and ‘purchase_large’ (>$1K, 2-step). The calling code selects the template based on the amount at start_workflow() time. Template selection at the entry point is easier to reason about than runtime branching within a single template.”}},{“@type”:”Question”,”name”:”How do you build an audit trail that satisfies SOX or ISO 27001 compliance requirements?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Compliance audit trails require: (1) immutability — records cannot be modified after creation; (2) completeness — every state transition, decision, and notification must be logged; (3) non-repudiation — records must prove who did what and when. Implementation: ApprovalDecision rows are INSERT-only — no UPDATE, no DELETE (enforce via row-level security: GRANT INSERT ON ApprovalDecision TO app_user; no UPDATE or DELETE grants). Add a signature column: HMAC-SHA256(decision_id + approver_id + decision + decided_at, signing_key) — the signature proves the record was not altered after the fact. For export: audit reports are queried as SELECT di.*, wi.entity_type, wi.entity_id, wi.requested_by, ws.step_name FROM ApprovalDecision di JOIN WorkflowInstance wi USING(instance_id) JOIN WorkflowStep ws USING(step_id) WHERE wi.entity_id=%s ORDER BY di.decided_at. This provides the full chain of custody for any entity that passed through a workflow.”}},{“@type”:”Question”,”name”:”How do you handle a workflow that needs to be restarted after a rejection?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”After a rejection, the entity (purchase order, content post) may be revised and resubmitted for approval. Two patterns: (1) New instance: always create a new WorkflowInstance for a resubmission. The old instance’s status remains ‘rejected’ in the audit trail; the new instance starts fresh at step 1. Link them via a predecessor_instance_id foreign key for lineage tracking. (2) Reset and restart: UPDATE WorkflowInstance SET status=’in_progress’, current_step_id=$first_step_id, completed_at=NULL; delete WorkflowNotification rows for the prior run; call _activate_step again. Simpler but loses rejection history in the instance record. Pattern 1 is preferred for compliance — the rejection event is permanently recorded and the resubmission is a distinct, traceable event. The entity (e.g., PurchaseOrder) has a workflow_instance_id foreign key pointing to the most recent active instance; past instances are queryable by entity_id.”}}]}

Approval workflow and content review system design is discussed in Atlassian system design interview questions.

Approval workflow and merchant review system design is covered in Shopify system design interview preparation.

Approval workflow and compliance review design is discussed in Coinbase system design interview guide.

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems

Scroll to Top