What Is Shard Rebalancing?
Shard rebalancing is the process of redistributing data across shards to address imbalances: hot shards that handle disproportionate load, oversized shards that exceed storage capacity, or uneven distribution after cluster topology changes. Rebalancing involves splitting large shards, merging small ones, and migrating data between nodes — all ideally with minimal downtime.
Rebalancing Triggers
Automatic rebalancing is triggered by threshold violations:
- Size trigger: shard
size_bytesexceeds a maximum (e.g., 100 GB). Split into two halves. - QPS trigger: shard QPS exceeds a hot-shard threshold (e.g., 10,000 req/s). Split to distribute load.
- Distribution trigger: statistical imbalance metric across shards exceeds tolerance. Redistribute via merge + split.
Triggers are evaluated by a background rebalancing controller that queries shard metrics at regular intervals (e.g., every minute).
Split: Dividing a Hot Shard
To split shard S into S1 (lower half) and S2 (upper half):
- Choose a split boundary key — typically the median key in the shard's key range (computed by sampling or exact count).
- Create a new destination shard S2 and begin dual-write: all writes to S are mirrored to both S (for the lower half) and S2 (for the upper half) based on key.
- Bulk-copy the upper-half data from S to S2.
- Apply ongoing delta writes during the copy (captured via replication log or CDC).
- Once S2 is caught up (delta lag near zero), perform atomic cutover: update the shard metadata to route upper-half traffic to S2.
- Keep S read-only briefly for rollback window; then decommission the upper-half data from S.
Merge: Combining Small Shards
Merge combines two adjacent (by key range) sibling shards into one:
- Identify candidate shards:
size_bytes < merge_threshold AND qps < merge_qps_threshold. - Pick two siblings with adjacent key ranges.
- Bulk-copy all data from the smaller shard into the larger one.
- Update metadata to cover the combined key range.
- Decommission the now-empty source shard.
Data Migration with Dual-Write
During migration, the system must continue to serve reads and writes without data loss:
- Writes: dual-written to both source and destination based on key range. This ensures destination is current even while bulk copy is running.
- Reads: served from source until cutover. During cutover, a brief read-from-both window handles in-flight requests that were routed before metadata update propagated.
Cutover and Atomicity
Cutover must be atomic from the routing layer's perspective. Typically:
- Increment the shard map version.
- Write the new routing entry to the metadata store (etcd, ZooKeeper, or a central DB) in a single transaction.
- All routing clients cache the shard map and refresh on version mismatch or on receipt of an invalidation signal.
Stale-map errors (routing to old shard) are handled by returning a redirect response that includes the correct shard location, prompting the client to refresh its map.
Consistent Hashing and Virtual Nodes
With consistent hashing, each physical shard owns a set of virtual nodes (vnodes) on the hash ring. Rebalancing redistributes vnodes between shards — only the data owned by migrated vnodes needs to move, minimizing the migration scope. Adding a new physical node takes vnodes from existing nodes; only the affected key ranges migrate.
SQL Schema
CREATE TABLE Shard (
id SERIAL PRIMARY KEY,
shard_key_range_start TEXT NOT NULL,
shard_key_range_end TEXT NOT NULL,
node_id INT NOT NULL,
status TEXT NOT NULL DEFAULT 'active' CHECK (status IN ('active','migrating','readonly','decommissioned')),
row_count BIGINT NOT NULL DEFAULT 0,
size_bytes BIGINT NOT NULL DEFAULT 0
);
CREATE TABLE ShardMigration (
id SERIAL PRIMARY KEY,
source_shard_id INT REFERENCES Shard(id),
dest_shard_id INT REFERENCES Shard(id),
phase TEXT NOT NULL DEFAULT 'copying' CHECK (phase IN ('copying','dual_write','cutover','complete')),
rows_migrated BIGINT NOT NULL DEFAULT 0,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_migration_source ON ShardMigration (source_shard_id, phase);
Python Implementation Sketch
import time
from typing import Optional
class ShardRebalancer:
def __init__(self, db, metadata_store):
self.db = db
self.metadata_store = metadata_store
self.size_threshold = 100 * 1024 ** 3 # 100 GB
self.qps_threshold = 10000
self.merge_size_threshold = 5 * 1024 ** 3 # 5 GB
def detect_hot_shards(self) -> list:
return self.db.fetchall(
"SELECT * FROM Shard WHERE (size_bytes > %s OR qps > %s) AND status = 'active'",
(self.size_threshold, self.qps_threshold)
)
def trigger_split(self, shard_id: int) -> int:
shard = self.db.fetchone("SELECT * FROM Shard WHERE id = %s", (shard_id,))
boundary = self._find_median_key(shard_id)
dest_shard_id = self.db.fetchone(
"INSERT INTO Shard (shard_key_range_start, shard_key_range_end, node_id, status) VALUES (%s, %s, %s, 'migrating') RETURNING id",
(boundary, shard['shard_key_range_end'], self._select_target_node())
)['id']
migration_id = self.db.fetchone(
"INSERT INTO ShardMigration (source_shard_id, dest_shard_id, phase) VALUES (%s, %s, 'copying') RETURNING id",
(shard_id, dest_shard_id)
)['id']
return migration_id
def migrate_shard(self, migration_id: int):
migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
self._bulk_copy(migration['source_shard_id'], migration['dest_shard_id'])
self.db.execute(
"UPDATE ShardMigration SET phase = 'dual_write' WHERE id = %s", (migration_id,)
)
self._apply_delta_sync(migration_id)
self.db.execute(
"UPDATE ShardMigration SET phase = 'cutover' WHERE id = %s", (migration_id,)
)
self.cutover(migration_id)
def cutover(self, migration_id: int):
migration = self.db.fetchone("SELECT * FROM ShardMigration WHERE id = %s", (migration_id,))
self.metadata_store.atomic_update_shard_map(
migration['source_shard_id'],
migration['dest_shard_id']
)
self.db.execute(
"UPDATE ShardMigration SET phase = 'complete', completed_at = now() WHERE id = %s",
(migration_id,)
)
self.db.execute(
"UPDATE Shard SET status = 'readonly' WHERE id = %s",
(migration['source_shard_id'],)
)
self.db.execute(
"UPDATE Shard SET status = 'active' WHERE id = %s",
(migration['dest_shard_id'],)
)
def _find_median_key(self, shard_id: int) -> str:
result = self.db.fetchone(
"SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY shard_key) AS median FROM shard_data WHERE shard_id = %s",
(shard_id,)
)
return result['median']
def _select_target_node(self) -> int:
row = self.db.fetchone("SELECT id FROM nodes ORDER BY load_score ASC LIMIT 1")
return row['id']
def _bulk_copy(self, source_id: int, dest_id: int):
pass # Implementation: batch SELECT from source, INSERT into dest
def _apply_delta_sync(self, migration_id: int):
pass # Implementation: replay CDC events from source to dest until lag ~ 0
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture