Scheduled Task Manager Low-Level Design: Cron Parsing, Distributed Locking, and Missed Run Recovery

A scheduled task manager triggers recurring jobs on cron schedules reliably across distributed infrastructure. The core problems are: parsing cron expressions correctly across timezones, ensuring exactly one scheduler instance fires each task (distributed locking), and recovering gracefully when the scheduler was down and missed scheduled runs.

Cron Expression Parsing

The croniter Python library parses standard 5-field and 6-field cron expressions and computes future run times relative to any reference datetime. Timezone-awareness is critical: a job scheduled for “0 9 * * *” should fire at 9:00 AM in the task's configured timezone, not UTC.

from croniter import croniter
from datetime import datetime
import pytz

def compute_next_run(cron_expr: str, tz_name: str,
                     after_dt: datetime = None) -> datetime:
    tz = pytz.timezone(tz_name)
    now_local = datetime.now(tz) if after_dt is None else after_dt.astimezone(tz)
    cron = croniter(cron_expr, now_local)
    next_local = cron.get_next(datetime)
    return next_local.astimezone(pytz.utc)

The next_run_at column always stores UTC. Display to users converts back to the task's timezone.

Distributed Leader Election

Running multiple scheduler instances is necessary for high availability, but only one should fire tasks at any given time (otherwise the same task dispatches twice). Redis SETNX-based leader election solves this:

  • Each scheduler instance attempts to SET a lock key with NX (only set if not exists) and a TTL (e.g., 30 seconds).
  • The instance that succeeds is the leader and runs the dispatch loop.
  • The leader refreshes (resets TTL) the lock every 10 seconds as a heartbeat.
  • If the leader crashes, the TTL expires and another instance takes over within 30 seconds.

Lock contention is extremely low because only one instance wins per TTL window.

Task Dispatch Loop

The leader runs the dispatch loop every 10 seconds:

  1. Query tasks WHERE next_run_at <= NOW() AND status = 'active'.
  2. For each due task: INSERT into task_execution with scheduled_for = next_run_at (unique constraint prevents duplicates), then enqueue the task payload to Celery or SQS.
  3. UPDATE scheduled_task SET last_run_at = NOW(), next_run_at = compute_next_run(…).

The unique constraint on (task_id, scheduled_for) is the idempotency guard: even if two scheduler instances briefly overlap during leader handoff, the second INSERT fails and no duplicate dispatch occurs.

Missed Run Detection and Catch-Up

When the scheduler has been offline (deploy, crash, maintenance), tasks accumulate missed runs. On lock acquisition, the new leader checks for tasks where last_run_at is earlier than the most recently expected run time:

expected_run = most_recent_scheduled_time_before_now(cron_expr, tz)
if last_run_at < expected_run - max_delay:
    # missed run detected -- trigger catch-up
    pass

Catch-up fires the missed run immediately. However, to avoid a thundering herd when many tasks were missed (e.g., 4-hour outage with hourly tasks), runs older than the configured max_delay_minutes are skipped rather than replayed. This prevents a sudden burst of stale work flooding downstream systems.

Concurrent Execution Guard

Some tasks must not overlap: if run N is still executing when run N+1 is due, skip N+1. The TaskExecution table tracks running executions. Before dispatching, query for any row with status = 'running' for the same task_id. If found and the task has the no_overlap flag set, skip the dispatch and log a SKIP_OVERLAP event.

SQL Schema

CREATE TABLE scheduled_task (
    id                SERIAL PRIMARY KEY,
    name              VARCHAR(200) NOT NULL UNIQUE,
    cron_expr         VARCHAR(100) NOT NULL,
    timezone          VARCHAR(60)  NOT NULL DEFAULT 'UTC',
    status            VARCHAR(20)  NOT NULL DEFAULT 'active',
    -- active | paused | disabled
    last_run_at       TIMESTAMPTZ,
    next_run_at       TIMESTAMPTZ NOT NULL,
    max_delay_minutes INT NOT NULL DEFAULT 60,
    no_overlap        BOOLEAN NOT NULL DEFAULT FALSE,
    payload           JSONB,
    created_at        TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON scheduled_task (next_run_at) WHERE status = 'active';

CREATE TABLE task_execution (
    id           BIGSERIAL PRIMARY KEY,
    task_id      INT NOT NULL REFERENCES scheduled_task(id),
    scheduled_for TIMESTAMPTZ NOT NULL,
    started_at   TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    status       VARCHAR(20) NOT NULL DEFAULT 'dispatched',
    -- dispatched | running | completed | failed | skipped
    error        TEXT,
    worker_id    VARCHAR(100),
    UNIQUE (task_id, scheduled_for)
);
CREATE INDEX ON task_execution (task_id, status);

CREATE TABLE scheduler_lock (
    lock_name    VARCHAR(100) PRIMARY KEY,
    instance_id  VARCHAR(200) NOT NULL,
    acquired_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Python Implementation

import time, uuid, redis, psycopg2
from croniter import croniter
from datetime import datetime, timedelta
import pytz

r = redis.Redis()
LOCK_KEY = "scheduler:leader"
LOCK_TTL = 30
HEARTBEAT_INTERVAL = 10

def acquire_leader_lock(instance_id: str, ttl: int = LOCK_TTL) -> bool:
    """Try to acquire the leader lock via SET NX PX. Return True if acquired."""
    result = r.set(LOCK_KEY, instance_id, nx=True, px=ttl * 1000)
    return result is True

def refresh_leader_lock(instance_id: str, ttl: int = LOCK_TTL) -> bool:
    """Extend TTL only if this instance still holds the lock (Lua script for atomicity)."""
    lua_script = """
    if redis.call("GET", KEYS[1]) == ARGV[1] then
        return redis.call("PEXPIRE", KEYS[1], ARGV[2])
    else
        return 0
    end
    """
    run_script = r.register_script(lua_script)
    result = run_script(keys=[LOCK_KEY], args=[instance_id, ttl * 1000])
    return result == 1

def dispatch_due_tasks(conn):
    """Query and dispatch all tasks whose next_run_at has passed."""
    with conn.cursor() as cur:
        cur.execute(
            """SELECT id, name, cron_expr, timezone, payload, no_overlap
               FROM scheduled_task
               WHERE status = 'active' AND next_run_at <= NOW()
               FOR UPDATE SKIP LOCKED"""
        )
        tasks = cur.fetchall()
    for task in tasks:
        task_id, name, cron_expr, tz, payload, no_overlap = task
        if no_overlap and has_running_execution(conn, task_id):
            log_skip(conn, task_id, "SKIP_OVERLAP")
            continue
        try:
            with conn.cursor() as cur:
                cur.execute(
                    """INSERT INTO task_execution (task_id, scheduled_for, status)
                       VALUES (%s, (SELECT next_run_at FROM scheduled_task WHERE id = %s),
                               'dispatched')
                       ON CONFLICT (task_id, scheduled_for) DO NOTHING
                       RETURNING id""",
                    (task_id, task_id)
                )
                if cur.fetchone():
                    enqueue_to_worker(name, payload)
                    next_run = compute_next_run(cron_expr, tz)
                    cur.execute(
                        """UPDATE scheduled_task
                           SET last_run_at = NOW(), next_run_at = %s
                           WHERE id = %s""",
                        (next_run, task_id)
                    )
            conn.commit()
        except Exception as exc:
            conn.rollback()
            print(f"Dispatch error for task {name}: {exc}")

def detect_missed_runs(conn):
    """On leader acquisition, fire catch-up for tasks that were missed."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT id, name, cron_expr, timezone, max_delay_minutes, payload "
            "FROM scheduled_task WHERE status = 'active'"
        )
        tasks = cur.fetchall()
    for task in tasks:
        task_id, name, cron_expr, tz, max_delay, payload = task
        expected = most_recent_expected_run(cron_expr, tz)
        last_run = get_last_run_at(conn, task_id)
        cutoff = expected - timedelta(minutes=max_delay)
        if last_run is None or last_run < cutoff:
            print(f"Missed run detected for {name}; dispatching catch-up")
            enqueue_to_worker(name, payload)

Timezone-Aware Scheduling Pitfalls

DST transitions cause two edge cases: a cron expression may match twice (clocks fall back) or not at all (clocks spring forward). croniter handles this correctly when given a timezone-aware datetime as the start reference. Always store and compare next_run_at in UTC; apply timezone conversion only for display and for computing the next run from the cron expression.

See also: Atlassian Interview Guide

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top