Low Level Design: Saga Orchestrator

Saga Orchestrator: Low Level Design

The Saga pattern manages distributed transactions across microservices. An orchestrator-based saga drives each step sequentially and triggers compensating transactions on failure, avoiding two-phase commit while maintaining data consistency.

Architecture Overview

The SagaOrchestrator is a central coordinator: it executes steps one at a time, persists state to survive restarts, and compensates in reverse order on failure.

Core Data Model

SagaInstance Table

CREATE TABLE saga_instances (
    id           UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    saga_type    VARCHAR(100) NOT NULL,
    state        JSONB NOT NULL DEFAULT '{}',
    current_step INT NOT NULL DEFAULT 0,
    status       VARCHAR(20) NOT NULL DEFAULT 'running',
                 -- running | completed | compensating | failed
    created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_saga_status ON saga_instances(status);
CREATE INDEX idx_saga_type   ON saga_instances(saga_type);

SagaEvent Log

CREATE TABLE saga_events (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    saga_id     UUID NOT NULL REFERENCES saga_instances(id),
    step        INT NOT NULL,
    event_type  VARCHAR(100) NOT NULL,
                -- StepStarted | StepCompleted | StepFailed
                -- CompensationStarted | CompensationCompleted
    payload     JSONB NOT NULL DEFAULT '{}',
    occurred_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_saga_events_saga_id ON saga_events(saga_id);

Saga Definition

Sagas are defined as JSON config — each step specifies the service, forward endpoint, and compensating endpoint.

{
  "saga_type": "CreateOrderSaga",
  "steps": [
    {
      "name": "ReserveInventory",
      "service_url": "http://inventory-svc",
      "forward_endpoint": "POST /reservations",
      "compensating_endpoint": "DELETE /reservations/{reservation_id}"
    },
    {
      "name": "ChargePayment",
      "service_url": "http://payment-svc",
      "forward_endpoint": "POST /charges",
      "compensating_endpoint": "POST /refunds"
    },
    {
      "name": "ConfirmOrder",
      "service_url": "http://order-svc",
      "forward_endpoint": "POST /orders/{order_id}/confirm",
      "compensating_endpoint": "POST /orders/{order_id}/cancel"
    }
  ]
}

Orchestrator Implementation

class SagaOrchestrator:
    def start(self, saga_type: str, initial_state: dict) -> UUID:
        definition = self.load_definition(saga_type)
        saga = self.saga_repo.create(
            saga_type=saga_type,
            state=initial_state,
            status='running',
            current_step=0
        )
        self.execute_step(saga, definition)
        return saga.id

    def execute_step(self, saga: SagaInstance, definition: dict):
        step_def = definition['steps'][saga.current_step]
        self.log_event(saga.id, saga.current_step, 'StepStarted', {})

        try:
            result = self.call_service(
                step_def=step_def,
                payload=saga.state,
                # Idempotency: include saga_id + step_index
                headers={
                    'X-Saga-Id': str(saga.id),
                    'X-Saga-Step': str(saga.current_step)
                }
            )
            # Merge step result into saga state
            saga.state.update(result)
            self.log_event(saga.id, saga.current_step, 'StepCompleted', result)

            if saga.current_step + 1 >= len(definition['steps']):
                self.complete_saga(saga)
            else:
                saga.current_step += 1
                self.saga_repo.update(saga)
                self.execute_step(saga, definition)

        except ServiceCallError as e:
            self.log_event(saga.id, saga.current_step, 'StepFailed',
                           {'error': str(e)})
            self.start_compensation(saga, definition)

    def start_compensation(self, saga: SagaInstance, definition: dict):
        saga.status = 'compensating'
        self.saga_repo.update(saga)
        # Compensate in reverse from current_step - 1 down to 0
        for step_idx in range(saga.current_step - 1, -1, -1):
            step_def = definition['steps'][step_idx]
            self.log_event(saga.id, step_idx, 'CompensationStarted', {})
            self.call_compensating(step_def, saga.state, saga.id, step_idx)
            self.log_event(saga.id, step_idx, 'CompensationCompleted', {})

        saga.status = 'failed'
        self.saga_repo.update(saga)

Timeout Detection

The SagaTimeoutWorker scans periodically for sagas stuck beyond the configured threshold and triggers compensation.

class SagaTimeoutWorker:
    TIMEOUT_SECONDS = 30

    def run(self):
        while True:
            stuck_sagas = self.saga_repo.find_stuck(
                status='running',
                older_than=utcnow() - timedelta(seconds=self.TIMEOUT_SECONDS)
            )
            for saga in stuck_sagas:
                definition = self.load_definition(saga.saga_type)
                self.orchestrator.start_compensation(saga, definition)
            time.sleep(5)

Retry with Exponential Backoff

class ServiceCaller:
    MAX_RETRIES = 3
    BASE_DELAY  = 0.5  # seconds

    def call(self, url: str, payload: dict, headers: dict) -> dict:
        for attempt in range(self.MAX_RETRIES):
            try:
                resp = requests.post(url, json=payload, headers=headers,
                                     timeout=10)
                resp.raise_for_status()
                return resp.json()
            except (RequestException, HTTPError) as e:
                if attempt == self.MAX_RETRIES - 1:
                    raise ServiceCallError(str(e))
                delay = self.BASE_DELAY * (2 ** attempt)
                time.sleep(delay)

Idempotency

Every forward and compensating request includes saga_id + step_index in headers. Downstream services use this as an idempotency key, so at-least-once delivery from Kafka reply events is safe.

-- Downstream service idempotency table
CREATE TABLE saga_step_requests (
    saga_id   UUID NOT NULL,
    step_idx  INT NOT NULL,
    result    JSONB,
    PRIMARY KEY (saga_id, step_idx)
);

Key Design Decisions

  • Orchestrator is stateless; all saga state lives in the DB — safe to restart or scale
  • JSON-defined steps make saga topology configurable without code changes
  • Compensation runs in strict reverse order to maintain consistency invariants
  • SagaEvent log provides full audit trail for debugging and replay
  • Timeout worker handles stuck sagas caused by network partitions or downstream crashes

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems

Scroll to Top