Saga Orchestration System: Low-Level Design
A saga is a sequence of local transactions, each in a separate service, that together implement a distributed business transaction. When any step fails, previously committed steps must be compensated (undone) using compensating transactions. The orchestrator pattern places a central coordinator that calls each service in order, tracks state, and issues compensation calls on failure — unlike the choreography pattern where services emit events and react to each other. This design covers the orchestrator, step state machine, and idempotent compensation.
Core Data Model
CREATE TABLE SagaDefinition (
saga_type VARCHAR(100) PRIMARY KEY,
description TEXT,
steps JSONB NOT NULL, -- ordered list of step definitions
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- steps example:
-- [{"name":"reserve_inventory","service":"inventory","compensate":"release_inventory"},
-- {"name":"charge_payment","service":"payments","compensate":"refund_payment"},
-- {"name":"create_shipment","service":"fulfillment","compensate":"cancel_shipment"}]
CREATE TABLE SagaInstance (
saga_id BIGSERIAL PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
correlation_id VARCHAR(200) UNIQUE NOT NULL, -- e.g., order_id
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- running, completed, compensating, compensated, failed
current_step INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL, -- initial saga input
step_results JSONB NOT NULL DEFAULT '{}', -- results from each completed step
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
failed_step INT,
failure_reason TEXT
);
CREATE TABLE SagaStepRecord (
step_record_id BIGSERIAL PRIMARY KEY,
saga_id BIGINT NOT NULL REFERENCES SagaInstance(saga_id),
step_index INT NOT NULL,
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, compensating, compensated, failed
request_payload JSONB,
response_payload JSONB,
idempotency_key VARCHAR(200) NOT NULL,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
UNIQUE (saga_id, step_index)
);
CREATE INDEX ON SagaInstance(status) WHERE status IN ('running','compensating');
CREATE INDEX ON SagaInstance(correlation_id);
Orchestrator: Execute Forward Steps
import json, uuid
def start_saga(saga_type: str, correlation_id: str, payload: dict) -> int:
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga_type,)
)
if not definition:
raise ValueError(f"Unknown saga type: {saga_type}")
saga_id = db.fetchone("""
INSERT INTO SagaInstance (saga_type, correlation_id, payload)
VALUES (%s,%s,%s) RETURNING saga_id
""", (saga_type, correlation_id, json.dumps(payload)))['saga_id']
# Initialize all step records
steps = definition['steps']
for i, step in enumerate(steps):
idem_key = f"{saga_id}:{i}:{step['name']}:forward"
db.execute("""
INSERT INTO SagaStepRecord (saga_id, step_index, step_name, idempotency_key)
VALUES (%s,%s,%s,%s)
""", (saga_id, i, step['name'], idem_key))
# Advance to first step
advance_saga(saga_id)
return saga_id
def advance_saga(saga_id: int):
"""Execute the next pending step. Called after each step completes."""
saga = db.fetchone(
"SELECT * FROM SagaInstance WHERE saga_id=%s FOR UPDATE", (saga_id,)
)
if saga['status'] != 'running':
return
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_index = saga['current_step']
if step_index >= len(steps):
# All steps completed
db.execute("""
UPDATE SagaInstance SET status='completed', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
step_def = steps[step_index]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, step_index))
if step_record['status'] == 'completed':
# Already done (idempotency), advance
db.execute("""
UPDATE SagaInstance SET current_step=%s WHERE saga_id=%s
""", (step_index + 1, saga_id))
advance_saga(saga_id)
return
db.execute("""
UPDATE SagaStepRecord SET status='running', started_at=NOW() WHERE step_record_id=%s
""", (step_record['step_record_id'],))
# Prepare step payload: merge saga payload with results from previous steps
step_results = saga['step_results'] or {}
request_payload = {**saga['payload'], **step_results}
try:
response = _call_service(
service=step_def['service'],
action=step_def['name'],
payload=request_payload,
idempotency_key=step_record['idempotency_key'],
)
new_results = {**step_results, step_def['name']: response}
db.execute("""
UPDATE SagaStepRecord
SET status='completed', response_payload=%s, completed_at=NOW()
WHERE step_record_id=%s
""", (json.dumps(response), step_record['step_record_id']))
db.execute("""
UPDATE SagaInstance SET current_step=%s, step_results=%s WHERE saga_id=%s
""", (step_index + 1, json.dumps(new_results), saga_id))
advance_saga(saga_id)
except ServiceCallError as e:
_begin_compensation(saga_id, step_index, str(e))
Compensation: Roll Back Committed Steps
def _begin_compensation(saga_id: int, failed_step: int, reason: str):
db.execute("""
UPDATE SagaInstance
SET status='compensating', failed_step=%s, failure_reason=%s
WHERE saga_id=%s
""", (failed_step, reason[:500], saga_id))
db.execute("""
UPDATE SagaStepRecord SET status='failed' WHERE saga_id=%s AND step_index=%s
""", (saga_id, failed_step))
# Compensate completed steps in reverse order
compensate_saga(saga_id, failed_step - 1)
def compensate_saga(saga_id: int, from_step: int):
"""Compensate steps from from_step down to 0."""
if from_step < 0:
db.execute("""
UPDATE SagaInstance SET status='compensated', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
saga = db.fetchone("SELECT * FROM SagaInstance WHERE saga_id=%s", (saga_id,))
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_def = steps[from_step]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, from_step))
if step_record['status'] != 'completed':
# Step never succeeded — no compensation needed, move to previous
compensate_saga(saga_id, from_step - 1)
return
comp_idem_key = f"{saga_id}:{from_step}:{step_def['name']}:compensate"
db.execute("""
UPDATE SagaStepRecord SET status='compensating' WHERE step_record_id=%s
""", (step_record['step_record_id'],))
try:
_call_service(
service=step_def['service'],
action=step_def['compensate'],
payload=step_record['response_payload'], # pass original response for compensation
idempotency_key=comp_idem_key,
)
db.execute("""
UPDATE SagaStepRecord SET status='compensated', completed_at=NOW()
WHERE step_record_id=%s
""", (step_record['step_record_id'],))
compensate_saga(saga_id, from_step - 1)
except ServiceCallError as e:
# Compensation failed — human intervention required
db.execute("""
UPDATE SagaInstance SET status='failed', failure_reason=%s WHERE saga_id=%s
""", (f"Compensation failed at step {from_step}: {e}", saga_id))
_alert_ops(saga_id, from_step, str(e))
Key Design Decisions
- Idempotency keys for every step and compensation: the orchestrator retries failed service calls. Without idempotency, retrying “charge_payment” charges the customer twice. Each forward step and each compensation step has a unique, deterministic idempotency key (saga_id:step_index:action). The called service uses this key to deduplicate: if the same key has been processed, return the cached response.
- Compensation runs in reverse order: step 3 failed after steps 1 and 2 succeeded. Compensating in reverse (2 → 1) ensures dependencies are preserved — if step 2 created a shipment that references the inventory reservation from step 1, the shipment must be cancelled before the reservation is released.
- step_results carries context forward: the response from step 1 (e.g., reservation_id) is passed into the payload for step 2. This allows each service to reference artifacts created by previous steps without a separate lookup. The same step_results are passed to compensation handlers — the inventory service receives the reservation_id it originally returned, so it can locate and release the correct reservation.
- Compensation failures escalate to ops: if a compensation call fails (the payment service is down during rollback), the saga enters status=’failed’ and alerts the operations team. This is the “stuck saga” scenario — it requires manual intervention. Maintain an SLA for stuck saga resolution (e.g., P2 alert if a saga is in ‘failed’ for more than 15 minutes).
CREATE TABLE SagaDefinition (
saga_type VARCHAR(100) PRIMARY KEY,
description TEXT,
steps JSONB NOT NULL, -- ordered list of step definitions
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- steps example:
-- [{"name":"reserve_inventory","service":"inventory","compensate":"release_inventory"},
-- {"name":"charge_payment","service":"payments","compensate":"refund_payment"},
-- {"name":"create_shipment","service":"fulfillment","compensate":"cancel_shipment"}]
CREATE TABLE SagaInstance (
saga_id BIGSERIAL PRIMARY KEY,
saga_type VARCHAR(100) NOT NULL,
correlation_id VARCHAR(200) UNIQUE NOT NULL, -- e.g., order_id
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- running, completed, compensating, compensated, failed
current_step INT NOT NULL DEFAULT 0,
payload JSONB NOT NULL, -- initial saga input
step_results JSONB NOT NULL DEFAULT '{}', -- results from each completed step
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
failed_step INT,
failure_reason TEXT
);
CREATE TABLE SagaStepRecord (
step_record_id BIGSERIAL PRIMARY KEY,
saga_id BIGINT NOT NULL REFERENCES SagaInstance(saga_id),
step_index INT NOT NULL,
step_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, compensating, compensated, failed
request_payload JSONB,
response_payload JSONB,
idempotency_key VARCHAR(200) NOT NULL,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
UNIQUE (saga_id, step_index)
);
CREATE INDEX ON SagaInstance(status) WHERE status IN ('running','compensating');
CREATE INDEX ON SagaInstance(correlation_id);
import json, uuid
def start_saga(saga_type: str, correlation_id: str, payload: dict) -> int:
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga_type,)
)
if not definition:
raise ValueError(f"Unknown saga type: {saga_type}")
saga_id = db.fetchone("""
INSERT INTO SagaInstance (saga_type, correlation_id, payload)
VALUES (%s,%s,%s) RETURNING saga_id
""", (saga_type, correlation_id, json.dumps(payload)))['saga_id']
# Initialize all step records
steps = definition['steps']
for i, step in enumerate(steps):
idem_key = f"{saga_id}:{i}:{step['name']}:forward"
db.execute("""
INSERT INTO SagaStepRecord (saga_id, step_index, step_name, idempotency_key)
VALUES (%s,%s,%s,%s)
""", (saga_id, i, step['name'], idem_key))
# Advance to first step
advance_saga(saga_id)
return saga_id
def advance_saga(saga_id: int):
"""Execute the next pending step. Called after each step completes."""
saga = db.fetchone(
"SELECT * FROM SagaInstance WHERE saga_id=%s FOR UPDATE", (saga_id,)
)
if saga['status'] != 'running':
return
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_index = saga['current_step']
if step_index >= len(steps):
# All steps completed
db.execute("""
UPDATE SagaInstance SET status='completed', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
step_def = steps[step_index]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, step_index))
if step_record['status'] == 'completed':
# Already done (idempotency), advance
db.execute("""
UPDATE SagaInstance SET current_step=%s WHERE saga_id=%s
""", (step_index + 1, saga_id))
advance_saga(saga_id)
return
db.execute("""
UPDATE SagaStepRecord SET status='running', started_at=NOW() WHERE step_record_id=%s
""", (step_record['step_record_id'],))
# Prepare step payload: merge saga payload with results from previous steps
step_results = saga['step_results'] or {}
request_payload = {**saga['payload'], **step_results}
try:
response = _call_service(
service=step_def['service'],
action=step_def['name'],
payload=request_payload,
idempotency_key=step_record['idempotency_key'],
)
new_results = {**step_results, step_def['name']: response}
db.execute("""
UPDATE SagaStepRecord
SET status='completed', response_payload=%s, completed_at=NOW()
WHERE step_record_id=%s
""", (json.dumps(response), step_record['step_record_id']))
db.execute("""
UPDATE SagaInstance SET current_step=%s, step_results=%s WHERE saga_id=%s
""", (step_index + 1, json.dumps(new_results), saga_id))
advance_saga(saga_id)
except ServiceCallError as e:
_begin_compensation(saga_id, step_index, str(e))
def _begin_compensation(saga_id: int, failed_step: int, reason: str):
db.execute("""
UPDATE SagaInstance
SET status='compensating', failed_step=%s, failure_reason=%s
WHERE saga_id=%s
""", (failed_step, reason[:500], saga_id))
db.execute("""
UPDATE SagaStepRecord SET status='failed' WHERE saga_id=%s AND step_index=%s
""", (saga_id, failed_step))
# Compensate completed steps in reverse order
compensate_saga(saga_id, failed_step - 1)
def compensate_saga(saga_id: int, from_step: int):
"""Compensate steps from from_step down to 0."""
if from_step < 0:
db.execute("""
UPDATE SagaInstance SET status='compensated', completed_at=NOW()
WHERE saga_id=%s
""", (saga_id,))
return
saga = db.fetchone("SELECT * FROM SagaInstance WHERE saga_id=%s", (saga_id,))
definition = db.fetchone(
"SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
)
steps = definition['steps']
step_def = steps[from_step]
step_record = db.fetchone("""
SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
""", (saga_id, from_step))
if step_record['status'] != 'completed':
# Step never succeeded — no compensation needed, move to previous
compensate_saga(saga_id, from_step - 1)
return
comp_idem_key = f"{saga_id}:{from_step}:{step_def['name']}:compensate"
db.execute("""
UPDATE SagaStepRecord SET status='compensating' WHERE step_record_id=%s
""", (step_record['step_record_id'],))
try:
_call_service(
service=step_def['service'],
action=step_def['compensate'],
payload=step_record['response_payload'], # pass original response for compensation
idempotency_key=comp_idem_key,
)
db.execute("""
UPDATE SagaStepRecord SET status='compensated', completed_at=NOW()
WHERE step_record_id=%s
""", (step_record['step_record_id'],))
compensate_saga(saga_id, from_step - 1)
except ServiceCallError as e:
# Compensation failed — human intervention required
db.execute("""
UPDATE SagaInstance SET status='failed', failure_reason=%s WHERE saga_id=%s
""", (f"Compensation failed at step {from_step}: {e}", saga_id))
_alert_ops(saga_id, from_step, str(e))
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the difference between saga orchestration and saga choreography?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Choreography: each service listens to events and reacts by performing its action and emitting the next event. No central coordinator. Example: Order service emits OrderPlaced → Inventory service listens, reserves items, emits InventoryReserved → Payment service listens, charges card, emits PaymentCharged → Fulfillment service listens and ships. Advantage: loose coupling, no single point of failure. Disadvantage: the saga flow is implicit — spread across multiple services’ event handlers — making it hard to reason about the overall flow, test, or debug. Orchestration: a central Saga Orchestrator calls each service in sequence and tracks state. The flow is explicit in one place. Advantage: easy to see the full saga state at any time, easier to add retry and compensation logic, simpler testing. Disadvantage: the orchestrator is an additional service that must be maintained. Choose choreography for simple 2–3 step flows; choose orchestration for complex flows with many steps, conditional branching, or strict compensation requirements.”}},{“@type”:”Question”,”name”:”How do you design idempotent compensating transactions?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A compensating transaction must be safe to call multiple times — the orchestrator retries it if the first call times out. Design: (1) reserve_inventory (forward step) → release_inventory (compensation). release_inventory checks: if the reservation (identified by reservation_id from step_results) no longer exists, return success immediately — it was already released. If it exists, release it and return success. (2) charge_payment → refund_payment. refund_payment uses the processor’s idempotency key to avoid double-refunding. (3) create_shipment → cancel_shipment. cancel_shipment checks: if shipment is already cancelled, return success. If it is shipped (too late to cancel), return a special already_shipped error — the orchestrator escalates to manual intervention. The key principle: a compensation that finds "nothing to undo" should succeed silently, not error. Design compensations to be naturally idempotent by always checking current state before acting.”}},{“@type”:”Question”,”name”:”How do you handle a saga step that partially succeeds (some records created, some failed)?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A step that creates 10 records and fails on record 7 leaves 6 orphaned records. The compensation must clean them up, but the step’ response_payload may not include the IDs of the partially created records. Mitigation: (1) make steps atomic where possible — wrap all database operations in a single transaction; if the transaction fails, nothing is committed; (2) for non-atomic steps (calling an external service that creates records one by one): return partial results in the response_payload even on failure (e.g., {‘created_ids’: [1,2,3], ‘failed_at’: 4}) and design the compensation to delete the partial list; (3) use a saga_correlation_id as a foreign key in the created records (step_record.idempotency_key as the correlation key). The compensation queries SELECT * FROM records WHERE saga_idem_key=X and deletes all of them — works even if the response_payload was lost due to a crash.”}},{“@type”:”Question”,”name”:”How do you monitor and alert on stuck or long-running sagas?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A saga stuck in ‘compensating’ status for 30 minutes is a financial or data integrity emergency. Monitoring: (1) alert on sagas where status IN (‘running’,’compensating’) AND started_at < NOW() – INTERVAL ’15 minutes’. For most sagas this should never happen — a 15-minute running saga indicates a service is down or unresponsive. (2) alert on status=’failed’ (compensation itself failed) — these require immediate manual intervention. (3) dashboard: show saga count by status, age distribution, and type. The SLA dashboard should show: running (healthy: <1 min), completed, compensated, failed. A failed saga means data is in a partially inconsistent state across multiple services — it is a P1 incident. (4) runbook: for each saga type, document how to manually complete or compensate a stuck saga, including the exact SQL and service API calls needed. Sagas with no runbook are operational debt.”}},{“@type”:”Question”,”name”:”How does the saga pattern handle eventual consistency vs. strong consistency?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Sagas provide eventual consistency, not strong consistency. During a saga’s execution: inventory is reserved but payment hasn’t been charged yet — a read at this point sees reserved inventory but no payment record. This intermediate state is visible to other operations. Consequences: (1) two users can simultaneously start sagas to purchase the last item in stock — both reserve it, but only one payment saga will succeed; the other must compensate. Use a reservation pattern (reserve inventory at step 1, finalize at step N) to minimize the inconsistency window. (2) a business report run during a saga’s execution may see partial data. For financial reporting, always query completed sagas (status=’completed’) and add manual reconciliation for in-flight sagas. (3) the alternative (distributed 2PC) provides strong consistency but requires all participating services to support prepare/commit/rollback protocols — complex, and creates long-held locks that kill throughput. For most business workflows, saga eventual consistency is acceptable. For money movements requiring zero intermediate inconsistency, use a single-database transaction instead.”}}]}
Saga orchestration and distributed transaction design is discussed in Uber system design interview questions.
Saga orchestration and booking workflow design is covered in Airbnb system design interview preparation.
Saga orchestration and payment workflow coordination design is discussed in Stripe system design interview guide.