Job Queue System Low-Level Design: Worker Pool, Retry Backoff, Heartbeat, and Recurring Jobs

Job Queue System: Low-Level Design

A job queue system schedules and executes background work: sending emails, resizing images, generating reports, syncing data with third-party APIs. Unlike a messaging queue (fire-and-forget delivery), a job queue tracks execution state — scheduled, running, completed, failed — and provides retry logic, job chaining, priority scheduling, and observability into worker throughput. This design covers the job lifecycle, worker pool management, job deduplication, and cron-style recurring jobs.

Core Data Model

CREATE TABLE JobQueue (
    queue_name     VARCHAR(100) PRIMARY KEY,
    max_concurrency INT NOT NULL DEFAULT 10,
    max_retries    SMALLINT NOT NULL DEFAULT 3,
    retry_backoff  VARCHAR(20) NOT NULL DEFAULT 'exponential',  -- exponential, linear, fixed
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE Job (
    job_id         BIGSERIAL PRIMARY KEY,
    queue_name     VARCHAR(100) NOT NULL REFERENCES JobQueue(queue_name),
    job_type       VARCHAR(100) NOT NULL,    -- 'send_email', 'resize_image', 'sync_crm'
    payload        JSONB NOT NULL,
    idempotency_key VARCHAR(200),            -- deduplicate concurrent enqueues
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, running, completed, failed, cancelled
    priority       SMALLINT NOT NULL DEFAULT 0,
    attempt_count  SMALLINT NOT NULL DEFAULT 0,
    run_at         TIMESTAMPTZ NOT NULL DEFAULT NOW(),  -- earliest execution time
    started_at     TIMESTAMPTZ,
    completed_at   TIMESTAMPTZ,
    worker_id      VARCHAR(200),
    result         JSONB,
    last_error     TEXT,
    parent_job_id  BIGINT REFERENCES Job(job_id),  -- for job chains
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY LIST (status);

CREATE TABLE Job_pending   PARTITION OF Job FOR VALUES IN ('pending');
CREATE TABLE Job_running   PARTITION OF Job FOR VALUES IN ('running');
CREATE TABLE Job_completed PARTITION OF Job FOR VALUES IN ('completed');
CREATE TABLE Job_failed    PARTITION OF Job FOR VALUES IN ('failed');

CREATE TABLE RecurringJob (
    recurring_job_id SERIAL PRIMARY KEY,
    job_type         VARCHAR(100) NOT NULL,
    queue_name       VARCHAR(100) NOT NULL,
    payload          JSONB NOT NULL DEFAULT '{}',
    cron_expression  VARCHAR(100) NOT NULL,  -- '0 9 * * 1-5' = weekdays 9am
    is_active        BOOLEAN NOT NULL DEFAULT TRUE,
    last_run_at      TIMESTAMPTZ,
    next_run_at      TIMESTAMPTZ NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE UNIQUE INDEX ON Job(queue_name, idempotency_key)
    WHERE idempotency_key IS NOT NULL AND status IN ('pending','running');
CREATE INDEX ON Job_pending(queue_name, priority DESC, run_at ASC);
CREATE INDEX ON RecurringJob(next_run_at) WHERE is_active=TRUE;

Job Enqueue and Dedup

import json, datetime, socket

def enqueue(job_type: str, payload: dict, queue_name: str = 'default',
            priority: int = 0, run_at: datetime.datetime = None,
            idempotency_key: str = None, parent_job_id: int = None) -> int:
    """
    Enqueue a job. Returns job_id.
    Idempotent: if idempotency_key is provided and a job with that key is
    pending or running, returns the existing job_id.
    """
    run_at = run_at or datetime.datetime.utcnow()

    try:
        row = db.fetchone("""
            INSERT INTO Job (queue_name, job_type, payload, idempotency_key,
                             priority, run_at, parent_job_id)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (queue_name, idempotency_key)
                WHERE idempotency_key IS NOT NULL AND status IN ('pending','running')
            DO NOTHING
            RETURNING job_id
        """, (queue_name, job_type, json.dumps(payload), idempotency_key,
              priority, run_at, parent_job_id))

        if not row:
            # Deduplicated — return the existing job_id
            existing = db.fetchone("""
                SELECT job_id FROM Job
                WHERE queue_name=%s AND idempotency_key=%s
                  AND status IN ('pending','running')
            """, (queue_name, idempotency_key))
            return existing['job_id'] if existing else None

        return row['job_id']
    except Exception as e:
        raise JobEnqueueError(str(e)) from e

Worker: Claim and Execute

import time, traceback, threading

HEARTBEAT_INTERVAL = 10  # seconds; update run_at to prevent timeout

def worker_loop(queue_name: str, handlers: dict, worker_id: str = None,
                batch_size: int = 5, poll_interval: float = 1.0):
    """
    handlers: {'send_email': handle_send_email, 'resize_image': handle_resize_image}
    """
    worker_id = worker_id or f"{socket.gethostname()}:{threading.get_ident()}"

    while True:
        jobs = _claim_jobs(queue_name, worker_id, batch_size)
        if not jobs:
            time.sleep(poll_interval)
            continue

        for job in jobs:
            _execute_job(job, handlers, worker_id)

def _claim_jobs(queue_name: str, worker_id: str, batch_size: int) -> list:
    return db.fetchall("""
        UPDATE Job SET
            status     = 'running',
            started_at = NOW(),
            worker_id  = %s,
            attempt_count = attempt_count + 1
        WHERE job_id IN (
            SELECT job_id FROM Job
            WHERE queue_name=%s AND status='pending' AND run_at <= NOW()
            ORDER BY priority DESC, run_at ASC
            LIMIT %s
            FOR UPDATE SKIP LOCKED
        )
        RETURNING job_id, job_type, payload, attempt_count, parent_job_id
    """, (worker_id, queue_name, batch_size))

def _execute_job(job: dict, handlers: dict, worker_id: str):
    handler = handlers.get(job['job_type'])
    if not handler:
        _fail_job(job['job_id'], f"No handler for job_type: {job['job_type']}", job['attempt_count'])
        return

    # Heartbeat thread: keeps run_at fresh so job doesn't look timed out
    stop_heartbeat = threading.Event()
    heartbeat = threading.Thread(
        target=_heartbeat_loop,
        args=(job['job_id'], stop_heartbeat),
        daemon=True
    )
    heartbeat.start()

    try:
        result = handler(job['payload'])
        stop_heartbeat.set()
        db.execute("""
            UPDATE Job SET status='completed', completed_at=NOW(), result=%s
            WHERE job_id=%s
        """, (json.dumps(result) if result else None, job['job_id']))

        # Enqueue chained jobs if this job is part of a chain
        _maybe_trigger_next(job['job_id'], job['parent_job_id'])

    except Exception as e:
        stop_heartbeat.set()
        _fail_job(job['job_id'], traceback.format_exc(), job['attempt_count'])

def _heartbeat_loop(job_id: int, stop: threading.Event):
    while not stop.wait(HEARTBEAT_INTERVAL):
        db.execute(
            "UPDATE Job SET started_at=NOW() WHERE job_id=%s AND status='running'",
            (job_id,)
        )

def _fail_job(job_id: int, error: str, attempt_count: int):
    queue = db.fetchone("""
        SELECT q.max_retries, q.retry_backoff FROM Job j JOIN JobQueue q USING(queue_name)
        WHERE j.job_id=%s
    """, (job_id,))
    max_retries = queue['max_retries'] if queue else 3

    if attempt_count < max_retries:
        # Retry with exponential backoff: 60s, 120s, 240s
        delay = 60 * (2 ** (attempt_count - 1))
        next_run = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
        db.execute("""
            UPDATE Job SET status='pending', run_at=%s, last_error=%s
            WHERE job_id=%s
        """, (next_run, error[:1000], job_id))
    else:
        db.execute("""
            UPDATE Job SET status='failed', completed_at=NOW(), last_error=%s
            WHERE job_id=%s
        """, (error[:1000], job_id))

Recurring Job Scheduler

from croniter import croniter

def tick_recurring_jobs():
    """
    Run every minute (cron: * * * * *).
    Enqueues due recurring jobs and advances next_run_at.
    """
    due = db.fetchall("""
        SELECT * FROM RecurringJob
        WHERE is_active=TRUE AND next_run_at <= NOW()
        FOR UPDATE SKIP LOCKED
    """)

    for rj in due:
        # Enqueue with idempotency_key to prevent double-enqueue on concurrent ticks
        idem_key = f"recurring:{rj['recurring_job_id']}:{rj['next_run_at'].isoformat()}"
        enqueue(
            job_type=rj['job_type'],
            payload=rj['payload'],
            queue_name=rj['queue_name'],
            idempotency_key=idem_key,
        )
        # Advance to next scheduled time
        cron = croniter(rj['cron_expression'], rj['next_run_at'])
        next_run = cron.get_next(datetime.datetime)
        db.execute("""
            UPDATE RecurringJob
            SET last_run_at=%s, next_run_at=%s
            WHERE recurring_job_id=%s
        """, (rj['next_run_at'], next_run, rj['recurring_job_id']))

Key Design Decisions

  • Heartbeat prevents ghost jobs: if a worker crashes mid-job, the job stays in status=running until a cleanup job detects it has timed out (started_at older than 2× heartbeat interval with no update). Without heartbeats, a crashed worker leaves a job in running state forever. The heartbeat pattern also supports very long-running jobs (report generation, ML training) without false-positive timeouts.
  • Partition by status keeps active table small: Job_pending and Job_running are queried on the hot path. Job_completed accumulates millions of rows. Partitioning lets you drop or archive Job_completed monthly without affecting the live queue index.
  • Idempotency key dedup scope is pending+running only: once a job completes, its idempotency key is no longer in scope — re-enqueuing the same idempotency key triggers a fresh run. This correctly models “run this job once per period” semantics without permanent dedup.
  • Recurring job idempotency key includes next_run_at: if the tick_recurring_jobs cron fires twice in the same minute (e.g., during a deploy restart), the same next_run_at value produces the same idempotency key — the second tick’s enqueue is a no-op.

Job queue and background processing system design is discussed in Uber system design interview questions.

Job queue and async task execution design is covered in Airbnb system design interview preparation.

Job queue and distributed background job design is discussed in Amazon system design interview guide.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Atlassian Interview Guide

Scroll to Top