User Segmentation System Low-Level Design: Rule Engine, Dynamic Segments, and Real-Time Membership

Overview

A user segmentation system lets product and marketing teams define rule-based groups of users — “enterprise plan users who signed up in the last 30 days,” “users who made a purchase but haven’t logged in for 60 days” — and use those segments to target campaigns, feature flags, onboarding flows, and analytics. The system must evaluate rules against user attributes, maintain a live membership table that reflects the current segment composition, and update incrementally as user attributes change rather than running expensive full recomputes on a nightly cron. This LLD covers the data model, rule engine, membership computation strategies, segment size estimation, and usage in downstream campaigns.

Core Data Model


-- Segment definition with JSONB rule array
CREATE TABLE Segment (
    segment_id      BIGSERIAL     PRIMARY KEY,
    name            VARCHAR(255)  NOT NULL,
    description     TEXT,
    rules           JSONB         NOT NULL,  -- array of rule objects (see Rule Engine section)
    rule_combinator VARCHAR(8)    NOT NULL DEFAULT 'AND',  -- 'AND' or 'OR'
    status          VARCHAR(16)   NOT NULL DEFAULT 'active',  -- active, archived, computing
    estimated_size  INT,                     -- rough member count, refreshed periodically
    last_computed_at TIMESTAMPTZ,
    created_by      BIGINT        NOT NULL,
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

-- Example rules JSONB value:
-- [
--   {"field": "plan", "op": "eq", "value": "enterprise"},
--   {"field": "created_days_ago", "op": "lte", "value": 30},
--   {"field": "country", "op": "in", "value": ["US", "CA", "GB"]}
-- ]

-- Segment membership — one row per (user, segment) pair
CREATE TABLE SegmentMembership (
    user_id         BIGINT        NOT NULL,
    segment_id      BIGINT        NOT NULL REFERENCES Segment(segment_id) ON DELETE CASCADE,
    added_at        TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    removed_at      TIMESTAMPTZ,             -- NULL = currently a member
    PRIMARY KEY (user_id, segment_id)
);

CREATE INDEX idx_segmembership_segment ON SegmentMembership(segment_id, removed_at)
    WHERE removed_at IS NULL;
CREATE INDEX idx_segmembership_user    ON SegmentMembership(user_id, removed_at)
    WHERE removed_at IS NULL;

-- Job tracking for async compute operations
CREATE TABLE SegmentComputeJob (
    job_id          BIGSERIAL     PRIMARY KEY,
    segment_id      BIGINT        NOT NULL REFERENCES Segment(segment_id),
    job_type        VARCHAR(32)   NOT NULL,  -- 'full_recompute', 'incremental_update'
    trigger         VARCHAR(64),             -- 'user_attribute_change', 'manual', 'scheduled'
    status          VARCHAR(16)   NOT NULL DEFAULT 'queued',  -- queued, running, done, failed
    affected_users  INT,
    added_count     INT,
    removed_count   INT,
    error_message   TEXT,
    started_at      TIMESTAMPTZ,
    finished_at     TIMESTAMPTZ,
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

-- Snapshot of user attributes used for rule evaluation
-- Denormalized from users + subscriptions + activity tables
CREATE TABLE UserAttributeSnapshot (
    user_id         BIGINT        PRIMARY KEY,
    plan            VARCHAR(64),
    country         CHAR(2),
    created_days_ago INT,
    last_login_days_ago INT,
    total_purchases  INT,
    lifetime_value_cents BIGINT,
    tags            TEXT[],
    custom_attrs    JSONB         NOT NULL DEFAULT '{}',
    snapshot_at     TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_uas_plan    ON UserAttributeSnapshot(plan);
CREATE INDEX idx_uas_country ON UserAttributeSnapshot(country);

Rule Engine

The rule engine evaluates a segment’s JSONB rule array against a user’s attributes. Rules support scalar comparisons, set membership, and computed fields like created_days_ago.


from datetime import datetime, timezone
from typing import Any

SUPPORTED_OPS = {'eq', 'neq', 'gt', 'gte', 'lt', 'lte', 'in', 'not_in', 'contains', 'is_null', 'is_not_null'}

def evaluate_rule(rule: dict, user_attrs: dict) -> bool:
    """
    Evaluate a single rule dict against a user attribute dict.

    Rule format:
    {
      "field": "plan",           # attribute name
      "op":    "eq",             # operator
      "value": "enterprise"      # comparison value (type depends on op)
    }
    """
    field = rule['field']
    op    = rule['op']
    value = rule.get('value')

    if op not in SUPPORTED_OPS:
        raise ValueError(f"Unsupported operator: {op}")

    user_value = user_attrs.get(field)

    if op == 'is_null':
        return user_value is None
    if op == 'is_not_null':
        return user_value is not None
    if user_value is None:
        return False  # missing attribute never satisfies a comparison rule

    if op == 'eq':        return user_value == value
    if op == 'neq':       return user_value != value
    if op == 'gt':        return user_value > value
    if op == 'gte':       return user_value >= value
    if op == 'lt':        return user_value < value
    if op == 'lte':       return user_value  bool:
    """
    Evaluate all rules in a segment definition against a user's attributes.
    combinator='AND': all rules must pass.
    combinator='OR':  at least one rule must pass.
    """
    if not rules:
        return False  # empty rule set matches nobody

    results = [evaluate_rule(r, user_attrs) for r in rules]

    if combinator == 'AND':
        return all(results)
    elif combinator == 'OR':
        return any(results)
    else:
        raise ValueError(f"Unknown combinator: {combinator}")


def build_user_attrs(user_id: int, db_conn) -> dict:
    """Fetch the denormalized attribute snapshot for a user."""
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT plan, country, created_days_ago, last_login_days_ago,
                   total_purchases, lifetime_value_cents, tags, custom_attrs
            FROM UserAttributeSnapshot
            WHERE user_id = %s
        """, (user_id,))
        row = cur.fetchone()
    if not row:
        return {}
    keys = ['plan', 'country', 'created_days_ago', 'last_login_days_ago',
            'total_purchases', 'lifetime_value_cents', 'tags']
    attrs = dict(zip(keys, row[:-1]))
    attrs.update(row[-1] or {})  # merge custom_attrs JSONB
    return attrs

Membership Computation

Two computation modes handle different scenarios: full recompute for new or changed segments, and incremental update triggered by user attribute changes.


def full_recompute_segment(segment_id: int, db_conn):
    """
    Re-evaluate all users against the segment rules.
    Used when: a segment is newly created, its rules change, or a periodic audit is requested.
    Expensive — runs as a background job, not in-request.
    """
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT rules, rule_combinator FROM Segment WHERE segment_id=%s
        """, (segment_id,))
        rules_json, combinator = cur.fetchone()

    rules = rules_json  # already deserialized from JSONB

    added = removed = 0

    with db_conn.cursor() as cur:
        # Stream all users from attribute snapshot in batches
        cur.execute("SELECT user_id FROM UserAttributeSnapshot")
        while True:
            batch = cur.fetchmany(500)
            if not batch:
                break

            user_ids = [r[0] for r in batch]
            new_members = set()

            for user_id in user_ids:
                attrs = build_user_attrs(user_id, db_conn)
                if evaluate_segment_rules(rules, combinator, attrs):
                    new_members.add(user_id)

            # Sync membership table for this batch
            a, r = sync_membership_batch(db_conn, segment_id, user_ids, new_members)
            added += a
            removed += r

    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE Segment
            SET last_computed_at=NOW(), status='active', estimated_size=(
                SELECT COUNT(*) FROM SegmentMembership
                WHERE segment_id=%s AND removed_at IS NULL
            )
            WHERE segment_id=%s
        """, (segment_id, segment_id))
    db_conn.commit()

    return added, removed


def sync_membership_batch(db_conn, segment_id, all_user_ids, qualifying_user_ids):
    """
    Given a batch of user IDs and which ones qualify for the segment,
    add new members and soft-remove users who no longer qualify.
    """
    qualifying = set(qualifying_user_ids)
    added = removed = 0

    with db_conn.cursor() as cur:
        # Get current membership for this batch
        cur.execute("""
            SELECT user_id FROM SegmentMembership
            WHERE segment_id=%s AND user_id=ANY(%s) AND removed_at IS NULL
        """, (segment_id, list(all_user_ids)))
        current_members = {r[0] for r in cur.fetchall()}

    to_add    = qualifying - current_members
    to_remove = current_members - qualifying

    with db_conn.cursor() as cur:
        if to_add:
            cur.executemany("""
                INSERT INTO SegmentMembership (user_id, segment_id, added_at)
                VALUES (%s, %s, NOW())
                ON CONFLICT (user_id, segment_id) DO UPDATE SET removed_at=NULL, added_at=NOW()
            """, [(uid, segment_id) for uid in to_add])
            added = len(to_add)

        if to_remove:
            cur.execute("""
                UPDATE SegmentMembership
                SET removed_at=NOW()
                WHERE segment_id=%s AND user_id=ANY(%s) AND removed_at IS NULL
            """, (segment_id, list(to_remove)))
            removed = len(to_remove)

    db_conn.commit()
    return added, removed


def incremental_update_on_attribute_change(user_id: int, changed_fields: list, db_conn):
    """
    Called when a user's attributes change (e.g. plan upgrade, new purchase).
    Re-evaluates only segments whose rules reference the changed fields.
    Much cheaper than full recompute — O(relevant_segments) not O(all_users * all_segments).
    """
    attrs = build_user_attrs(user_id, db_conn)

    with db_conn.cursor() as cur:
        # Find segments that reference any of the changed fields in their rules
        # JSONB @> and jsonb_path_exists allow field-level rule filtering
        placeholders = ','.join([f"'$.*.field ? (@ == "{f}")'::jsonpath" for f in changed_fields])
        cur.execute(f"""
            SELECT segment_id, rules, rule_combinator
            FROM Segment
            WHERE status = 'active'
              AND (
                  {' OR '.join([
                      f"rules @? '$.*.field ? (@ == "{f}")'::jsonpath"
                      for f in changed_fields
                  ])}
              )
        """)
        relevant_segments = cur.fetchall()

    for segment_id, rules, combinator in relevant_segments:
        qualifies = evaluate_segment_rules(rules, combinator, attrs)

        with db_conn.cursor() as cur:
            cur.execute("""
                SELECT 1 FROM SegmentMembership
                WHERE segment_id=%s AND user_id=%s AND removed_at IS NULL
            """, (segment_id, user_id))
            is_member = cur.fetchone() is not None

        if qualifies and not is_member:
            with db_conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO SegmentMembership (user_id, segment_id, added_at)
                    VALUES (%s, %s, NOW())
                    ON CONFLICT (user_id, segment_id) DO UPDATE SET removed_at=NULL, added_at=NOW()
                """, (user_id, segment_id))
            db_conn.commit()

        elif not qualifies and is_member:
            with db_conn.cursor() as cur:
                cur.execute("""
                    UPDATE SegmentMembership SET removed_at=NOW()
                    WHERE segment_id=%s AND user_id=%s AND removed_at IS NULL
                """, (segment_id, user_id))
            db_conn.commit()

Segment Size Estimation

Before running a full recompute on a new segment (which may be expensive for large user bases), estimate the segment size using sampling or EXPLAIN ANALYZE approximate counts.


def estimate_segment_size(segment_id: int, db_conn, sample_fraction=0.05) -> int:
    """
    Estimate how many users will be in a segment by evaluating rules against
    a random sample of users and extrapolating.
    """
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT rules, rule_combinator FROM Segment WHERE segment_id=%s
        """, (segment_id,))
        rules, combinator = cur.fetchone()

        # Sample a fraction of users randomly
        cur.execute("""
            SELECT user_id FROM UserAttributeSnapshot
            TABLESAMPLE BERNOULLI(%s)
        """, (sample_fraction * 100,))
        sample = cur.fetchall()

    if not sample:
        return 0

    qualifying = sum(
        1 for (uid,) in sample
        if evaluate_segment_rules(rules, combinator, build_user_attrs(uid, db_conn))
    )

    # Extrapolate to full population
    with db_conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM UserAttributeSnapshot")
        total_users = cur.fetchone()[0]

    estimated = int((qualifying / len(sample)) * total_users)

    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE Segment SET estimated_size=%s WHERE segment_id=%s
        """, (estimated, segment_id))
    db_conn.commit()

    return estimated

Usage in Campaigns

Once the SegmentMembership table is populated, downstream systems can retrieve the target audience with a single indexed lookup.


-- Get all current members of a segment for email campaign targeting
SELECT sm.user_id, u.email, u.display_name
FROM SegmentMembership sm
JOIN users u ON u.id = sm.user_id
WHERE sm.segment_id = $1
  AND sm.removed_at IS NULL
ORDER BY sm.added_at DESC;

-- Count by segment for dashboard overview
SELECT s.name, COUNT(sm.user_id) AS member_count
FROM Segment s
LEFT JOIN SegmentMembership sm
    ON sm.segment_id = s.segment_id AND sm.removed_at IS NULL
WHERE s.status = 'active'
GROUP BY s.segment_id, s.name
ORDER BY member_count DESC;

-- Find all segments a specific user currently belongs to
SELECT s.segment_id, s.name, sm.added_at
FROM SegmentMembership sm
JOIN Segment s ON s.segment_id = sm.segment_id
WHERE sm.user_id = $1
  AND sm.removed_at IS NULL
ORDER BY sm.added_at DESC;

-- Segment-targeted feature flag: is user in segment?
SELECT EXISTS (
    SELECT 1 FROM SegmentMembership
    WHERE user_id = $1 AND segment_id = $2 AND removed_at IS NULL
) AS is_member;

Key Design Decisions

  • JSONB rules enable no-code segment building: Storing the rule definition as a JSONB array means product managers can create and modify segments through a UI without schema changes or deployments. The rule engine is a ~50-line Python function that handles all needed operators. New operators (like regex_match or between) can be added to the engine without touching the data model.
  • Incremental updates avoid full daily recompute: Triggering incremental_update_on_attribute_change on user attribute events (plan change, purchase, login) means the membership table stays near-real-time without a nightly full-scan job. A user who upgrades to an enterprise plan at 10 AM is in enterprise segments by 10:01 AM, not at midnight. The JSONB field filter in the SQL query ensures only segments that care about changed fields are re-evaluated.
  • SegmentMembership table enables O(1) membership lookup: Downstream systems (campaign emailers, feature flag checks, personalization engines) can check membership with a single primary-key lookup on (user_id, segment_id) rather than re-evaluating rules at request time. This separates the expensive compute phase from the cheap read phase.
  • Soft deletes on membership preserve history: Setting removed_at rather than deleting rows means the system can answer “was this user in this segment during the campaign that ran last month?” — useful for campaign attribution, A/B test analysis, and compliance audits. A periodic cleanup job can hard-delete rows older than the retention period.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are segment rules stored and evaluated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Rules are stored as JSONB arrays of condition objects; a Python rule engine iterates conditions and applies operator functions (eq, gt, in, contains, is_null) against user attribute snapshots.”
}
},
{
“@type”: “Question”,
“name”: “How are large segments computed without full table scans?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Incremental updates process only users whose attributes changed since the last compute job, using a dirty-flag or changelog table.”
}
},
{
“@type”: “Question”,
“name”: “How is segment membership estimated before full computation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “TABLESAMPLE BERNOULLI on the user attribute snapshot table provides a statistical estimate of membership count without a full scan.”
}
},
{
“@type”: “Question”,
“name”: “How are segments used for targeting?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A SegmentMembership table is pre-computed; targeting queries join against it rather than re-evaluating rules at query time.”
}
}
]
}

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top