Shard Rebalancing Low-Level Design: Split/Merge Triggers, Data Migration, and Minimal Downtime

What Is Shard Rebalancing?

Shard rebalancing is the process of redistributing data across shards to address imbalances: hot shards that handle disproportionate load, oversized shards that exceed storage capacity, or uneven distribution after cluster topology changes. Rebalancing involves splitting large shards, merging small ones, and migrating data between nodes — all ideally with minimal downtime.

Rebalancing Triggers

Automatic rebalancing is triggered by threshold violations:

  • Size trigger: shard size_bytes exceeds a maximum (e.g., 100 GB). Split into two halves.
  • QPS trigger: shard QPS exceeds a hot-shard threshold (e.g., 10,000 req/s). Split to distribute load.
  • Distribution trigger: statistical imbalance metric across shards exceeds tolerance. Redistribute via merge + split.

Triggers are evaluated by a background rebalancing controller that queries shard metrics at regular intervals (e.g., every minute).

Split: Dividing a Hot Shard

To split shard S into S1 (lower half) and S2 (upper half):

  1. Choose a split boundary key — typically the median key in the shard's key range (computed by sampling or exact count).
  2. Create a new destination shard S2 and begin dual-write: all writes to S are mirrored to both S (for the lower half) and S2 (for the upper half) based on key.
  3. Bulk-copy the upper-half data from S to S2.
  4. Apply ongoing delta writes during the copy (captured via replication log or CDC).
  5. Once S2 is caught up (delta lag near zero), perform atomic cutover: update the shard metadata to route upper-half traffic to S2.
  6. Keep S read-only briefly for rollback window; then decommission the upper-half data from S.

Merge: Combining Small Shards

Merge combines two adjacent (by key range) sibling shards into one:

  1. Identify candidate shards: size_bytes < merge_threshold AND qps < merge_qps_threshold.
  2. Pick two siblings with adjacent key ranges.
  3. Bulk-copy all data from the smaller shard into the larger one.
  4. Update metadata to cover the combined key range.
  5. Decommission the now-empty source shard.

Data Migration with Dual-Write

During migration, the system must continue to serve reads and writes without data loss:

  • Writes: dual-written to both source and destination based on key range. This ensures destination is current even while bulk copy is running.
  • Reads: served from source until cutover. During cutover, a brief read-from-both window handles in-flight requests that were routed before metadata update propagated.

Cutover and Atomicity

Cutover must be atomic from the routing layer's perspective. Typically:

  1. Increment the shard map version.
  2. Write the new routing entry to the metadata store (etcd, ZooKeeper, or a central DB) in a single transaction.
  3. All routing clients cache the shard map and refresh on version mismatch or on receipt of an invalidation signal.

Stale-map errors (routing to old shard) are handled by returning a redirect response that includes the correct shard location, prompting the client to refresh its map.

Consistent Hashing and Virtual Nodes

With consistent hashing, each physical shard owns a set of virtual nodes (vnodes) on the hash ring. Rebalancing redistributes vnodes between shards — only the data owned by migrated vnodes needs to move, minimizing the migration scope. Adding a new physical node takes vnodes from existing nodes; only the affected key ranges migrate.

SQL Schema

CREATE TABLE Shard (
    id                      SERIAL PRIMARY KEY,
    shard_key_range_start   TEXT NOT NULL,
    shard_key_range_end     TEXT NOT NULL,
    node_id                 INT NOT NULL,
    status                  TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active','migrating','readonly','decommissioned')),
    row_count               BIGINT NOT NULL DEFAULT 0,
    size_bytes              BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE ShardMigration (
    id              SERIAL PRIMARY KEY,
    source_shard_id INT REFERENCES Shard(id),
    dest_shard_id   INT REFERENCES Shard(id),
    phase           TEXT NOT NULL DEFAULT 'copying' CHECK (phase IN ('copying','dual_write','cutover','complete')),
    rows_migrated   BIGINT NOT NULL DEFAULT 0,
    started_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    completed_at    TIMESTAMPTZ
);

CREATE INDEX idx_migration_source ON ShardMigration (source_shard_id, phase);

Python Implementation Sketch

import time
from typing import Optional

class ShardRebalancer:
    def __init__(self, db, metadata_store):
        self.db = db
        self.metadata_store = metadata_store
        self.size_threshold = 100 * 1024 ** 3      # 100 GB
        self.qps_threshold = 10000
        self.merge_size_threshold = 5 * 1024 ** 3   # 5 GB

    def detect_hot_shards(self) -> list:
        return self.db.fetchall(
            "SELECT * FROM Shard WHERE (size_bytes > %s OR qps > %s) AND status = 'active'",
            (self.size_threshold, self.qps_threshold)
        )

    def trigger_split(self, shard_id: int) -> int:
        shard = self.db.fetchone("SELECT * FROM Shard WHERE id = %s", (shard_id,))
        boundary = self._find_median_key(shard_id)
        dest_shard_id = self.db.fetchone(
            "INSERT INTO Shard (shard_key_range_start, shard_key_range_end, node_id, status) VALUES (%s, %s, %s, 'migrating') RETURNING id",
            (boundary, shard['shard_key_range_end'], self._select_target_node())
        )['id']
        migration_id = self.db.fetchone(
            "INSERT INTO ShardMigration (source_shard_id, dest_shard_id, phase) VALUES (%s, %s, 'copying') RETURNING id",
            (shard_id, dest_shard_id)
        )['id']
        return migration_id

    def migrate_shard(self, migration_id: int):
        migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
        self._bulk_copy(migration['source_shard_id'], migration['dest_shard_id'])
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'dual_write' WHERE id = %s", (migration_id,)
        )
        self._apply_delta_sync(migration_id)
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'cutover' WHERE id = %s", (migration_id,)
        )
        self.cutover(migration_id)

    def cutover(self, migration_id: int):
        migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
        self.metadata_store.atomic_update_shard_map(
            migration['source_shard_id'],
            migration['dest_shard_id']
        )
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'complete', completed_at = now() WHERE id = %s",
            (migration_id,)
        )
        self.db.execute(
            "UPDATE Shard SET status = 'readonly' WHERE id = %s",
            (migration['source_shard_id'],)
        )
        self.db.execute(
            "UPDATE Shard SET status = 'active' WHERE id = %s",
            (migration['dest_shard_id'],)
        )

    def _find_median_key(self, shard_id: int) -> str:
        result = self.db.fetchone(
            "SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY shard_key) AS median FROM shard_data WHERE shard_id = %s",
            (shard_id,)
        )
        return result['median']

    def _select_target_node(self) -> int:
        row = self.db.fetchone("SELECT id FROM nodes ORDER BY load_score ASC LIMIT 1")
        return row['id']

    def _bulk_copy(self, source_id: int, dest_id: int):
        pass  # Implementation: batch SELECT from source, INSERT into dest

    def _apply_delta_sync(self, migration_id: int):
        pass  # Implementation: replay CDC events from source to dest until lag ~ 0

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top