Database Sharding — Low-Level Design
Database sharding horizontally partitions data across multiple database instances, each holding a subset of the rows. It is applied when a single database cannot handle the write throughput, storage volume, or connection count required. This design is asked at companies like Facebook, Uber, and Pinterest where data volumes are massive.
Sharding Strategies
Strategy 1: Hash Sharding (most common)
shard_id = hash(shard_key) % num_shards
shard_key: user_id, order_id, or any stable entity identifier
Pros: even distribution, simple to implement
Cons: resharding requires migrating half the data when adding shards;
range queries across shards require querying all shards
Strategy 2: Range Sharding
Shard 1: user_id 1 – 10,000,000
Shard 2: user_id 10,000,001 – 20,000,000
Pros: range queries efficient on a single shard
Cons: hot spots (new user_ids always go to the highest shard)
Strategy 3: Directory / Lookup Table
Maintain a mapping: entity_id → shard_id in a fast lookup store (Redis)
Pros: flexible; can rebalance by updating the mapping
Cons: lookup store is a single point of failure; extra hop per query
Most production systems use consistent hashing (a variant of hash sharding)
which minimizes data movement when adding or removing shards.
Shard Router
import hashlib
SHARD_CONFIGS = {
0: 'postgresql://shard0:5432/mydb',
1: 'postgresql://shard1:5432/mydb',
2: 'postgresql://shard2:5432/mydb',
3: 'postgresql://shard3:5432/mydb',
}
class ShardRouter:
def __init__(self, num_shards):
self.num_shards = num_shards
self.connections = {i: create_engine(url) for i, url in SHARD_CONFIGS.items()}
def shard_for_key(self, shard_key):
"""Deterministic: same key always maps to same shard."""
h = int(hashlib.md5(str(shard_key).encode()).hexdigest(), 16)
return h % self.num_shards
def connection(self, shard_key):
return self.connections[self.shard_for_key(shard_key)]
router = ShardRouter(num_shards=4)
def get_user(user_id):
conn = router.connection(user_id)
return conn.execute("SELECT * FROM User WHERE id=%(id)s", {'id': user_id})
def create_order(user_id, items):
# Co-locate order with user: same shard_key ensures order is on user's shard
conn = router.connection(user_id)
return conn.execute("""
INSERT INTO Order (user_id, ...) VALUES (%(uid)s, ...)
""", {'uid': user_id})
Cross-Shard Queries
-- Problem: "find all orders placed in the last 24 hours" requires
-- querying all shards and merging results
def get_recent_orders(since):
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(query_shard, shard_id, since): shard_id
for shard_id in range(router.num_shards)
}
for future in as_completed(futures):
results.extend(future.result())
return sorted(results, key=lambda o: o.created_at, reverse=True)
def query_shard(shard_id, since):
conn = router.connections[shard_id]
return conn.execute("""
SELECT * FROM Order WHERE created_at > %(since)s
ORDER BY created_at DESC LIMIT 1000
""", {'since': since})
-- For heavy cross-shard analytics: replicate all shards to a
-- central data warehouse (BigQuery, Redshift) for OLAP queries.
-- Never run aggregation queries across production shards.
Resharding: Adding a New Shard
-- Doubling from 4 to 8 shards: each existing shard splits into two.
-- For hash % N: shard 0 (hash % 4 == 0) splits into:
-- new shard 0 (hash % 8 == 0) and new shard 4 (hash % 8 == 4)
-- Migration approach (online, zero-downtime):
-- Phase 1: Write to both old and new shards (dual-write)
-- Phase 2: Backfill: copy existing rows from old shard to new shard
-- Phase 3: Verify: compare row counts and checksums
-- Phase 4: Switch reads to new shards
-- Phase 5: Remove old shards
-- With consistent hashing: only K/N keys move when adding 1 shard
-- (K=total keys, N=new shard count). Much less data movement than
-- hash % N which moves half the data when doubling.
Shard Key Selection
Good shard keys:
user_id → user data co-located; queries about one user stay on one shard
tenant_id → SaaS: all tenant data on one shard
order_id → if orders are accessed independently of users
Bad shard keys:
created_at → range sharding creates hot spots (today's shard gets all writes)
country_code → low cardinality; can't distribute evenly across many shards
status → low cardinality; 'active' orders swamp one shard
Co-location: put related data on the same shard.
Users and their orders: shard both by user_id
→ a query for "user X's recent orders" hits one shard, not all shards
Key Interview Points
- Sharding is a last resort: Before sharding, exhaust: read replicas (for read scale), vertical scaling (bigger instance), partitioning (one large table → many smaller tables in the same DB), caching (eliminate DB reads). Sharding adds enormous operational complexity.
- Shard key determines everything: The shard key cannot be changed after the fact. A wrong choice (low cardinality, hotspot-prone) is very expensive to fix. Choose based on access patterns, not intuition.
- Cross-shard operations are expensive: JOINs, aggregations, and transactions across shards require scatter-gather queries or distributed transactions. Design your data model so the common case is single-shard.
- Consistent hashing minimizes resharding cost: Hash % N moves 50% of data when adding a shard. Consistent hashing moves only 1/N of data. For a production system, consistent hashing is worth the added complexity.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the difference between horizontal sharding and vertical partitioning?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Vertical partitioning splits a table by columns: move rarely-accessed or large columns (e.g., blob data, full article text) to a separate table or store. The row count stays the same; each table is narrower. Horizontal sharding splits a table by rows: all rows with user_id % 4 = 0 go to shard 0, % 4 = 1 to shard 1, etc. Each shard has the same schema but a subset of the rows. Horizontal sharding is what people mean when they say "sharding" — it distributes both storage and query load across multiple database instances. Vertical partitioning is a schema optimization on a single database.”}},{“@type”:”Question”,”name”:”How do you choose a shard key and what makes a bad one?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A good shard key: (1) High cardinality — user_id or order_id, not status or country. (2) Evenly distributed — avoids hotspots where one shard gets 80% of traffic. (3) Frequently used in WHERE clauses — most queries should include the shard key so they hit only one shard. (4) Immutable — changing a shard key means moving the row to a different shard, which is expensive. Bad shard keys: created_at (all new data goes to the latest shard — hotspot), status (only a few distinct values — most data on one shard), user_type (low cardinality). user_id is almost always correct for user-centric applications.”}},{“@type”:”Question”,”name”:”How do you handle queries that span multiple shards?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Cross-shard queries (scatter-gather) are expensive: the query must be sent to all N shards, each shard returns partial results, and the application merges and re-sorts them. For ORDER BY + LIMIT: each shard returns LIMIT rows, the application merges up to N * LIMIT rows and takes the top LIMIT. For aggregations (COUNT, SUM): each shard computes a partial aggregate, the application combines them. Minimize cross-shard queries by: (1) including the shard key in all critical query paths, (2) denormalizing data to co-locate related rows on the same shard, (3) accepting eventual consistency for aggregations computed from a pre-aggregated counter table updated per-shard.”}},{“@type”:”Question”,”name”:”How do you add a new shard without downtime?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Consistent hashing makes resharding easier than modulo-based sharding. With consistent hashing, adding a shard only moves ~1/N of the keys, not all of them. Process: (1) Add the new shard to the ring. (2) Identify which keys should move to the new shard. (3) Double-write: write new records to both old and new shards. (4) Background job copies existing records to the new shard. (5) Switch reads to the new shard for moved keys once the copy is complete. (6) Stop double-writing and delete from old shard. This is how DynamoDB and Cassandra handle topology changes — the consistent hash ring makes the data movement bounded and predictable.”}},{“@type”:”Question”,”name”:”How do you handle transactions across shards?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Single-shard transactions are standard ACID. Cross-shard transactions require distributed coordination — use the saga pattern or two-phase commit (2PC). 2PC is available in PostgreSQL via postgres_fdw but adds latency (two round-trips to all shards) and is vulnerable to coordinator failure. The saga pattern is preferred: decompose the cross-shard operation into a sequence of single-shard local transactions with compensating transactions for rollback. For most user-centric applications, transactions are naturally single-shard because all data for a user lives on one shard (user_id is the shard key and the transaction scope). Design your data model to keep related data on the same shard.”}}]}
Database sharding and distributed data storage design is discussed in Uber system design interview questions.
Database sharding and large-scale data partitioning design is covered in Databricks system design interview preparation.
Database sharding and horizontal scaling design is discussed in Amazon system design interview guide.