Job Queue System Low-Level Design: Worker Pool, Retry Backoff, Heartbeat, and Recurring Jobs

Job Queue System: Low-Level Design

A job queue system schedules and executes background work: sending emails, resizing images, generating reports, syncing data with third-party APIs. Unlike a messaging queue (fire-and-forget delivery), a job queue tracks execution state — scheduled, running, completed, failed — and provides retry logic, job chaining, priority scheduling, and observability into worker throughput. This design covers the job lifecycle, worker pool management, job deduplication, and cron-style recurring jobs.

Core Data Model

CREATE TABLE JobQueue (
    queue_name     VARCHAR(100) PRIMARY KEY,
    max_concurrency INT NOT NULL DEFAULT 10,
    max_retries    SMALLINT NOT NULL DEFAULT 3,
    retry_backoff  VARCHAR(20) NOT NULL DEFAULT 'exponential',  -- exponential, linear, fixed
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE Job (
    job_id         BIGSERIAL PRIMARY KEY,
    queue_name     VARCHAR(100) NOT NULL REFERENCES JobQueue(queue_name),
    job_type       VARCHAR(100) NOT NULL,    -- 'send_email', 'resize_image', 'sync_crm'
    payload        JSONB NOT NULL,
    idempotency_key VARCHAR(200),            -- deduplicate concurrent enqueues
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, running, completed, failed, cancelled
    priority       SMALLINT NOT NULL DEFAULT 0,
    attempt_count  SMALLINT NOT NULL DEFAULT 0,
    run_at         TIMESTAMPTZ NOT NULL DEFAULT NOW(),  -- earliest execution time
    started_at     TIMESTAMPTZ,
    completed_at   TIMESTAMPTZ,
    worker_id      VARCHAR(200),
    result         JSONB,
    last_error     TEXT,
    parent_job_id  BIGINT REFERENCES Job(job_id),  -- for job chains
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY LIST (status);

CREATE TABLE Job_pending   PARTITION OF Job FOR VALUES IN ('pending');
CREATE TABLE Job_running   PARTITION OF Job FOR VALUES IN ('running');
CREATE TABLE Job_completed PARTITION OF Job FOR VALUES IN ('completed');
CREATE TABLE Job_failed    PARTITION OF Job FOR VALUES IN ('failed');

CREATE TABLE RecurringJob (
    recurring_job_id SERIAL PRIMARY KEY,
    job_type         VARCHAR(100) NOT NULL,
    queue_name       VARCHAR(100) NOT NULL,
    payload          JSONB NOT NULL DEFAULT '{}',
    cron_expression  VARCHAR(100) NOT NULL,  -- '0 9 * * 1-5' = weekdays 9am
    is_active        BOOLEAN NOT NULL DEFAULT TRUE,
    last_run_at      TIMESTAMPTZ,
    next_run_at      TIMESTAMPTZ NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE UNIQUE INDEX ON Job(queue_name, idempotency_key)
    WHERE idempotency_key IS NOT NULL AND status IN ('pending','running');
CREATE INDEX ON Job_pending(queue_name, priority DESC, run_at ASC);
CREATE INDEX ON RecurringJob(next_run_at) WHERE is_active=TRUE;

Job Enqueue and Dedup

import json, datetime, socket

def enqueue(job_type: str, payload: dict, queue_name: str = 'default',
            priority: int = 0, run_at: datetime.datetime = None,
            idempotency_key: str = None, parent_job_id: int = None) -> int:
    """
    Enqueue a job. Returns job_id.
    Idempotent: if idempotency_key is provided and a job with that key is
    pending or running, returns the existing job_id.
    """
    run_at = run_at or datetime.datetime.utcnow()

    try:
        row = db.fetchone("""
            INSERT INTO Job (queue_name, job_type, payload, idempotency_key,
                             priority, run_at, parent_job_id)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (queue_name, idempotency_key)
                WHERE idempotency_key IS NOT NULL AND status IN ('pending','running')
            DO NOTHING
            RETURNING job_id
        """, (queue_name, job_type, json.dumps(payload), idempotency_key,
              priority, run_at, parent_job_id))

        if not row:
            # Deduplicated — return the existing job_id
            existing = db.fetchone("""
                SELECT job_id FROM Job
                WHERE queue_name=%s AND idempotency_key=%s
                  AND status IN ('pending','running')
            """, (queue_name, idempotency_key))
            return existing['job_id'] if existing else None

        return row['job_id']
    except Exception as e:
        raise JobEnqueueError(str(e)) from e

Worker: Claim and Execute

import time, traceback, threading

HEARTBEAT_INTERVAL = 10  # seconds; update run_at to prevent timeout

def worker_loop(queue_name: str, handlers: dict, worker_id: str = None,
                batch_size: int = 5, poll_interval: float = 1.0):
    """
    handlers: {'send_email': handle_send_email, 'resize_image': handle_resize_image}
    """
    worker_id = worker_id or f"{socket.gethostname()}:{threading.get_ident()}"

    while True:
        jobs = _claim_jobs(queue_name, worker_id, batch_size)
        if not jobs:
            time.sleep(poll_interval)
            continue

        for job in jobs:
            _execute_job(job, handlers, worker_id)

def _claim_jobs(queue_name: str, worker_id: str, batch_size: int) -> list:
    return db.fetchall("""
        UPDATE Job SET
            status     = 'running',
            started_at = NOW(),
            worker_id  = %s,
            attempt_count = attempt_count + 1
        WHERE job_id IN (
            SELECT job_id FROM Job
            WHERE queue_name=%s AND status='pending' AND run_at <= NOW()
            ORDER BY priority DESC, run_at ASC
            LIMIT %s
            FOR UPDATE SKIP LOCKED
        )
        RETURNING job_id, job_type, payload, attempt_count, parent_job_id
    """, (worker_id, queue_name, batch_size))

def _execute_job(job: dict, handlers: dict, worker_id: str):
    handler = handlers.get(job['job_type'])
    if not handler:
        _fail_job(job['job_id'], f"No handler for job_type: {job['job_type']}", job['attempt_count'])
        return

    # Heartbeat thread: keeps run_at fresh so job doesn't look timed out
    stop_heartbeat = threading.Event()
    heartbeat = threading.Thread(
        target=_heartbeat_loop,
        args=(job['job_id'], stop_heartbeat),
        daemon=True
    )
    heartbeat.start()

    try:
        result = handler(job['payload'])
        stop_heartbeat.set()
        db.execute("""
            UPDATE Job SET status='completed', completed_at=NOW(), result=%s
            WHERE job_id=%s
        """, (json.dumps(result) if result else None, job['job_id']))

        # Enqueue chained jobs if this job is part of a chain
        _maybe_trigger_next(job['job_id'], job['parent_job_id'])

    except Exception as e:
        stop_heartbeat.set()
        _fail_job(job['job_id'], traceback.format_exc(), job['attempt_count'])

def _heartbeat_loop(job_id: int, stop: threading.Event):
    while not stop.wait(HEARTBEAT_INTERVAL):
        db.execute(
            "UPDATE Job SET started_at=NOW() WHERE job_id=%s AND status='running'",
            (job_id,)
        )

def _fail_job(job_id: int, error: str, attempt_count: int):
    queue = db.fetchone("""
        SELECT q.max_retries, q.retry_backoff FROM Job j JOIN JobQueue q USING(queue_name)
        WHERE j.job_id=%s
    """, (job_id,))
    max_retries = queue['max_retries'] if queue else 3

    if attempt_count < max_retries:
        # Retry with exponential backoff: 60s, 120s, 240s
        delay = 60 * (2 ** (attempt_count - 1))
        next_run = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
        db.execute("""
            UPDATE Job SET status='pending', run_at=%s, last_error=%s
            WHERE job_id=%s
        """, (next_run, error[:1000], job_id))
    else:
        db.execute("""
            UPDATE Job SET status='failed', completed_at=NOW(), last_error=%s
            WHERE job_id=%s
        """, (error[:1000], job_id))

Recurring Job Scheduler

from croniter import croniter

def tick_recurring_jobs():
    """
    Run every minute (cron: * * * * *).
    Enqueues due recurring jobs and advances next_run_at.
    """
    due = db.fetchall("""
        SELECT * FROM RecurringJob
        WHERE is_active=TRUE AND next_run_at <= NOW()
        FOR UPDATE SKIP LOCKED
    """)

    for rj in due:
        # Enqueue with idempotency_key to prevent double-enqueue on concurrent ticks
        idem_key = f"recurring:{rj['recurring_job_id']}:{rj['next_run_at'].isoformat()}"
        enqueue(
            job_type=rj['job_type'],
            payload=rj['payload'],
            queue_name=rj['queue_name'],
            idempotency_key=idem_key,
        )
        # Advance to next scheduled time
        cron = croniter(rj['cron_expression'], rj['next_run_at'])
        next_run = cron.get_next(datetime.datetime)
        db.execute("""
            UPDATE RecurringJob
            SET last_run_at=%s, next_run_at=%s
            WHERE recurring_job_id=%s
        """, (rj['next_run_at'], next_run, rj['recurring_job_id']))

Key Design Decisions

  • Heartbeat prevents ghost jobs: if a worker crashes mid-job, the job stays in status=running until a cleanup job detects it has timed out (started_at older than 2× heartbeat interval with no update). Without heartbeats, a crashed worker leaves a job in running state forever. The heartbeat pattern also supports very long-running jobs (report generation, ML training) without false-positive timeouts.
  • Partition by status keeps active table small: Job_pending and Job_running are queried on the hot path. Job_completed accumulates millions of rows. Partitioning lets you drop or archive Job_completed monthly without affecting the live queue index.
  • Idempotency key dedup scope is pending+running only: once a job completes, its idempotency key is no longer in scope — re-enqueuing the same idempotency key triggers a fresh run. This correctly models “run this job once per period” semantics without permanent dedup.
  • Recurring job idempotency key includes next_run_at: if the tick_recurring_jobs cron fires twice in the same minute (e.g., during a deploy restart), the same next_run_at value produces the same idempotency key — the second tick’s enqueue is a no-op.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How does a job queue differ from a messaging queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A messaging queue (SQS, RabbitMQ) models delivery: a message is enqueued, delivered to a consumer, and the consumer acknowledges receipt. The queue tracks delivery state but not execution state — once a message is acknowledged, the queue doesn’t know whether processing succeeded. A job queue models execution: a job is enqueued, claimed by a worker, executed, and the result (success or failure) is recorded. The queue tracks the full lifecycle — scheduled, running, completed, failed, retried. Job queues also provide: job deduplication (idempotency keys), result storage, job chaining (job B starts after job A completes), scheduling (run this job at 3 AM), and observability (how many jobs failed in the last hour, average execution time per job type). Use a messaging queue when you need maximum throughput and don’t need execution tracking; use a job queue when you need retry logic, result storage, or scheduling.”}},{“@type”:”Question”,”name”:”How does the heartbeat pattern prevent ghost jobs without a fixed execution timeout?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A fixed execution timeout (30 minutes) breaks long-running jobs that legitimately take longer — a nightly report that takes 45 minutes would be killed and retried in an infinite loop. The heartbeat pattern avoids a fixed timeout: the worker sends a keep-alive signal every N seconds (update started_at=NOW()). The cleanup job detects ghost jobs as: status=running AND started_at < NOW() – INTERVAL ‘2 * heartbeat_interval’. If the worker is alive, started_at is refreshed every heartbeat and never crosses this threshold. If the worker crashes, no heartbeat arrives, started_at falls behind, and the cleanup job resets the job to pending after 20 seconds. A 10-second heartbeat + 20-second ghost detection window means a crashed worker’s job is re-queued within 30 seconds — fast enough for most use cases without a fixed runtime cap.”}},{“@type”:”Question”,”name”:”How do you implement job priorities without starvation of low-priority jobs?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A pure priority queue processes high-priority jobs first. If high-priority jobs arrive continuously, low-priority jobs starve indefinitely — never executed. Solutions: (1) Aging: a background job increments a waiting_boost column every 5 minutes for pending jobs. The worker orders by (priority + waiting_boost) DESC — a low-priority job that has waited 4 hours accumulates enough boost to outrank newly arrived medium-priority jobs. (2) Dedicated queues: separate queues per priority level (queue_name: "critical", "normal", "batch"). Workers poll "critical" first; if empty, poll "normal"; if empty, poll "batch." Allocate worker capacity proportionally (50% critical, 40% normal, 10% batch). Low-priority jobs always get 10% of worker capacity regardless of critical queue depth. (3) Weighted fair queuing: process K high-priority jobs, then N normal-priority jobs, then M low-priority jobs in a round-robin pattern — implemented in the worker’s poll logic.”}},{“@type”:”Question”,”name”:”How do you implement job chaining and fan-out patterns?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Job chaining (job B starts after job A): store parent_job_id on job B. When job A completes, query SELECT * FROM Job WHERE parent_job_id=A_id AND status=pending and advance their run_at to NOW(). The enqueue call for job B includes parent_job_id=A_id and run_at far in the future — it won’t run until job A explicitly triggers it. Fan-out (one job spawns many child jobs): job A runs, creates 100 child jobs (each processing one chunk), and creates a "sentinel" job with a dependency on all 100. The sentinel polls SELECT COUNT(*) FROM Job WHERE parent_job_id IN (child_ids) AND status != completed; when all children complete, the sentinel runs and aggregates results. For complex fan-out, use a workflow orchestrator pattern instead: a WorkflowJob table tracks the overall state, and each step transition is managed by the orchestrator rather than by polling parent_job_id.”}},{“@type”:”Question”,”name”:”What is the difference between run_at scheduling and recurring jobs?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”run_at scheduling: a one-time execution at a specific future time. enqueue(…, run_at=tomorrow_9am) creates one job row; it executes once when the worker polls and finds run_at <= NOW(). Use for: send reminder email in 3 days, retry a failed payment tomorrow, generate a monthly report on the 1st. Recurring jobs: defined by a cron expression; a scheduler mints new job instances on each tick. The RecurringJob table stores the template; the scheduler calls enqueue() at each scheduled time, creating a new Job row each time. Use for: nightly database cleanup, hourly data sync, daily digest email. Key difference: run_at jobs are finite (one execution), recurring jobs are indefinite (run until the RecurringJob.is_active is set to FALSE). Anti-pattern: implementing recurring behavior by having a job re-enqueue itself on completion — this creates a chain of single-use jobs with no central control point. Use RecurringJob instead for cleaner observability and easier pause/resume control.”}}]}

Job queue and background processing system design is discussed in Uber system design interview questions.

Job queue and async task execution design is covered in Airbnb system design interview preparation.

Job queue and distributed background job design is discussed in Amazon system design interview guide.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Atlassian Interview Guide

Scroll to Top