A priority queue service ensures that high-urgency work is processed before lower-urgency work even when the system is under load. The challenge is implementing this without starving lower-priority work indefinitely, dispatching items efficiently from a persistent store, and supporting runtime priority changes.
Priority Levels
Five priority tiers cover most workloads:
- CRITICAL (0) — user-facing, SLA-bound operations; examples: payment processing, auth token generation.
- HIGH (1) — important background work; examples: transactional email, webhook delivery.
- NORMAL (2) — standard async tasks; examples: report generation, data exports.
- LOW (3) — deferred work; examples: thumbnails, search index updates.
- BACKGROUND (4) — best-effort only; examples: analytics aggregation, cache warming.
Integer values allow direct ORDER BY in SQL and comparisons in application code.
Weighted Fair Queuing
Strict priority ordering (always drain CRITICAL before touching NORMAL) guarantees starvation of lower priorities under sustained load. Weighted fair queuing (WFQ) allocates worker capacity proportionally:
- CRITICAL: 8 shares
- HIGH: 4 shares
- NORMAL: 2 shares
- LOW: 1 share
- BACKGROUND: 0.5 shares
In a round-robin implementation, for every 15.5 total shares, CRITICAL gets 8 attempts, HIGH gets 4, and so on. Workers use a weighted round-robin token bucket: each tier has a token count; a worker picks the tier with the most remaining tokens, claims an item from that tier, decrements its token, and refills all tokens when exhausted.
Starvation Prevention via Aging
Even with WFQ, a sustained flood of CRITICAL work can starve BACKGROUND items beyond acceptable wait times. Aging solves this: a background job scans items that have been waiting longer than a threshold and bumps their priority by one level.
Example thresholds:
- LOW waiting more than 30 minutes → promote to NORMAL
- BACKGROUND waiting more than 60 minutes → promote to LOW
Promotion is a simple UPDATE with optimistic locking to avoid race conditions with workers.
PostgreSQL SKIP LOCKED Dispatch
SKIP LOCKED allows multiple workers to dequeue concurrently without lock contention or deadlocks. Each worker runs:
BEGIN;
SELECT id, queue_name, payload, priority
FROM priority_queue
WHERE queue_name = $1
AND status = 'pending'
AND scheduled_at <= NOW()
AND (locked_until IS NULL OR locked_until < NOW())
ORDER BY priority ASC, enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
-- update the row to claimed status
UPDATE priority_queue SET status = 'processing',
locked_until = NOW() + INTERVAL '5 minutes',
attempts = attempts + 1
WHERE id = $claimed_id;
COMMIT;
SKIP LOCKED causes the SELECT to skip any rows already held by another transaction, so workers immediately get the next available item without waiting. The locked_until column provides a visibility timeout: if a worker crashes, another worker can reclaim the item after the timeout expires.
Priority Escalation API
Operators sometimes need to promote a specific job to CRITICAL at runtime (e.g., a customer escalation). The escalation API accepts a job ID and a new priority, validates the transition, and issues an UPDATE:
UPDATE priority_queue
SET priority = $new_priority, escalated_at = NOW(), escalated_by = $actor
WHERE id = $job_id AND status = 'pending';
The change takes effect on the next worker poll cycle. No message passing or queue restart is needed.
Dead Letter Queue
Items that exceed max_attempts are moved to the DLQ rather than deleted. The DLQ preserves the full payload, the failure reason from the last attempt, and the original enqueue timestamp for audit. Alerts fire when DLQ depth exceeds a threshold so engineering can investigate systemic failures.
SQL Schema
CREATE TABLE priority_queue (
id BIGSERIAL PRIMARY KEY,
queue_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
priority SMALLINT NOT NULL DEFAULT 2
CHECK (priority BETWEEN 0 AND 4),
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending | processing | completed | failed
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
locked_until TIMESTAMPTZ,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
escalated_at TIMESTAMPTZ,
escalated_by VARCHAR(100)
);
CREATE INDEX ON priority_queue (queue_name, priority, enqueued_at)
WHERE status = 'pending';
CREATE INDEX ON priority_queue (locked_until)
WHERE status = 'processing';
CREATE TABLE priority_queue_dlq (
id BIGSERIAL PRIMARY KEY,
original_id BIGINT NOT NULL,
queue_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
priority SMALLINT NOT NULL,
attempts INT NOT NULL,
failure_reason TEXT,
original_enqueued_at TIMESTAMPTZ NOT NULL,
moved_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE priority_config (
queue_name VARCHAR(100) PRIMARY KEY,
aging_threshold_low_minutes INT NOT NULL DEFAULT 30,
aging_threshold_background_minutes INT NOT NULL DEFAULT 60,
visibility_timeout_seconds INT NOT NULL DEFAULT 300,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Python Implementation
import psycopg2, json, time
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0
HIGH = 1
NORMAL = 2
LOW = 3
BACKGROUND = 4
def enqueue(conn, queue_name: str, payload: dict,
priority: Priority = Priority.NORMAL,
scheduled_at=None) -> int:
"""Insert a new item into the priority queue; return its ID."""
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO priority_queue
(queue_name, payload, priority, scheduled_at)
VALUES (%s, %s, %s, COALESCE(%s, NOW()))
RETURNING id""",
(queue_name, json.dumps(payload), int(priority), scheduled_at)
)
job_id = cur.fetchone()[0]
conn.commit()
return job_id
def dequeue(conn, queue_name: str, worker_id: str) -> dict | None:
"""Claim the next available item using SKIP LOCKED; return the row or None."""
with conn.cursor() as cur:
cur.execute(
"""WITH claimed AS (
SELECT id FROM priority_queue
WHERE queue_name = %s
AND status = 'pending'
AND scheduled_at <= NOW()
AND (locked_until IS NULL OR locked_until < NOW())
ORDER BY priority ASC, enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE priority_queue pq
SET status = 'processing',
locked_until = NOW() + INTERVAL '5 minutes',
attempts = attempts + 1
FROM claimed
WHERE pq.id = claimed.id
RETURNING pq.id, pq.payload, pq.priority, pq.attempts""",
(queue_name,)
)
row = cur.fetchone()
conn.commit()
if row is None:
return None
return {"id": row[0], "payload": row[1],
"priority": row[2], "attempts": row[3]}
def escalate_priority(conn, job_id: int,
new_priority: Priority, actor: str) -> bool:
"""Promote a pending job to a higher priority; return True if updated."""
with conn.cursor() as cur:
cur.execute(
"""UPDATE priority_queue
SET priority = %s, escalated_at = NOW(), escalated_by = %s
WHERE id = %s AND status = 'pending'
RETURNING id""",
(int(new_priority), actor, job_id)
)
updated = cur.fetchone() is not None
conn.commit()
return updated
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering