Distributed Counter System: Low-Level Design
A distributed counter tracks a numeric value that many servers increment or decrement concurrently — page views, likes, inventory quantities, API rate limits, and active user counts all depend on counters that must be correct under high write contention. This design covers the trade-off spectrum from exact Postgres counters to approximate probabilistic structures, and shows how to choose the right approach based on required accuracy, write throughput, and read latency.
Core Data Model
-- Exact counter for critical values (inventory, payment limits)
CREATE TABLE Counter (
counter_id VARCHAR(200) PRIMARY KEY, -- "inventory:SKU-123", "likes:post:456"
value BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Sharded counter for high-throughput approximate values (view counts, likes)
CREATE TABLE CounterShard (
counter_id VARCHAR(200) NOT NULL,
shard_id SMALLINT NOT NULL, -- 0..N_SHARDS-1
value BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (counter_id, shard_id)
);
-- Delta buffer table for batched increments (very high write rates)
CREATE TABLE CounterDelta (
delta_id BIGSERIAL PRIMARY KEY,
counter_id VARCHAR(200) NOT NULL,
delta BIGINT NOT NULL, -- +1, -1, or bulk adjustment
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
CREATE INDEX ON CounterDelta(counter_id, created_at);
Strategy 1: Exact Postgres Counter (Low Write Rate)
def increment(counter_id: str, delta: int = 1) -> int:
"""
Atomic increment using INSERT ... ON CONFLICT UPDATE.
Correct under concurrent writes; suitable for up to ~5K writes/second.
Returns new value.
"""
row = db.fetchone("""
INSERT INTO Counter (counter_id, value, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (counter_id) DO UPDATE
SET value = Counter.value + EXCLUDED.value,
updated_at = NOW()
RETURNING value
""", (counter_id, delta))
return row['value']
def decrement_with_floor(counter_id: str, delta: int = 1, floor: int = 0) -> int:
"""
Decrement but never go below floor. Used for inventory: can't sell what you don't have.
Uses UPDATE ... WHERE value >= floor + delta to prevent going negative atomically.
"""
row = db.fetchone("""
UPDATE Counter
SET value = value - %s, updated_at = NOW()
WHERE counter_id = %s AND value >= %s
RETURNING value
""", (delta, counter_id, floor + delta))
if not row:
raise InsufficientInventoryError(f"Counter {counter_id} has fewer than {delta} remaining")
return row['value']
def get_count(counter_id: str) -> int:
row = db.fetchone("SELECT value FROM Counter WHERE counter_id=%s", (counter_id,))
return row['value'] if row else 0
Strategy 2: Redis Counter (High Write Rate, Eventual Persistence)
import redis
r = redis.Redis(decode_responses=True)
PERSIST_INTERVAL = 60 # seconds
def redis_increment(counter_id: str, delta: int = 1) -> int:
"""
Redis INCRBY is atomic and handles >1M increments/second.
Returns new value. Persistence to Postgres happens asynchronously.
"""
return r.incrby(f"counter:{counter_id}", delta)
def redis_get(counter_id: str) -> int:
val = r.get(f"counter:{counter_id}")
return int(val) if val is not None else get_count(counter_id) # fallback to DB
def persist_redis_counters():
"""
Run every 60 seconds via cron. Flush Redis counter values to Postgres.
Pattern: read Redis value, write to DB, do NOT delete Redis key.
Redis is the source of truth for current value; DB is the durable backup.
"""
cursor = 0
while True:
cursor, keys = r.scan(cursor, match='counter:*', count=200)
for key in keys:
counter_id = key.removeprefix('counter:')
value = r.get(key)
if value is None:
continue
db.execute("""
INSERT INTO Counter (counter_id, value, updated_at)
VALUES (%s, %s, NOW())
ON CONFLICT (counter_id) DO UPDATE
SET value = EXCLUDED.value, updated_at = NOW()
""", (counter_id, int(value)))
if cursor == 0:
break
Strategy 3: Sharded Counter (Extremely High Write Throughput)
import hashlib
N_SHARDS = 16
def sharded_increment(counter_id: str, shard_key: str, delta: int = 1):
"""
Distribute writes across N_SHARDS shards to eliminate single-row contention.
shard_key is any per-request identifier (user_id, request_id) — routes consistently.
Reads must SUM all shards; writes touch only one row.
"""
shard_id = int(hashlib.md5(shard_key.encode()).hexdigest()[:4], 16) % N_SHARDS
db.execute("""
INSERT INTO CounterShard (counter_id, shard_id, value)
VALUES (%s, %s, %s)
ON CONFLICT (counter_id, shard_id) DO UPDATE
SET value = CounterShard.value + EXCLUDED.value,
updated_at = NOW()
""", (counter_id, shard_id, delta))
def sharded_get(counter_id: str) -> int:
"""Sum all shards. Slightly stale OK for view counts / likes."""
row = db.fetchone("""
SELECT COALESCE(SUM(value), 0) AS total
FROM CounterShard WHERE counter_id=%s
""", (counter_id,))
return row['total']
# With N_SHARDS=16, write contention on any single row is reduced by 16x.
# Postgres can handle ~5K updates/second per row; sharding gives ~80K/second.
Strategy 4: HyperLogLog for Unique Counts (Cardinality Estimation)
# Count distinct users who viewed a post — exact COUNT(DISTINCT user_id) needs
# O(N) memory. HyperLogLog gives ~2% error with fixed 12KB of memory.
def hll_add(counter_id: str, element: str):
"""Add an element to the HyperLogLog. Redis PFADD is O(1)."""
r.pfadd(f"hll:{counter_id}", element)
def hll_count(counter_id: str) -> int:
"""Return estimated cardinality. Error rate ~0.81%."""
return r.pfcount(f"hll:{counter_id}")
# Usage: count unique viewers per post
# hll_add(f"post_views:{post_id}", str(user_id))
# unique_viewers = hll_count(f"post_views:{post_id}")
# Merge multiple HLLs: count unique viewers across all posts by author
# r.pfmerge("hll:author_views:42", *[f"hll:post_views:{pid}" for pid in post_ids])
# total_unique = r.pfcount("hll:author_views:42")
Key Design Decisions
- Choose the right strategy by write rate and accuracy requirement:
- <5K writes/sec + must be exact (inventory, billing) → Postgres atomic UPDATE
- 5K–500K writes/sec + can be slightly stale (likes, views) → Redis INCRBY + periodic persist
- >500K writes/sec → sharded counter (16 shards × 5K/shard) or Redis cluster
- Unique counts (distinct visitors) → HyperLogLog (2% error, O(1) memory)
- Inventory decrement floor: the UPDATE … WHERE value >= delta is atomic — no transaction needed. If rowcount=0, raise a domain error. Never use SELECT then UPDATE for inventory (TOCTOU race).
- Redis persistence gap: if the Redis server crashes between persist cycles, up to 60 seconds of increments are lost. Acceptable for view counts; not for rate limit counters or inventory. For rate limits, use Redis with AOF persistence (appendonly yes in redis.conf) — every write is fsynced to disk.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”When should you use a sharded counter instead of a Redis counter?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Redis handles over 1 million INCRBY operations per second on a single node — it is the right choice for most high-throughput counters (view counts, likes, session activity). Use sharded Postgres counters when: (1) the data must be durable without a separate Redis persistence layer (AOF/RDB); (2) the counter must participate in a Postgres transaction alongside other writes (e.g., an inventory decrement that also updates an order row); (3) you need audit history per increment (a ledger of who changed the count, when, and by how much). Sharded Postgres counters with N=16 shards achieve approximately 80K updates/second per counter — sufficient for most single-counter hotspots. For counters updated by millions of users simultaneously (global like count on a viral post), Redis is the only practical option.”}},{“@type”:”Question”,”name”:”How does HyperLogLog achieve constant memory regardless of cardinality?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”HyperLogLog (HLL) estimates set cardinality using a probabilistic algorithm. Instead of storing every distinct element, it hashes each element and tracks the position of the leading zero bits in the hash. Intuitively: the probability of seeing k leading zeros is 1/2^k — so if the maximum leading zeros observed is k, the cardinality is approximately 2^k. HLL aggregates multiple such estimates (using 16,384 registers in Redis’s implementation) and applies the HarmonicMean correction, achieving ~0.81% standard error. Memory: exactly 12 KB per HLL regardless of whether you have 100 or 100 billion distinct elements. Trade-off: you cannot enumerate the distinct elements (no membership query) — only cardinality. For sets where you need both the count and the ability to check membership, use a Bloom filter (approximate membership) alongside a separate counter.”}},{“@type”:”Question”,”name”:”How do you handle counter resets or adjustments (e.g., correcting a fraudulent like count)?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A Redis INCRBY counter has no built-in audit trail — you can SET the counter to any value but cannot explain why. For counters that need correction capability: (1) maintain a separate adjustment ledger (CounterDelta table) with an adjustment_reason column; apply deltas via the INSERT-based batching pattern rather than direct INCR; (2) for Redis counters, SET the corrected value directly and log the change: r.set("counter:post:42:likes", corrected_value) with a corresponding audit record. Never silently adjust counters — always write an audit record with the before value, after value, changed_by user, and reason. For fraud-related adjustments (removing fake likes), batch the removal in an offline job: compute the fraudulent delta, apply a negative adjustment, log the fraud event. Do not remove individual increments retroactively — the ledger pattern makes corrections additive (a negative entry), not destructive.”}},{“@type”:”Question”,”name”:”How do you implement a global request counter for rate limiting without a single hotspot?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Rate limiting requires a per-user, per-time-window counter: "user 42 has made 47 requests in the last 60 seconds." Redis is the natural fit: INCRBY user:ratelimit:42:1735689600 1 with EXPIREAT to the window end. This is O(1) and Redis handles millions of such counters. The hotspot risk is not per-user (each user has their own key) but per-Redis-shard if one shard holds too many active keys. Solution: partition rate limit keys across a Redis cluster by user_id hash — key space is automatically distributed. For sliding window rate limits: use Redis sorted sets (ZADD with timestamp as score, ZREMRANGEBYSCORE to evict old entries, ZCARD to count). Sorted set approach is O(log N) per request but provides exact sliding window semantics vs. the approximate fixed-window INCR approach.”}},{“@type”:”Question”,”name”:”What consistency guarantees does a sharded counter provide for reads?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A sharded counter read (SELECT SUM(value) FROM CounterShard WHERE counter_id=X) is a snapshot read: it sees all committed increments up to the moment of the SELECT. It does NOT see in-flight transactions (increments that started but haven’t committed yet). This is correct for most use cases — inventory counts should not include uncommitted reservations. Stale read window: if shards are spread across Postgres nodes with replication lag, the SUM across shards may include some shards’ committed values and other shards’ older values. Avoid cross-node sharding for counters that need strong consistency; keep all shards on the primary. Approximate counters (view counts, like counts): cache the last SUM result in Redis for 10 seconds. The read is always at most 10 seconds stale, which is imperceptible to users and eliminates the SUM query from the hot path entirely.”}}]}
Distributed counter and high-throughput counting system design is discussed in Twitter system design interview questions.
Distributed counter and social media like count design is covered in Meta system design interview preparation.
Distributed counter and inventory count system design is discussed in Amazon system design interview guide.