CQRS Pattern Low-Level Design: Command and Query Separation, Event Publishing, and Read Model Sync

CQRS Pattern: Low-Level Design

CQRS (Command Query Responsibility Segregation) separates the write path — commands that mutate state — from the read path — queries that return projections. This allows each side to be optimized independently: the write side can be normalized and transactionally safe; the read side can be denormalized for fast retrieval. This post covers the full implementation including the outbox pattern, event relay, and read model projection.

Command Side

A command is a request to change state. The command handler validates the command, loads the aggregate, applies business logic, persists the updated aggregate to the write database, and writes domain events to an outbox table — all in a single transaction.


from dataclasses import dataclass
from datetime import datetime, timezone
import json, uuid

@dataclass
class PlaceOrderCommand:
    user_id: int
    items: list[dict]
    shipping_address: str

def handle_command(cmd: PlaceOrderCommand, db) -> int:
    """
    Validates, persists the order, and writes an OrderPlaced event to the outbox.
    Returns the new order ID.
    """
    total = sum(i["price"] * i["qty"] for i in cmd.items)

    with db.transaction():
        order_id = db.fetchone(
            """INSERT INTO orders (user_id, total, status, created_at)
               VALUES (%s, %s, 'pending', %s)
               RETURNING id""",
            (cmd.user_id, total, datetime.now(timezone.utc))
        )[0]

        event = {
            "event_type": "OrderPlaced",
            "order_id": order_id,
            "user_id": cmd.user_id,
            "items": cmd.items,
            "total": str(total),
        }
        db.execute(
            """INSERT INTO order_outbox (event_id, aggregate_id, event_type, payload, created_at)
               VALUES (%s, %s, %s, %s, %s)""",
            (str(uuid.uuid4()), order_id, "OrderPlaced",
             json.dumps(event), datetime.now(timezone.utc))
        )

    return order_id

Writing the event inside the same database transaction as the aggregate update is the critical property of the outbox pattern. There is no window in which the state changes but the event is lost.

Outbox Relay

A background worker polls the outbox for unpublished events, publishes them to Kafka, and marks them published. The worker runs in a tight loop with a short sleep when the outbox is empty.


import time

def publish_outbox_events(db, producer, batch_size: int = 100) -> int:
    """
    Fetches up to batch_size unpublished events, publishes to Kafka, marks published.
    Returns the count of events published.
    """
    rows = db.fetchall(
        """SELECT id, event_id, aggregate_id, event_type, payload
           FROM order_outbox
           WHERE published_at IS NULL
           ORDER BY id ASC
           LIMIT %s
           FOR UPDATE SKIP LOCKED""",
        (batch_size,)
    )
    if not rows:
        return 0

    for row in rows:
        producer.produce(
            topic=f"domain.{row.event_type.lower()}",
            key=str(row.aggregate_id),
            value=row.payload
        )
    producer.flush()

    ids = [r.id for r in rows]
    db.execute(
        f"UPDATE order_outbox SET published_at = NOW() "
        f"WHERE id = ANY(%s)",
        (ids,)
    )
    return len(rows)

FOR UPDATE SKIP LOCKED allows multiple relay workers to run in parallel without conflicts: each worker locks its own batch and skips rows already locked by another worker.

Read Model Projector

A Kafka consumer subscribes to domain event topics and applies projections to the read database. Each event handler is idempotent: replaying the same event twice produces the same read model state.


def project_event(event: dict, db) -> None:
    """
    Applies a domain event to the read model.
    Idempotent: re-processing the same event_id is a no-op.
    """
    event_id = event["event_id"]
    event_type = event["event_type"]

    already_processed = db.fetchone(
        "SELECT 1 FROM projector_checkpoint WHERE event_id = %s",
        (event_id,)
    )
    if already_processed:
        return

    if event_type == "OrderPlaced":
        db.execute(
            """INSERT INTO order_read_model
               (order_id, user_id, total, status, items_json, created_at)
               VALUES (%s, %s, %s, 'pending', %s, %s)
               ON CONFLICT (order_id) DO UPDATE
               SET total=EXCLUDED.total, items_json=EXCLUDED.items_json""",
            (event["order_id"], event["user_id"], event["total"],
             json.dumps(event["items"]), datetime.now(timezone.utc))
        )
    elif event_type == "OrderShipped":
        db.execute(
            "UPDATE order_read_model SET status='shipped', "
            "shipped_at=%s WHERE order_id=%s",
            (datetime.now(timezone.utc), event["order_id"])
        )

    db.execute(
        "INSERT INTO projector_checkpoint (event_id, processed_at) VALUES (%s, NOW())",
        (event_id,)
    )

Query Side

The read model is a denormalized table designed for the queries the UI actually makes. No joins required; the projector pre-computes everything at write time.


def query_orders(user_id: int, status: str | None, db, page: int = 1, per_page: int = 20) -> list[dict]:
    """Queries the read model; never touches the write DB."""
    base  = "SELECT * FROM order_read_model WHERE user_id = %s"
    args  = [user_id]
    if status:
        base += " AND status = %s"
        args.append(status)
    base += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
    args += [per_page, (page - 1) * per_page]
    return db.fetchall(base, args)

Database Schema


-- Write model
CREATE TABLE orders (
    id          BIGSERIAL PRIMARY KEY,
    user_id     BIGINT       NOT NULL,
    total       NUMERIC(12,2) NOT NULL,
    status      VARCHAR(32)  NOT NULL,
    created_at  TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

CREATE TABLE order_outbox (
    id           BIGSERIAL PRIMARY KEY,
    event_id     UUID         NOT NULL UNIQUE,
    aggregate_id BIGINT       NOT NULL,
    event_type   VARCHAR(128) NOT NULL,
    payload      JSONB        NOT NULL,
    created_at   TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished ON order_outbox(id)
    WHERE published_at IS NULL;

-- Read model
CREATE TABLE order_read_model (
    order_id    BIGINT PRIMARY KEY,
    user_id     BIGINT        NOT NULL,
    total       NUMERIC(12,2) NOT NULL,
    status      VARCHAR(32)   NOT NULL,
    items_json  JSONB         NOT NULL,
    created_at  TIMESTAMPTZ   NOT NULL,
    shipped_at  TIMESTAMPTZ
);
CREATE INDEX idx_orm_user_status ON order_read_model(user_id, status);

CREATE TABLE projector_checkpoint (
    event_id     UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Eventual Consistency

The read model lags behind the write model by the time it takes the outbox relay to publish and the projector to consume — typically under 100 ms in a healthy system. For most query use cases (order history, dashboards) this lag is acceptable. If a command response needs to reflect the updated read model immediately (e.g., redirect after checkout), the application layer can poll the read model with a short timeout before rendering, or return the write model's data for that single request.

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Atlassian Interview Guide

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

Scroll to Top