Saga Orchestrator: Low Level Design
The Saga pattern manages distributed transactions across microservices. An orchestrator-based saga drives each step sequentially and triggers compensating transactions on failure, avoiding two-phase commit while maintaining data consistency.
Architecture Overview
The SagaOrchestrator is a central coordinator: it executes steps one at a time, persists state to survive restarts, and compensates in reverse order on failure.
Core Data Model
SagaInstance Table
CREATE TABLE saga_instances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_type VARCHAR(100) NOT NULL,
state JSONB NOT NULL DEFAULT '{}',
current_step INT NOT NULL DEFAULT 0,
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- running | completed | compensating | failed
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_saga_status ON saga_instances(status);
CREATE INDEX idx_saga_type ON saga_instances(saga_type);
SagaEvent Log
CREATE TABLE saga_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_id UUID NOT NULL REFERENCES saga_instances(id),
step INT NOT NULL,
event_type VARCHAR(100) NOT NULL,
-- StepStarted | StepCompleted | StepFailed
-- CompensationStarted | CompensationCompleted
payload JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_saga_events_saga_id ON saga_events(saga_id);
Saga Definition
Sagas are defined as JSON config — each step specifies the service, forward endpoint, and compensating endpoint.
{
"saga_type": "CreateOrderSaga",
"steps": [
{
"name": "ReserveInventory",
"service_url": "http://inventory-svc",
"forward_endpoint": "POST /reservations",
"compensating_endpoint": "DELETE /reservations/{reservation_id}"
},
{
"name": "ChargePayment",
"service_url": "http://payment-svc",
"forward_endpoint": "POST /charges",
"compensating_endpoint": "POST /refunds"
},
{
"name": "ConfirmOrder",
"service_url": "http://order-svc",
"forward_endpoint": "POST /orders/{order_id}/confirm",
"compensating_endpoint": "POST /orders/{order_id}/cancel"
}
]
}
Orchestrator Implementation
class SagaOrchestrator:
def start(self, saga_type: str, initial_state: dict) -> UUID:
definition = self.load_definition(saga_type)
saga = self.saga_repo.create(
saga_type=saga_type,
state=initial_state,
status='running',
current_step=0
)
self.execute_step(saga, definition)
return saga.id
def execute_step(self, saga: SagaInstance, definition: dict):
step_def = definition['steps'][saga.current_step]
self.log_event(saga.id, saga.current_step, 'StepStarted', {})
try:
result = self.call_service(
step_def=step_def,
payload=saga.state,
# Idempotency: include saga_id + step_index
headers={
'X-Saga-Id': str(saga.id),
'X-Saga-Step': str(saga.current_step)
}
)
# Merge step result into saga state
saga.state.update(result)
self.log_event(saga.id, saga.current_step, 'StepCompleted', result)
if saga.current_step + 1 >= len(definition['steps']):
self.complete_saga(saga)
else:
saga.current_step += 1
self.saga_repo.update(saga)
self.execute_step(saga, definition)
except ServiceCallError as e:
self.log_event(saga.id, saga.current_step, 'StepFailed',
{'error': str(e)})
self.start_compensation(saga, definition)
def start_compensation(self, saga: SagaInstance, definition: dict):
saga.status = 'compensating'
self.saga_repo.update(saga)
# Compensate in reverse from current_step - 1 down to 0
for step_idx in range(saga.current_step - 1, -1, -1):
step_def = definition['steps'][step_idx]
self.log_event(saga.id, step_idx, 'CompensationStarted', {})
self.call_compensating(step_def, saga.state, saga.id, step_idx)
self.log_event(saga.id, step_idx, 'CompensationCompleted', {})
saga.status = 'failed'
self.saga_repo.update(saga)
Timeout Detection
The SagaTimeoutWorker scans periodically for sagas stuck beyond the configured threshold and triggers compensation.
class SagaTimeoutWorker:
TIMEOUT_SECONDS = 30
def run(self):
while True:
stuck_sagas = self.saga_repo.find_stuck(
status='running',
older_than=utcnow() - timedelta(seconds=self.TIMEOUT_SECONDS)
)
for saga in stuck_sagas:
definition = self.load_definition(saga.saga_type)
self.orchestrator.start_compensation(saga, definition)
time.sleep(5)
Retry with Exponential Backoff
class ServiceCaller:
MAX_RETRIES = 3
BASE_DELAY = 0.5 # seconds
def call(self, url: str, payload: dict, headers: dict) -> dict:
for attempt in range(self.MAX_RETRIES):
try:
resp = requests.post(url, json=payload, headers=headers,
timeout=10)
resp.raise_for_status()
return resp.json()
except (RequestException, HTTPError) as e:
if attempt == self.MAX_RETRIES - 1:
raise ServiceCallError(str(e))
delay = self.BASE_DELAY * (2 ** attempt)
time.sleep(delay)
Idempotency
Every forward and compensating request includes saga_id + step_index in headers. Downstream services use this as an idempotency key, so at-least-once delivery from Kafka reply events is safe.
-- Downstream service idempotency table
CREATE TABLE saga_step_requests (
saga_id UUID NOT NULL,
step_idx INT NOT NULL,
result JSONB,
PRIMARY KEY (saga_id, step_idx)
);
Key Design Decisions
- Orchestrator is stateless; all saga state lives in the DB — safe to restart or scale
- JSON-defined steps make saga topology configurable without code changes
- Compensation runs in strict reverse order to maintain consistency invariants
- SagaEvent log provides full audit trail for debugging and replay
- Timeout worker handles stuck sagas caused by network partitions or downstream crashes
See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems