Overview
A user segmentation system lets product and marketing teams define rule-based groups of users — “enterprise plan users who signed up in the last 30 days,” “users who made a purchase but haven’t logged in for 60 days” — and use those segments to target campaigns, feature flags, onboarding flows, and analytics. The system must evaluate rules against user attributes, maintain a live membership table that reflects the current segment composition, and update incrementally as user attributes change rather than running expensive full recomputes on a nightly cron. This LLD covers the data model, rule engine, membership computation strategies, segment size estimation, and usage in downstream campaigns.
Core Data Model
-- Segment definition with JSONB rule array
CREATE TABLE Segment (
segment_id BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
rules JSONB NOT NULL, -- array of rule objects (see Rule Engine section)
rule_combinator VARCHAR(8) NOT NULL DEFAULT 'AND', -- 'AND' or 'OR'
status VARCHAR(16) NOT NULL DEFAULT 'active', -- active, archived, computing
estimated_size INT, -- rough member count, refreshed periodically
last_computed_at TIMESTAMPTZ,
created_by BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Example rules JSONB value:
-- [
-- {"field": "plan", "op": "eq", "value": "enterprise"},
-- {"field": "created_days_ago", "op": "lte", "value": 30},
-- {"field": "country", "op": "in", "value": ["US", "CA", "GB"]}
-- ]
-- Segment membership — one row per (user, segment) pair
CREATE TABLE SegmentMembership (
user_id BIGINT NOT NULL,
segment_id BIGINT NOT NULL REFERENCES Segment(segment_id) ON DELETE CASCADE,
added_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
removed_at TIMESTAMPTZ, -- NULL = currently a member
PRIMARY KEY (user_id, segment_id)
);
CREATE INDEX idx_segmembership_segment ON SegmentMembership(segment_id, removed_at)
WHERE removed_at IS NULL;
CREATE INDEX idx_segmembership_user ON SegmentMembership(user_id, removed_at)
WHERE removed_at IS NULL;
-- Job tracking for async compute operations
CREATE TABLE SegmentComputeJob (
job_id BIGSERIAL PRIMARY KEY,
segment_id BIGINT NOT NULL REFERENCES Segment(segment_id),
job_type VARCHAR(32) NOT NULL, -- 'full_recompute', 'incremental_update'
trigger VARCHAR(64), -- 'user_attribute_change', 'manual', 'scheduled'
status VARCHAR(16) NOT NULL DEFAULT 'queued', -- queued, running, done, failed
affected_users INT,
added_count INT,
removed_count INT,
error_message TEXT,
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Snapshot of user attributes used for rule evaluation
-- Denormalized from users + subscriptions + activity tables
CREATE TABLE UserAttributeSnapshot (
user_id BIGINT PRIMARY KEY,
plan VARCHAR(64),
country CHAR(2),
created_days_ago INT,
last_login_days_ago INT,
total_purchases INT,
lifetime_value_cents BIGINT,
tags TEXT[],
custom_attrs JSONB NOT NULL DEFAULT '{}',
snapshot_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_uas_plan ON UserAttributeSnapshot(plan);
CREATE INDEX idx_uas_country ON UserAttributeSnapshot(country);
Rule Engine
The rule engine evaluates a segment’s JSONB rule array against a user’s attributes. Rules support scalar comparisons, set membership, and computed fields like created_days_ago.
from datetime import datetime, timezone
from typing import Any
SUPPORTED_OPS = {'eq', 'neq', 'gt', 'gte', 'lt', 'lte', 'in', 'not_in', 'contains', 'is_null', 'is_not_null'}
def evaluate_rule(rule: dict, user_attrs: dict) -> bool:
"""
Evaluate a single rule dict against a user attribute dict.
Rule format:
{
"field": "plan", # attribute name
"op": "eq", # operator
"value": "enterprise" # comparison value (type depends on op)
}
"""
field = rule['field']
op = rule['op']
value = rule.get('value')
if op not in SUPPORTED_OPS:
raise ValueError(f"Unsupported operator: {op}")
user_value = user_attrs.get(field)
if op == 'is_null':
return user_value is None
if op == 'is_not_null':
return user_value is not None
if user_value is None:
return False # missing attribute never satisfies a comparison rule
if op == 'eq': return user_value == value
if op == 'neq': return user_value != value
if op == 'gt': return user_value > value
if op == 'gte': return user_value >= value
if op == 'lt': return user_value < value
if op == 'lte': return user_value bool:
"""
Evaluate all rules in a segment definition against a user's attributes.
combinator='AND': all rules must pass.
combinator='OR': at least one rule must pass.
"""
if not rules:
return False # empty rule set matches nobody
results = [evaluate_rule(r, user_attrs) for r in rules]
if combinator == 'AND':
return all(results)
elif combinator == 'OR':
return any(results)
else:
raise ValueError(f"Unknown combinator: {combinator}")
def build_user_attrs(user_id: int, db_conn) -> dict:
"""Fetch the denormalized attribute snapshot for a user."""
with db_conn.cursor() as cur:
cur.execute("""
SELECT plan, country, created_days_ago, last_login_days_ago,
total_purchases, lifetime_value_cents, tags, custom_attrs
FROM UserAttributeSnapshot
WHERE user_id = %s
""", (user_id,))
row = cur.fetchone()
if not row:
return {}
keys = ['plan', 'country', 'created_days_ago', 'last_login_days_ago',
'total_purchases', 'lifetime_value_cents', 'tags']
attrs = dict(zip(keys, row[:-1]))
attrs.update(row[-1] or {}) # merge custom_attrs JSONB
return attrs
Membership Computation
Two computation modes handle different scenarios: full recompute for new or changed segments, and incremental update triggered by user attribute changes.
def full_recompute_segment(segment_id: int, db_conn):
"""
Re-evaluate all users against the segment rules.
Used when: a segment is newly created, its rules change, or a periodic audit is requested.
Expensive — runs as a background job, not in-request.
"""
with db_conn.cursor() as cur:
cur.execute("""
SELECT rules, rule_combinator FROM Segment WHERE segment_id=%s
""", (segment_id,))
rules_json, combinator = cur.fetchone()
rules = rules_json # already deserialized from JSONB
added = removed = 0
with db_conn.cursor() as cur:
# Stream all users from attribute snapshot in batches
cur.execute("SELECT user_id FROM UserAttributeSnapshot")
while True:
batch = cur.fetchmany(500)
if not batch:
break
user_ids = [r[0] for r in batch]
new_members = set()
for user_id in user_ids:
attrs = build_user_attrs(user_id, db_conn)
if evaluate_segment_rules(rules, combinator, attrs):
new_members.add(user_id)
# Sync membership table for this batch
a, r = sync_membership_batch(db_conn, segment_id, user_ids, new_members)
added += a
removed += r
with db_conn.cursor() as cur:
cur.execute("""
UPDATE Segment
SET last_computed_at=NOW(), status='active', estimated_size=(
SELECT COUNT(*) FROM SegmentMembership
WHERE segment_id=%s AND removed_at IS NULL
)
WHERE segment_id=%s
""", (segment_id, segment_id))
db_conn.commit()
return added, removed
def sync_membership_batch(db_conn, segment_id, all_user_ids, qualifying_user_ids):
"""
Given a batch of user IDs and which ones qualify for the segment,
add new members and soft-remove users who no longer qualify.
"""
qualifying = set(qualifying_user_ids)
added = removed = 0
with db_conn.cursor() as cur:
# Get current membership for this batch
cur.execute("""
SELECT user_id FROM SegmentMembership
WHERE segment_id=%s AND user_id=ANY(%s) AND removed_at IS NULL
""", (segment_id, list(all_user_ids)))
current_members = {r[0] for r in cur.fetchall()}
to_add = qualifying - current_members
to_remove = current_members - qualifying
with db_conn.cursor() as cur:
if to_add:
cur.executemany("""
INSERT INTO SegmentMembership (user_id, segment_id, added_at)
VALUES (%s, %s, NOW())
ON CONFLICT (user_id, segment_id) DO UPDATE SET removed_at=NULL, added_at=NOW()
""", [(uid, segment_id) for uid in to_add])
added = len(to_add)
if to_remove:
cur.execute("""
UPDATE SegmentMembership
SET removed_at=NOW()
WHERE segment_id=%s AND user_id=ANY(%s) AND removed_at IS NULL
""", (segment_id, list(to_remove)))
removed = len(to_remove)
db_conn.commit()
return added, removed
def incremental_update_on_attribute_change(user_id: int, changed_fields: list, db_conn):
"""
Called when a user's attributes change (e.g. plan upgrade, new purchase).
Re-evaluates only segments whose rules reference the changed fields.
Much cheaper than full recompute — O(relevant_segments) not O(all_users * all_segments).
"""
attrs = build_user_attrs(user_id, db_conn)
with db_conn.cursor() as cur:
# Find segments that reference any of the changed fields in their rules
# JSONB @> and jsonb_path_exists allow field-level rule filtering
placeholders = ','.join([f"'$.*.field ? (@ == "{f}")'::jsonpath" for f in changed_fields])
cur.execute(f"""
SELECT segment_id, rules, rule_combinator
FROM Segment
WHERE status = 'active'
AND (
{' OR '.join([
f"rules @? '$.*.field ? (@ == "{f}")'::jsonpath"
for f in changed_fields
])}
)
""")
relevant_segments = cur.fetchall()
for segment_id, rules, combinator in relevant_segments:
qualifies = evaluate_segment_rules(rules, combinator, attrs)
with db_conn.cursor() as cur:
cur.execute("""
SELECT 1 FROM SegmentMembership
WHERE segment_id=%s AND user_id=%s AND removed_at IS NULL
""", (segment_id, user_id))
is_member = cur.fetchone() is not None
if qualifies and not is_member:
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO SegmentMembership (user_id, segment_id, added_at)
VALUES (%s, %s, NOW())
ON CONFLICT (user_id, segment_id) DO UPDATE SET removed_at=NULL, added_at=NOW()
""", (user_id, segment_id))
db_conn.commit()
elif not qualifies and is_member:
with db_conn.cursor() as cur:
cur.execute("""
UPDATE SegmentMembership SET removed_at=NOW()
WHERE segment_id=%s AND user_id=%s AND removed_at IS NULL
""", (segment_id, user_id))
db_conn.commit()
Segment Size Estimation
Before running a full recompute on a new segment (which may be expensive for large user bases), estimate the segment size using sampling or EXPLAIN ANALYZE approximate counts.
def estimate_segment_size(segment_id: int, db_conn, sample_fraction=0.05) -> int:
"""
Estimate how many users will be in a segment by evaluating rules against
a random sample of users and extrapolating.
"""
with db_conn.cursor() as cur:
cur.execute("""
SELECT rules, rule_combinator FROM Segment WHERE segment_id=%s
""", (segment_id,))
rules, combinator = cur.fetchone()
# Sample a fraction of users randomly
cur.execute("""
SELECT user_id FROM UserAttributeSnapshot
TABLESAMPLE BERNOULLI(%s)
""", (sample_fraction * 100,))
sample = cur.fetchall()
if not sample:
return 0
qualifying = sum(
1 for (uid,) in sample
if evaluate_segment_rules(rules, combinator, build_user_attrs(uid, db_conn))
)
# Extrapolate to full population
with db_conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM UserAttributeSnapshot")
total_users = cur.fetchone()[0]
estimated = int((qualifying / len(sample)) * total_users)
with db_conn.cursor() as cur:
cur.execute("""
UPDATE Segment SET estimated_size=%s WHERE segment_id=%s
""", (estimated, segment_id))
db_conn.commit()
return estimated
Usage in Campaigns
Once the SegmentMembership table is populated, downstream systems can retrieve the target audience with a single indexed lookup.
-- Get all current members of a segment for email campaign targeting
SELECT sm.user_id, u.email, u.display_name
FROM SegmentMembership sm
JOIN users u ON u.id = sm.user_id
WHERE sm.segment_id = $1
AND sm.removed_at IS NULL
ORDER BY sm.added_at DESC;
-- Count by segment for dashboard overview
SELECT s.name, COUNT(sm.user_id) AS member_count
FROM Segment s
LEFT JOIN SegmentMembership sm
ON sm.segment_id = s.segment_id AND sm.removed_at IS NULL
WHERE s.status = 'active'
GROUP BY s.segment_id, s.name
ORDER BY member_count DESC;
-- Find all segments a specific user currently belongs to
SELECT s.segment_id, s.name, sm.added_at
FROM SegmentMembership sm
JOIN Segment s ON s.segment_id = sm.segment_id
WHERE sm.user_id = $1
AND sm.removed_at IS NULL
ORDER BY sm.added_at DESC;
-- Segment-targeted feature flag: is user in segment?
SELECT EXISTS (
SELECT 1 FROM SegmentMembership
WHERE user_id = $1 AND segment_id = $2 AND removed_at IS NULL
) AS is_member;
Key Design Decisions
- JSONB rules enable no-code segment building: Storing the rule definition as a JSONB array means product managers can create and modify segments through a UI without schema changes or deployments. The rule engine is a ~50-line Python function that handles all needed operators. New operators (like
regex_matchorbetween) can be added to the engine without touching the data model. - Incremental updates avoid full daily recompute: Triggering
incremental_update_on_attribute_changeon user attribute events (plan change, purchase, login) means the membership table stays near-real-time without a nightly full-scan job. A user who upgrades to an enterprise plan at 10 AM is in enterprise segments by 10:01 AM, not at midnight. The JSONB field filter in the SQL query ensures only segments that care about changed fields are re-evaluated. - SegmentMembership table enables O(1) membership lookup: Downstream systems (campaign emailers, feature flag checks, personalization engines) can check membership with a single primary-key lookup on
(user_id, segment_id)rather than re-evaluating rules at request time. This separates the expensive compute phase from the cheap read phase. - Soft deletes on membership preserve history: Setting
removed_atrather than deleting rows means the system can answer “was this user in this segment during the campaign that ran last month?” — useful for campaign attribution, A/B test analysis, and compliance audits. A periodic cleanup job can hard-delete rows older than the retention period.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are segment rules stored and evaluated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Rules are stored as JSONB arrays of condition objects; a Python rule engine iterates conditions and applies operator functions (eq, gt, in, contains, is_null) against user attribute snapshots.”
}
},
{
“@type”: “Question”,
“name”: “How are large segments computed without full table scans?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Incremental updates process only users whose attributes changed since the last compute job, using a dirty-flag or changelog table.”
}
},
{
“@type”: “Question”,
“name”: “How is segment membership estimated before full computation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “TABLESAMPLE BERNOULLI on the user attribute snapshot table provides a statistical estimate of membership count without a full scan.”
}
},
{
“@type”: “Question”,
“name”: “How are segments used for targeting?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A SegmentMembership table is pre-computed; targeting queries join against it rather than re-evaluating rules at query time.”
}
}
]
}
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering