Messaging Queue System: Low-Level Design
A messaging queue decouples producers from consumers: producers publish messages without knowing who will process them or when. This design covers a durable, at-least-once delivery queue built on Postgres, consumer group semantics with competing consumers, dead-letter handling, and the patterns needed to make message processing idempotent. The design choices apply equally to building a lightweight in-process queue or understanding how Kafka, SQS, and RabbitMQ work under the hood.
Core Data Model
CREATE TABLE Queue (
queue_id SERIAL PRIMARY KEY,
queue_name VARCHAR(100) UNIQUE NOT NULL,
max_retries SMALLINT NOT NULL DEFAULT 5,
visibility_timeout_sec INT NOT NULL DEFAULT 30, -- how long a consumer holds a message
dlq_name VARCHAR(100), -- dead-letter queue name
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE Message (
message_id BIGSERIAL PRIMARY KEY,
queue_id INT NOT NULL REFERENCES Queue(queue_id),
payload JSONB NOT NULL,
deduplication_id VARCHAR(200), -- optional: prevent duplicate enqueues
priority SMALLINT NOT NULL DEFAULT 0, -- higher = processed first
attempt_count SMALLINT NOT NULL DEFAULT 0,
max_retries SMALLINT NOT NULL DEFAULT 5,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending, processing, completed, failed, dead
visible_after TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- when next available for pickup
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
last_error TEXT
) PARTITION BY LIST (status);
-- Active partitions (small, fast)
CREATE TABLE Message_pending PARTITION OF Message FOR VALUES IN ('pending');
CREATE TABLE Message_processing PARTITION OF Message FOR VALUES IN ('processing');
-- Archive partitions (can be moved to cold storage)
CREATE TABLE Message_completed PARTITION OF Message FOR VALUES IN ('completed');
CREATE TABLE Message_dead PARTITION OF Message FOR VALUES IN ('dead');
CREATE UNIQUE INDEX ON Message(queue_id, deduplication_id)
WHERE deduplication_id IS NOT NULL AND status IN ('pending','processing');
CREATE INDEX ON Message_pending(queue_id, priority DESC, visible_after ASC);
Producer: Enqueue
import json, uuid, datetime
def enqueue(queue_name: str, payload: dict,
dedup_id: str = None, priority: int = 0, delay_seconds: int = 0) -> int:
"""
Enqueue a message. Returns message_id.
dedup_id prevents duplicate messages (idempotent enqueue within dedup window).
delay_seconds: message won't be visible until now + delay.
"""
queue = db.fetchone("SELECT queue_id, max_retries FROM Queue WHERE queue_name=%s", (queue_name,))
if not queue:
raise ValueError(f"Queue not found: {queue_name}")
visible_after = datetime.datetime.utcnow() + datetime.timedelta(seconds=delay_seconds)
try:
row = db.fetchone("""
INSERT INTO Message (queue_id, payload, deduplication_id, priority,
max_retries, visible_after)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (queue_id, deduplication_id)
WHERE deduplication_id IS NOT NULL AND status IN ('pending','processing')
DO NOTHING
RETURNING message_id
""", (queue['queue_id'], json.dumps(payload), dedup_id,
priority, queue['max_retries'], visible_after))
if not row:
# Deduplicated — find existing message_id
row = db.fetchone("""
SELECT message_id FROM Message
WHERE queue_id=%s AND deduplication_id=%s AND status IN ('pending','processing')
""", (queue['queue_id'], dedup_id))
return row['message_id']
except Exception as e:
raise EnqueueError(f"Failed to enqueue to {queue_name}: {e}") from e
Consumer: Dequeue with Visibility Timeout
def dequeue(queue_name: str, batch_size: int = 10) -> list:
"""
Atomically claim messages: set status=processing and advance visible_after.
SKIP LOCKED prevents multiple workers from claiming the same message.
"""
queue = db.fetchone(
"SELECT queue_id, visibility_timeout_sec FROM Queue WHERE queue_name=%s", (queue_name,)
)
if not queue:
return []
new_visible = datetime.datetime.utcnow() + datetime.timedelta(
seconds=queue['visibility_timeout_sec']
)
messages = db.fetchall("""
UPDATE Message SET
status = 'processing',
attempt_count = attempt_count + 1,
visible_after = %s
WHERE message_id IN (
SELECT message_id FROM Message
WHERE queue_id = %s
AND status = 'pending'
AND visible_after = msg['max_retries']:
_move_to_dlq(message_id, error)
else:
# Exponential backoff: 60s, 120s, 240s, 480s...
backoff = retry_delay_seconds * (2 ** (msg['attempt_count'] - 1))
next_visible = datetime.datetime.utcnow() + datetime.timedelta(seconds=backoff)
db.execute("""
UPDATE Message SET status='pending', visible_after=%s, last_error=%s
WHERE message_id=%s
""", (next_visible, error[:500], message_id))
def _move_to_dlq(message_id: int, error: str):
queue = db.fetchone("""
SELECT q.dlq_name FROM Message m JOIN Queue q USING(queue_id)
WHERE m.message_id=%s
""", (message_id,))
if queue and queue['dlq_name']:
dlq = db.fetchone("SELECT queue_id FROM Queue WHERE queue_name=%s", (queue['dlq_name'],))
if dlq:
# Copy message to DLQ then mark original dead
db.execute("""
INSERT INTO Message (queue_id, payload, status, last_error)
SELECT %s, payload, 'pending', %s FROM Message WHERE message_id=%s
""", (dlq['queue_id'], error[:500], message_id))
db.execute("""
UPDATE Message SET status='dead', last_error=%s, completed_at=NOW()
WHERE message_id=%s
""", (error[:500], message_id))
Worker Loop Pattern
import time, logging
def worker_loop(queue_name: str, handler, poll_interval: float = 1.0):
"""
Long-running worker process. Polls for messages and processes them.
handler(payload: dict) -> None: raises on failure.
"""
logging.info(f"Worker started for queue: {queue_name}")
while True:
messages = dequeue(queue_name, batch_size=10)
if not messages:
time.sleep(poll_interval)
continue
for msg in messages:
try:
handler(msg['payload'])
ack(msg['message_id'])
except Exception as e:
logging.error(f"Message {msg['message_id']} failed: {e}")
nack(msg['message_id'], str(e))
# No sleep when there were messages — drain the queue fast
Key Design Decisions
- Visibility timeout instead of DELETE on dequeue: when a consumer claims a message, it’s not deleted — it’s made invisible for visibility_timeout_sec seconds. If the consumer crashes before ack(), the message automatically becomes visible again when the timeout expires and is retried by another worker. This ensures at-least-once delivery even under consumer failure.
- SKIP LOCKED is essential for competing consumers: without SKIP LOCKED, all workers block on the same row lock and serialize. With SKIP LOCKED, each worker instantly skips rows locked by other workers and claims the next available one — full parallelism across the worker pool.
- Partitioning by status keeps the active table small: completed messages accumulate quickly. Partitioning into Message_pending, Message_processing, Message_completed lets the index on pending messages stay small and fast. Detach Message_completed partitions monthly for archival.
- Deduplication_id scope is pending+processing only: once a message is completed, the dedup constraint is lifted. Re-enqueuing the same dedup_id after completion succeeds — this models “at-least-once processing with dedup window” rather than “exactly-once forever.”
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the difference between at-least-once and exactly-once message delivery?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”At-least-once delivery: the queue guarantees every message will be delivered to a consumer at least once, but may deliver it more than once (if the consumer crashes after processing but before acking, the message becomes visible again and is redelivered). At-least-once is achievable with a visibility timeout and requires consumers to be idempotent. Exactly-once delivery: each message is processed exactly one time, even under consumer failure. True exactly-once requires either: (1) distributed transactions (2PC) spanning the queue and the consumer’s database — expensive and complex; or (2) idempotency keys — the consumer checks a deduplication table before processing and records processed message IDs; re-delivery hits the dedup check and is a no-op. Most production systems accept at-least-once + idempotent consumers as the practical equivalent of exactly-once, because true exactly-once is costly and often unnecessary.”}},{“@type”:”Question”,”name”:”How does a dead-letter queue help with debugging message processing failures?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”When a message exhausts all retries (attempt_count >= max_retries), it moves to the DLQ rather than being deleted. The DLQ serves as a staging area for failed messages: (1) inspection — engineers can read DLQ messages to understand what payloads caused failures; (2) replay — after fixing the bug, messages can be re-enqueued to the original queue for reprocessing (SELECT from DLQ, INSERT into main queue); (3) alerting — a CloudWatch alarm or Datadog monitor fires when DLQ depth exceeds 0, alerting on-call that messages are failing. Without a DLQ, failed messages are silently deleted after max retries — the failure is invisible until a customer complains. DLQ design: use a separate queue (not a status flag) so DLQ consumers can be independently rate-limited during replay, and DLQ depth metrics are isolated from the main queue depth metric.”}},{“@type”:”Question”,”name”:”How do you implement message priority in a Postgres-backed queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”The Message table has a priority column. The dequeue query orders by priority DESC, visible_after ASC: higher-priority messages are always claimed before lower-priority ones. Practical priority levels: 0=normal, 10=high (paid users), 100=critical (payment webhooks). Starvation prevention: if high-priority messages arrive continuously, low-priority messages may never be processed. Mitigate with aging: add an age_boost column; a background job increments age_boost every 5 minutes for pending messages, effectively raising their priority. The dequeue ORDER BY becomes: (priority + age_boost) DESC, visible_after ASC. Alternatively, use separate physical queues per priority level and have workers poll high-priority queues first, falling back to normal queues when the high-priority queue is empty. Separate queues are simpler but require more worker complexity.”}},{“@type”:”Question”,”name”:”How do you handle message ordering requirements in a distributed queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A Postgres SKIP LOCKED queue doesn’t guarantee ordered delivery: worker A claims message 100, worker B claims message 101, but B finishes first. If processing must be strictly ordered (e.g., events for the same user must be processed in sequence), two options: (1) single-consumer ordering — partition messages by a group key (user_id) and route all messages for the same group key to the same worker. Workers claim messages WHERE queue_id=X AND group_key=%s ORDER BY message_id LIMIT 1 FOR UPDATE SKIP LOCKED. Different group keys are processed in parallel; within a group, order is preserved. (2) Sequence number check — the consumer checks that the incoming sequence_number = last_processed + 1 before processing; if not, nack and wait. This is complex and creates head-of-line blocking. Most systems relax ordering requirements and instead design processing to be order-independent (idempotent and commutative).”}},{“@type”:”Question”,”name”:”How do you scale a Postgres-backed queue to millions of messages per day?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A Postgres queue with SKIP LOCKED handles approximately 10K–50K message claims per second on modern hardware before the index and lock overhead becomes a bottleneck. For millions of messages per day (roughly 12–120 messages/second), Postgres is entirely adequate. For millions per hour (300–30K messages/second): (1) partition Message_pending by queue_id — each queue has its own partition, eliminating cross-queue index contention; (2) increase worker parallelism — more workers each claiming smaller batches; (3) shard across multiple Postgres instances (one per high-volume queue). Beyond ~100K messages/second: migrate to Kafka (ordered, partitioned, replay-capable) or SQS (managed, scales to millions/second automatically). The key scaling question is not just throughput but whether you need message replay (Kafka yes, SQS/Postgres no), strict ordering (Kafka yes, SQS no), or cross-account fanout (SNS+SQS yes).”}}]}
Messaging queue and async job processing system design is discussed in Amazon system design interview questions.
Messaging queue and distributed job processing design is covered in Uber system design interview preparation.
Messaging queue and order processing pipeline design is discussed in Shopify system design interview guide.