Shard Rebalancing Low-Level Design: Split/Merge Triggers, Data Migration, and Minimal Downtime

What Is Shard Rebalancing?

Shard rebalancing is the process of redistributing data across shards to address imbalances: hot shards that handle disproportionate load, oversized shards that exceed storage capacity, or uneven distribution after cluster topology changes. Rebalancing involves splitting large shards, merging small ones, and migrating data between nodes — all ideally with minimal downtime.

Rebalancing Triggers

Automatic rebalancing is triggered by threshold violations:

  • Size trigger: shard size_bytes exceeds a maximum (e.g., 100 GB). Split into two halves.
  • QPS trigger: shard QPS exceeds a hot-shard threshold (e.g., 10,000 req/s). Split to distribute load.
  • Distribution trigger: statistical imbalance metric across shards exceeds tolerance. Redistribute via merge + split.

Triggers are evaluated by a background rebalancing controller that queries shard metrics at regular intervals (e.g., every minute).

Split: Dividing a Hot Shard

To split shard S into S1 (lower half) and S2 (upper half):

  1. Choose a split boundary key — typically the median key in the shard's key range (computed by sampling or exact count).
  2. Create a new destination shard S2 and begin dual-write: all writes to S are mirrored to both S (for the lower half) and S2 (for the upper half) based on key.
  3. Bulk-copy the upper-half data from S to S2.
  4. Apply ongoing delta writes during the copy (captured via replication log or CDC).
  5. Once S2 is caught up (delta lag near zero), perform atomic cutover: update the shard metadata to route upper-half traffic to S2.
  6. Keep S read-only briefly for rollback window; then decommission the upper-half data from S.

Merge: Combining Small Shards

Merge combines two adjacent (by key range) sibling shards into one:

  1. Identify candidate shards: size_bytes < merge_threshold AND qps < merge_qps_threshold.
  2. Pick two siblings with adjacent key ranges.
  3. Bulk-copy all data from the smaller shard into the larger one.
  4. Update metadata to cover the combined key range.
  5. Decommission the now-empty source shard.

Data Migration with Dual-Write

During migration, the system must continue to serve reads and writes without data loss:

  • Writes: dual-written to both source and destination based on key range. This ensures destination is current even while bulk copy is running.
  • Reads: served from source until cutover. During cutover, a brief read-from-both window handles in-flight requests that were routed before metadata update propagated.

Cutover and Atomicity

Cutover must be atomic from the routing layer's perspective. Typically:

  1. Increment the shard map version.
  2. Write the new routing entry to the metadata store (etcd, ZooKeeper, or a central DB) in a single transaction.
  3. All routing clients cache the shard map and refresh on version mismatch or on receipt of an invalidation signal.

Stale-map errors (routing to old shard) are handled by returning a redirect response that includes the correct shard location, prompting the client to refresh its map.

Consistent Hashing and Virtual Nodes

With consistent hashing, each physical shard owns a set of virtual nodes (vnodes) on the hash ring. Rebalancing redistributes vnodes between shards — only the data owned by migrated vnodes needs to move, minimizing the migration scope. Adding a new physical node takes vnodes from existing nodes; only the affected key ranges migrate.

SQL Schema

CREATE TABLE Shard (
    id                      SERIAL PRIMARY KEY,
    shard_key_range_start   TEXT NOT NULL,
    shard_key_range_end     TEXT NOT NULL,
    node_id                 INT NOT NULL,
    status                  TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active','migrating','readonly','decommissioned')),
    row_count               BIGINT NOT NULL DEFAULT 0,
    size_bytes              BIGINT NOT NULL DEFAULT 0
);

CREATE TABLE ShardMigration (
    id              SERIAL PRIMARY KEY,
    source_shard_id INT REFERENCES Shard(id),
    dest_shard_id   INT REFERENCES Shard(id),
    phase           TEXT NOT NULL DEFAULT 'copying' CHECK (phase IN ('copying','dual_write','cutover','complete')),
    rows_migrated   BIGINT NOT NULL DEFAULT 0,
    started_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    completed_at    TIMESTAMPTZ
);

CREATE INDEX idx_migration_source ON ShardMigration (source_shard_id, phase);

Python Implementation Sketch

import time
from typing import Optional

class ShardRebalancer:
    def __init__(self, db, metadata_store):
        self.db = db
        self.metadata_store = metadata_store
        self.size_threshold = 100 * 1024 ** 3      # 100 GB
        self.qps_threshold = 10000
        self.merge_size_threshold = 5 * 1024 ** 3   # 5 GB

    def detect_hot_shards(self) -> list:
        return self.db.fetchall(
            "SELECT * FROM Shard WHERE (size_bytes > %s OR qps > %s) AND status = 'active'",
            (self.size_threshold, self.qps_threshold)
        )

    def trigger_split(self, shard_id: int) -> int:
        shard = self.db.fetchone("SELECT * FROM Shard WHERE id = %s", (shard_id,))
        boundary = self._find_median_key(shard_id)
        dest_shard_id = self.db.fetchone(
            "INSERT INTO Shard (shard_key_range_start, shard_key_range_end, node_id, status) VALUES (%s, %s, %s, 'migrating') RETURNING id",
            (boundary, shard['shard_key_range_end'], self._select_target_node())
        )['id']
        migration_id = self.db.fetchone(
            "INSERT INTO ShardMigration (source_shard_id, dest_shard_id, phase) VALUES (%s, %s, 'copying') RETURNING id",
            (shard_id, dest_shard_id)
        )['id']
        return migration_id

    def migrate_shard(self, migration_id: int):
        migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
        self._bulk_copy(migration['source_shard_id'], migration['dest_shard_id'])
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'dual_write' WHERE id = %s", (migration_id,)
        )
        self._apply_delta_sync(migration_id)
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'cutover' WHERE id = %s", (migration_id,)
        )
        self.cutover(migration_id)

    def cutover(self, migration_id: int):
        migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
        self.metadata_store.atomic_update_shard_map(
            migration['source_shard_id'],
            migration['dest_shard_id']
        )
        self.db.execute(
            "UPDATE ShardMigration SET phase = 'complete', completed_at = now() WHERE id = %s",
            (migration_id,)
        )
        self.db.execute(
            "UPDATE Shard SET status = 'readonly' WHERE id = %s",
            (migration['source_shard_id'],)
        )
        self.db.execute(
            "UPDATE Shard SET status = 'active' WHERE id = %s",
            (migration['dest_shard_id'],)
        )

    def _find_median_key(self, shard_id: int) -> str:
        result = self.db.fetchone(
            "SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY shard_key) AS median FROM shard_data WHERE shard_id = %s",
            (shard_id,)
        )
        return result['median']

    def _select_target_node(self) -> int:
        row = self.db.fetchone("SELECT id FROM nodes ORDER BY load_score ASC LIMIT 1")
        return row['id']

    def _bulk_copy(self, source_id: int, dest_id: int):
        pass  # Implementation: batch SELECT from source, INSERT into dest

    def _apply_delta_sync(self, migration_id: int):
        pass  # Implementation: replay CDC events from source to dest until lag ~ 0

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is the split boundary key selected during shard splitting?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The split boundary is chosen at the median key of the shard's key range, computed by sampling or exact count (e.g., SQL PERCENTILE_DISC). The goal is to produce two equally-sized shards after the split. For hash-based sharding, the boundary is chosen in the hash space. For range-based sharding, the boundary is a value in the actual key domain. The median minimizes the worst-case imbalance after the split.”
}
},
{
“@type”: “Question”,
“name”: “How is consistency maintained during dual-write migration?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “During dual-write, all writes are routed to both source and destination shards based on the key's position relative to the split boundary. Reads continue to be served from the source. The dual-write phase ensures the destination stays current even as the bulk copy runs. Once the bulk copy completes and the delta sync lag reaches near zero, cutover updates the routing metadata atomically. Stale-map errors after cutover cause clients to refresh their shard map.”
}
},
{
“@type”: “Question”,
“name”: “How is cutover made atomic in shard rebalancing?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Cutover updates the shard routing map in the metadata store (etcd, ZooKeeper, or a versioned DB table) in a single transaction. The shard map version is incremented. All routing clients cache the current shard map and refresh on version mismatch. The brief window between the metadata write and cache refresh propagation is handled by returning redirect errors that prompt clients to re-fetch the shard map and retry.”
}
},
{
“@type”: “Question”,
“name”: “How do you roll back a failed shard migration?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Rollback is possible because the source shard is kept in read-only status (not deleted) for a grace period after cutover. To roll back: update the routing metadata to point back to the source shard, set the source to active status, stop dual-writes to the destination, and mark the migration as failed. The destination shard is then decommissioned. The rollback window is typically 30-60 minutes, after which the source shard data is cleaned up.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What triggers shard split vs merge?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A shard split is triggered when a shard's size, key cardinality, or request rate exceeds an upper threshold, dividing it into two child shards at a chosen split key determined by the midpoint or hotspot analysis. A merge is triggered when two adjacent shards both fall below a lower threshold (e.g., combined size is less than half the split threshold), consolidating them to reduce metadata overhead and improve scan locality.”
}
},
{
“@type”: “Question”,
“name”: “How is data migrated during rebalancing with minimal downtime?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Rebalancing uses a double-write or copy-then-switch protocol: the source shard streams its key range to the destination node while continuing to serve reads and writes, then a brief consistent-state window (often a few milliseconds with write quiesce) allows the router's ownership metadata to atomically flip to the destination. During the copy phase, writes are mirrored to both source and destination to keep them in sync.”
}
},
{
“@type”: “Question”,
“name”: “How does the router handle requests during migration?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The router holds a shard map that is versioned; during migration it may forward requests to the source until the ownership transfer is committed, then atomically update its local shard map and start forwarding to the destination. Requests that arrive during the brief ownership-flip window are either retried transparently by the router or returned a redirect error that the client retries, keeping the transition invisible to callers.”
}
},
{
“@type”: “Question”,
“name”: “How is rebalancing throttled to avoid overloading the cluster?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A rebalancing controller enforces a maximum bytes-per-second migration rate per node pair using token-bucket rate limiting, and pauses or slows migration when source or destination node CPU/disk I/O exceeds a configured ceiling. Rebalancing is also scheduled during off-peak windows and the number of concurrent migrations across the cluster is capped to prevent network saturation.”
}
}
]
}

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top