Low-Level Design: Workflow Engine – DAG Execution, Step Retry, and State Persistence (2025)

Low-Level Design: Workflow Engine

A workflow engine executes directed acyclic graphs (DAGs) of steps, handling parallelism, retries, and failure propagation. This is a common LLD interview topic at Atlassian, Stripe, and Databricks. Think: Airflow, Temporal, GitHub Actions, or AWS Step Functions internals.

1. Core Entities

WorkflowDefinition

definition_id    UUID PRIMARY KEY
name             VARCHAR
version          INT
dag              JSONB
-- dag structure:
-- {
--   "nodes": [{"step_name": "fetch_data", "max_retries": 3, "timeout_s": 60}, ...],
--   "edges": [{"from": "fetch_data", "to": "process_data", "condition": null}, ...]
-- }

WorkflowRun

run_id           UUID PRIMARY KEY
definition_id    UUID REFERENCES workflow_definitions
status           ENUM('PENDING','RUNNING','COMPLETED','FAILED','CANCELLED')
input            JSONB
output           JSONB
started_at       TIMESTAMPTZ
completed_at     TIMESTAMPTZ
sla_seconds      INT

StepRun

step_run_id      UUID PRIMARY KEY
run_id           UUID REFERENCES workflow_runs
step_name        VARCHAR
status           ENUM('PENDING','RUNNING','COMPLETED','FAILED','SKIPPED')
input            JSONB
output           JSONB
attempt_count    INT DEFAULT 0
started_at       TIMESTAMPTZ
completed_at     TIMESTAMPTZ
error_message    TEXT
worker_id        VARCHAR
last_heartbeat   TIMESTAMPTZ

2. DAG Execution Engine

Topological Sort to Find Runnable Steps

def get_runnable_steps(dag, step_statuses):
    """Return step names where all dependencies are COMPLETED."""
    runnable = []
    for node in dag['nodes']:
        step = node['step_name']
        if step_statuses.get(step) != 'PENDING':
            continue
        deps = [e['from'] for e in dag['edges'] if e['to'] == step]
        if all(step_statuses.get(d) == 'COMPLETED' for d in deps):
            runnable.append(step)
    return runnable

Dispatch Loop

def run_workflow(run_id, dag):
    step_statuses = {n['step_name']: 'PENDING' for n in dag['nodes']}
    while True:
        runnable = get_runnable_steps(dag, step_statuses)
        if not runnable:
            if all(s in ('COMPLETED','SKIPPED','FAILED') for s in step_statuses.values()):
                break
            time.sleep(1)  # wait for in-flight steps
            continue
        for step in runnable:
            claim_and_dispatch(run_id, step)   # atomic DB claim + queue push
            step_statuses[step] = 'RUNNING'

Steps with satisfied dependencies execute in parallel – multiple workers pick from the queue concurrently.

3. Retry Logic with Exponential Backoff

import random, time

BASE_DELAY = 2  # seconds

def on_step_failure(step_run, max_retries=3):
    if step_run['attempt_count'] < max_retries:
        delay = BASE_DELAY ** step_run['attempt_count']
        jitter = random.uniform(0, delay * 0.1)
        schedule_retry(step_run['step_run_id'], delay + jitter)
    else:
        mark_step_failed(step_run['step_run_id'])
        propagate_failure_to_workflow(step_run['run_id'])
        send_alert(step_run)

Retry schedule for base_delay=2, max_retries=3:

Attempt Delay
1 (first retry) ~2s
2 ~4s
3 ~8s
4 (exhausted) FAILED

4. Exactly-Once Semantics

Atomic Step Claiming

-- Worker atomically claims a step before executing
UPDATE step_runs
SET status = 'RUNNING',
    worker_id = :worker_id,
    started_at = NOW(),
    attempt_count = attempt_count + 1
WHERE step_run_id = :step_run_id
  AND status = 'PENDING'
RETURNING *;
-- If 0 rows returned, another worker claimed it - abort.

Idempotent Completion

UPDATE step_runs
SET status = 'COMPLETED',
    output = :output,
    completed_at = NOW()
WHERE step_run_id = :step_run_id
  AND worker_id = :worker_id   -- only the claiming worker can complete it
  AND status = 'RUNNING';

Heartbeat and Timeout Detection

Workers emit a heartbeat every 30 seconds:

UPDATE step_runs SET last_heartbeat = NOW()
WHERE step_run_id = :step_run_id AND worker_id = :worker_id;

A separate timeout detector (runs every 60s) reclaims stalled steps:

UPDATE step_runs
SET status = 'PENDING', worker_id = NULL, last_heartbeat = NULL
WHERE status = 'RUNNING'
  AND last_heartbeat < NOW() - INTERVAL '90 seconds';

5. Conditional Branching

A step’s output can include a branch key. The engine evaluates edge conditions to determine which downstream steps to enable and which to skip:

def resolve_branches(dag, step_name, step_output, step_statuses):
    branch = step_output.get('branch')
    for edge in dag['edges']:
        if edge['from'] != step_name:
            continue
        condition = edge.get('condition')
        if condition is None or condition == branch:
            # enable downstream step (leave as PENDING)
            pass
        else:
            # skip downstream step
            step_statuses[edge['to']] = 'SKIPPED'

DAG edge definition example:

{
  "from": "validate_input",
  "to": "process_premium",
  "condition": "premium"
}

6. Observability

Gantt Chart Data

SELECT step_name, started_at, completed_at, status, attempt_count
FROM step_runs
WHERE run_id = :run_id
ORDER BY started_at;

The UI renders this as a Gantt chart, visualizing parallelism and bottlenecks in the workflow execution.

SLA Breach Alerting

SELECT run_id, definition_id, started_at
FROM workflow_runs
WHERE status NOT IN ('COMPLETED','FAILED','CANCELLED')
  AND started_at + (sla_seconds * INTERVAL '1 second') < NOW();

Alert fires when a running workflow has exceeded its SLA without completing. Page on-call; do not auto-cancel (the workflow may be close to finishing).

Design Tradeoffs

  • Polling vs event-driven dispatch: Polling is simpler but adds latency. Event-driven (workers emit a “step completed” event that triggers the scheduler) is faster but more complex. Most production systems use a hybrid: event-driven with a polling fallback.
  • DB-backed queue vs external queue: Storing step_runs in Postgres with SELECT FOR UPDATE SKIP LOCKED is simple and consistent. Kafka/SQS scales better but requires idempotency at the application layer.
  • Timeout granularity: Per-step timeouts (stored in WorkflowDefinition) vs workflow-level SLA. Both are needed – per-step for fast failure, SLA for business-level alerting.

Interview Tips

  • Draw the state machine for StepRun status transitions – interviewers will probe edge cases (e.g. worker dies mid-execution).
  • The heartbeat + timeout detector pattern is the standard answer for exactly-once in distributed systems without a distributed lock service.
  • Conditional branching is a common follow-up – have the edge condition schema ready.
  • Mention Temporal or Airflow as prior art to show awareness, then explain what you would build differently.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a workflow engine determine which steps are ready to execute?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The engine performs a topological readiness check: a step is ready when all steps listed in its depends_on have status succeeded or skipped. After each step completes, the engine re-evaluates readiness and enqueues newly unblocked steps immediately.”
}
},
{
“@type”: “Question”,
“name”: “How are parallel branches and joins handled in a workflow engine?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A parallel step fans out by enqueuing all child steps simultaneously. A join step lists all parallel branches in its depends_on list and only becomes ready once every branch has completed. Outputs from all branches are merged into the join step's input context.”
}
},
{
“@type”: “Question”,
“name”: “How does conditional routing work in a DAG-based workflow engine?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A decision step evaluates a CEL or JSONPath expression against the current instance context. Based on the result, it marks one branch's entry step as pending and all other branch entry steps as skipped. Downstream steps depending on skipped steps are transitively skipped.”
}
},
{
“@type”: “Question”,
“name”: “How does a workflow engine handle long-running steps without false timeouts?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Long-running steps send periodic heartbeat pings to the engine to renew their timeout lease. If heartbeats stop arriving within the expected window, the watchdog treats the step as timed out and triggers re-execution on another worker, applying the step's retry policy.”
}
}
]
}

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Atlassian Interview Guide

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

Scroll to Top