System Design Interview: Distributed Task Scheduler (Cron at Scale)

Designing a distributed task scheduler — like AWS Step Functions, Airflow, Celery, or cron at scale — tests your ability to handle reliable job execution, failure recovery, deduplication, and priority queuing. This problem appears at Airbnb, Uber, LinkedIn, and infrastructure-focused companies.

Requirements Clarification

  • Scale: 10M tasks/day, 100k concurrent workers, tasks range from seconds to hours
  • Features: One-time tasks, cron-style recurring tasks, task dependencies (DAGs), retries, priority
  • Reliability: At-least-once delivery with deduplication for idempotent tasks; exactly-once for financial tasks
  • Visibility: Real-time job status, logs, alerting on failures
  • Latency: Tasks start within 1 second of scheduled time for high-priority; 30s for normal

Core Architecture

"""
Components:
  Scheduler:  converts cron expressions to next-run timestamps; enqueues tasks
  Queue:      holds ready-to-run tasks with priority and visibility timeout
  Workers:    poll queue, execute tasks, report results
  State DB:   persists task definitions, run history, status
  Heartbeat:  workers report health; scheduler detects dead workers and re-enqueues

Data flow:
  Task definition (cron: "0 * * * *") → Scheduler polls DB every minute
  → computes next_run, checks if past due → enqueues to priority queue
  → Worker picks up → sets task status=RUNNING + heartbeat TTL
  → Executes → success: status=DONE | failure: retry logic → status=FAILED/RETRIED
"""

Task Model and State Machine

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Dict
from datetime import datetime, timedelta

class TaskStatus(Enum):
    PENDING   = "pending"    # Defined but not yet queued
    QUEUED    = "queued"     # In queue, waiting for worker
    RUNNING   = "running"    # Worker picked up, executing
    COMPLETED = "completed"  # Finished successfully
    FAILED    = "failed"     # Exhausted retries
    RETRYING  = "retrying"   # Failed once, will retry
    CANCELLED = "cancelled"  # Explicitly cancelled

@dataclass
class TaskDefinition:
    """Persisted in database — describes what to run and when."""
    id:             str
    name:           str
    handler:        str          # "module.function" to invoke
    cron:           Optional[str] = None  # "0 9 * * 1-5" = weekdays 9am
    payload:        dict = field(default_factory=dict)
    max_retries:    int = 3
    retry_delay_s:  int = 60     # Base delay; actual = delay * 2^attempt
    timeout_s:      int = 3600   # Task killed after this many seconds
    priority:       int = 5      # 1=highest, 10=lowest
    depends_on:     List[str] = field(default_factory=list)  # Task IDs for DAG
    idempotency_key: Optional[str] = None  # Dedup key

@dataclass
class TaskRun:
    """One execution of a TaskDefinition."""
    run_id:      str
    task_id:     str
    attempt:     int
    status:      TaskStatus
    scheduled_at: datetime
    started_at:  Optional[datetime] = None
    completed_at: Optional[datetime] = None
    worker_id:   Optional[str] = None
    error:       Optional[str] = None
    next_run_at: Optional[datetime] = None  # For cron tasks

class TaskStateMachine:
    VALID_TRANSITIONS = {
        TaskStatus.PENDING:   {TaskStatus.QUEUED, TaskStatus.CANCELLED},
        TaskStatus.QUEUED:    {TaskStatus.RUNNING, TaskStatus.CANCELLED},
        TaskStatus.RUNNING:   {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.RETRYING},
        TaskStatus.RETRYING:  {TaskStatus.QUEUED},
        TaskStatus.COMPLETED: set(),
        TaskStatus.FAILED:    set(),
        TaskStatus.CANCELLED: set(),
    }

    @classmethod
    def transition(cls, current: TaskStatus, next_status: TaskStatus) -> bool:
        if next_status not in cls.VALID_TRANSITIONS.get(current, set()):
            raise ValueError(f"Invalid transition: {current} -> {next_status}")
        return True

Priority Queue with Visibility Timeout

import redis
import json
import time
import uuid
from typing import Optional

class TaskQueue:
    """
    Redis-based priority queue with visibility timeout (like SQS).
    Visibility timeout prevents duplicate processing:
      Worker picks task → task hidden from others for N seconds.
      If worker dies, task reappears after timeout for another worker.
    """
    QUEUE_KEY  = "tasks:queue"     # Sorted set: score = priority*1e12 + run_at
    RUNNING_KEY = "tasks:running"  # Hash: run_id -> {task, expires_at, worker_id}
    VISIBILITY_TIMEOUT = 300       # 5 minutes

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def enqueue(self, task_run: TaskRun, run_at: float = None) -> str:
        run_at = run_at or time.time()
        # Score: lower = higher priority; tasks in future have higher score
        score = task_run.task_definition.priority * 1e12 + run_at

        self.redis.zadd(
            self.QUEUE_KEY,
            {json.dumps({"run_id": task_run.run_id, "task_id": task_run.task_id}): score}
        )
        return task_run.run_id

    def dequeue(self, worker_id: str) -> Optional[dict]:
        """
        Atomically move task from queue to running set.
        Use Lua script for atomicity.
        """
        now = time.time()

        # Get highest-priority task that is ready (score <= now threshold)
        results = self.redis.zrangebyscore(
            self.QUEUE_KEY, "-inf", f"1e12:{now}", start=0, num=1, withscores=True
        )
        if not results:
            return None

        raw, score = results[0]
        item = json.loads(raw)

        # Move to running set with expiry
        pipe = self.redis.pipeline(transaction=True)
        pipe.zrem(self.QUEUE_KEY, raw)
        expires_at = now + self.VISIBILITY_TIMEOUT
        pipe.hset(self.RUNNING_KEY, item["run_id"], json.dumps({
            **item, "worker_id": worker_id, "expires_at": expires_at
        }))
        pipe.execute()

        return item

    def complete(self, run_id: str):
        """Remove from running set on successful completion."""
        self.redis.hdel(self.RUNNING_KEY, run_id)

    def recover_stalled(self):
        """
        Re-enqueue tasks whose visibility timeout expired (worker died).
        Call periodically from scheduler process.
        """
        now = time.time()
        all_running = self.redis.hgetall(self.RUNNING_KEY)
        recovered = 0

        for run_id, raw in all_running.items():
            item = json.loads(raw)
            if item["expires_at"] < now:
                # Worker died — re-enqueue with slight delay
                self.redis.hdel(self.RUNNING_KEY, run_id)
                self.redis.zadd(
                    self.QUEUE_KEY,
                    {json.dumps({"run_id": run_id, "task_id": item["task_id"]}): now + 10}
                )
                recovered += 1

        return recovered

Cron Expression Parsing and Scheduling

from croniter import croniter  # pip install croniter
from datetime import datetime
import pytz

class CronScheduler:
    def __init__(self, db, queue: TaskQueue, timezone: str = "UTC"):
        self.db = db
        self.queue = queue
        self.tz = pytz.timezone(timezone)

    def compute_next_run(self, cron_expr: str, after: datetime = None) -> datetime:
        """Compute next run time for a cron expression."""
        after = after or datetime.now(self.tz)
        cron = croniter(cron_expr, after)
        return cron.get_next(datetime)

    def tick(self):
        """
        Called every second by scheduler loop.
        Find all tasks due in the next window, enqueue them.
        """
        now = datetime.now(self.tz)
        window_end = now + timedelta(seconds=5)  # Look ahead 5 seconds

        due_tasks = self.db.get_tasks_due_before(window_end)

        for task_def in due_tasks:
            # Idempotency check: skip if already queued for this scheduled time
            if self.db.run_exists_for_time(task_def.id, task_def.next_run_at):
                continue

            run = TaskRun(
                run_id=str(uuid.uuid4()),
                task_id=task_def.id,
                attempt=0,
                status=TaskStatus.QUEUED,
                scheduled_at=task_def.next_run_at,
            )
            self.db.create_run(run)
            self.queue.enqueue(run, run_at=task_def.next_run_at.timestamp())

            # Update next_run_at in DB
            if task_def.cron:
                next_run = self.compute_next_run(task_def.cron, after=task_def.next_run_at)
                self.db.update_next_run(task_def.id, next_run)

Worker Implementation

import importlib
import logging
import signal
import time

class Worker:
    def __init__(self, worker_id: str, queue: TaskQueue, db, heartbeat_interval: int = 30):
        self.worker_id = worker_id
        self.queue = queue
        self.db = db
        self.heartbeat_interval = heartbeat_interval
        self.running = True
        self.current_run_id = None
        signal.signal(signal.SIGTERM, self._handle_shutdown)

    def run(self):
        """Main worker loop: poll queue, execute tasks."""
        logging.info(f"Worker {self.worker_id} started")
        while self.running:
            item = self.queue.dequeue(self.worker_id)
            if not item:
                time.sleep(1)
                continue

            self.current_run_id = item["run_id"]
            self._execute_task(item)
            self.current_run_id = None

    def _execute_task(self, item: dict):
        run_id  = item["run_id"]
        task_id = item["task_id"]
        task_def = self.db.get_task_definition(task_id)
        run      = self.db.get_run(run_id)

        self.db.update_run(run_id, status=TaskStatus.RUNNING,
                           started_at=datetime.utcnow(), worker_id=self.worker_id)

        try:
            # Resolve handler: "myapp.tasks.send_email"
            module_name, fn_name = task_def.handler.rsplit(".", 1)
            module = importlib.import_module(module_name)
            handler = getattr(module, fn_name)

            # Execute with timeout
            result = self._run_with_timeout(handler, task_def.payload, task_def.timeout_s)
            self.queue.complete(run_id)
            self.db.update_run(run_id, status=TaskStatus.COMPLETED,
                               completed_at=datetime.utcnow())
            logging.info(f"Task {task_id} run {run_id} completed")

        except TimeoutError:
            self._handle_failure(run, task_def, "Task timed out")
        except Exception as e:
            self._handle_failure(run, task_def, str(e))

    def _handle_failure(self, run: TaskRun, task_def: TaskDefinition, error: str):
        if run.attempt < task_def.max_retries:
            delay = task_def.retry_delay_s * (2 ** run.attempt)  # Exponential backoff
            retry_at = time.time() + delay
            self.db.update_run(run.run_id, status=TaskStatus.RETRYING, error=error)
            # Create new run for retry
            retry_run = TaskRun(
                run_id=str(uuid.uuid4()),
                task_id=task_def.id,
                attempt=run.attempt + 1,
                status=TaskStatus.QUEUED,
                scheduled_at=datetime.fromtimestamp(retry_at),
            )
            self.db.create_run(retry_run)
            self.queue.enqueue(retry_run, run_at=retry_at)
        else:
            self.db.update_run(run.run_id, status=TaskStatus.FAILED, error=error)
            self.queue.complete(run.run_id)  # Remove from running set
            self._alert_failure(run.run_id, error)

    def _handle_shutdown(self, sig, frame):
        logging.info(f"Worker {self.worker_id} shutting down gracefully")
        self.running = False

    def _run_with_timeout(self, fn, payload: dict, timeout_s: int):
        # Production: use multiprocessing with join(timeout) for true isolation
        import threading
        result = [None]
        exception = [None]

        def target():
            try:
                result[0] = fn(**payload)
            except Exception as e:
                exception[0] = e

        t = threading.Thread(target=target)
        t.start()
        t.join(timeout=timeout_s)
        if t.is_alive():
            raise TimeoutError(f"Task exceeded {timeout_s}s timeout")
        if exception[0]:
            raise exception[0]
        return result[0]

Key Design Decisions

Decision Choice Rationale
Queue backend Redis sorted set O(log n) enqueue/dequeue; TTL-based visibility; sub-ms latency
Exactly-once Idempotency key + DB unique constraint Retry storms create duplicates; idempotency key deduplicates
Dead task recovery Visibility timeout + scheduler scan Worker crash: task reappears after timeout for another worker
Cron accuracy 1s scheduler tick + next_run_at DB column Scheduler is single leader (Paxos/Raft lock); idempotent enqueue
Persistence PostgreSQL for task defs + run history ACID for state transitions; audit log
Alerting PagerDuty/OpsGenie on repeated failure Alert after max_retries exhausted; include last error and logs link

Companies That Ask This System Design Question

This problem commonly appears in interviews at:

See our company interview guides for full interview process, compensation, and preparation tips.

Frequently Asked Questions

How do you ensure exactly-once execution in a distributed task scheduler?

True exactly-once is hard. The practical approach is at-least-once delivery (with deduplication): use an idempotency key per task execution, check a database unique constraint before processing, and design task handlers to be idempotent. Visibility timeouts prevent the same task from being picked up by two workers simultaneously.

What is a visibility timeout in task queues?

A visibility timeout hides a task from other workers after it is dequeued. If the worker completes the task, it deletes the message. If the worker crashes, the timeout expires and the task reappears for another worker. AWS SQS uses this pattern with a default 30-second visibility timeout.

How does Celery handle task retries?

Celery supports automatic retries with configurable max_retries and countdown (delay). On failure, the task is re-queued with an exponential backoff delay. Failed tasks beyond max_retries go to a dead letter queue for manual inspection.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do you ensure exactly-once execution in a distributed task scheduler?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “True exactly-once is hard. The practical approach is at-least-once delivery (with deduplication): use an idempotency key per task execution, check a database unique constraint before processing, and design task handlers to be idempotent. Visibility timeouts prevent the same task from being picked up by two workers simultaneously.”
}
},
{
“@type”: “Question”,
“name”: “What is a visibility timeout in task queues?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A visibility timeout hides a task from other workers after it is dequeued. If the worker completes the task, it deletes the message. If the worker crashes, the timeout expires and the task reappears for another worker. AWS SQS uses this pattern with a default 30-second visibility timeout.”
}
},
{
“@type”: “Question”,
“name”: “How does Celery handle task retries?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Celery supports automatic retries with configurable max_retries and countdown (delay). On failure, the task is re-queued with an exponential backoff delay. Failed tasks beyond max_retries go to a dead letter queue for manual inspection.”
}
}
]
}

Scroll to Top