Overview
A digest email system aggregates user-relevant events over a configurable time window and delivers a single summarized email rather than individual per-event notifications. This pattern dramatically reduces notification fatigue while keeping users informed. The core challenge is correctly windowing events per user, personalizing content ranking, honoring individual timezone and schedule preferences, and ensuring reliable delivery with full unsubscribe compliance. This LLD walks through the data model, aggregation pipeline, scheduling logic, rendering, and delivery mechanics.
Core Data Model
-- Stores each user's digest subscription preferences
CREATE TABLE DigestSubscription (
subscription_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
digest_type VARCHAR(64) NOT NULL, -- e.g. 'daily_activity', 'weekly_summary'
frequency VARCHAR(16) NOT NULL, -- 'daily', 'weekly'
preferred_hour SMALLINT NOT NULL DEFAULT 8, -- local hour to send
timezone VARCHAR(64) NOT NULL DEFAULT 'UTC',
next_digest_at TIMESTAMPTZ NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (user_id, digest_type)
);
-- Raw events to be included in future digests
CREATE TABLE DigestEvent (
event_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
digest_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
entity_id BIGINT,
entity_type VARCHAR(64),
payload JSONB NOT NULL DEFAULT '{}',
engagement_score FLOAT NOT NULL DEFAULT 1.0,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
included_in_batch BIGINT REFERENCES DigestBatch(batch_id) ON DELETE SET NULL
);
CREATE INDEX idx_digest_event_user_type ON DigestEvent(user_id, digest_type, occurred_at DESC);
CREATE INDEX idx_digest_event_unbatched ON DigestEvent(user_id, digest_type) WHERE included_in_batch IS NULL;
-- One batch per user per digest window
CREATE TABLE DigestBatch (
batch_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
digest_type VARCHAR(64) NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
window_end TIMESTAMPTZ NOT NULL,
event_count INT NOT NULL DEFAULT 0,
status VARCHAR(32) NOT NULL DEFAULT 'pending', -- pending, rendered, sent, failed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Delivery record per batch
CREATE TABLE DigestDelivery (
delivery_id BIGSERIAL PRIMARY KEY,
batch_id BIGINT NOT NULL REFERENCES DigestBatch(batch_id),
user_id BIGINT NOT NULL,
email_address VARCHAR(255) NOT NULL,
subject TEXT NOT NULL,
sent_at TIMESTAMPTZ,
provider_message_id VARCHAR(255),
status VARCHAR(32) NOT NULL DEFAULT 'queued', -- queued, sent, bounced, failed
failure_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Aggregation Pipeline
The aggregation worker runs on a schedule (every few minutes) and collects unbatched events for users whose digest window has closed.
import psycopg2
from datetime import datetime, timezone
from collections import defaultdict
def aggregate_pending_digests(db_conn):
"""
Find all DigestSubscriptions whose next_digest_at <= NOW()
and aggregate unbatched events into a DigestBatch.
"""
now = datetime.now(timezone.utc)
with db_conn.cursor() as cur:
# Lock subscriptions due for processing
cur.execute("""
SELECT subscription_id, user_id, digest_type, next_digest_at, frequency, timezone, preferred_hour
FROM DigestSubscription
WHERE next_digest_at = %s AND occurred_at < %s
AND included_in_batch IS NULL
ORDER BY occurred_at DESC
""", (user_id, digest_type, window_start, window_end))
raw_events = cur.fetchall()
if not raw_events:
# Advance next_digest_at even if no events — skip empty digest
advance_next_digest(db_conn, sub_id, window_end, frequency, tz, pref_hour)
continue
deduped = deduplicate_events(raw_events)
ranked = rank_events(deduped)
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO DigestBatch (user_id, digest_type, window_start, window_end, event_count, status)
VALUES (%s, %s, %s, %s, %s, 'pending')
RETURNING batch_id
""", (user_id, digest_type, window_start, window_end, len(ranked)))
batch_id = cur.fetchone()[0]
# Mark events as included in this batch
event_ids = [e[0] for e in ranked]
cur.execute("""
UPDATE DigestEvent SET included_in_batch = %s
WHERE event_id = ANY(%s)
""", (batch_id, event_ids))
advance_next_digest(db_conn, sub_id, window_end, frequency, tz, pref_hour)
db_conn.commit()
# Enqueue render + send job
enqueue_job('render_and_send_digest', {'batch_id': batch_id, 'user_id': user_id})
def deduplicate_events(events):
"""
Remove duplicate events for the same entity in the same window.
Keep the most recent occurrence per (entity_type, entity_id, event_type).
"""
seen = {}
for event in events:
event_id, event_type, entity_id, entity_type, payload, score, occurred_at = event
key = (event_type, entity_type, entity_id)
if key not in seen:
seen[key] = event
# already keeping most recent due to ORDER BY occurred_at DESC
return list(seen.values())
def rank_events(events, top_n=20):
"""
Rank events by a composite score of recency and engagement.
Score = engagement_score * recency_decay
"""
import math
now = datetime.now(timezone.utc)
scored = []
for event in events:
event_id, event_type, entity_id, entity_type, payload, engagement_score, occurred_at = event
age_hours = (now - occurred_at).total_seconds() / 3600
recency_decay = math.exp(-0.1 * age_hours) # exponential decay
composite = engagement_score * recency_decay
scored.append((composite, event))
scored.sort(key=lambda x: x[0], reverse=True)
return [e for _, e in scored[:top_n]]
def compute_window_start(window_end, frequency):
from dateutil.relativedelta import relativedelta
if frequency == 'daily':
return window_end - relativedelta(days=1)
elif frequency == 'weekly':
return window_end - relativedelta(weeks=1)
raise ValueError(f"Unknown frequency: {frequency}")
Scheduling Logic
Each DigestSubscription has a next_digest_at stored in UTC. The scheduler finds all subscriptions past due and advances the pointer after processing.
from datetime import datetime, timezone, timedelta
import pytz
from dateutil.relativedelta import relativedelta
def compute_next_digest_at(frequency, preferred_hour, user_timezone, after_utc):
"""
Given the user's preferred send hour (in their local timezone) and frequency,
compute the next UTC time to send after `after_utc`.
"""
tz = pytz.timezone(user_timezone)
local_now = after_utc.astimezone(tz)
if frequency == 'daily':
# Next day at preferred_hour local time
candidate = local_now.replace(hour=preferred_hour, minute=0, second=0, microsecond=0)
if candidate <= local_now:
candidate += timedelta(days=1)
elif frequency == 'weekly':
# Next week, same weekday as the original schedule, at preferred_hour
candidate = local_now.replace(hour=preferred_hour, minute=0, second=0, microsecond=0)
if candidate <= local_now:
candidate += timedelta(weeks=1)
else:
raise ValueError(f"Unknown frequency: {frequency}")
return candidate.astimezone(timezone.utc)
def advance_next_digest(db_conn, sub_id, window_end, frequency, user_timezone, preferred_hour):
next_at = compute_next_digest_at(frequency, preferred_hour, user_timezone, window_end)
with db_conn.cursor() as cur:
cur.execute("""
UPDATE DigestSubscription
SET next_digest_at = %s
WHERE subscription_id = %s
""", (next_at, sub_id))
Rendering
Each digest type has a Jinja2 template. The render worker pulls the top-N events from the batch and builds a personalized HTML email.
from jinja2 import Environment, FileSystemLoader
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
jinja_env = Environment(loader=FileSystemLoader('/app/templates/digests'))
def render_digest(batch_id, user_id, db_conn):
with db_conn.cursor() as cur:
cur.execute("""
SELECT b.digest_type, b.window_start, b.window_end,
u.email, u.display_name, u.timezone
FROM DigestBatch b
JOIN users u ON u.id = b.user_id
WHERE b.batch_id = %s
""", (batch_id,))
row = cur.fetchone()
digest_type, window_start, window_end, email, display_name, user_tz = row
with db_conn.cursor() as cur:
cur.execute("""
SELECT e.event_type, e.entity_type, e.entity_id, e.payload, e.occurred_at
FROM DigestEvent e
WHERE e.included_in_batch = %s
ORDER BY e.engagement_score DESC, e.occurred_at DESC
""", (batch_id,))
events = cur.fetchall()
template = jinja_env.get_template(f"{digest_type}.html.j2")
unsubscribe_token = generate_unsubscribe_token(user_id, digest_type)
html_body = template.render(
display_name=display_name,
events=events,
window_start=localize(window_start, user_tz),
window_end=localize(window_end, user_tz),
unsubscribe_url=f"https://example.com/unsubscribe/{unsubscribe_token}",
digest_type=digest_type,
)
subject = build_subject(digest_type, events, window_start, user_tz)
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO DigestDelivery (batch_id, user_id, email_address, subject, status)
VALUES (%s, %s, %s, %s, 'queued')
RETURNING delivery_id
""", (batch_id, user_id, email, subject))
delivery_id = cur.fetchone()[0]
cur.execute("UPDATE DigestBatch SET status='rendered' WHERE batch_id=%s", (batch_id,))
db_conn.commit()
return delivery_id, html_body, email, subject
Delivery
A separate delivery worker picks up queued DigestDelivery rows, sends via SES/SendGrid, and advances the record.
import boto3
from botocore.exceptions import ClientError
ses = boto3.client('ses', region_name='us-east-1')
def send_digest_batch(delivery_id, html_body, email, subject, db_conn):
try:
response = ses.send_email(
Source='digest@example.com',
Destination={'ToAddresses': [email]},
Message={
'Subject': {'Data': subject, 'Charset': 'UTF-8'},
'Body': {'Html': {'Data': html_body, 'Charset': 'UTF-8'}},
},
)
message_id = response['MessageId']
with db_conn.cursor() as cur:
cur.execute("""
UPDATE DigestDelivery
SET status='sent', sent_at=NOW(), provider_message_id=%s
WHERE delivery_id=%s
""", (message_id, delivery_id))
cur.execute("""
UPDATE DigestBatch SET status='sent'
WHERE batch_id=(SELECT batch_id FROM DigestDelivery WHERE delivery_id=%s)
""", (delivery_id,))
db_conn.commit()
except ClientError as e:
reason = e.response['Error']['Message']
with db_conn.cursor() as cur:
cur.execute("""
UPDATE DigestDelivery
SET status='failed', failure_reason=%s
WHERE delivery_id=%s
""", (reason, delivery_id))
cur.execute("""
UPDATE DigestBatch SET status='failed'
WHERE batch_id=(SELECT batch_id FROM DigestDelivery WHERE delivery_id=%s)
""", (delivery_id,))
db_conn.commit()
raise
def handle_unsubscribe(token, db_conn):
"""Validate token and deactivate the subscription."""
user_id, digest_type = validate_unsubscribe_token(token)
with db_conn.cursor() as cur:
cur.execute("""
UPDATE DigestSubscription
SET is_active = FALSE
WHERE user_id = %s AND digest_type = %s
""", (user_id, digest_type))
db_conn.commit()
Key Design Decisions
- Window-based aggregation prevents email floods: By batching all events in a configurable window (daily/weekly) into a single digest, users receive one email instead of tens of individual notifications. The
included_in_batchforeign key ensures each event is counted exactly once even if the aggregation worker is retried. - Timezone-aware scheduling:
next_digest_atis always stored in UTC but computed from the user’s preferred local hour and timezone using pytz. This correctly handles DST transitions — a user who wants 8 AM always gets 8 AM in their local time. - Content deduplication across windows: The
deduplicate_eventsstep removes repeated (event_type, entity_type, entity_id) tuples within the window, preventing a user from seeing the same entity referenced five times in one digest because five minor updates fired events. - Unsubscribe compliance: Every rendered email includes a signed unsubscribe token that maps back to the specific (user_id, digest_type) pair. The
handle_unsubscribefunction setsis_active=FALSE, which the scheduler WHERE clause respects, guaranteeing no further digests are sent regardless of queued jobs.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a digest email system aggregate content?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “It queries user subscriptions and scores items by engagement (opens, clicks, recency) within a lookback window, groups by user, and batches delivery using SES or similar.”
}
},
{
“@type”: “Question”,
“name”: “How are timezones handled in scheduled digest delivery?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each subscription stores a timezone; delivery time is converted to UTC using pytz for scheduler comparison.”
}
},
{
“@type”: “Question”,
“name”: “How does digest unsubscribe work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A signed token (HMAC) is embedded in each email; clicking it marks the subscription inactive without requiring login.”
}
},
{
“@type”: “Question”,
“name”: “What prevents duplicate digest sends?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An idempotency key (user_id + digest_date) with a unique constraint prevents re-insertion of already-sent digests.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering