Low-Level Design: Workflow Engine
A workflow engine executes directed acyclic graphs (DAGs) of steps, handling parallelism, retries, and failure propagation. This is a common LLD interview topic at Atlassian, Stripe, and Databricks. Think: Airflow, Temporal, GitHub Actions, or AWS Step Functions internals.
1. Core Entities
WorkflowDefinition
definition_id UUID PRIMARY KEY
name VARCHAR
version INT
dag JSONB
-- dag structure:
-- {
-- "nodes": [{"step_name": "fetch_data", "max_retries": 3, "timeout_s": 60}, ...],
-- "edges": [{"from": "fetch_data", "to": "process_data", "condition": null}, ...]
-- }
WorkflowRun
run_id UUID PRIMARY KEY
definition_id UUID REFERENCES workflow_definitions
status ENUM('PENDING','RUNNING','COMPLETED','FAILED','CANCELLED')
input JSONB
output JSONB
started_at TIMESTAMPTZ
completed_at TIMESTAMPTZ
sla_seconds INT
StepRun
step_run_id UUID PRIMARY KEY
run_id UUID REFERENCES workflow_runs
step_name VARCHAR
status ENUM('PENDING','RUNNING','COMPLETED','FAILED','SKIPPED')
input JSONB
output JSONB
attempt_count INT DEFAULT 0
started_at TIMESTAMPTZ
completed_at TIMESTAMPTZ
error_message TEXT
worker_id VARCHAR
last_heartbeat TIMESTAMPTZ
2. DAG Execution Engine
Topological Sort to Find Runnable Steps
def get_runnable_steps(dag, step_statuses):
"""Return step names where all dependencies are COMPLETED."""
runnable = []
for node in dag['nodes']:
step = node['step_name']
if step_statuses.get(step) != 'PENDING':
continue
deps = [e['from'] for e in dag['edges'] if e['to'] == step]
if all(step_statuses.get(d) == 'COMPLETED' for d in deps):
runnable.append(step)
return runnable
Dispatch Loop
def run_workflow(run_id, dag):
step_statuses = {n['step_name']: 'PENDING' for n in dag['nodes']}
while True:
runnable = get_runnable_steps(dag, step_statuses)
if not runnable:
if all(s in ('COMPLETED','SKIPPED','FAILED') for s in step_statuses.values()):
break
time.sleep(1) # wait for in-flight steps
continue
for step in runnable:
claim_and_dispatch(run_id, step) # atomic DB claim + queue push
step_statuses[step] = 'RUNNING'
Steps with satisfied dependencies execute in parallel – multiple workers pick from the queue concurrently.
3. Retry Logic with Exponential Backoff
import random, time
BASE_DELAY = 2 # seconds
def on_step_failure(step_run, max_retries=3):
if step_run['attempt_count'] < max_retries:
delay = BASE_DELAY ** step_run['attempt_count']
jitter = random.uniform(0, delay * 0.1)
schedule_retry(step_run['step_run_id'], delay + jitter)
else:
mark_step_failed(step_run['step_run_id'])
propagate_failure_to_workflow(step_run['run_id'])
send_alert(step_run)
Retry schedule for base_delay=2, max_retries=3:
| Attempt | Delay |
|---|---|
| 1 (first retry) | ~2s |
| 2 | ~4s |
| 3 | ~8s |
| 4 (exhausted) | FAILED |
4. Exactly-Once Semantics
Atomic Step Claiming
-- Worker atomically claims a step before executing
UPDATE step_runs
SET status = 'RUNNING',
worker_id = :worker_id,
started_at = NOW(),
attempt_count = attempt_count + 1
WHERE step_run_id = :step_run_id
AND status = 'PENDING'
RETURNING *;
-- If 0 rows returned, another worker claimed it - abort.
Idempotent Completion
UPDATE step_runs
SET status = 'COMPLETED',
output = :output,
completed_at = NOW()
WHERE step_run_id = :step_run_id
AND worker_id = :worker_id -- only the claiming worker can complete it
AND status = 'RUNNING';
Heartbeat and Timeout Detection
Workers emit a heartbeat every 30 seconds:
UPDATE step_runs SET last_heartbeat = NOW()
WHERE step_run_id = :step_run_id AND worker_id = :worker_id;
A separate timeout detector (runs every 60s) reclaims stalled steps:
UPDATE step_runs
SET status = 'PENDING', worker_id = NULL, last_heartbeat = NULL
WHERE status = 'RUNNING'
AND last_heartbeat < NOW() - INTERVAL '90 seconds';
5. Conditional Branching
A step’s output can include a branch key. The engine evaluates edge conditions to determine which downstream steps to enable and which to skip:
def resolve_branches(dag, step_name, step_output, step_statuses):
branch = step_output.get('branch')
for edge in dag['edges']:
if edge['from'] != step_name:
continue
condition = edge.get('condition')
if condition is None or condition == branch:
# enable downstream step (leave as PENDING)
pass
else:
# skip downstream step
step_statuses[edge['to']] = 'SKIPPED'
DAG edge definition example:
{
"from": "validate_input",
"to": "process_premium",
"condition": "premium"
}
6. Observability
Gantt Chart Data
SELECT step_name, started_at, completed_at, status, attempt_count
FROM step_runs
WHERE run_id = :run_id
ORDER BY started_at;
The UI renders this as a Gantt chart, visualizing parallelism and bottlenecks in the workflow execution.
SLA Breach Alerting
SELECT run_id, definition_id, started_at
FROM workflow_runs
WHERE status NOT IN ('COMPLETED','FAILED','CANCELLED')
AND started_at + (sla_seconds * INTERVAL '1 second') < NOW();
Alert fires when a running workflow has exceeded its SLA without completing. Page on-call; do not auto-cancel (the workflow may be close to finishing).
Design Tradeoffs
- Polling vs event-driven dispatch: Polling is simpler but adds latency. Event-driven (workers emit a “step completed” event that triggers the scheduler) is faster but more complex. Most production systems use a hybrid: event-driven with a polling fallback.
- DB-backed queue vs external queue: Storing step_runs in Postgres with SELECT FOR UPDATE SKIP LOCKED is simple and consistent. Kafka/SQS scales better but requires idempotency at the application layer.
- Timeout granularity: Per-step timeouts (stored in WorkflowDefinition) vs workflow-level SLA. Both are needed – per-step for fast failure, SLA for business-level alerting.
Interview Tips
- Draw the state machine for StepRun status transitions – interviewers will probe edge cases (e.g. worker dies mid-execution).
- The heartbeat + timeout detector pattern is the standard answer for exactly-once in distributed systems without a distributed lock service.
- Conditional branching is a common follow-up – have the edge condition schema ready.
- Mention Temporal or Airflow as prior art to show awareness, then explain what you would build differently.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a workflow engine determine which steps are ready to execute?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The engine performs a topological readiness check: a step is ready when all steps listed in its depends_on have status succeeded or skipped. After each step completes, the engine re-evaluates readiness and enqueues newly unblocked steps immediately.”
}
},
{
“@type”: “Question”,
“name”: “How are parallel branches and joins handled in a workflow engine?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A parallel step fans out by enqueuing all child steps simultaneously. A join step lists all parallel branches in its depends_on list and only becomes ready once every branch has completed. Outputs from all branches are merged into the join step's input context.”
}
},
{
“@type”: “Question”,
“name”: “How does conditional routing work in a DAG-based workflow engine?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A decision step evaluates a CEL or JSONPath expression against the current instance context. Based on the result, it marks one branch's entry step as pending and all other branch entry steps as skipped. Downstream steps depending on skipped steps are transitively skipped.”
}
},
{
“@type”: “Question”,
“name”: “How does a workflow engine handle long-running steps without false timeouts?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Long-running steps send periodic heartbeat pings to the engine to renew their timeout lease. If heartbeats stop arriving within the expected window, the watchdog treats the step as timed out and triggers re-execution on another worker, applying the step's retry policy.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Atlassian Interview Guide
See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering