Priority Queue Service Low-Level Design: Multi-Tier Priorities, Starvation Prevention, and Worker Dispatch

A priority queue service ensures that high-urgency work is processed before lower-urgency work even when the system is under load. The challenge is implementing this without starving lower-priority work indefinitely, dispatching items efficiently from a persistent store, and supporting runtime priority changes.

Priority Levels

Five priority tiers cover most workloads:

  • CRITICAL (0) — user-facing, SLA-bound operations; examples: payment processing, auth token generation.
  • HIGH (1) — important background work; examples: transactional email, webhook delivery.
  • NORMAL (2) — standard async tasks; examples: report generation, data exports.
  • LOW (3) — deferred work; examples: thumbnails, search index updates.
  • BACKGROUND (4) — best-effort only; examples: analytics aggregation, cache warming.

Integer values allow direct ORDER BY in SQL and comparisons in application code.

Weighted Fair Queuing

Strict priority ordering (always drain CRITICAL before touching NORMAL) guarantees starvation of lower priorities under sustained load. Weighted fair queuing (WFQ) allocates worker capacity proportionally:

  • CRITICAL: 8 shares
  • HIGH: 4 shares
  • NORMAL: 2 shares
  • LOW: 1 share
  • BACKGROUND: 0.5 shares

In a round-robin implementation, for every 15.5 total shares, CRITICAL gets 8 attempts, HIGH gets 4, and so on. Workers use a weighted round-robin token bucket: each tier has a token count; a worker picks the tier with the most remaining tokens, claims an item from that tier, decrements its token, and refills all tokens when exhausted.

Starvation Prevention via Aging

Even with WFQ, a sustained flood of CRITICAL work can starve BACKGROUND items beyond acceptable wait times. Aging solves this: a background job scans items that have been waiting longer than a threshold and bumps their priority by one level.

Example thresholds:

  • LOW waiting more than 30 minutes → promote to NORMAL
  • BACKGROUND waiting more than 60 minutes → promote to LOW

Promotion is a simple UPDATE with optimistic locking to avoid race conditions with workers.

PostgreSQL SKIP LOCKED Dispatch

SKIP LOCKED allows multiple workers to dequeue concurrently without lock contention or deadlocks. Each worker runs:

BEGIN;
SELECT id, queue_name, payload, priority
FROM priority_queue
WHERE queue_name = $1
  AND status = 'pending'
  AND scheduled_at <= NOW()
  AND (locked_until IS NULL OR locked_until < NOW())
ORDER BY priority ASC, enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
-- update the row to claimed status
UPDATE priority_queue SET status = 'processing',
       locked_until = NOW() + INTERVAL '5 minutes',
       attempts = attempts + 1
WHERE id = $claimed_id;
COMMIT;

SKIP LOCKED causes the SELECT to skip any rows already held by another transaction, so workers immediately get the next available item without waiting. The locked_until column provides a visibility timeout: if a worker crashes, another worker can reclaim the item after the timeout expires.

Priority Escalation API

Operators sometimes need to promote a specific job to CRITICAL at runtime (e.g., a customer escalation). The escalation API accepts a job ID and a new priority, validates the transition, and issues an UPDATE:

UPDATE priority_queue
SET priority = $new_priority, escalated_at = NOW(), escalated_by = $actor
WHERE id = $job_id AND status = 'pending';

The change takes effect on the next worker poll cycle. No message passing or queue restart is needed.

Dead Letter Queue

Items that exceed max_attempts are moved to the DLQ rather than deleted. The DLQ preserves the full payload, the failure reason from the last attempt, and the original enqueue timestamp for audit. Alerts fire when DLQ depth exceeds a threshold so engineering can investigate systemic failures.

SQL Schema

CREATE TABLE priority_queue (
    id           BIGSERIAL PRIMARY KEY,
    queue_name   VARCHAR(100) NOT NULL,
    payload      JSONB NOT NULL,
    priority     SMALLINT NOT NULL DEFAULT 2
                 CHECK (priority BETWEEN 0 AND 4),
    status       VARCHAR(20) NOT NULL DEFAULT 'pending',
    -- pending | processing | completed | failed
    attempts     INT NOT NULL DEFAULT 0,
    max_attempts INT NOT NULL DEFAULT 3,
    scheduled_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    locked_until TIMESTAMPTZ,
    enqueued_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    escalated_at TIMESTAMPTZ,
    escalated_by VARCHAR(100)
);
CREATE INDEX ON priority_queue (queue_name, priority, enqueued_at)
    WHERE status = 'pending';
CREATE INDEX ON priority_queue (locked_until)
    WHERE status = 'processing';

CREATE TABLE priority_queue_dlq (
    id              BIGSERIAL PRIMARY KEY,
    original_id     BIGINT NOT NULL,
    queue_name      VARCHAR(100) NOT NULL,
    payload         JSONB NOT NULL,
    priority        SMALLINT NOT NULL,
    attempts        INT NOT NULL,
    failure_reason  TEXT,
    original_enqueued_at TIMESTAMPTZ NOT NULL,
    moved_at        TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE priority_config (
    queue_name          VARCHAR(100) PRIMARY KEY,
    aging_threshold_low_minutes        INT NOT NULL DEFAULT 30,
    aging_threshold_background_minutes INT NOT NULL DEFAULT 60,
    visibility_timeout_seconds         INT NOT NULL DEFAULT 300,
    updated_at          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Python Implementation

import psycopg2, json, time
from enum import IntEnum

class Priority(IntEnum):
    CRITICAL   = 0
    HIGH       = 1
    NORMAL     = 2
    LOW        = 3
    BACKGROUND = 4

def enqueue(conn, queue_name: str, payload: dict,
            priority: Priority = Priority.NORMAL,
            scheduled_at=None) -> int:
    """Insert a new item into the priority queue; return its ID."""
    with conn.cursor() as cur:
        cur.execute(
            """INSERT INTO priority_queue
               (queue_name, payload, priority, scheduled_at)
               VALUES (%s, %s, %s, COALESCE(%s, NOW()))
               RETURNING id""",
            (queue_name, json.dumps(payload), int(priority), scheduled_at)
        )
        job_id = cur.fetchone()[0]
    conn.commit()
    return job_id

def dequeue(conn, queue_name: str, worker_id: str) -> dict | None:
    """Claim the next available item using SKIP LOCKED; return the row or None."""
    with conn.cursor() as cur:
        cur.execute(
            """WITH claimed AS (
               SELECT id FROM priority_queue
               WHERE queue_name = %s
                 AND status = 'pending'
                 AND scheduled_at <= NOW()
                 AND (locked_until IS NULL OR locked_until < NOW())
               ORDER BY priority ASC, enqueued_at ASC
               LIMIT 1
               FOR UPDATE SKIP LOCKED
            )
            UPDATE priority_queue pq
            SET status = 'processing',
                locked_until = NOW() + INTERVAL '5 minutes',
                attempts = attempts + 1
            FROM claimed
            WHERE pq.id = claimed.id
            RETURNING pq.id, pq.payload, pq.priority, pq.attempts""",
            (queue_name,)
        )
        row = cur.fetchone()
    conn.commit()
    if row is None:
        return None
    return {"id": row[0], "payload": row[1],
            "priority": row[2], "attempts": row[3]}

def escalate_priority(conn, job_id: int,
                      new_priority: Priority, actor: str) -> bool:
    """Promote a pending job to a higher priority; return True if updated."""
    with conn.cursor() as cur:
        cur.execute(
            """UPDATE priority_queue
               SET priority = %s, escalated_at = NOW(), escalated_by = %s
               WHERE id = %s AND status = 'pending'
               RETURNING id""",
            (int(new_priority), actor, job_id)
        )
        updated = cur.fetchone() is not None
    conn.commit()
    return updated

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does weighted fair queuing prevent priority starvation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Weighted fair queuing allocates a share of worker capacity to each priority tier rather than processing tiers strictly in order. For example, with shares 8:4:2:1:0.5, a BACKGROUND item is guaranteed at least 0.5 of every 15.5 worker cycles. This bounds the worst-case wait time for lower-priority work even under sustained high-priority load. The shares are tunable and should be adjusted based on observed throughput ratios.”
}
},
{
“@type”: “Question”,
“name”: “How does the aging mechanism prevent starvation in a priority queue?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A background job periodically scans pending items and promotes any that have waited beyond a threshold: for example, LOW items waiting over 30 minutes are bumped to NORMAL, and BACKGROUND items waiting over 60 minutes are bumped to LOW. This guarantees that every item eventually reaches a priority level that receives meaningful worker attention, regardless of how much higher-priority work arrives.”
}
},
{
“@type”: “Question”,
“name”: “Why use SKIP LOCKED instead of a standard SELECT FOR UPDATE for queue dispatch?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A standard SELECT FOR UPDATE causes workers to queue behind the lock holder and wait. With many workers polling simultaneously, this creates lock contention and serialized throughput. SKIP LOCKED causes each worker to immediately move past any row locked by another transaction and claim the next available row. This allows N workers to dequeue N items in parallel with no contention, scaling horizontally with worker count.”
}
},
{
“@type”: “Question”,
“name”: “How does dynamic priority escalation work at runtime?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Escalation is a direct UPDATE on the priority_queue row for the target job. Since workers ORDER BY priority on each poll, the promoted job will be selected before lower-priority items on the next poll cycle. No restarts or special signaling are required. The escalated_at and escalated_by columns provide an audit trail. Only pending items can be escalated; processing or completed items are already past the dispatch gate.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is weighted fair queuing implemented across priority tiers?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Workers track a token counter per priority tier; each dequeue consumes one token; tokens are refilled in proportion to configured weights (e.g., 8:4:2:1), ensuring lower-priority work gets a share without starvation.”
}
},
{
“@type”: “Question”,
“name”: “How does the aging mechanism prevent low-priority starvation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A background job periodically increments the priority field of items that have waited beyond a threshold; after enough aging increments, a BACKGROUND item reaches HIGH priority and gets dequeued.”
}
},
{
“@type”: “Question”,
“name”: “How does SKIP LOCKED enable multiple concurrent dequeue workers?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each worker issues SELECT … FOR UPDATE SKIP LOCKED; rows locked by other workers are skipped atomically, so N workers dequeue N distinct items in parallel without coordination.”
}
},
{
“@type”: “Question”,
“name”: “How is priority escalation implemented in real time?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An API endpoint issues UPDATE priority_queue SET priority = 0 WHERE id = :job_id, which takes effect on the next dequeue cycle with no additional signaling required.”
}
}
]
}

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

Scroll to Top