Materialized View Low-Level Design: Incremental Refresh, Change Data Capture, and Query Routing

What Is a Materialized View?

A materialized view is a precomputed result of a query stored as a physical table. Unlike a regular view (which re-executes its query at read time), a materialized view stores the result set on disk and can be read with a simple table scan — no join or aggregation at query time.

Materialized views are essential for analytics queries that are expensive to recompute (e.g., multi-table joins with aggregations) but are read far more frequently than the underlying data changes.

Full Refresh

Full refresh is the simplest strategy: truncate the materialized view table and reinsert from the source query. Steps:

  1. Acquire a refresh lock on the view (prevent concurrent refresh).
  2. Execute the source query and write results to a staging table.
  3. Within a transaction: TRUNCATE the view table, INSERT from staging.
  4. Update last_refreshed_at in the metadata table.

Full refresh is straightforward but expensive for large datasets — it re-scans all source data regardless of what changed. Suitable when the source dataset is small or when the view is refreshed infrequently.

Incremental Refresh

Incremental refresh applies only the changes since the last refresh. This requires a change tracking mechanism:

  • updated_at column: query source table WHERE updated_at > last_refreshed_at; apply upserts to the view.
  • Change Data Capture (CDC): consume a stream of insert/update/delete events from the source table and apply deltas to the view.

Incremental refresh is much faster for large datasets but requires careful handling of deletes (which don't update an updated_at column) and schema changes.

CDC-Based Real-Time Updates

With CDC, a change event is emitted for every row modification in the source table. The materialized view service consumes these events (e.g., from Kafka) and applies deltas in near-real-time:

  • INSERT: insert new row into view (if it satisfies view filter).
  • UPDATE: update the corresponding row in the view; may also affect aggregations (e.g., recompute group totals).
  • DELETE: remove row from view or adjust aggregation.

CDC-based views can have sub-second staleness, making them suitable for near-real-time dashboards.

Staleness Tracking

Each materialized view records its last_refreshed_at timestamp and a staleness_threshold_seconds. A monitoring job computes estimated staleness:

staleness = now() - last_refreshed_at
if staleness > staleness_threshold_seconds:
    alert("View {view_name} is stale by {staleness}s")

For CDC-based views, staleness is tracked via consumer lag: the difference between the latest event timestamp in the source stream and the latest applied event timestamp.

Concurrent Refresh Prevention

Two concurrent refresh jobs would produce inconsistent intermediate states. Prevent this with:

  • Refresh flag: set a boolean is_refreshing in the metadata table before starting; check atomically with SELECT FOR UPDATE NOWAIT.
  • Advisory lock: use a DB-level advisory lock keyed to the view name; other refresh attempts skip if the lock is held.

Query Routing

The application should query the materialized view for analytics and fall back to the base tables if the view is too stale:

if check_staleness(view_name) < acceptable_threshold:
    run query against materialized view
else:
    run query against base tables (slower but fresh)

This staleness-aware routing ensures the application degrades gracefully when the refresh pipeline is delayed.

SQL Schema

CREATE TABLE MaterializedView (
    id                          SERIAL PRIMARY KEY,
    view_name                   TEXT UNIQUE NOT NULL,
    source_query                TEXT NOT NULL,
    refresh_strategy            TEXT NOT NULL CHECK (refresh_strategy IN ('full','incremental','cdc')),
    last_refreshed_at           TIMESTAMPTZ,
    staleness_threshold_seconds INT NOT NULL DEFAULT 300,
    is_refreshing               BOOLEAN NOT NULL DEFAULT FALSE
);

CREATE TABLE MaterializedViewLog (
    id            SERIAL PRIMARY KEY,
    view_name     TEXT         NOT NULL,
    refresh_type  TEXT         NOT NULL,
    rows_affected INT,
    duration_ms   INT,
    refreshed_at  TIMESTAMPTZ  NOT NULL DEFAULT now()
);

CREATE TABLE ChangeCapture (
    id            BIGSERIAL PRIMARY KEY,
    table_name    TEXT         NOT NULL,
    operation     TEXT         NOT NULL CHECK (operation IN ('INSERT','UPDATE','DELETE')),
    row_id        BIGINT       NOT NULL,
    changed_data  JSONB,
    captured_at   TIMESTAMPTZ  NOT NULL DEFAULT now()
);

CREATE INDEX idx_cdc_table_time ON ChangeCapture (table_name, captured_at);

Python Implementation Sketch

import time
from typing import List

class MaterializedViewManager:
    def __init__(self, db):
        self.db = db

    def refresh_view(self, view_name: str, strategy: str):
        if not self._acquire_refresh_lock(view_name):
            print(f"Refresh already in progress for {view_name}")
            return
        try:
            start = time.time()
            if strategy == 'full':
                rows = self._full_refresh(view_name)
            elif strategy == 'incremental':
                rows = self._incremental_refresh(view_name)
            elif strategy == 'cdc':
                rows = self._cdc_refresh(view_name)
            else:
                raise ValueError(f"Unknown strategy: {strategy}")
            duration_ms = int((time.time() - start) * 1000)
            self._log_refresh(view_name, strategy, rows, duration_ms)
            self.db.execute(
                "UPDATE MaterializedView SET last_refreshed_at = now(), is_refreshing = FALSE WHERE view_name = %s",
                (view_name,)
            )
        finally:
            self._release_refresh_lock(view_name)

    def apply_cdc_delta(self, view_name: str, changes: List[dict]):
        for change in changes:
            op = change['operation']
            if op == 'INSERT':
                self.db.execute("INSERT INTO mv_{} ...".format(view_name), change['changed_data'])
            elif op == 'UPDATE':
                self.db.execute("UPDATE mv_{} SET ... WHERE row_id = %s".format(view_name), (change['row_id'],))
            elif op == 'DELETE':
                self.db.execute("DELETE FROM mv_{} WHERE row_id = %s".format(view_name), (change['row_id'],))

    def check_staleness(self, view_name: str) -> int:
        row = self.db.fetchone(
            "SELECT EXTRACT(EPOCH FROM (now() - last_refreshed_at)) AS staleness FROM MaterializedView WHERE view_name = %s",
            (view_name,)
        )
        return int(row['staleness']) if row else 999999

    def route_query(self, query: str, staleness_threshold: int, view_name: str) -> str:
        if self.check_staleness(view_name)  bool:
        result = self.db.execute(
            "UPDATE MaterializedView SET is_refreshing = TRUE WHERE view_name = %s AND is_refreshing = FALSE",
            (view_name,)
        )
        return result.rowcount > 0

    def _release_refresh_lock(self, view_name: str):
        self.db.execute(
            "UPDATE MaterializedView SET is_refreshing = FALSE WHERE view_name = %s",
            (view_name,)
        )

    def _full_refresh(self, view_name: str) -> int:
        self.db.execute(f"TRUNCATE mv_{view_name}")
        result = self.db.execute(f"INSERT INTO mv_{view_name} SELECT * FROM source_{view_name}")
        return result.rowcount

    def _incremental_refresh(self, view_name: str) -> int:
        row = self.db.fetchone("SELECT last_refreshed_at FROM MaterializedView WHERE view_name = %s", (view_name,))
        since = row['last_refreshed_at']
        result = self.db.execute(
            f"INSERT INTO mv_{view_name} SELECT * FROM source_{view_name} WHERE updated_at > %s ON CONFLICT (row_id) DO UPDATE SET ...",
            (since,)
        )
        return result.rowcount

    def _cdc_refresh(self, view_name: str) -> int:
        # Pull pending CDC events and apply
        changes = self.db.fetchall(
            "SELECT * FROM ChangeCapture WHERE table_name = %s AND captured_at > (SELECT last_refreshed_at FROM MaterializedView WHERE view_name = %s) ORDER BY captured_at",
            (view_name, view_name)
        )
        self.apply_cdc_delta(view_name, changes)
        return len(changes)

    def _log_refresh(self, view_name, refresh_type, rows_affected, duration_ms):
        self.db.execute(
            "INSERT INTO MaterializedViewLog (view_name, refresh_type, rows_affected, duration_ms) VALUES (%s, %s, %s, %s)",
            (view_name, refresh_type, rows_affected, duration_ms)
        )

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top