Digest Email System Low-Level Design: Aggregation, Scheduling, Personalization, and Delivery

Overview

A digest email system aggregates user-relevant events over a configurable time window and delivers a single summarized email rather than individual per-event notifications. This pattern dramatically reduces notification fatigue while keeping users informed. The core challenge is correctly windowing events per user, personalizing content ranking, honoring individual timezone and schedule preferences, and ensuring reliable delivery with full unsubscribe compliance. This LLD walks through the data model, aggregation pipeline, scheduling logic, rendering, and delivery mechanics.

Core Data Model


-- Stores each user's digest subscription preferences
CREATE TABLE DigestSubscription (
    subscription_id   BIGSERIAL PRIMARY KEY,
    user_id           BIGINT        NOT NULL,
    digest_type       VARCHAR(64)   NOT NULL,  -- e.g. 'daily_activity', 'weekly_summary'
    frequency         VARCHAR(16)   NOT NULL,  -- 'daily', 'weekly'
    preferred_hour    SMALLINT      NOT NULL DEFAULT 8,  -- local hour to send
    timezone          VARCHAR(64)   NOT NULL DEFAULT 'UTC',
    next_digest_at    TIMESTAMPTZ   NOT NULL,
    is_active         BOOLEAN       NOT NULL DEFAULT TRUE,
    created_at        TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    UNIQUE (user_id, digest_type)
);

-- Raw events to be included in future digests
CREATE TABLE DigestEvent (
    event_id          BIGSERIAL PRIMARY KEY,
    user_id           BIGINT        NOT NULL,
    digest_type       VARCHAR(64)   NOT NULL,
    event_type        VARCHAR(128)  NOT NULL,
    entity_id         BIGINT,
    entity_type       VARCHAR(64),
    payload           JSONB         NOT NULL DEFAULT '{}',
    engagement_score  FLOAT         NOT NULL DEFAULT 1.0,
    occurred_at       TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    included_in_batch BIGINT        REFERENCES DigestBatch(batch_id) ON DELETE SET NULL
);

CREATE INDEX idx_digest_event_user_type ON DigestEvent(user_id, digest_type, occurred_at DESC);
CREATE INDEX idx_digest_event_unbatched ON DigestEvent(user_id, digest_type) WHERE included_in_batch IS NULL;

-- One batch per user per digest window
CREATE TABLE DigestBatch (
    batch_id          BIGSERIAL PRIMARY KEY,
    user_id           BIGINT        NOT NULL,
    digest_type       VARCHAR(64)   NOT NULL,
    window_start      TIMESTAMPTZ   NOT NULL,
    window_end        TIMESTAMPTZ   NOT NULL,
    event_count       INT           NOT NULL DEFAULT 0,
    status            VARCHAR(32)   NOT NULL DEFAULT 'pending',  -- pending, rendered, sent, failed
    created_at        TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

-- Delivery record per batch
CREATE TABLE DigestDelivery (
    delivery_id       BIGSERIAL PRIMARY KEY,
    batch_id          BIGINT        NOT NULL REFERENCES DigestBatch(batch_id),
    user_id           BIGINT        NOT NULL,
    email_address     VARCHAR(255)  NOT NULL,
    subject           TEXT          NOT NULL,
    sent_at           TIMESTAMPTZ,
    provider_message_id VARCHAR(255),
    status            VARCHAR(32)   NOT NULL DEFAULT 'queued',  -- queued, sent, bounced, failed
    failure_reason    TEXT,
    created_at        TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

Aggregation Pipeline

The aggregation worker runs on a schedule (every few minutes) and collects unbatched events for users whose digest window has closed.


import psycopg2
from datetime import datetime, timezone
from collections import defaultdict

def aggregate_pending_digests(db_conn):
    """
    Find all DigestSubscriptions whose next_digest_at <= NOW()
    and aggregate unbatched events into a DigestBatch.
    """
    now = datetime.now(timezone.utc)

    with db_conn.cursor() as cur:
        # Lock subscriptions due for processing
        cur.execute("""
            SELECT subscription_id, user_id, digest_type, next_digest_at, frequency, timezone, preferred_hour
            FROM DigestSubscription
            WHERE next_digest_at = %s AND occurred_at < %s
                  AND included_in_batch IS NULL
                ORDER BY occurred_at DESC
            """, (user_id, digest_type, window_start, window_end))
            raw_events = cur.fetchall()

        if not raw_events:
            # Advance next_digest_at even if no events — skip empty digest
            advance_next_digest(db_conn, sub_id, window_end, frequency, tz, pref_hour)
            continue

        deduped = deduplicate_events(raw_events)
        ranked  = rank_events(deduped)

        with db_conn.cursor() as cur:
            cur.execute("""
                INSERT INTO DigestBatch (user_id, digest_type, window_start, window_end, event_count, status)
                VALUES (%s, %s, %s, %s, %s, 'pending')
                RETURNING batch_id
            """, (user_id, digest_type, window_start, window_end, len(ranked)))
            batch_id = cur.fetchone()[0]

            # Mark events as included in this batch
            event_ids = [e[0] for e in ranked]
            cur.execute("""
                UPDATE DigestEvent SET included_in_batch = %s
                WHERE event_id = ANY(%s)
            """, (batch_id, event_ids))

        advance_next_digest(db_conn, sub_id, window_end, frequency, tz, pref_hour)
        db_conn.commit()

        # Enqueue render + send job
        enqueue_job('render_and_send_digest', {'batch_id': batch_id, 'user_id': user_id})


def deduplicate_events(events):
    """
    Remove duplicate events for the same entity in the same window.
    Keep the most recent occurrence per (entity_type, entity_id, event_type).
    """
    seen = {}
    for event in events:
        event_id, event_type, entity_id, entity_type, payload, score, occurred_at = event
        key = (event_type, entity_type, entity_id)
        if key not in seen:
            seen[key] = event
        # already keeping most recent due to ORDER BY occurred_at DESC
    return list(seen.values())


def rank_events(events, top_n=20):
    """
    Rank events by a composite score of recency and engagement.
    Score = engagement_score * recency_decay
    """
    import math
    now = datetime.now(timezone.utc)
    scored = []
    for event in events:
        event_id, event_type, entity_id, entity_type, payload, engagement_score, occurred_at = event
        age_hours = (now - occurred_at).total_seconds() / 3600
        recency_decay = math.exp(-0.1 * age_hours)  # exponential decay
        composite = engagement_score * recency_decay
        scored.append((composite, event))
    scored.sort(key=lambda x: x[0], reverse=True)
    return [e for _, e in scored[:top_n]]


def compute_window_start(window_end, frequency):
    from dateutil.relativedelta import relativedelta
    if frequency == 'daily':
        return window_end - relativedelta(days=1)
    elif frequency == 'weekly':
        return window_end - relativedelta(weeks=1)
    raise ValueError(f"Unknown frequency: {frequency}")

Scheduling Logic

Each DigestSubscription has a next_digest_at stored in UTC. The scheduler finds all subscriptions past due and advances the pointer after processing.


from datetime import datetime, timezone, timedelta
import pytz
from dateutil.relativedelta import relativedelta

def compute_next_digest_at(frequency, preferred_hour, user_timezone, after_utc):
    """
    Given the user's preferred send hour (in their local timezone) and frequency,
    compute the next UTC time to send after `after_utc`.
    """
    tz = pytz.timezone(user_timezone)
    local_now = after_utc.astimezone(tz)

    if frequency == 'daily':
        # Next day at preferred_hour local time
        candidate = local_now.replace(hour=preferred_hour, minute=0, second=0, microsecond=0)
        if candidate <= local_now:
            candidate += timedelta(days=1)
    elif frequency == 'weekly':
        # Next week, same weekday as the original schedule, at preferred_hour
        candidate = local_now.replace(hour=preferred_hour, minute=0, second=0, microsecond=0)
        if candidate <= local_now:
            candidate += timedelta(weeks=1)
    else:
        raise ValueError(f"Unknown frequency: {frequency}")

    return candidate.astimezone(timezone.utc)


def advance_next_digest(db_conn, sub_id, window_end, frequency, user_timezone, preferred_hour):
    next_at = compute_next_digest_at(frequency, preferred_hour, user_timezone, window_end)
    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE DigestSubscription
            SET next_digest_at = %s
            WHERE subscription_id = %s
        """, (next_at, sub_id))

Rendering

Each digest type has a Jinja2 template. The render worker pulls the top-N events from the batch and builds a personalized HTML email.


from jinja2 import Environment, FileSystemLoader
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

jinja_env = Environment(loader=FileSystemLoader('/app/templates/digests'))

def render_digest(batch_id, user_id, db_conn):
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT b.digest_type, b.window_start, b.window_end,
                   u.email, u.display_name, u.timezone
            FROM DigestBatch b
            JOIN users u ON u.id = b.user_id
            WHERE b.batch_id = %s
        """, (batch_id,))
        row = cur.fetchone()

    digest_type, window_start, window_end, email, display_name, user_tz = row

    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT e.event_type, e.entity_type, e.entity_id, e.payload, e.occurred_at
            FROM DigestEvent e
            WHERE e.included_in_batch = %s
            ORDER BY e.engagement_score DESC, e.occurred_at DESC
        """, (batch_id,))
        events = cur.fetchall()

    template = jinja_env.get_template(f"{digest_type}.html.j2")
    unsubscribe_token = generate_unsubscribe_token(user_id, digest_type)

    html_body = template.render(
        display_name=display_name,
        events=events,
        window_start=localize(window_start, user_tz),
        window_end=localize(window_end, user_tz),
        unsubscribe_url=f"https://example.com/unsubscribe/{unsubscribe_token}",
        digest_type=digest_type,
    )

    subject = build_subject(digest_type, events, window_start, user_tz)

    with db_conn.cursor() as cur:
        cur.execute("""
            INSERT INTO DigestDelivery (batch_id, user_id, email_address, subject, status)
            VALUES (%s, %s, %s, %s, 'queued')
            RETURNING delivery_id
        """, (batch_id, user_id, email, subject))
        delivery_id = cur.fetchone()[0]

        cur.execute("UPDATE DigestBatch SET status='rendered' WHERE batch_id=%s", (batch_id,))
    db_conn.commit()

    return delivery_id, html_body, email, subject

Delivery

A separate delivery worker picks up queued DigestDelivery rows, sends via SES/SendGrid, and advances the record.


import boto3
from botocore.exceptions import ClientError

ses = boto3.client('ses', region_name='us-east-1')

def send_digest_batch(delivery_id, html_body, email, subject, db_conn):
    try:
        response = ses.send_email(
            Source='digest@example.com',
            Destination={'ToAddresses': [email]},
            Message={
                'Subject': {'Data': subject, 'Charset': 'UTF-8'},
                'Body': {'Html': {'Data': html_body, 'Charset': 'UTF-8'}},
            },
        )
        message_id = response['MessageId']
        with db_conn.cursor() as cur:
            cur.execute("""
                UPDATE DigestDelivery
                SET status='sent', sent_at=NOW(), provider_message_id=%s
                WHERE delivery_id=%s
            """, (message_id, delivery_id))
            cur.execute("""
                UPDATE DigestBatch SET status='sent'
                WHERE batch_id=(SELECT batch_id FROM DigestDelivery WHERE delivery_id=%s)
            """, (delivery_id,))
        db_conn.commit()

    except ClientError as e:
        reason = e.response['Error']['Message']
        with db_conn.cursor() as cur:
            cur.execute("""
                UPDATE DigestDelivery
                SET status='failed', failure_reason=%s
                WHERE delivery_id=%s
            """, (reason, delivery_id))
            cur.execute("""
                UPDATE DigestBatch SET status='failed'
                WHERE batch_id=(SELECT batch_id FROM DigestDelivery WHERE delivery_id=%s)
            """, (delivery_id,))
        db_conn.commit()
        raise


def handle_unsubscribe(token, db_conn):
    """Validate token and deactivate the subscription."""
    user_id, digest_type = validate_unsubscribe_token(token)
    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE DigestSubscription
            SET is_active = FALSE
            WHERE user_id = %s AND digest_type = %s
        """, (user_id, digest_type))
    db_conn.commit()

Key Design Decisions

  • Window-based aggregation prevents email floods: By batching all events in a configurable window (daily/weekly) into a single digest, users receive one email instead of tens of individual notifications. The included_in_batch foreign key ensures each event is counted exactly once even if the aggregation worker is retried.
  • Timezone-aware scheduling: next_digest_at is always stored in UTC but computed from the user’s preferred local hour and timezone using pytz. This correctly handles DST transitions — a user who wants 8 AM always gets 8 AM in their local time.
  • Content deduplication across windows: The deduplicate_events step removes repeated (event_type, entity_type, entity_id) tuples within the window, preventing a user from seeing the same entity referenced five times in one digest because five minor updates fired events.
  • Unsubscribe compliance: Every rendered email includes a signed unsubscribe token that maps back to the specific (user_id, digest_type) pair. The handle_unsubscribe function sets is_active=FALSE, which the scheduler WHERE clause respects, guaranteeing no further digests are sent regardless of queued jobs.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a digest email system aggregate content?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “It queries user subscriptions and scores items by engagement (opens, clicks, recency) within a lookback window, groups by user, and batches delivery using SES or similar.”
}
},
{
“@type”: “Question”,
“name”: “How are timezones handled in scheduled digest delivery?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each subscription stores a timezone; delivery time is converted to UTC using pytz for scheduler comparison.”
}
},
{
“@type”: “Question”,
“name”: “How does digest unsubscribe work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A signed token (HMAC) is embedded in each email; clicking it marks the subscription inactive without requiring login.”
}
},
{
“@type”: “Question”,
“name”: “What prevents duplicate digest sends?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An idempotency key (user_id + digest_date) with a unique constraint prevents re-insertion of already-sent digests.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top