Messaging Queue System Low-Level Design: At-Least-Once Delivery, Visibility Timeout, and Dead-Letter Handling

Messaging Queue System: Low-Level Design

A messaging queue decouples producers from consumers: producers publish messages without knowing who will process them or when. This design covers a durable, at-least-once delivery queue built on Postgres, consumer group semantics with competing consumers, dead-letter handling, and the patterns needed to make message processing idempotent. The design choices apply equally to building a lightweight in-process queue or understanding how Kafka, SQS, and RabbitMQ work under the hood.

Core Data Model

CREATE TABLE Queue (
    queue_id       SERIAL PRIMARY KEY,
    queue_name     VARCHAR(100) UNIQUE NOT NULL,
    max_retries    SMALLINT NOT NULL DEFAULT 5,
    visibility_timeout_sec INT NOT NULL DEFAULT 30,  -- how long a consumer holds a message
    dlq_name       VARCHAR(100),                      -- dead-letter queue name
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE Message (
    message_id     BIGSERIAL PRIMARY KEY,
    queue_id       INT NOT NULL REFERENCES Queue(queue_id),
    payload        JSONB NOT NULL,
    deduplication_id VARCHAR(200),           -- optional: prevent duplicate enqueues
    priority       SMALLINT NOT NULL DEFAULT 0,  -- higher = processed first
    attempt_count  SMALLINT NOT NULL DEFAULT 0,
    max_retries    SMALLINT NOT NULL DEFAULT 5,
    status         VARCHAR(20) NOT NULL DEFAULT 'pending',
        -- pending, processing, completed, failed, dead
    visible_after  TIMESTAMPTZ NOT NULL DEFAULT NOW(),  -- when next available for pickup
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at   TIMESTAMPTZ,
    last_error     TEXT
) PARTITION BY LIST (status);

-- Active partitions (small, fast)
CREATE TABLE Message_pending   PARTITION OF Message FOR VALUES IN ('pending');
CREATE TABLE Message_processing PARTITION OF Message FOR VALUES IN ('processing');
-- Archive partitions (can be moved to cold storage)
CREATE TABLE Message_completed PARTITION OF Message FOR VALUES IN ('completed');
CREATE TABLE Message_dead      PARTITION OF Message FOR VALUES IN ('dead');

CREATE UNIQUE INDEX ON Message(queue_id, deduplication_id)
    WHERE deduplication_id IS NOT NULL AND status IN ('pending','processing');

CREATE INDEX ON Message_pending(queue_id, priority DESC, visible_after ASC);

Producer: Enqueue

import json, uuid, datetime

def enqueue(queue_name: str, payload: dict,
            dedup_id: str = None, priority: int = 0, delay_seconds: int = 0) -> int:
    """
    Enqueue a message. Returns message_id.
    dedup_id prevents duplicate messages (idempotent enqueue within dedup window).
    delay_seconds: message won't be visible until now + delay.
    """
    queue = db.fetchone("SELECT queue_id, max_retries FROM Queue WHERE queue_name=%s", (queue_name,))
    if not queue:
        raise ValueError(f"Queue not found: {queue_name}")

    visible_after = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)

    try:
        row = db.fetchone("""
            INSERT INTO Message (queue_id, payload, deduplication_id, priority,
                                 max_retries, visible_after)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (queue_id, deduplication_id)
                WHERE deduplication_id IS NOT NULL AND status IN ('pending','processing')
            DO NOTHING
            RETURNING message_id
        """, (queue['queue_id'], json.dumps(payload), dedup_id,
              priority, queue['max_retries'], visible_after))

        if not row:
            # Deduplicated — find existing message_id
            row = db.fetchone("""
                SELECT message_id FROM Message
                WHERE queue_id=%s AND deduplication_id=%s AND status IN ('pending','processing')
            """, (queue['queue_id'], dedup_id))

        return row['message_id']
    except Exception as e:
        raise EnqueueError(f"Failed to enqueue to {queue_name}: {e}") from e

Consumer: Dequeue with Visibility Timeout

def dequeue(queue_name: str, batch_size: int = 10) -> list:
    """
    Atomically claim messages: set status=processing and advance visible_after.
    SKIP LOCKED prevents multiple workers from claiming the same message.
    """
    queue = db.fetchone(
        "SELECT queue_id, visibility_timeout_sec FROM Queue WHERE queue_name=%s", (queue_name,)
    )
    if not queue:
        return []

    new_visible = datetime.datetime.utcnow() + datetime.timedelta(
        seconds=queue['visibility_timeout_sec']
    )

    messages = db.fetchall("""
        UPDATE Message SET
            status = 'processing',
            attempt_count = attempt_count + 1,
            visible_after = %s
        WHERE message_id IN (
            SELECT message_id FROM Message
            WHERE queue_id = %s
              AND status = 'pending'
              AND visible_after = msg['max_retries']:
        _move_to_dlq(message_id, error)
    else:
        # Exponential backoff: 60s, 120s, 240s, 480s...
        backoff = retry_delay_seconds * (2 ** (msg['attempt_count'] - 1))
        next_visible = datetime.datetime.utcnow() + datetime.timedelta(seconds=backoff)
        db.execute("""
            UPDATE Message SET status='pending', visible_after=%s, last_error=%s
            WHERE message_id=%s
        """, (next_visible, error[:500], message_id))

def _move_to_dlq(message_id: int, error: str):
    queue = db.fetchone("""
        SELECT q.dlq_name FROM Message m JOIN Queue q USING(queue_id)
        WHERE m.message_id=%s
    """, (message_id,))

    if queue and queue['dlq_name']:
        dlq = db.fetchone("SELECT queue_id FROM Queue WHERE queue_name=%s", (queue['dlq_name'],))
        if dlq:
            # Copy message to DLQ then mark original dead
            db.execute("""
                INSERT INTO Message (queue_id, payload, status, last_error)
                SELECT %s, payload, 'pending', %s FROM Message WHERE message_id=%s
            """, (dlq['queue_id'], error[:500], message_id))

    db.execute("""
        UPDATE Message SET status='dead', last_error=%s, completed_at=NOW()
        WHERE message_id=%s
    """, (error[:500], message_id))

Worker Loop Pattern

import time, logging

def worker_loop(queue_name: str, handler, poll_interval: float = 1.0):
    """
    Long-running worker process. Polls for messages and processes them.
    handler(payload: dict) -> None: raises on failure.
    """
    logging.info(f"Worker started for queue: {queue_name}")
    while True:
        messages = dequeue(queue_name, batch_size=10)

        if not messages:
            time.sleep(poll_interval)
            continue

        for msg in messages:
            try:
                handler(msg['payload'])
                ack(msg['message_id'])
            except Exception as e:
                logging.error(f"Message {msg['message_id']} failed: {e}")
                nack(msg['message_id'], str(e))

        # No sleep when there were messages — drain the queue fast

Key Design Decisions

  • Visibility timeout instead of DELETE on dequeue: when a consumer claims a message, it’s not deleted — it’s made invisible for visibility_timeout_sec seconds. If the consumer crashes before ack(), the message automatically becomes visible again when the timeout expires and is retried by another worker. This ensures at-least-once delivery even under consumer failure.
  • SKIP LOCKED is essential for competing consumers: without SKIP LOCKED, all workers block on the same row lock and serialize. With SKIP LOCKED, each worker instantly skips rows locked by other workers and claims the next available one — full parallelism across the worker pool.
  • Partitioning by status keeps the active table small: completed messages accumulate quickly. Partitioning into Message_pending, Message_processing, Message_completed lets the index on pending messages stay small and fast. Detach Message_completed partitions monthly for archival.
  • Deduplication_id scope is pending+processing only: once a message is completed, the dedup constraint is lifted. Re-enqueuing the same dedup_id after completion succeeds — this models “at-least-once processing with dedup window” rather than “exactly-once forever.”

Messaging queue and async job processing system design is discussed in Amazon system design interview questions.

Messaging queue and distributed job processing design is covered in Uber system design interview preparation.

Messaging queue and order processing pipeline design is discussed in Shopify system design interview guide.

Scroll to Top