Designing a distributed task scheduler — like AWS Step Functions, Airflow, Celery, or cron at scale — tests your ability to handle reliable job execution, failure recovery, deduplication, and priority queuing. This problem appears at Airbnb, Uber, LinkedIn, and infrastructure-focused companies.
Requirements Clarification
- Scale: 10M tasks/day, 100k concurrent workers, tasks range from seconds to hours
- Features: One-time tasks, cron-style recurring tasks, task dependencies (DAGs), retries, priority
- Reliability: At-least-once delivery with deduplication for idempotent tasks; exactly-once for financial tasks
- Visibility: Real-time job status, logs, alerting on failures
- Latency: Tasks start within 1 second of scheduled time for high-priority; 30s for normal
Core Architecture
"""
Components:
Scheduler: converts cron expressions to next-run timestamps; enqueues tasks
Queue: holds ready-to-run tasks with priority and visibility timeout
Workers: poll queue, execute tasks, report results
State DB: persists task definitions, run history, status
Heartbeat: workers report health; scheduler detects dead workers and re-enqueues
Data flow:
Task definition (cron: "0 * * * *") → Scheduler polls DB every minute
→ computes next_run, checks if past due → enqueues to priority queue
→ Worker picks up → sets task status=RUNNING + heartbeat TTL
→ Executes → success: status=DONE | failure: retry logic → status=FAILED/RETRIED
"""
Task Model and State Machine
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Dict
from datetime import datetime, timedelta
class TaskStatus(Enum):
PENDING = "pending" # Defined but not yet queued
QUEUED = "queued" # In queue, waiting for worker
RUNNING = "running" # Worker picked up, executing
COMPLETED = "completed" # Finished successfully
FAILED = "failed" # Exhausted retries
RETRYING = "retrying" # Failed once, will retry
CANCELLED = "cancelled" # Explicitly cancelled
@dataclass
class TaskDefinition:
"""Persisted in database — describes what to run and when."""
id: str
name: str
handler: str # "module.function" to invoke
cron: Optional[str] = None # "0 9 * * 1-5" = weekdays 9am
payload: dict = field(default_factory=dict)
max_retries: int = 3
retry_delay_s: int = 60 # Base delay; actual = delay * 2^attempt
timeout_s: int = 3600 # Task killed after this many seconds
priority: int = 5 # 1=highest, 10=lowest
depends_on: List[str] = field(default_factory=list) # Task IDs for DAG
idempotency_key: Optional[str] = None # Dedup key
@dataclass
class TaskRun:
"""One execution of a TaskDefinition."""
run_id: str
task_id: str
attempt: int
status: TaskStatus
scheduled_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
worker_id: Optional[str] = None
error: Optional[str] = None
next_run_at: Optional[datetime] = None # For cron tasks
class TaskStateMachine:
VALID_TRANSITIONS = {
TaskStatus.PENDING: {TaskStatus.QUEUED, TaskStatus.CANCELLED},
TaskStatus.QUEUED: {TaskStatus.RUNNING, TaskStatus.CANCELLED},
TaskStatus.RUNNING: {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.RETRYING},
TaskStatus.RETRYING: {TaskStatus.QUEUED},
TaskStatus.COMPLETED: set(),
TaskStatus.FAILED: set(),
TaskStatus.CANCELLED: set(),
}
@classmethod
def transition(cls, current: TaskStatus, next_status: TaskStatus) -> bool:
if next_status not in cls.VALID_TRANSITIONS.get(current, set()):
raise ValueError(f"Invalid transition: {current} -> {next_status}")
return True
Priority Queue with Visibility Timeout
import redis
import json
import time
import uuid
from typing import Optional
class TaskQueue:
"""
Redis-based priority queue with visibility timeout (like SQS).
Visibility timeout prevents duplicate processing:
Worker picks task → task hidden from others for N seconds.
If worker dies, task reappears after timeout for another worker.
"""
QUEUE_KEY = "tasks:queue" # Sorted set: score = priority*1e12 + run_at
RUNNING_KEY = "tasks:running" # Hash: run_id -> {task, expires_at, worker_id}
VISIBILITY_TIMEOUT = 300 # 5 minutes
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def enqueue(self, task_run: TaskRun, run_at: float = None) -> str:
run_at = run_at or time.time()
# Score: lower = higher priority; tasks in future have higher score
score = task_run.task_definition.priority * 1e12 + run_at
self.redis.zadd(
self.QUEUE_KEY,
{json.dumps({"run_id": task_run.run_id, "task_id": task_run.task_id}): score}
)
return task_run.run_id
def dequeue(self, worker_id: str) -> Optional[dict]:
"""
Atomically move task from queue to running set.
Use Lua script for atomicity.
"""
now = time.time()
# Get highest-priority task that is ready (score <= now threshold)
results = self.redis.zrangebyscore(
self.QUEUE_KEY, "-inf", f"1e12:{now}", start=0, num=1, withscores=True
)
if not results:
return None
raw, score = results[0]
item = json.loads(raw)
# Move to running set with expiry
pipe = self.redis.pipeline(transaction=True)
pipe.zrem(self.QUEUE_KEY, raw)
expires_at = now + self.VISIBILITY_TIMEOUT
pipe.hset(self.RUNNING_KEY, item["run_id"], json.dumps({
**item, "worker_id": worker_id, "expires_at": expires_at
}))
pipe.execute()
return item
def complete(self, run_id: str):
"""Remove from running set on successful completion."""
self.redis.hdel(self.RUNNING_KEY, run_id)
def recover_stalled(self):
"""
Re-enqueue tasks whose visibility timeout expired (worker died).
Call periodically from scheduler process.
"""
now = time.time()
all_running = self.redis.hgetall(self.RUNNING_KEY)
recovered = 0
for run_id, raw in all_running.items():
item = json.loads(raw)
if item["expires_at"] < now:
# Worker died — re-enqueue with slight delay
self.redis.hdel(self.RUNNING_KEY, run_id)
self.redis.zadd(
self.QUEUE_KEY,
{json.dumps({"run_id": run_id, "task_id": item["task_id"]}): now + 10}
)
recovered += 1
return recovered
Cron Expression Parsing and Scheduling
from croniter import croniter # pip install croniter
from datetime import datetime
import pytz
class CronScheduler:
def __init__(self, db, queue: TaskQueue, timezone: str = "UTC"):
self.db = db
self.queue = queue
self.tz = pytz.timezone(timezone)
def compute_next_run(self, cron_expr: str, after: datetime = None) -> datetime:
"""Compute next run time for a cron expression."""
after = after or datetime.now(self.tz)
cron = croniter(cron_expr, after)
return cron.get_next(datetime)
def tick(self):
"""
Called every second by scheduler loop.
Find all tasks due in the next window, enqueue them.
"""
now = datetime.now(self.tz)
window_end = now + timedelta(seconds=5) # Look ahead 5 seconds
due_tasks = self.db.get_tasks_due_before(window_end)
for task_def in due_tasks:
# Idempotency check: skip if already queued for this scheduled time
if self.db.run_exists_for_time(task_def.id, task_def.next_run_at):
continue
run = TaskRun(
run_id=str(uuid.uuid4()),
task_id=task_def.id,
attempt=0,
status=TaskStatus.QUEUED,
scheduled_at=task_def.next_run_at,
)
self.db.create_run(run)
self.queue.enqueue(run, run_at=task_def.next_run_at.timestamp())
# Update next_run_at in DB
if task_def.cron:
next_run = self.compute_next_run(task_def.cron, after=task_def.next_run_at)
self.db.update_next_run(task_def.id, next_run)
Worker Implementation
import importlib
import logging
import signal
import time
class Worker:
def __init__(self, worker_id: str, queue: TaskQueue, db, heartbeat_interval: int = 30):
self.worker_id = worker_id
self.queue = queue
self.db = db
self.heartbeat_interval = heartbeat_interval
self.running = True
self.current_run_id = None
signal.signal(signal.SIGTERM, self._handle_shutdown)
def run(self):
"""Main worker loop: poll queue, execute tasks."""
logging.info(f"Worker {self.worker_id} started")
while self.running:
item = self.queue.dequeue(self.worker_id)
if not item:
time.sleep(1)
continue
self.current_run_id = item["run_id"]
self._execute_task(item)
self.current_run_id = None
def _execute_task(self, item: dict):
run_id = item["run_id"]
task_id = item["task_id"]
task_def = self.db.get_task_definition(task_id)
run = self.db.get_run(run_id)
self.db.update_run(run_id, status=TaskStatus.RUNNING,
started_at=datetime.utcnow(), worker_id=self.worker_id)
try:
# Resolve handler: "myapp.tasks.send_email"
module_name, fn_name = task_def.handler.rsplit(".", 1)
module = importlib.import_module(module_name)
handler = getattr(module, fn_name)
# Execute with timeout
result = self._run_with_timeout(handler, task_def.payload, task_def.timeout_s)
self.queue.complete(run_id)
self.db.update_run(run_id, status=TaskStatus.COMPLETED,
completed_at=datetime.utcnow())
logging.info(f"Task {task_id} run {run_id} completed")
except TimeoutError:
self._handle_failure(run, task_def, "Task timed out")
except Exception as e:
self._handle_failure(run, task_def, str(e))
def _handle_failure(self, run: TaskRun, task_def: TaskDefinition, error: str):
if run.attempt < task_def.max_retries:
delay = task_def.retry_delay_s * (2 ** run.attempt) # Exponential backoff
retry_at = time.time() + delay
self.db.update_run(run.run_id, status=TaskStatus.RETRYING, error=error)
# Create new run for retry
retry_run = TaskRun(
run_id=str(uuid.uuid4()),
task_id=task_def.id,
attempt=run.attempt + 1,
status=TaskStatus.QUEUED,
scheduled_at=datetime.fromtimestamp(retry_at),
)
self.db.create_run(retry_run)
self.queue.enqueue(retry_run, run_at=retry_at)
else:
self.db.update_run(run.run_id, status=TaskStatus.FAILED, error=error)
self.queue.complete(run.run_id) # Remove from running set
self._alert_failure(run.run_id, error)
def _handle_shutdown(self, sig, frame):
logging.info(f"Worker {self.worker_id} shutting down gracefully")
self.running = False
def _run_with_timeout(self, fn, payload: dict, timeout_s: int):
# Production: use multiprocessing with join(timeout) for true isolation
import threading
result = [None]
exception = [None]
def target():
try:
result[0] = fn(**payload)
except Exception as e:
exception[0] = e
t = threading.Thread(target=target)
t.start()
t.join(timeout=timeout_s)
if t.is_alive():
raise TimeoutError(f"Task exceeded {timeout_s}s timeout")
if exception[0]:
raise exception[0]
return result[0]
Key Design Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Queue backend | Redis sorted set | O(log n) enqueue/dequeue; TTL-based visibility; sub-ms latency |
| Exactly-once | Idempotency key + DB unique constraint | Retry storms create duplicates; idempotency key deduplicates |
| Dead task recovery | Visibility timeout + scheduler scan | Worker crash: task reappears after timeout for another worker |
| Cron accuracy | 1s scheduler tick + next_run_at DB column | Scheduler is single leader (Paxos/Raft lock); idempotent enqueue |
| Persistence | PostgreSQL for task defs + run history | ACID for state transitions; audit log |
| Alerting | PagerDuty/OpsGenie on repeated failure | Alert after max_retries exhausted; include last error and logs link |
Companies That Ask This System Design Question
This problem commonly appears in interviews at:
See our company interview guides for full interview process, compensation, and preparation tips.
Frequently Asked Questions
How do you ensure exactly-once execution in a distributed task scheduler?
True exactly-once is hard. The practical approach is at-least-once delivery (with deduplication): use an idempotency key per task execution, check a database unique constraint before processing, and design task handlers to be idempotent. Visibility timeouts prevent the same task from being picked up by two workers simultaneously.
What is a visibility timeout in task queues?
A visibility timeout hides a task from other workers after it is dequeued. If the worker completes the task, it deletes the message. If the worker crashes, the timeout expires and the task reappears for another worker. AWS SQS uses this pattern with a default 30-second visibility timeout.
How does Celery handle task retries?
Celery supports automatic retries with configurable max_retries and countdown (delay). On failure, the task is re-queued with an exponential backoff delay. Failed tasks beyond max_retries go to a dead letter queue for manual inspection.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do you ensure exactly-once execution in a distributed task scheduler?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “True exactly-once is hard. The practical approach is at-least-once delivery (with deduplication): use an idempotency key per task execution, check a database unique constraint before processing, and design task handlers to be idempotent. Visibility timeouts prevent the same task from being picked up by two workers simultaneously.”
}
},
{
“@type”: “Question”,
“name”: “What is a visibility timeout in task queues?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A visibility timeout hides a task from other workers after it is dequeued. If the worker completes the task, it deletes the message. If the worker crashes, the timeout expires and the task reappears for another worker. AWS SQS uses this pattern with a default 30-second visibility timeout.”
}
},
{
“@type”: “Question”,
“name”: “How does Celery handle task retries?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Celery supports automatic retries with configurable max_retries and countdown (delay). On failure, the task is re-queued with an exponential backoff delay. Failed tasks beyond max_retries go to a dead letter queue for manual inspection.”
}
}
]
}