Low-Level Design: Workflow Engine – DAG Execution, Step Retry, and State Persistence (2025)

Low-Level Design: Workflow Engine

A workflow engine executes directed acyclic graphs (DAGs) of steps, handling parallelism, retries, and failure propagation. This is a common LLD interview topic at Atlassian, Stripe, and Databricks. Think: Airflow, Temporal, GitHub Actions, or AWS Step Functions internals.

1. Core Entities

WorkflowDefinition

definition_id    UUID PRIMARY KEY
name             VARCHAR
version          INT
dag              JSONB
-- dag structure:
-- {
--   "nodes": [{"step_name": "fetch_data", "max_retries": 3, "timeout_s": 60}, ...],
--   "edges": [{"from": "fetch_data", "to": "process_data", "condition": null}, ...]
-- }

WorkflowRun

run_id           UUID PRIMARY KEY
definition_id    UUID REFERENCES workflow_definitions
status           ENUM('PENDING','RUNNING','COMPLETED','FAILED','CANCELLED')
input            JSONB
output           JSONB
started_at       TIMESTAMPTZ
completed_at     TIMESTAMPTZ
sla_seconds      INT

StepRun

step_run_id      UUID PRIMARY KEY
run_id           UUID REFERENCES workflow_runs
step_name        VARCHAR
status           ENUM('PENDING','RUNNING','COMPLETED','FAILED','SKIPPED')
input            JSONB
output           JSONB
attempt_count    INT DEFAULT 0
started_at       TIMESTAMPTZ
completed_at     TIMESTAMPTZ
error_message    TEXT
worker_id        VARCHAR
last_heartbeat   TIMESTAMPTZ

2. DAG Execution Engine

Topological Sort to Find Runnable Steps

def get_runnable_steps(dag, step_statuses):
    """Return step names where all dependencies are COMPLETED."""
    runnable = []
    for node in dag['nodes']:
        step = node['step_name']
        if step_statuses.get(step) != 'PENDING':
            continue
        deps = [e['from'] for e in dag['edges'] if e['to'] == step]
        if all(step_statuses.get(d) == 'COMPLETED' for d in deps):
            runnable.append(step)
    return runnable

Dispatch Loop

def run_workflow(run_id, dag):
    step_statuses = {n['step_name']: 'PENDING' for n in dag['nodes']}
    while True:
        runnable = get_runnable_steps(dag, step_statuses)
        if not runnable:
            if all(s in ('COMPLETED','SKIPPED','FAILED') for s in step_statuses.values()):
                break
            time.sleep(1)  # wait for in-flight steps
            continue
        for step in runnable:
            claim_and_dispatch(run_id, step)   # atomic DB claim + queue push
            step_statuses[step] = 'RUNNING'

Steps with satisfied dependencies execute in parallel – multiple workers pick from the queue concurrently.

3. Retry Logic with Exponential Backoff

import random, time

BASE_DELAY = 2  # seconds

def on_step_failure(step_run, max_retries=3):
    if step_run['attempt_count'] < max_retries:
        delay = BASE_DELAY ** step_run['attempt_count']
        jitter = random.uniform(0, delay * 0.1)
        schedule_retry(step_run['step_run_id'], delay + jitter)
    else:
        mark_step_failed(step_run['step_run_id'])
        propagate_failure_to_workflow(step_run['run_id'])
        send_alert(step_run)

Retry schedule for base_delay=2, max_retries=3:

Attempt Delay
1 (first retry) ~2s
2 ~4s
3 ~8s
4 (exhausted) FAILED

4. Exactly-Once Semantics

Atomic Step Claiming

-- Worker atomically claims a step before executing
UPDATE step_runs
SET status = 'RUNNING',
    worker_id = :worker_id,
    started_at = NOW(),
    attempt_count = attempt_count + 1
WHERE step_run_id = :step_run_id
  AND status = 'PENDING'
RETURNING *;
-- If 0 rows returned, another worker claimed it - abort.

Idempotent Completion

UPDATE step_runs
SET status = 'COMPLETED',
    output = :output,
    completed_at = NOW()
WHERE step_run_id = :step_run_id
  AND worker_id = :worker_id   -- only the claiming worker can complete it
  AND status = 'RUNNING';

Heartbeat and Timeout Detection

Workers emit a heartbeat every 30 seconds:

UPDATE step_runs SET last_heartbeat = NOW()
WHERE step_run_id = :step_run_id AND worker_id = :worker_id;

A separate timeout detector (runs every 60s) reclaims stalled steps:

UPDATE step_runs
SET status = 'PENDING', worker_id = NULL, last_heartbeat = NULL
WHERE status = 'RUNNING'
  AND last_heartbeat < NOW() - INTERVAL '90 seconds';

5. Conditional Branching

A step’s output can include a branch key. The engine evaluates edge conditions to determine which downstream steps to enable and which to skip:

def resolve_branches(dag, step_name, step_output, step_statuses):
    branch = step_output.get('branch')
    for edge in dag['edges']:
        if edge['from'] != step_name:
            continue
        condition = edge.get('condition')
        if condition is None or condition == branch:
            # enable downstream step (leave as PENDING)
            pass
        else:
            # skip downstream step
            step_statuses[edge['to']] = 'SKIPPED'

DAG edge definition example:

{
  "from": "validate_input",
  "to": "process_premium",
  "condition": "premium"
}

6. Observability

Gantt Chart Data

SELECT step_name, started_at, completed_at, status, attempt_count
FROM step_runs
WHERE run_id = :run_id
ORDER BY started_at;

The UI renders this as a Gantt chart, visualizing parallelism and bottlenecks in the workflow execution.

SLA Breach Alerting

SELECT run_id, definition_id, started_at
FROM workflow_runs
WHERE status NOT IN ('COMPLETED','FAILED','CANCELLED')
  AND started_at + (sla_seconds * INTERVAL '1 second') < NOW();

Alert fires when a running workflow has exceeded its SLA without completing. Page on-call; do not auto-cancel (the workflow may be close to finishing).

Design Tradeoffs

  • Polling vs event-driven dispatch: Polling is simpler but adds latency. Event-driven (workers emit a “step completed” event that triggers the scheduler) is faster but more complex. Most production systems use a hybrid: event-driven with a polling fallback.
  • DB-backed queue vs external queue: Storing step_runs in Postgres with SELECT FOR UPDATE SKIP LOCKED is simple and consistent. Kafka/SQS scales better but requires idempotency at the application layer.
  • Timeout granularity: Per-step timeouts (stored in WorkflowDefinition) vs workflow-level SLA. Both are needed – per-step for fast failure, SLA for business-level alerting.

Interview Tips

  • Draw the state machine for StepRun status transitions – interviewers will probe edge cases (e.g. worker dies mid-execution).
  • The heartbeat + timeout detector pattern is the standard answer for exactly-once in distributed systems without a distributed lock service.
  • Conditional branching is a common follow-up – have the edge condition schema ready.
  • Mention Temporal or Airflow as prior art to show awareness, then explain what you would build differently.

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Atlassian Interview Guide

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Shopify Interview Guide

Scroll to Top