Job Queue System: Low-Level Design
A job queue system schedules and executes background work: sending emails, resizing images, generating reports, syncing data with third-party APIs. Unlike a messaging queue (fire-and-forget delivery), a job queue tracks execution state — scheduled, running, completed, failed — and provides retry logic, job chaining, priority scheduling, and observability into worker throughput. This design covers the job lifecycle, worker pool management, job deduplication, and cron-style recurring jobs.
Core Data Model
CREATE TABLE JobQueue (
queue_name VARCHAR(100) PRIMARY KEY,
max_concurrency INT NOT NULL DEFAULT 10,
max_retries SMALLINT NOT NULL DEFAULT 3,
retry_backoff VARCHAR(20) NOT NULL DEFAULT 'exponential', -- exponential, linear, fixed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE Job (
job_id BIGSERIAL PRIMARY KEY,
queue_name VARCHAR(100) NOT NULL REFERENCES JobQueue(queue_name),
job_type VARCHAR(100) NOT NULL, -- 'send_email', 'resize_image', 'sync_crm'
payload JSONB NOT NULL,
idempotency_key VARCHAR(200), -- deduplicate concurrent enqueues
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, running, completed, failed, cancelled
priority SMALLINT NOT NULL DEFAULT 0,
attempt_count SMALLINT NOT NULL DEFAULT 0,
run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- earliest execution time
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
worker_id VARCHAR(200),
result JSONB,
last_error TEXT,
parent_job_id BIGINT REFERENCES Job(job_id), -- for job chains
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY LIST (status);
CREATE TABLE Job_pending PARTITION OF Job FOR VALUES IN ('pending');
CREATE TABLE Job_running PARTITION OF Job FOR VALUES IN ('running');
CREATE TABLE Job_completed PARTITION OF Job FOR VALUES IN ('completed');
CREATE TABLE Job_failed PARTITION OF Job FOR VALUES IN ('failed');
CREATE TABLE RecurringJob (
recurring_job_id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
queue_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL DEFAULT '{}',
cron_expression VARCHAR(100) NOT NULL, -- '0 9 * * 1-5' = weekdays 9am
is_active BOOLEAN NOT NULL DEFAULT TRUE,
last_run_at TIMESTAMPTZ,
next_run_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX ON Job(queue_name, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('pending','running');
CREATE INDEX ON Job_pending(queue_name, priority DESC, run_at ASC);
CREATE INDEX ON RecurringJob(next_run_at) WHERE is_active=TRUE;
Job Enqueue and Dedup
import json, datetime, socket
def enqueue(job_type: str, payload: dict, queue_name: str = 'default',
priority: int = 0, run_at: datetime.datetime = None,
idempotency_key: str = None, parent_job_id: int = None) -> int:
"""
Enqueue a job. Returns job_id.
Idempotent: if idempotency_key is provided and a job with that key is
pending or running, returns the existing job_id.
"""
run_at = run_at or datetime.datetime.utcnow()
try:
row = db.fetchone("""
INSERT INTO Job (queue_name, job_type, payload, idempotency_key,
priority, run_at, parent_job_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (queue_name, idempotency_key)
WHERE idempotency_key IS NOT NULL AND status IN ('pending','running')
DO NOTHING
RETURNING job_id
""", (queue_name, job_type, json.dumps(payload), idempotency_key,
priority, run_at, parent_job_id))
if not row:
# Deduplicated — return the existing job_id
existing = db.fetchone("""
SELECT job_id FROM Job
WHERE queue_name=%s AND idempotency_key=%s
AND status IN ('pending','running')
""", (queue_name, idempotency_key))
return existing['job_id'] if existing else None
return row['job_id']
except Exception as e:
raise JobEnqueueError(str(e)) from e
Worker: Claim and Execute
import time, traceback, threading
HEARTBEAT_INTERVAL = 10 # seconds; update run_at to prevent timeout
def worker_loop(queue_name: str, handlers: dict, worker_id: str = None,
batch_size: int = 5, poll_interval: float = 1.0):
"""
handlers: {'send_email': handle_send_email, 'resize_image': handle_resize_image}
"""
worker_id = worker_id or f"{socket.gethostname()}:{threading.get_ident()}"
while True:
jobs = _claim_jobs(queue_name, worker_id, batch_size)
if not jobs:
time.sleep(poll_interval)
continue
for job in jobs:
_execute_job(job, handlers, worker_id)
def _claim_jobs(queue_name: str, worker_id: str, batch_size: int) -> list:
return db.fetchall("""
UPDATE Job SET
status = 'running',
started_at = NOW(),
worker_id = %s,
attempt_count = attempt_count + 1
WHERE job_id IN (
SELECT job_id FROM Job
WHERE queue_name=%s AND status='pending' AND run_at <= NOW()
ORDER BY priority DESC, run_at ASC
LIMIT %s
FOR UPDATE SKIP LOCKED
)
RETURNING job_id, job_type, payload, attempt_count, parent_job_id
""", (worker_id, queue_name, batch_size))
def _execute_job(job: dict, handlers: dict, worker_id: str):
handler = handlers.get(job['job_type'])
if not handler:
_fail_job(job['job_id'], f"No handler for job_type: {job['job_type']}", job['attempt_count'])
return
# Heartbeat thread: keeps run_at fresh so job doesn't look timed out
stop_heartbeat = threading.Event()
heartbeat = threading.Thread(
target=_heartbeat_loop,
args=(job['job_id'], stop_heartbeat),
daemon=True
)
heartbeat.start()
try:
result = handler(job['payload'])
stop_heartbeat.set()
db.execute("""
UPDATE Job SET status='completed', completed_at=NOW(), result=%s
WHERE job_id=%s
""", (json.dumps(result) if result else None, job['job_id']))
# Enqueue chained jobs if this job is part of a chain
_maybe_trigger_next(job['job_id'], job['parent_job_id'])
except Exception as e:
stop_heartbeat.set()
_fail_job(job['job_id'], traceback.format_exc(), job['attempt_count'])
def _heartbeat_loop(job_id: int, stop: threading.Event):
while not stop.wait(HEARTBEAT_INTERVAL):
db.execute(
"UPDATE Job SET started_at=NOW() WHERE job_id=%s AND status='running'",
(job_id,)
)
def _fail_job(job_id: int, error: str, attempt_count: int):
queue = db.fetchone("""
SELECT q.max_retries, q.retry_backoff FROM Job j JOIN JobQueue q USING(queue_name)
WHERE j.job_id=%s
""", (job_id,))
max_retries = queue['max_retries'] if queue else 3
if attempt_count < max_retries:
# Retry with exponential backoff: 60s, 120s, 240s
delay = 60 * (2 ** (attempt_count - 1))
next_run = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay)
db.execute("""
UPDATE Job SET status='pending', run_at=%s, last_error=%s
WHERE job_id=%s
""", (next_run, error[:1000], job_id))
else:
db.execute("""
UPDATE Job SET status='failed', completed_at=NOW(), last_error=%s
WHERE job_id=%s
""", (error[:1000], job_id))
Recurring Job Scheduler
from croniter import croniter
def tick_recurring_jobs():
"""
Run every minute (cron: * * * * *).
Enqueues due recurring jobs and advances next_run_at.
"""
due = db.fetchall("""
SELECT * FROM RecurringJob
WHERE is_active=TRUE AND next_run_at <= NOW()
FOR UPDATE SKIP LOCKED
""")
for rj in due:
# Enqueue with idempotency_key to prevent double-enqueue on concurrent ticks
idem_key = f"recurring:{rj['recurring_job_id']}:{rj['next_run_at'].isoformat()}"
enqueue(
job_type=rj['job_type'],
payload=rj['payload'],
queue_name=rj['queue_name'],
idempotency_key=idem_key,
)
# Advance to next scheduled time
cron = croniter(rj['cron_expression'], rj['next_run_at'])
next_run = cron.get_next(datetime.datetime)
db.execute("""
UPDATE RecurringJob
SET last_run_at=%s, next_run_at=%s
WHERE recurring_job_id=%s
""", (rj['next_run_at'], next_run, rj['recurring_job_id']))
Key Design Decisions
- Heartbeat prevents ghost jobs: if a worker crashes mid-job, the job stays in status=running until a cleanup job detects it has timed out (started_at older than 2× heartbeat interval with no update). Without heartbeats, a crashed worker leaves a job in running state forever. The heartbeat pattern also supports very long-running jobs (report generation, ML training) without false-positive timeouts.
- Partition by status keeps active table small: Job_pending and Job_running are queried on the hot path. Job_completed accumulates millions of rows. Partitioning lets you drop or archive Job_completed monthly without affecting the live queue index.
- Idempotency key dedup scope is pending+running only: once a job completes, its idempotency key is no longer in scope — re-enqueuing the same idempotency key triggers a fresh run. This correctly models “run this job once per period” semantics without permanent dedup.
- Recurring job idempotency key includes next_run_at: if the tick_recurring_jobs cron fires twice in the same minute (e.g., during a deploy restart), the same next_run_at value produces the same idempotency key — the second tick’s enqueue is a no-op.
Job queue and background processing system design is discussed in Uber system design interview questions.
Job queue and async task execution design is covered in Airbnb system design interview preparation.
Job queue and distributed background job design is discussed in Amazon system design interview guide.
See also: Atlassian Interview Guide