Saga Orchestration System Low-Level Design: Distributed Transactions, Compensation, and Idempotent Step Execution

Saga Orchestration System: Low-Level Design

A saga is a sequence of local transactions, each in a separate service, that together implement a distributed business transaction. When any step fails, previously committed steps must be compensated (undone) using compensating transactions. The orchestrator pattern places a central coordinator that calls each service in order, tracks state, and issues compensation calls on failure — unlike the choreography pattern where services emit events and react to each other. This design covers the orchestrator, step state machine, and idempotent compensation.

Core Data Model

CREATE TABLE SagaDefinition (
    saga_type      VARCHAR(100) PRIMARY KEY,
    description    TEXT,
    steps          JSONB NOT NULL,  -- ordered list of step definitions
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- steps example:
-- [{"name":"reserve_inventory","service":"inventory","compensate":"release_inventory"},
--  {"name":"charge_payment","service":"payments","compensate":"refund_payment"},
--  {"name":"create_shipment","service":"fulfillment","compensate":"cancel_shipment"}]

CREATE TABLE SagaInstance (
    saga_id        BIGSERIAL PRIMARY KEY,
    saga_type      VARCHAR(100) NOT NULL,
    correlation_id VARCHAR(200) UNIQUE NOT NULL,  -- e.g., order_id
    status         VARCHAR(20) NOT NULL DEFAULT 'running',
        -- running, completed, compensating, compensated, failed
    current_step   INT NOT NULL DEFAULT 0,
    payload        JSONB NOT NULL,   -- initial saga input
    step_results   JSONB NOT NULL DEFAULT '{}',  -- results from each completed step
    started_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at   TIMESTAMPTZ,
    failed_step    INT,
    failure_reason TEXT
);

CREATE TABLE SagaStepRecord (
    step_record_id  BIGSERIAL PRIMARY KEY,
    saga_id         BIGINT NOT NULL REFERENCES SagaInstance(saga_id),
    step_index      INT NOT NULL,
    step_name       VARCHAR(100) NOT NULL,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, running, completed, compensating, compensated, failed
    request_payload JSONB,
    response_payload JSONB,
    idempotency_key  VARCHAR(200) NOT NULL,
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,
    UNIQUE (saga_id, step_index)
);

CREATE INDEX ON SagaInstance(status) WHERE status IN ('running','compensating');
CREATE INDEX ON SagaInstance(correlation_id);

Orchestrator: Execute Forward Steps

import json, uuid

def start_saga(saga_type: str, correlation_id: str, payload: dict) -> int:
    definition = db.fetchone(
        "SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga_type,)
    )
    if not definition:
        raise ValueError(f"Unknown saga type: {saga_type}")

    saga_id = db.fetchone("""
        INSERT INTO SagaInstance (saga_type, correlation_id, payload)
        VALUES (%s,%s,%s) RETURNING saga_id
    """, (saga_type, correlation_id, json.dumps(payload)))['saga_id']

    # Initialize all step records
    steps = definition['steps']
    for i, step in enumerate(steps):
        idem_key = f"{saga_id}:{i}:{step['name']}:forward"
        db.execute("""
            INSERT INTO SagaStepRecord (saga_id, step_index, step_name, idempotency_key)
            VALUES (%s,%s,%s,%s)
        """, (saga_id, i, step['name'], idem_key))

    # Advance to first step
    advance_saga(saga_id)
    return saga_id

def advance_saga(saga_id: int):
    """Execute the next pending step. Called after each step completes."""
    saga = db.fetchone(
        "SELECT * FROM SagaInstance WHERE saga_id=%s FOR UPDATE", (saga_id,)
    )
    if saga['status'] != 'running':
        return

    definition = db.fetchone(
        "SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
    )
    steps = definition['steps']
    step_index = saga['current_step']

    if step_index >= len(steps):
        # All steps completed
        db.execute("""
            UPDATE SagaInstance SET status='completed', completed_at=NOW()
            WHERE saga_id=%s
        """, (saga_id,))
        return

    step_def = steps[step_index]
    step_record = db.fetchone("""
        SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
    """, (saga_id, step_index))

    if step_record['status'] == 'completed':
        # Already done (idempotency), advance
        db.execute("""
            UPDATE SagaInstance SET current_step=%s WHERE saga_id=%s
        """, (step_index + 1, saga_id))
        advance_saga(saga_id)
        return

    db.execute("""
        UPDATE SagaStepRecord SET status='running', started_at=NOW() WHERE step_record_id=%s
    """, (step_record['step_record_id'],))

    # Prepare step payload: merge saga payload with results from previous steps
    step_results = saga['step_results'] or {}
    request_payload = {**saga['payload'], **step_results}

    try:
        response = _call_service(
            service=step_def['service'],
            action=step_def['name'],
            payload=request_payload,
            idempotency_key=step_record['idempotency_key'],
        )
        new_results = {**step_results, step_def['name']: response}
        db.execute("""
            UPDATE SagaStepRecord
            SET status='completed', response_payload=%s, completed_at=NOW()
            WHERE step_record_id=%s
        """, (json.dumps(response), step_record['step_record_id']))
        db.execute("""
            UPDATE SagaInstance SET current_step=%s, step_results=%s WHERE saga_id=%s
        """, (step_index + 1, json.dumps(new_results), saga_id))
        advance_saga(saga_id)

    except ServiceCallError as e:
        _begin_compensation(saga_id, step_index, str(e))

Compensation: Roll Back Committed Steps

def _begin_compensation(saga_id: int, failed_step: int, reason: str):
    db.execute("""
        UPDATE SagaInstance
        SET status='compensating', failed_step=%s, failure_reason=%s
        WHERE saga_id=%s
    """, (failed_step, reason[:500], saga_id))
    db.execute("""
        UPDATE SagaStepRecord SET status='failed' WHERE saga_id=%s AND step_index=%s
    """, (saga_id, failed_step))

    # Compensate completed steps in reverse order
    compensate_saga(saga_id, failed_step - 1)

def compensate_saga(saga_id: int, from_step: int):
    """Compensate steps from from_step down to 0."""
    if from_step < 0:
        db.execute("""
            UPDATE SagaInstance SET status='compensated', completed_at=NOW()
            WHERE saga_id=%s
        """, (saga_id,))
        return

    saga = db.fetchone("SELECT * FROM SagaInstance WHERE saga_id=%s", (saga_id,))
    definition = db.fetchone(
        "SELECT steps FROM SagaDefinition WHERE saga_type=%s", (saga['saga_type'],)
    )
    steps = definition['steps']
    step_def = steps[from_step]

    step_record = db.fetchone("""
        SELECT * FROM SagaStepRecord WHERE saga_id=%s AND step_index=%s
    """, (saga_id, from_step))

    if step_record['status'] != 'completed':
        # Step never succeeded — no compensation needed, move to previous
        compensate_saga(saga_id, from_step - 1)
        return

    comp_idem_key = f"{saga_id}:{from_step}:{step_def['name']}:compensate"
    db.execute("""
        UPDATE SagaStepRecord SET status='compensating' WHERE step_record_id=%s
    """, (step_record['step_record_id'],))

    try:
        _call_service(
            service=step_def['service'],
            action=step_def['compensate'],
            payload=step_record['response_payload'],  # pass original response for compensation
            idempotency_key=comp_idem_key,
        )
        db.execute("""
            UPDATE SagaStepRecord SET status='compensated', completed_at=NOW()
            WHERE step_record_id=%s
        """, (step_record['step_record_id'],))
        compensate_saga(saga_id, from_step - 1)

    except ServiceCallError as e:
        # Compensation failed — human intervention required
        db.execute("""
            UPDATE SagaInstance SET status='failed', failure_reason=%s WHERE saga_id=%s
        """, (f"Compensation failed at step {from_step}: {e}", saga_id))
        _alert_ops(saga_id, from_step, str(e))

Key Design Decisions

  • Idempotency keys for every step and compensation: the orchestrator retries failed service calls. Without idempotency, retrying “charge_payment” charges the customer twice. Each forward step and each compensation step has a unique, deterministic idempotency key (saga_id:step_index:action). The called service uses this key to deduplicate: if the same key has been processed, return the cached response.
  • Compensation runs in reverse order: step 3 failed after steps 1 and 2 succeeded. Compensating in reverse (2 → 1) ensures dependencies are preserved — if step 2 created a shipment that references the inventory reservation from step 1, the shipment must be cancelled before the reservation is released.
  • step_results carries context forward: the response from step 1 (e.g., reservation_id) is passed into the payload for step 2. This allows each service to reference artifacts created by previous steps without a separate lookup. The same step_results are passed to compensation handlers — the inventory service receives the reservation_id it originally returned, so it can locate and release the correct reservation.
  • Compensation failures escalate to ops: if a compensation call fails (the payment service is down during rollback), the saga enters status=’failed’ and alerts the operations team. This is the “stuck saga” scenario — it requires manual intervention. Maintain an SLA for stuck saga resolution (e.g., P2 alert if a saga is in ‘failed’ for more than 15 minutes).

Saga orchestration and distributed transaction design is discussed in Uber system design interview questions.

Saga orchestration and booking workflow design is covered in Airbnb system design interview preparation.

Saga orchestration and payment workflow coordination design is discussed in Stripe system design interview guide.

Scroll to Top