Saga Orchestration System: Low-Level Design
A saga is a sequence of local transactions, each in a separate service, that together implement a distributed business transaction. When any step fails, previously committed steps must be compensated (undone) using compensating transactions. The orchestrator pattern places a central coordinator that calls each service in order, tracks state, and issues compensation calls on failure — unlike the choreography pattern where services emit events and react to each other. This design covers the orchestrator, step state machine, and idempotent compensation.
Core Data Model
CREATE TABLE SagaDefinition (
saga_type VARCHAR(100) PRIMARY KEY,
description TEXT,
steps JSONB NOT NULL, -- ordered list of step definitions
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- steps example:
-- [{"name":"reserve_inventory","service":"inventory","compensate":"release_inventory"},
-- {"name":"charge_payment","service":"payments","compensate":"refund_payment"},
-- {"name":"create_shipment","service":"fulfillment","compensate":"cancel_shipment"}]
CREATE TABLE SagaInstance (
saga_id BIGSERIAL PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
correlation_id VARCHAR(200) UNIQUE NOT NULL, -- e.g., order_id
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- running, completed, compensating, compensated, failed
current_step INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL, -- initial saga input
step_results JSONB NOT NULL DEFAULT '{}', -- results from each completed step
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
failed_step INT,
failure_reason TEXT
);
CREATE TABLE SagaStepRecord (
step_record_id BIGSERIAL PRIMARY KEY,
saga_id BIGINT NOT NULL REFERENCES SagaInstance(saga_id),
step_index INT NOT NULL,
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, compensating, compensated, failed
request_payload JSONB,
response_payload JSONB,
idempotency_key VARCHAR(200) NOT NULL,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
UNIQUE (saga_id, step_index)
);
CREATE INDEX ON SagaInstance(status) WHERE status IN ('running','compensating');
CREATE INDEX ON SagaInstance(correlation_id);
Orchestrator: Execute Forward Steps
import json, uuid
def start_saga(saga_type: str, correlation_id: str, payload: dict) -> int:
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga_type,)
)
if not definition:
raise ValueError(f"Unknown saga type: {saga_type}")
saga_id = db.fetchone("""
INSERT INTO SagaInstance (saga_type, correlation_id, payload)
VALUES (%s,%s,%s) RETURNING saga_id
""", (saga_type, correlation_id, json.dumps(payload)))['saga_id']
# Initialize all step records
steps = definition['steps']
for i, step in enumerate(steps):
idem_key = f"{saga_id}:{i}:{step['name']}:forward"
db.execute("""
INSERT INTO SagaStepRecord (saga_id, step_index, step_name, idempotency_key)
VALUES (%s,%s,%s,%s)
""", (saga_id, i, step['name'], idem_key))
# Advance to first step
advance_saga(saga_id)
return saga_id
def advance_saga(saga_id: int):
"""Execute the next pending step. Called after each step completes."""
saga = db.fetchone(
"SELECT * FROM SagaInstance WHERE saga_id=%s FOR UPDATE", (saga_id,)
)
if saga['status'] != 'running':
return
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_index = saga['current_step']
if step_index >= len(steps):
# All steps completed
db.execute("""
UPDATE SagaInstance SET status='completed', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
step_def = steps[step_index]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, step_index))
if step_record['status'] == 'completed':
# Already done (idempotency), advance
db.execute("""
UPDATE SagaInstance SET current_step=%s WHERE saga_id=%s
""", (step_index + 1, saga_id))
advance_saga(saga_id)
return
db.execute("""
UPDATE SagaStepRecord SET status='running', started_at=NOW() WHERE step_record_id=%s
""", (step_record['step_record_id'],))
# Prepare step payload: merge saga payload with results from previous steps
step_results = saga['step_results'] or {}
request_payload = {**saga['payload'], **step_results}
try:
response = _call_service(
service=step_def['service'],
action=step_def['name'],
payload=request_payload,
idempotency_key=step_record['idempotency_key'],
)
new_results = {**step_results, step_def['name']: response}
db.execute("""
UPDATE SagaStepRecord
SET status='completed', response_payload=%s, completed_at=NOW()
WHERE step_record_id=%s
""", (json.dumps(response), step_record['step_record_id']))
db.execute("""
UPDATE SagaInstance SET current_step=%s, step_results=%s WHERE saga_id=%s
""", (step_index + 1, json.dumps(new_results), saga_id))
advance_saga(saga_id)
except ServiceCallError as e:
_begin_compensation(saga_id, step_index, str(e))
Compensation: Roll Back Committed Steps
def _begin_compensation(saga_id: int, failed_step: int, reason: str):
db.execute("""
UPDATE SagaInstance
SET status='compensating', failed_step=%s, failure_reason=%s
WHERE saga_id=%s
""", (failed_step, reason[:500], saga_id))
db.execute("""
UPDATE SagaStepRecord SET status='failed' WHERE saga_id=%s AND step_index=%s
""", (saga_id, failed_step))
# Compensate completed steps in reverse order
compensate_saga(saga_id, failed_step - 1)
def compensate_saga(saga_id: int, from_step: int):
"""Compensate steps from from_step down to 0."""
if from_step < 0:
db.execute("""
UPDATE SagaInstance SET status='compensated', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
saga = db.fetchone("SELECT * FROM SagaInstance WHERE saga_id=%s", (saga_id,))
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_def = steps[from_step]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, from_step))
if step_record['status'] != 'completed':
# Step never succeeded — no compensation needed, move to previous
compensate_saga(saga_id, from_step - 1)
return
comp_idem_key = f"{saga_id}:{from_step}:{step_def['name']}:compensate"
db.execute("""
UPDATE SagaStepRecord SET status='compensating' WHERE step_record_id=%s
""", (step_record['step_record_id'],))
try:
_call_service(
service=step_def['service'],
action=step_def['compensate'],
payload=step_record['response_payload'], # pass original response for compensation
idempotency_key=comp_idem_key,
)
db.execute("""
UPDATE SagaStepRecord SET status='compensated', completed_at=NOW()
WHERE step_record_id=%s
""", (step_record['step_record_id'],))
compensate_saga(saga_id, from_step - 1)
except ServiceCallError as e:
# Compensation failed — human intervention required
db.execute("""
UPDATE SagaInstance SET status='failed', failure_reason=%s WHERE saga_id=%s
""", (f"Compensation failed at step {from_step}: {e}", saga_id))
_alert_ops(saga_id, from_step, str(e))
Key Design Decisions
- Idempotency keys for every step and compensation: the orchestrator retries failed service calls. Without idempotency, retrying “charge_payment” charges the customer twice. Each forward step and each compensation step has a unique, deterministic idempotency key (saga_id:step_index:action). The called service uses this key to deduplicate: if the same key has been processed, return the cached response.
- Compensation runs in reverse order: step 3 failed after steps 1 and 2 succeeded. Compensating in reverse (2 → 1) ensures dependencies are preserved — if step 2 created a shipment that references the inventory reservation from step 1, the shipment must be cancelled before the reservation is released.
- step_results carries context forward: the response from step 1 (e.g., reservation_id) is passed into the payload for step 2. This allows each service to reference artifacts created by previous steps without a separate lookup. The same step_results are passed to compensation handlers — the inventory service receives the reservation_id it originally returned, so it can locate and release the correct reservation.
- Compensation failures escalate to ops: if a compensation call fails (the payment service is down during rollback), the saga enters status=’failed’ and alerts the operations team. This is the “stuck saga” scenario — it requires manual intervention. Maintain an SLA for stuck saga resolution (e.g., P2 alert if a saga is in ‘failed’ for more than 15 minutes).
CREATE TABLE SagaDefinition (
saga_type VARCHAR(100) PRIMARY KEY,
description TEXT,
steps JSONB NOT NULL, -- ordered list of step definitions
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- steps example:
-- [{"name":"reserve_inventory","service":"inventory","compensate":"release_inventory"},
-- {"name":"charge_payment","service":"payments","compensate":"refund_payment"},
-- {"name":"create_shipment","service":"fulfillment","compensate":"cancel_shipment"}]
CREATE TABLE SagaInstance (
saga_id BIGSERIAL PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
correlation_id VARCHAR(200) UNIQUE NOT NULL, -- e.g., order_id
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- running, completed, compensating, compensated, failed
current_step INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL, -- initial saga input
step_results JSONB NOT NULL DEFAULT '{}', -- results from each completed step
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
failed_step INT,
failure_reason TEXT
);
CREATE TABLE SagaStepRecord (
step_record_id BIGSERIAL PRIMARY KEY,
saga_id BIGINT NOT NULL REFERENCES SagaInstance(saga_id),
step_index INT NOT NULL,
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, compensating, compensated, failed
request_payload JSONB,
response_payload JSONB,
idempotency_key VARCHAR(200) NOT NULL,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
UNIQUE (saga_id, step_index)
);
CREATE INDEX ON SagaInstance(status) WHERE status IN ('running','compensating');
CREATE INDEX ON SagaInstance(correlation_id);
import json, uuid
def start_saga(saga_type: str, correlation_id: str, payload: dict) -> int:
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga_type,)
)
if not definition:
raise ValueError(f"Unknown saga type: {saga_type}")
saga_id = db.fetchone("""
INSERT INTO SagaInstance (saga_type, correlation_id, payload)
VALUES (%s,%s,%s) RETURNING saga_id
""", (saga_type, correlation_id, json.dumps(payload)))['saga_id']
# Initialize all step records
steps = definition['steps']
for i, step in enumerate(steps):
idem_key = f"{saga_id}:{i}:{step['name']}:forward"
db.execute("""
INSERT INTO SagaStepRecord (saga_id, step_index, step_name, idempotency_key)
VALUES (%s,%s,%s,%s)
""", (saga_id, i, step['name'], idem_key))
# Advance to first step
advance_saga(saga_id)
return saga_id
def advance_saga(saga_id: int):
"""Execute the next pending step. Called after each step completes."""
saga = db.fetchone(
"SELECT * FROM SagaInstance WHERE saga_id=%s FOR UPDATE", (saga_id,)
)
if saga['status'] != 'running':
return
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_index = saga['current_step']
if step_index >= len(steps):
# All steps completed
db.execute("""
UPDATE SagaInstance SET status='completed', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
step_def = steps[step_index]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, step_index))
if step_record['status'] == 'completed':
# Already done (idempotency), advance
db.execute("""
UPDATE SagaInstance SET current_step=%s WHERE saga_id=%s
""", (step_index + 1, saga_id))
advance_saga(saga_id)
return
db.execute("""
UPDATE SagaStepRecord SET status='running', started_at=NOW() WHERE step_record_id=%s
""", (step_record['step_record_id'],))
# Prepare step payload: merge saga payload with results from previous steps
step_results = saga['step_results'] or {}
request_payload = {**saga['payload'], **step_results}
try:
response = _call_service(
service=step_def['service'],
action=step_def['name'],
payload=request_payload,
idempotency_key=step_record['idempotency_key'],
)
new_results = {**step_results, step_def['name']: response}
db.execute("""
UPDATE SagaStepRecord
SET status='completed', response_payload=%s, completed_at=NOW()
WHERE step_record_id=%s
""", (json.dumps(response), step_record['step_record_id']))
db.execute("""
UPDATE SagaInstance SET current_step=%s, step_results=%s WHERE saga_id=%s
""", (step_index + 1, json.dumps(new_results), saga_id))
advance_saga(saga_id)
except ServiceCallError as e:
_begin_compensation(saga_id, step_index, str(e))
def _begin_compensation(saga_id: int, failed_step: int, reason: str):
db.execute("""
UPDATE SagaInstance
SET status='compensating', failed_step=%s, failure_reason=%s
WHERE saga_id=%s
""", (failed_step, reason[:500], saga_id))
db.execute("""
UPDATE SagaStepRecord SET status='failed' WHERE saga_id=%s AND step_index=%s
""", (saga_id, failed_step))
# Compensate completed steps in reverse order
compensate_saga(saga_id, failed_step - 1)
def compensate_saga(saga_id: int, from_step: int):
"""Compensate steps from from_step down to 0."""
if from_step < 0:
db.execute("""
UPDATE SagaInstance SET status='compensated', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
saga = db.fetchone("SELECT * FROM SagaInstance WHERE saga_id=%s", (saga_id,))
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_def = steps[from_step]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, from_step))
if step_record['status'] != 'completed':
# Step never succeeded — no compensation needed, move to previous
compensate_saga(saga_id, from_step - 1)
return
comp_idem_key = f"{saga_id}:{from_step}:{step_def['name']}:compensate"
db.execute("""
UPDATE SagaStepRecord SET status='compensating' WHERE step_record_id=%s
""", (step_record['step_record_id'],))
try:
_call_service(
service=step_def['service'],
action=step_def['compensate'],
payload=step_record['response_payload'], # pass original response for compensation
idempotency_key=comp_idem_key,
)
db.execute("""
UPDATE SagaStepRecord SET status='compensated', completed_at=NOW()
WHERE step_record_id=%s
""", (step_record['step_record_id'],))
compensate_saga(saga_id, from_step - 1)
except ServiceCallError as e:
# Compensation failed — human intervention required
db.execute("""
UPDATE SagaInstance SET status='failed', failure_reason=%s WHERE saga_id=%s
""", (f"Compensation failed at step {from_step}: {e}", saga_id))
_alert_ops(saga_id, from_step, str(e))
Saga orchestration and distributed transaction design is discussed in Uber system design interview questions.
Saga orchestration and booking workflow design is covered in Airbnb system design interview preparation.
Saga orchestration and payment workflow coordination design is discussed in Stripe system design interview guide.