Approval Workflow System: Low-Level Design
An approval workflow routes a request — a purchase order, a content post, a contract, a refund — through one or more human approvers before it takes effect. The system must track who approved or rejected at each step, enforce ordering (step 2 cannot start until step 1 is approved), handle delegation (approver is on vacation), send notifications, and provide an audit trail. This design covers the workflow definition model, the state machine for approval routing, and the delegation/escalation logic.
Core Data Model
-- Workflow template: defines steps, approvers, and ordering
CREATE TABLE WorkflowTemplate (
template_id SERIAL PRIMARY KEY,
template_key VARCHAR(100) UNIQUE NOT NULL, -- 'purchase_order', 'content_publish'
description TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE WorkflowStep (
step_id SERIAL PRIMARY KEY,
template_id INT NOT NULL REFERENCES WorkflowTemplate(template_id),
step_order SMALLINT NOT NULL,
step_name VARCHAR(200) NOT NULL, -- 'Manager Approval', 'Finance Review'
approver_type VARCHAR(30) NOT NULL, -- role, user, dynamic, any_of
approver_value VARCHAR(200), -- role name, user_id, or expression
required_count SMALLINT NOT NULL DEFAULT 1, -- # approvals needed (for any_of)
sla_hours INT NOT NULL DEFAULT 48, -- escalate after this many hours
UNIQUE (template_id, step_order)
);
-- A specific request moving through a workflow
CREATE TABLE WorkflowInstance (
instance_id BIGSERIAL PRIMARY KEY,
template_id INT NOT NULL REFERENCES WorkflowTemplate(template_id),
entity_type VARCHAR(100) NOT NULL, -- 'purchase_order', 'blog_post'
entity_id BIGINT NOT NULL,
requested_by BIGINT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, in_progress, approved, rejected, cancelled
current_step_id INT REFERENCES WorkflowStep(step_id),
metadata JSONB NOT NULL DEFAULT '{}', -- context for dynamic approver resolution
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
UNIQUE (entity_type, entity_id) -- one active workflow per entity
);
CREATE TABLE ApprovalDecision (
decision_id BIGSERIAL PRIMARY KEY,
instance_id BIGINT NOT NULL REFERENCES WorkflowInstance(instance_id),
step_id INT NOT NULL REFERENCES WorkflowStep(step_id),
approver_id BIGINT NOT NULL,
decision VARCHAR(20) NOT NULL, -- approved, rejected, delegated
comment TEXT,
delegated_to BIGINT, -- if decision=delegated
decided_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE WorkflowNotification (
notification_id BIGSERIAL PRIMARY KEY,
instance_id BIGINT NOT NULL,
step_id INT NOT NULL,
recipient_id BIGINT NOT NULL,
notification_type VARCHAR(30) NOT NULL, -- assigned, reminder, escalated, decided
sent_at TIMESTAMPTZ,
due_at TIMESTAMPTZ NOT NULL -- for SLA tracking
);
CREATE INDEX ON WorkflowInstance(status, current_step_id) WHERE status='in_progress';
CREATE INDEX ON WorkflowNotification(due_at) WHERE sent_at IS NULL;
CREATE INDEX ON ApprovalDecision(instance_id, step_id);
Workflow State Machine
import datetime
from typing import Optional, List
def start_workflow(template_key: str, entity_type: str, entity_id: int,
requested_by: int, metadata: dict = None) -> int:
"""
Create a new workflow instance and activate the first step.
"""
template = db.fetchone(
"SELECT template_id FROM WorkflowTemplate WHERE template_key=%s AND is_active=TRUE",
(template_key,)
)
if not template:
raise ValueError(f"No active template: {template_key}")
first_step = db.fetchone("""
SELECT step_id FROM WorkflowStep
WHERE template_id=%s ORDER BY step_order ASC LIMIT 1
""", (template['template_id'],))
instance_id = db.fetchone("""
INSERT INTO WorkflowInstance
(template_id, entity_type, entity_id, requested_by, status,
current_step_id, metadata)
VALUES (%s, %s, %s, %s, 'in_progress', %s, %s)
RETURNING instance_id
""", (
template['template_id'], entity_type, entity_id, requested_by,
first_step['step_id'] if first_step else None,
json.dumps(metadata or {})
))['instance_id']
if first_step:
_activate_step(instance_id, first_step['step_id'])
return instance_id
def _activate_step(instance_id: int, step_id: int):
"""Assign approvers, send notifications, set SLA deadline."""
step = db.fetchone("""
SELECT * FROM WorkflowStep WHERE step_id=%s
""", (step_id,))
instance = db.fetchone(
"SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,)
)
approvers = _resolve_approvers(step, instance)
due_at = datetime.datetime.utcnow() + datetime.timedelta(hours=step['sla_hours'])
for approver_id in approvers:
db.execute("""
INSERT INTO WorkflowNotification
(instance_id, step_id, recipient_id, notification_type, due_at)
VALUES (%s, %s, %s, 'assigned', %s)
""", (instance_id, step_id, approver_id, due_at))
_send_notification(approver_id, instance_id, step_id, 'assigned')
def _resolve_approvers(step: dict, instance: dict) -> List[int]:
"""
Resolve approver_type to a list of user IDs.
dynamic: evaluate an expression against instance metadata.
"""
t = step['approver_type']
val = step['approver_value']
if t == 'user':
return [int(val)]
elif t == 'role':
rows = db.fetchall("SELECT user_id FROM UserRole WHERE role=%s", (val,))
return [r['user_id'] for r in rows]
elif t == 'dynamic':
# val is a dotted path into instance.metadata, e.g. "manager_id"
meta = instance['metadata'] or {}
resolved = meta.get(val)
return [int(resolved)] if resolved else []
elif t == 'any_of':
# val is a comma-separated list of user_ids; any required_count can approve
return [int(uid) for uid in val.split(',')]
return []
Approve / Reject
def decide(instance_id: int, step_id: int, approver_id: int,
decision: str, comment: str = None, delegate_to: int = None):
"""
Record an approval decision and advance the workflow.
decision: 'approved', 'rejected', or 'delegated'
"""
instance = db.fetchone(
"SELECT * FROM WorkflowInstance WHERE instance_id=%s AND status='in_progress'",
(instance_id,)
)
if not instance or instance['current_step_id'] != step_id:
raise WorkflowStateError("Instance not at this step")
# Verify this approver is valid for the step
step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (step_id,))
valid_approvers = _resolve_approvers(step, instance)
if approver_id not in valid_approvers:
# Check delegations: was this approver delegated to?
delegated = db.fetchone("""
SELECT 1 FROM ApprovalDecision
WHERE instance_id=%s AND step_id=%s AND decision='delegated'
AND delegated_to=%s
""", (instance_id, step_id, approver_id))
if not delegated:
raise PermissionError(f"User {approver_id} is not an approver for this step")
db.execute("""
INSERT INTO ApprovalDecision
(instance_id, step_id, approver_id, decision, comment, delegated_to)
VALUES (%s, %s, %s, %s, %s, %s)
""", (instance_id, step_id, approver_id, decision, comment, delegate_to))
if decision == 'rejected':
db.execute("""
UPDATE WorkflowInstance SET status='rejected', completed_at=NOW()
WHERE instance_id=%s
""", (instance_id,))
_notify_requester(instance_id, 'rejected', comment)
elif decision == 'delegated':
# Add delegated user as a valid approver for this step
_send_notification(delegate_to, instance_id, step_id, 'assigned')
elif decision == 'approved':
# Check if required_count approvals are met
approval_count = db.fetchone("""
SELECT COUNT(*) AS cnt FROM ApprovalDecision
WHERE instance_id=%s AND step_id=%s AND decision='approved'
""", (instance_id, step_id))['cnt']
if approval_count >= step['required_count']:
_advance_workflow(instance_id, step_id)
def _advance_workflow(instance_id: int, completed_step_id: int):
"""Move to the next step or mark the workflow complete."""
current_step = db.fetchone("SELECT * FROM WorkflowStep WHERE step_id=%s", (completed_step_id,))
next_step = db.fetchone("""
SELECT step_id FROM WorkflowStep
WHERE template_id=%s AND step_order > %s
ORDER BY step_order ASC LIMIT 1
""", (current_step['template_id'], current_step['step_order']))
if next_step:
db.execute("""
UPDATE WorkflowInstance SET current_step_id=%s WHERE instance_id=%s
""", (next_step['step_id'], instance_id))
_activate_step(instance_id, next_step['step_id'])
else:
# All steps approved — workflow complete
db.execute("""
UPDATE WorkflowInstance SET status='approved', completed_at=NOW()
WHERE instance_id=%s
""", (instance_id,))
_notify_requester(instance_id, 'approved', None)
_execute_workflow_action(instance_id)
def _execute_workflow_action(instance_id: int):
"""Trigger the downstream action after full approval."""
instance = db.fetchone("SELECT * FROM WorkflowInstance WHERE instance_id=%s", (instance_id,))
# Enqueue a job to actually perform the approved action
enqueue('execute_approved_workflow', {
'entity_type': instance['entity_type'],
'entity_id': instance['entity_id'],
})
Key Design Decisions
- Template-driven steps: approval logic lives in WorkflowTemplate/WorkflowStep rows, not in code. Adding a new approval level (e.g., adding a CFO review above $50K) is a database insert, not a code deploy. The dynamic approver_type lets the approver be resolved at runtime from the entity’s context (e.g., the submitting user’s manager, fetched from instance.metadata.manager_id).
- required_count for consensus approvals: an any_of step with required_count=2 and 5 listed approvers requires any 2 of the 5 to approve. This models committee approvals, quorum decisions, and dual-control requirements (two-person integrity for financial transactions).
- Delegation as a decision type: rather than modifying the approver list, delegation is recorded as a decision record. The delegated user is checked against ApprovalDecision.delegated_to in the permission check. This preserves the audit trail: the original approver delegated, to whom, and when.
- SLA notification queue: WorkflowNotification.due_at enables a cron job to query WHERE sent_at IS NULL AND due_at <= NOW() and send reminder or escalation notifications. Escalation: after SLA expires, re-assign the step to the approver’s manager (resolve from HR system) and send an escalation alert.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you enforce that an approver cannot approve their own request?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Self-approval prevention: when resolving approvers for a step, filter out the requesting user. In _resolve_approvers(), after building the approvers list, check: if instance[‘requested_by’] in approvers and len(approvers) > 1: approvers.remove(instance[‘requested_by’]). If the requester is the only available approver (e.g., the only user in a role), escalate to their manager rather than allowing self-approval. Record this as an auto-escalation in WorkflowNotification. For the decide() function: add an explicit check at the top — if approver_id == instance[‘requested_by’]: raise SelfApprovalError(). Log the attempt in the audit trail. This enforcement is mandatory for financial controls (purchase orders, expense reimbursements) where self-approval would constitute a control bypass. For compliance reporting: query SELECT COUNT(*) FROM ApprovalDecision WHERE approver_id = (SELECT requested_by FROM WorkflowInstance WHERE instance_id=d.instance_id) — should always return zero.”}},{“@type”:”Question”,”name”:”How do you handle an approver who is on vacation or unavailable?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Three mechanisms: (1) Out-of-office delegation: a user sets a delegation in their profile (delegate_to_user_id, active_from, active_until). When resolving approvers, check if the resolved approver has an active delegation: if yes, add the delegate to the approver list. Both the original approver and the delegate can approve. (2) Manual delegation via the decide() decision=’delegated’ path: the approver explicitly reassigns the step to another user mid-workflow. (3) SLA-based escalation: if the WorkflowNotification.due_at passes without a decision, the escalation job auto-assigns the step to the approver’s manager (fetched from HR system or user.manager_id). Log escalation in WorkflowNotification with notification_type=’escalated’. Design principle: always have a fallback escalation path so that no workflow can be permanently blocked by a single unavailable approver.”}},{“@type”:”Question”,”name”:”How do you implement conditional branching in a workflow (approve if amount < $1000, different path if above)?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Conditional routing adds a condition column to WorkflowStep: a JSON expression evaluated at runtime against the instance metadata. Example: step with approver_type=’user’, approver_value=’123′ and condition={‘amount_lt’: 1000} is only activated if instance.metadata.amount < 1000. Implementation in _advance_workflow(): after the current step completes, query all steps with step_order > current where condition is NULL (unconditional) or condition evaluates TRUE against instance.metadata. If two steps have the same step_order (parallel branches), activate both. A simpler approach: use separate workflow templates per path — WorkflowTemplate for ‘purchase_small’ (<$1K, 1-step) and ‘purchase_large’ (>$1K, 2-step). The calling code selects the template based on the amount at start_workflow() time. Template selection at the entry point is easier to reason about than runtime branching within a single template.”}},{“@type”:”Question”,”name”:”How do you build an audit trail that satisfies SOX or ISO 27001 compliance requirements?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Compliance audit trails require: (1) immutability — records cannot be modified after creation; (2) completeness — every state transition, decision, and notification must be logged; (3) non-repudiation — records must prove who did what and when. Implementation: ApprovalDecision rows are INSERT-only — no UPDATE, no DELETE (enforce via row-level security: GRANT INSERT ON ApprovalDecision TO app_user; no UPDATE or DELETE grants). Add a signature column: HMAC-SHA256(decision_id + approver_id + decision + decided_at, signing_key) — the signature proves the record was not altered after the fact. For export: audit reports are queried as SELECT di.*, wi.entity_type, wi.entity_id, wi.requested_by, ws.step_name FROM ApprovalDecision di JOIN WorkflowInstance wi USING(instance_id) JOIN WorkflowStep ws USING(step_id) WHERE wi.entity_id=%s ORDER BY di.decided_at. This provides the full chain of custody for any entity that passed through a workflow.”}},{“@type”:”Question”,”name”:”How do you handle a workflow that needs to be restarted after a rejection?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”After a rejection, the entity (purchase order, content post) may be revised and resubmitted for approval. Two patterns: (1) New instance: always create a new WorkflowInstance for a resubmission. The old instance’s status remains ‘rejected’ in the audit trail; the new instance starts fresh at step 1. Link them via a predecessor_instance_id foreign key for lineage tracking. (2) Reset and restart: UPDATE WorkflowInstance SET status=’in_progress’, current_step_id=$first_step_id, completed_at=NULL; delete WorkflowNotification rows for the prior run; call _activate_step again. Simpler but loses rejection history in the instance record. Pattern 1 is preferred for compliance — the rejection event is permanently recorded and the resubmission is a distinct, traceable event. The entity (e.g., PurchaseOrder) has a workflow_instance_id foreign key pointing to the most recent active instance; past instances are queryable by entity_id.”}}]}
Approval workflow and content review system design is discussed in Atlassian system design interview questions.
Approval workflow and merchant review system design is covered in Shopify system design interview preparation.
Approval workflow and compliance review design is discussed in Coinbase system design interview guide.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems