CQRS Pattern: Low-Level Design
CQRS (Command Query Responsibility Segregation) separates the write path — commands that mutate state — from the read path — queries that return projections. This allows each side to be optimized independently: the write side can be normalized and transactionally safe; the read side can be denormalized for fast retrieval. This post covers the full implementation including the outbox pattern, event relay, and read model projection.
Command Side
A command is a request to change state. The command handler validates the command, loads the aggregate, applies business logic, persists the updated aggregate to the write database, and writes domain events to an outbox table — all in a single transaction.
from dataclasses import dataclass
from datetime import datetime, timezone
import json, uuid
@dataclass
class PlaceOrderCommand:
user_id: int
items: list[dict]
shipping_address: str
def handle_command(cmd: PlaceOrderCommand, db) -> int:
"""
Validates, persists the order, and writes an OrderPlaced event to the outbox.
Returns the new order ID.
"""
total = sum(i["price"] * i["qty"] for i in cmd.items)
with db.transaction():
order_id = db.fetchone(
"""INSERT INTO orders (user_id, total, status, created_at)
VALUES (%s, %s, 'pending', %s)
RETURNING id""",
(cmd.user_id, total, datetime.now(timezone.utc))
)[0]
event = {
"event_type": "OrderPlaced",
"order_id": order_id,
"user_id": cmd.user_id,
"items": cmd.items,
"total": str(total),
}
db.execute(
"""INSERT INTO order_outbox (event_id, aggregate_id, event_type, payload, created_at)
VALUES (%s, %s, %s, %s, %s)""",
(str(uuid.uuid4()), order_id, "OrderPlaced",
json.dumps(event), datetime.now(timezone.utc))
)
return order_id
Writing the event inside the same database transaction as the aggregate update is the critical property of the outbox pattern. There is no window in which the state changes but the event is lost.
Outbox Relay
A background worker polls the outbox for unpublished events, publishes them to Kafka, and marks them published. The worker runs in a tight loop with a short sleep when the outbox is empty.
import time
def publish_outbox_events(db, producer, batch_size: int = 100) -> int:
"""
Fetches up to batch_size unpublished events, publishes to Kafka, marks published.
Returns the count of events published.
"""
rows = db.fetchall(
"""SELECT id, event_id, aggregate_id, event_type, payload
FROM order_outbox
WHERE published_at IS NULL
ORDER BY id ASC
LIMIT %s
FOR UPDATE SKIP LOCKED""",
(batch_size,)
)
if not rows:
return 0
for row in rows:
producer.produce(
topic=f"domain.{row.event_type.lower()}",
key=str(row.aggregate_id),
value=row.payload
)
producer.flush()
ids = [r.id for r in rows]
db.execute(
f"UPDATE order_outbox SET published_at = NOW() "
f"WHERE id = ANY(%s)",
(ids,)
)
return len(rows)
FOR UPDATE SKIP LOCKED allows multiple relay workers to run in parallel without conflicts: each worker locks its own batch and skips rows already locked by another worker.
Read Model Projector
A Kafka consumer subscribes to domain event topics and applies projections to the read database. Each event handler is idempotent: replaying the same event twice produces the same read model state.
def project_event(event: dict, db) -> None:
"""
Applies a domain event to the read model.
Idempotent: re-processing the same event_id is a no-op.
"""
event_id = event["event_id"]
event_type = event["event_type"]
already_processed = db.fetchone(
"SELECT 1 FROM projector_checkpoint WHERE event_id = %s",
(event_id,)
)
if already_processed:
return
if event_type == "OrderPlaced":
db.execute(
"""INSERT INTO order_read_model
(order_id, user_id, total, status, items_json, created_at)
VALUES (%s, %s, %s, 'pending', %s, %s)
ON CONFLICT (order_id) DO UPDATE
SET total=EXCLUDED.total, items_json=EXCLUDED.items_json""",
(event["order_id"], event["user_id"], event["total"],
json.dumps(event["items"]), datetime.now(timezone.utc))
)
elif event_type == "OrderShipped":
db.execute(
"UPDATE order_read_model SET status='shipped', "
"shipped_at=%s WHERE order_id=%s",
(datetime.now(timezone.utc), event["order_id"])
)
db.execute(
"INSERT INTO projector_checkpoint (event_id, processed_at) VALUES (%s, NOW())",
(event_id,)
)
Query Side
The read model is a denormalized table designed for the queries the UI actually makes. No joins required; the projector pre-computes everything at write time.
def query_orders(user_id: int, status: str | None, db, page: int = 1, per_page: int = 20) -> list[dict]:
"""Queries the read model; never touches the write DB."""
base = "SELECT * FROM order_read_model WHERE user_id = %s"
args = [user_id]
if status:
base += " AND status = %s"
args.append(status)
base += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
args += [per_page, (page - 1) * per_page]
return db.fetchall(base, args)
Database Schema
-- Write model
CREATE TABLE orders (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
total NUMERIC(12,2) NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE order_outbox (
id BIGSERIAL PRIMARY KEY,
event_id UUID NOT NULL UNIQUE,
aggregate_id BIGINT NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished ON order_outbox(id)
WHERE published_at IS NULL;
-- Read model
CREATE TABLE order_read_model (
order_id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
total NUMERIC(12,2) NOT NULL,
status VARCHAR(32) NOT NULL,
items_json JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
shipped_at TIMESTAMPTZ
);
CREATE INDEX idx_orm_user_status ON order_read_model(user_id, status);
CREATE TABLE projector_checkpoint (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Eventual Consistency
The read model lags behind the write model by the time it takes the outbox relay to publish and the projector to consume — typically under 100 ms in a healthy system. For most query use cases (order history, dashboards) this lag is acceptable. If a command response needs to reflect the updated read model immediately (e.g., redirect after checkout), the application layer can poll the read model with a short timeout before rendering, or return the write model's data for that single request.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use an outbox table instead of publishing directly to Kafka?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Direct publish to Kafka from within a database transaction is not atomic: the transaction could commit but the Kafka publish could fail, or vice versa. The outbox pattern writes the event to a table in the same transaction as the aggregate update, guaranteeing the event is durable if the transaction commits. The relay then publishes with at-least-once semantics.”
}
},
{
“@type”: “Question”,
“name”: “How long does the read model lag behind the write model?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In a healthy system the lag is typically under 100 milliseconds: the outbox relay polls every few hundred milliseconds, Kafka delivery adds single-digit milliseconds, and the projector consumer processes events in near real time. Under heavy load or during relay downtime the lag can grow to seconds or minutes.”
}
},
{
“@type”: “Question”,
“name”: “How are read model projectors made idempotent?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The projector records each processed event_id in a checkpoint table before returning. At the start of processing, it checks whether the event_id has already been recorded and returns early if so. Using upsert (ON CONFLICT DO UPDATE) for read model writes provides an additional safety net even if the checkpoint check is skipped.”
}
},
{
“@type”: “Question”,
“name”: “What happens if a command is rolled back?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Because the outbox row and the aggregate update are in the same transaction, a rollback removes both atomically. No outbox event is left behind for a failed command, so the projector never sees an event for an aggregate that does not exist.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use the outbox pattern instead of publishing events directly after a DB write?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Direct publish can succeed while the DB transaction rolls back (or vice versa), causing phantom events; the outbox writes the event in the same transaction as the aggregate, guaranteeing consistency.”
}
},
{
“@type”: “Question”,
“name”: “How is the read model kept consistent with the write model?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A projector consumes domain events from Kafka in order; it applies idempotent projections to the read DB and checkpoints the last processed offset to resume after restart.”
}
},
{
“@type”: “Question”,
“name”: “How are idempotent projectors implemented?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each projection handler checks whether the event's sequence number has already been applied using the ProjectorCheckpoint table; duplicate events are skipped without re-applying side effects.”
}
},
{
“@type”: “Question”,
“name”: “How are queries isolated from write-side load?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The query side uses a separate database instance or read replica; all query endpoints connect exclusively to the read DB, preventing query load from impacting command throughput.”
}
}
]
}
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Atlassian Interview Guide
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety