Database Sharding — Low-Level Design
Database sharding horizontally partitions data across multiple database instances, each holding a subset of the rows. It is applied when a single database cannot handle the write throughput, storage volume, or connection count required. This design is asked at companies like Facebook, Uber, and Pinterest where data volumes are massive.
Sharding Strategies
Strategy 1: Hash Sharding (most common)
shard_id = hash(shard_key) % num_shards
shard_key: user_id, order_id, or any stable entity identifier
Pros: even distribution, simple to implement
Cons: resharding requires migrating half the data when adding shards;
range queries across shards require querying all shards
Strategy 2: Range Sharding
Shard 1: user_id 1 – 10,000,000
Shard 2: user_id 10,000,001 – 20,000,000
Pros: range queries efficient on a single shard
Cons: hot spots (new user_ids always go to the highest shard)
Strategy 3: Directory / Lookup Table
Maintain a mapping: entity_id → shard_id in a fast lookup store (Redis)
Pros: flexible; can rebalance by updating the mapping
Cons: lookup store is a single point of failure; extra hop per query
Most production systems use consistent hashing (a variant of hash sharding)
which minimizes data movement when adding or removing shards.
Shard Router
import hashlib
SHARD_CONFIGS = {
0: 'postgresql://shard0:5432/mydb',
1: 'postgresql://shard1:5432/mydb',
2: 'postgresql://shard2:5432/mydb',
3: 'postgresql://shard3:5432/mydb',
}
class ShardRouter:
def __init__(self, num_shards):
self.num_shards = num_shards
self.connections = {i: create_engine(url) for i, url in SHARD_CONFIGS.items()}
def shard_for_key(self, shard_key):
"""Deterministic: same key always maps to same shard."""
h = int(hashlib.md5(str(shard_key).encode()).hexdigest(), 16)
return h % self.num_shards
def connection(self, shard_key):
return self.connections[self.shard_for_key(shard_key)]
router = ShardRouter(num_shards=4)
def get_user(user_id):
conn = router.connection(user_id)
return conn.execute("SELECT * FROM User WHERE id=%(id)s", {'id': user_id})
def create_order(user_id, items):
# Co-locate order with user: same shard_key ensures order is on user's shard
conn = router.connection(user_id)
return conn.execute("""
INSERT INTO Order (user_id, ...) VALUES (%(uid)s, ...)
""", {'uid': user_id})
Cross-Shard Queries
-- Problem: "find all orders placed in the last 24 hours" requires
-- querying all shards and merging results
def get_recent_orders(since):
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(query_shard, shard_id, since): shard_id
for shard_id in range(router.num_shards)
}
for future in as_completed(futures):
results.extend(future.result())
return sorted(results, key=lambda o: o.created_at, reverse=True)
def query_shard(shard_id, since):
conn = router.connections[shard_id]
return conn.execute("""
SELECT * FROM Order WHERE created_at > %(since)s
ORDER BY created_at DESC LIMIT 1000
""", {'since': since})
-- For heavy cross-shard analytics: replicate all shards to a
-- central data warehouse (BigQuery, Redshift) for OLAP queries.
-- Never run aggregation queries across production shards.
Resharding: Adding a New Shard
-- Doubling from 4 to 8 shards: each existing shard splits into two.
-- For hash % N: shard 0 (hash % 4 == 0) splits into:
-- new shard 0 (hash % 8 == 0) and new shard 4 (hash % 8 == 4)
-- Migration approach (online, zero-downtime):
-- Phase 1: Write to both old and new shards (dual-write)
-- Phase 2: Backfill: copy existing rows from old shard to new shard
-- Phase 3: Verify: compare row counts and checksums
-- Phase 4: Switch reads to new shards
-- Phase 5: Remove old shards
-- With consistent hashing: only K/N keys move when adding 1 shard
-- (K=total keys, N=new shard count). Much less data movement than
-- hash % N which moves half the data when doubling.
Shard Key Selection
Good shard keys:
user_id → user data co-located; queries about one user stay on one shard
tenant_id → SaaS: all tenant data on one shard
order_id → if orders are accessed independently of users
Bad shard keys:
created_at → range sharding creates hot spots (today's shard gets all writes)
country_code → low cardinality; can't distribute evenly across many shards
status → low cardinality; 'active' orders swamp one shard
Co-location: put related data on the same shard.
Users and their orders: shard both by user_id
→ a query for "user X's recent orders" hits one shard, not all shards
Key Interview Points
- Sharding is a last resort: Before sharding, exhaust: read replicas (for read scale), vertical scaling (bigger instance), partitioning (one large table → many smaller tables in the same DB), caching (eliminate DB reads). Sharding adds enormous operational complexity.
- Shard key determines everything: The shard key cannot be changed after the fact. A wrong choice (low cardinality, hotspot-prone) is very expensive to fix. Choose based on access patterns, not intuition.
- Cross-shard operations are expensive: JOINs, aggregations, and transactions across shards require scatter-gather queries or distributed transactions. Design your data model so the common case is single-shard.
- Consistent hashing minimizes resharding cost: Hash % N moves 50% of data when adding a shard. Consistent hashing moves only 1/N of data. For a production system, consistent hashing is worth the added complexity.
Database sharding and distributed data storage design is discussed in Uber system design interview questions.
Database sharding and large-scale data partitioning design is covered in Databricks system design interview preparation.
Database sharding and horizontal scaling design is discussed in Amazon system design interview guide.