Job Queue System: Low-Level Design
A job queue system schedules and executes background work: sending emails, resizing images, generating reports, syncing data with third-party APIs. Unlike a messaging queue (fire-and-forget delivery), a job queue tracks execution state — scheduled, running, completed, failed — and provides retry logic, job chaining, priority scheduling, and observability into worker throughput. This design covers the job lifecycle, worker pool management, job deduplication, and cron-style recurring jobs.
Core Data Model
CREATE TABLE JobQueue (
queue_name VARCHAR(100) PRIMARY KEY,
max_concurrency INT NOT NULL DEFAULT 10,
max_retries SMALLINT NOT NULL DEFAULT 3,
retry_backoff VARCHAR(20) NOT NULL DEFAULT 'exponential', -- exponential, linear, fixed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE Job (
job_id BIGSERIAL PRIMARY KEY,
queue_name VARCHAR(100) NOT NULL REFERENCES JobQueue(queue_name),
job_type VARCHAR(100) NOT NULL, -- 'send_email', 'resize_image', 'sync_crm'
payload JSONB NOT NULL,
idempotency_key VARCHAR(200), -- deduplicate concurrent enqueues
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, failed, cancelled
priority SMALLINT NOT NULL DEFAULT 0,
attempt_count SMALLINT NOT NULL DEFAULT 0,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- earliest execution time
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
worker_id VARCHAR(200),
result JSONB,
last_error TEXT,
parent_job_id BIGINT REFERENCES Job(job_id), -- for job chains
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY LIST (status);
CREATE TABLE Job_pending PARTITION OF Job FOR VALUES IN ('pending');
CREATE TABLE Job_running PARTITION OF Job FOR VALUES IN ('running');
CREATE TABLE Job_completed PARTITION OF Job FOR VALUES IN ('completed');
CREATE TABLE Job_failed PARTITION OF Job FOR VALUES IN ('failed');
CREATE TABLE RecurringJob (
recurring_job_id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
queue_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
cron_expression VARCHAR(100) NOT NULL, -- '0 9 * * 1-5' = weekdays 9am
is_active BOOLEAN NOT NULL DEFAULT TRUE,
last_run_at TIMESTAMPTZ,
next_run_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX ON Job(queue_name, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('pending','running');
CREATE INDEX ON Job_pending(queue_name, priority DESC, run_at ASC);
CREATE INDEX ON RecurringJob(next_run_at) WHERE is_active=TRUE;
Job Enqueue and Dedup
import json, datetime, socket
def enqueue(job_type: str, payload: dict, queue_name: str = 'default',
priority: int = 0, run_at: datetime.datetime = None,
idempotency_key: str = None, parent_job_id: int = None) -> int:
"""
Enqueue a job. Returns job_id.
Idempotent: if idempotency_key is provided and a job with that key is
pending or running, returns the existing job_id.
"""
run_at = run_at or datetime.datetime.utcnow()
try:
row = db.fetchone("""
INSERT INTO Job (queue_name, job_type, payload, idempotency_key,
priority, run_at, parent_job_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (queue_name, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('pending','running')
DO NOTHING
RETURNING job_id
""", (queue_name, job_type, json.dumps(payload), idempotency_key,
priority, run_at, parent_job_id))
if not row:
# Deduplicated — return the existing job_id
existing = db.fetchone("""
SELECT job_id FROM Job
WHERE queue_name=%s AND idempotency_key=%s
AND status IN ('pending','running')
""", (queue_name, idempotency_key))
return existing['job_id'] if existing else None
return row['job_id']
except Exception as e:
raise JobEnqueueError(str(e)) from e
Worker: Claim and Execute
import time, traceback, threading
HEARTBEAT_INTERVAL = 10 # seconds; update run_at to prevent timeout
def worker_loop(queue_name: str, handlers: dict, worker_id: str = None,
batch_size: int = 5, poll_interval: float = 1.0):
"""
handlers: {'send_email': handle_send_email, 'resize_image': handle_resize_image}
"""
worker_id = worker_id or f"{socket.gethostname()}:{threading.get_ident()}"
while True:
jobs = _claim_jobs(queue_name, worker_id, batch_size)
if not jobs:
time.sleep(poll_interval)
continue
for job in jobs:
_execute_job(job, handlers, worker_id)
def _claim_jobs(queue_name: str, worker_id: str, batch_size: int) -> list:
return db.fetchall("""
UPDATE Job SET
status = 'running',
started_at = NOW(),
worker_id = %s,
attempt_count = attempt_count + 1
WHERE job_id IN (
SELECT job_id FROM Job
WHERE queue_name=%s AND status='pending' AND run_at <= NOW()
ORDER BY priority DESC, run_at ASC
LIMIT %s
FOR UPDATE SKIP LOCKED
)
RETURNING job_id, job_type, payload, attempt_count, parent_job_id
""", (worker_id, queue_name, batch_size))
def _execute_job(job: dict, handlers: dict, worker_id: str):
handler = handlers.get(job['job_type'])
if not handler:
_fail_job(job['job_id'], f"No handler for job_type: {job['job_type']}", job['attempt_count'])
return
# Heartbeat thread: keeps run_at fresh so job doesn't look timed out
stop_heartbeat = threading.Event()
heartbeat = threading.Thread(
target=_heartbeat_loop,
args=(job['job_id'], stop_heartbeat),
daemon=True
)
heartbeat.start()
try:
result = handler(job['payload'])
stop_heartbeat.set()
db.execute("""
UPDATE Job SET status='completed', completed_at=NOW(), result=%s
WHERE job_id=%s
""", (json.dumps(result) if result else None, job['job_id']))
# Enqueue chained jobs if this job is part of a chain
_maybe_trigger_next(job['job_id'], job['parent_job_id'])
except Exception as e:
stop_heartbeat.set()
_fail_job(job['job_id'], traceback.format_exc(), job['attempt_count'])
def _heartbeat_loop(job_id: int, stop: threading.Event):
while not stop.wait(HEARTBEAT_INTERVAL):
db.execute(
"UPDATE Job SET started_at=NOW() WHERE job_id=%s AND status='running'",
(job_id,)
)
def _fail_job(job_id: int, error: str, attempt_count: int):
queue = db.fetchone("""
SELECT q.max_retries, q.retry_backoff FROM Job j JOIN JobQueue q USING(queue_name)
WHERE j.job_id=%s
""", (job_id,))
max_retries = queue['max_retries'] if queue else 3
if attempt_count < max_retries:
# Retry with exponential backoff: 60s, 120s, 240s
delay = 60 * (2 ** (attempt_count - 1))
next_run = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
db.execute("""
UPDATE Job SET status='pending', run_at=%s, last_error=%s
WHERE job_id=%s
""", (next_run, error[:1000], job_id))
else:
db.execute("""
UPDATE Job SET status='failed', completed_at=NOW(), last_error=%s
WHERE job_id=%s
""", (error[:1000], job_id))
Recurring Job Scheduler
from croniter import croniter
def tick_recurring_jobs():
"""
Run every minute (cron: * * * * *).
Enqueues due recurring jobs and advances next_run_at.
"""
due = db.fetchall("""
SELECT * FROM RecurringJob
WHERE is_active=TRUE AND next_run_at <= NOW()
FOR UPDATE SKIP LOCKED
""")
for rj in due:
# Enqueue with idempotency_key to prevent double-enqueue on concurrent ticks
idem_key = f"recurring:{rj['recurring_job_id']}:{rj['next_run_at'].isoformat()}"
enqueue(
job_type=rj['job_type'],
payload=rj['payload'],
queue_name=rj['queue_name'],
idempotency_key=idem_key,
)
# Advance to next scheduled time
cron = croniter(rj['cron_expression'], rj['next_run_at'])
next_run = cron.get_next(datetime.datetime)
db.execute("""
UPDATE RecurringJob
SET last_run_at=%s, next_run_at=%s
WHERE recurring_job_id=%s
""", (rj['next_run_at'], next_run, rj['recurring_job_id']))
Key Design Decisions
- Heartbeat prevents ghost jobs: if a worker crashes mid-job, the job stays in status=running until a cleanup job detects it has timed out (started_at older than 2× heartbeat interval with no update). Without heartbeats, a crashed worker leaves a job in running state forever. The heartbeat pattern also supports very long-running jobs (report generation, ML training) without false-positive timeouts.
- Partition by status keeps active table small: Job_pending and Job_running are queried on the hot path. Job_completed accumulates millions of rows. Partitioning lets you drop or archive Job_completed monthly without affecting the live queue index.
- Idempotency key dedup scope is pending+running only: once a job completes, its idempotency key is no longer in scope — re-enqueuing the same idempotency key triggers a fresh run. This correctly models “run this job once per period” semantics without permanent dedup.
- Recurring job idempotency key includes next_run_at: if the tick_recurring_jobs cron fires twice in the same minute (e.g., during a deploy restart), the same next_run_at value produces the same idempotency key — the second tick’s enqueue is a no-op.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How does a job queue differ from a messaging queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A messaging queue (SQS, RabbitMQ) models delivery: a message is enqueued, delivered to a consumer, and the consumer acknowledges receipt. The queue tracks delivery state but not execution state — once a message is acknowledged, the queue doesn’t know whether processing succeeded. A job queue models execution: a job is enqueued, claimed by a worker, executed, and the result (success or failure) is recorded. The queue tracks the full lifecycle — scheduled, running, completed, failed, retried. Job queues also provide: job deduplication (idempotency keys), result storage, job chaining (job B starts after job A completes), scheduling (run this job at 3 AM), and observability (how many jobs failed in the last hour, average execution time per job type). Use a messaging queue when you need maximum throughput and don’t need execution tracking; use a job queue when you need retry logic, result storage, or scheduling.”}},{“@type”:”Question”,”name”:”How does the heartbeat pattern prevent ghost jobs without a fixed execution timeout?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A fixed execution timeout (30 minutes) breaks long-running jobs that legitimately take longer — a nightly report that takes 45 minutes would be killed and retried in an infinite loop. The heartbeat pattern avoids a fixed timeout: the worker sends a keep-alive signal every N seconds (update started_at=NOW()). The cleanup job detects ghost jobs as: status=running AND started_at < NOW() – INTERVAL ‘2 * heartbeat_interval’. If the worker is alive, started_at is refreshed every heartbeat and never crosses this threshold. If the worker crashes, no heartbeat arrives, started_at falls behind, and the cleanup job resets the job to pending after 20 seconds. A 10-second heartbeat + 20-second ghost detection window means a crashed worker’s job is re-queued within 30 seconds — fast enough for most use cases without a fixed runtime cap.”}},{“@type”:”Question”,”name”:”How do you implement job priorities without starvation of low-priority jobs?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A pure priority queue processes high-priority jobs first. If high-priority jobs arrive continuously, low-priority jobs starve indefinitely — never executed. Solutions: (1) Aging: a background job increments a waiting_boost column every 5 minutes for pending jobs. The worker orders by (priority + waiting_boost) DESC — a low-priority job that has waited 4 hours accumulates enough boost to outrank newly arrived medium-priority jobs. (2) Dedicated queues: separate queues per priority level (queue_name: "critical", "normal", "batch"). Workers poll "critical" first; if empty, poll "normal"; if empty, poll "batch." Allocate worker capacity proportionally (50% critical, 40% normal, 10% batch). Low-priority jobs always get 10% of worker capacity regardless of critical queue depth. (3) Weighted fair queuing: process K high-priority jobs, then N normal-priority jobs, then M low-priority jobs in a round-robin pattern — implemented in the worker’s poll logic.”}},{“@type”:”Question”,”name”:”How do you implement job chaining and fan-out patterns?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Job chaining (job B starts after job A): store parent_job_id on job B. When job A completes, query SELECT * FROM Job WHERE parent_job_id=A_id AND status=pending and advance their run_at to NOW(). The enqueue call for job B includes parent_job_id=A_id and run_at far in the future — it won’t run until job A explicitly triggers it. Fan-out (one job spawns many child jobs): job A runs, creates 100 child jobs (each processing one chunk), and creates a "sentinel" job with a dependency on all 100. The sentinel polls SELECT COUNT(*) FROM Job WHERE parent_job_id IN (child_ids) AND status != completed; when all children complete, the sentinel runs and aggregates results. For complex fan-out, use a workflow orchestrator pattern instead: a WorkflowJob table tracks the overall state, and each step transition is managed by the orchestrator rather than by polling parent_job_id.”}},{“@type”:”Question”,”name”:”What is the difference between run_at scheduling and recurring jobs?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”run_at scheduling: a one-time execution at a specific future time. enqueue(…, run_at=tomorrow_9am) creates one job row; it executes once when the worker polls and finds run_at <= NOW(). Use for: send reminder email in 3 days, retry a failed payment tomorrow, generate a monthly report on the 1st. Recurring jobs: defined by a cron expression; a scheduler mints new job instances on each tick. The RecurringJob table stores the template; the scheduler calls enqueue() at each scheduled time, creating a new Job row each time. Use for: nightly database cleanup, hourly data sync, daily digest email. Key difference: run_at jobs are finite (one execution), recurring jobs are indefinite (run until the RecurringJob.is_active is set to FALSE). Anti-pattern: implementing recurring behavior by having a job re-enqueue itself on completion — this creates a chain of single-use jobs with no central control point. Use RecurringJob instead for cleaner observability and easier pause/resume control.”}}]}
Job queue and background processing system design is discussed in Uber system design interview questions.
Job queue and async task execution design is covered in Airbnb system design interview preparation.
Job queue and distributed background job design is discussed in Amazon system design interview guide.
See also: Atlassian Interview Guide