Read Replica Routing System Low-Level Design: Lag-Aware Routing, Read-Your-Writes, and Failover

Read Replica Routing System: Low-Level Design

A read replica routing system transparently routes read queries to replica databases while sending writes to the primary, multiplying read capacity without changing application code. The design challenges are replica lag awareness (routing a read to a replica that hasn’t yet seen a just-committed write), connection pool management across multiple replicas, failover when a replica falls behind or crashes, and handling the “read-your-writes” consistency requirement after mutations.

Core Data Model

CREATE TABLE DbNode (
    node_id        SERIAL PRIMARY KEY,
    node_name      VARCHAR(100) UNIQUE NOT NULL,  -- "primary", "replica-1", "replica-2"
    host           VARCHAR(255) NOT NULL,
    port           INT NOT NULL DEFAULT 5432,
    role           VARCHAR(10) NOT NULL,           -- primary, replica
    is_active      BOOLEAN NOT NULL DEFAULT TRUE,
    max_lag_seconds INT NOT NULL DEFAULT 10,       -- max acceptable replication lag
    weight         SMALLINT NOT NULL DEFAULT 100,  -- for weighted round-robin (higher = more traffic)
    last_health_check TIMESTAMPTZ,
    last_lag_seconds  NUMERIC(6,2),
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Lag is measured by comparing primary's WAL position with replica's applied position
-- Postgres built-in: SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))

INSERT INTO DbNode (node_name, host, role) VALUES
    ('primary',   'db-primary.internal',   'primary'),
    ('replica-1', 'db-replica-1.internal', 'replica'),
    ('replica-2', 'db-replica-2.internal', 'replica');

Router Implementation

import threading, time, random
from dataclasses import dataclass, field
from typing import Optional
import psycopg2
from psycopg2 import pool

@dataclass
class NodeState:
    node_id: int
    node_name: str
    role: str
    host: str
    port: int
    weight: int
    is_healthy: bool = True
    lag_seconds: float = 0.0
    pool: object = field(default=None, repr=False)

class ReadReplicaRouter:
    def __init__(self):
        self._lock = threading.RLock()
        self._nodes: dict[str, NodeState] = {}
        self._primary: Optional[NodeState] = None
        self._replicas: list[NodeState] = []
        self._rr_index = 0
        self._load_nodes()
        self._start_health_checker()

    def _load_nodes(self):
        rows = db_admin.fetchall("SELECT * FROM DbNode WHERE is_active=TRUE")
        for row in rows:
            conn_pool = psycopg2.pool.ThreadedConnectionPool(
                minconn=2, maxconn=20,
                host=row['host'], port=row['port'],
                dbname='appdb', user='app', password='secret'
            )
            state = NodeState(
                node_id=row['node_id'], node_name=row['node_name'],
                role=row['role'], host=row['host'], port=row['port'],
                weight=row['weight'], pool=conn_pool
            )
            self._nodes[row['node_name']] = state
            if row['role'] == 'primary':
                self._primary = state
            else:
                self._replicas.append(state)

    def get_connection(self, read_only: bool = False,
                       require_freshness_after: float = None) -> psycopg2.extensions.connection:
        """
        Get a database connection.
        read_only=True: routes to a healthy replica (or primary if no replicas available).
        require_freshness_after: Unix timestamp — only use replicas that have replicated past this point.
          Used for read-your-writes: pass the timestamp of the last write.
        """
        if not read_only:
            return self._primary.pool.getconn()

        replica = self._pick_replica(require_freshness_after)
        if replica:
            conn = replica.pool.getconn()
            conn.set_session(readonly=True)
            return conn
        # Fallback to primary for reads if no healthy replica
        return self._primary.pool.getconn()

    def _pick_replica(self, min_timestamp: float = None) -> Optional[NodeState]:
        with self._lock:
            candidates = [
                r for r in self._replicas
                if r.is_healthy and r.lag_seconds = min_timestamp
                ]
            if not candidates:
                return None
            # Weighted random selection
            total_weight = sum(r.weight for r in candidates)
            pick = random.randint(0, total_weight - 1)
            cumulative = 0
            for r in candidates:
                cumulative += r.weight
                if pick < cumulative:
                    return r
            return candidates[-1]

    def _start_health_checker(self):
        thread = threading.Thread(target=self._health_check_loop, daemon=True)
        thread.start()

    def _health_check_loop(self):
        while True:
            for name, node in list(self._nodes.items()):
                self._check_node(node)
            time.sleep(5)

    def _check_node(self, node: NodeState):
        try:
            conn = node.pool.getconn()
            with conn.cursor() as cur:
                if node.role == 'replica':
                    cur.execute("""
                        SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
                        AS lag_seconds
                    """)
                    row = cur.fetchone()
                    lag = float(row[0]) if row and row[0] is not None else 0.0
                    with self._lock:
                        node.lag_seconds = lag
                        node.is_healthy = True
                else:
                    cur.execute("SELECT 1")
                    with self._lock:
                        node.is_healthy = True
            node.pool.putconn(conn)
            db_admin.execute("""
                UPDATE DbNode SET last_health_check=NOW(), last_lag_seconds=%s
                WHERE node_name=%s
            """, (node.lag_seconds if node.role == 'replica' else None, node.node_name))
        except Exception as e:
            with self._lock:
                node.is_healthy = False

router = ReadReplicaRouter()

Read-Your-Writes Pattern

import threading

# Thread-local storage: after a write, store the write timestamp.
# Subsequent reads in the same request require a fresh-enough replica.
_local = threading.local()

def after_write():
    """Call immediately after committing a write. Stores write timestamp in thread-local."""
    _local.last_write_at = time.time()

def get_read_conn():
    """Route reads: use replica, but require freshness if a recent write occurred."""
    min_ts = getattr(_local, 'last_write_at', None)
    # Require freshness for 5 seconds after a write, then relax
    if min_ts and time.time() - min_ts > 5:
        min_ts = None
    return router.get_connection(read_only=True, require_freshness_after=min_ts)

# Usage in application:
# with router.get_connection(read_only=False) as primary_conn:
#     primary_conn.execute("UPDATE orders SET status='shipped' WHERE order_id=%s", (order_id,))
#     primary_conn.commit()
#     after_write()  # mark write time
#
# Later in the same request:
# with get_read_conn() as read_conn:
#     read_conn.execute("SELECT status FROM orders WHERE order_id=%s", (order_id,))
#     # If within 5s of write, this routes to a replica with lag < (now - last_write_at)

Replica Promotion on Primary Failure

def promote_replica(replica_name: str):
    """
    Promote a replica to primary when the primary fails.
    This is usually handled by an external HA manager (Patroni, pg_auto_failover),
    but the application layer must re-configure to use the new primary.
    """
    with router._lock:
        new_primary = router._nodes.get(replica_name)
        if not new_primary or new_primary.role != 'replica':
            raise ValueError(f"Node {replica_name} is not a replica")

        # Update database record
        db_admin.execute("""
            UPDATE DbNode SET role='primary' WHERE node_name=%s;
            UPDATE DbNode SET role='replica', is_active=FALSE WHERE role='primary' AND node_name!=%s;
        """, (replica_name, replica_name))

        # Update in-memory state
        if router._primary:
            router._primary.role = 'replica'
            router._primary.is_healthy = False
            router._replicas.append(router._primary)

        new_primary.role = 'primary'
        router._primary = new_primary
        router._replicas = [r for r in router._replicas if r.node_name != replica_name]

    # Invalidate all read connections that might be on the old primary
    # Application retries handle the reconnect

Key Design Decisions

  • pg_last_xact_replay_timestamp() for lag measurement: this Postgres built-in returns the timestamp of the last transaction replayed on the replica. Subtracting from NOW() gives wall-clock replication lag. A lag of 2 seconds means the replica is 2 seconds behind the primary. At 10-second max lag, a replica is excluded from the pool — reads would see up to 10-second-old data.
  • Read-your-writes via timestamp comparison: after writing at T, route subsequent reads to replicas where (now – lag) ≥ T — i.e., replicas that have replicated past the write timestamp. If no replica qualifies, fall back to the primary. The 5-second window covers most replica lag scenarios without permanently routing all writes’ reads to primary.
  • Weighted random over round-robin: weighted random distributes traffic proportional to replica capacity (a larger replica gets more weight), handles the case where one replica falls behind (its weight could be dynamically reduced), and avoids the “synchronized round-robin” problem where all application servers hit the same replica simultaneously.
  • Health check in a background thread, not per-request: checking replica lag on each request adds latency and puts unnecessary load on replicas. A 5-second background check cycle is fast enough for lag detection and keeps per-request routing O(1).

Read replica routing and database scaling design is discussed in Airbnb system design interview questions.

Read replica routing and high-availability database design is covered in Uber system design interview preparation.

Read replica routing and database replication system design is discussed in Databricks system design interview guide.

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

Scroll to Top