Overview
A price history system records product price changes over time, enables historical chart queries, and fires alerts when a product’s price crosses a user-defined threshold. The system must handle high-throughput ingestion from many price sources, efficiently store time-series data without recording redundant values, and evaluate alert conditions at ingestion time rather than on a polling loop. This LLD covers the data model, ingestion pipeline, alert matching, historical chart queries, and the key design tradeoffs that make the system both storage-efficient and low-latency for alert delivery.
Core Data Model
-- Time-series price records — insert-only, partitioned by month
CREATE TABLE PriceRecord (
record_id BIGSERIAL,
product_id BIGINT NOT NULL,
price NUMERIC(12,4) NOT NULL,
currency CHAR(3) NOT NULL DEFAULT 'USD',
source VARCHAR(64) NOT NULL, -- 'amazon', 'bestbuy', 'manual'
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (record_id, recorded_at)
) PARTITION BY RANGE (recorded_at);
-- Monthly partitions created ahead of time (or via pg_partman)
CREATE TABLE PriceRecord_2025_01 PARTITION OF PriceRecord
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE PriceRecord_2025_02 PARTITION OF PriceRecord
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- ... etc.
CREATE INDEX idx_pricerecord_product_time
ON PriceRecord (product_id, recorded_at DESC);
-- Materialized current price per product+source for fast change detection
CREATE TABLE ProductCurrentPrice (
product_id BIGINT NOT NULL,
source VARCHAR(64) NOT NULL,
current_price NUMERIC(12,4) NOT NULL,
currency CHAR(3) NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (product_id, source)
);
-- User-defined price alerts
CREATE TABLE PriceAlert (
alert_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
target_price NUMERIC(12,4) NOT NULL,
direction VARCHAR(8) NOT NULL CHECK (direction IN ('drop', 'rise')),
currency CHAR(3) NOT NULL DEFAULT 'USD',
status VARCHAR(16) NOT NULL DEFAULT 'active', -- active, triggered, cancelled
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
triggered_at TIMESTAMPTZ
);
CREATE INDEX idx_pricealert_product_active
ON PriceAlert (product_id, status, direction, target_price)
WHERE status = 'active';
-- Notification log for triggered alerts
CREATE TABLE PriceAlertNotification (
notification_id BIGSERIAL PRIMARY KEY,
alert_id BIGINT NOT NULL REFERENCES PriceAlert(alert_id),
user_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
old_price NUMERIC(12,4) NOT NULL,
new_price NUMERIC(12,4) NOT NULL,
channel VARCHAR(32) NOT NULL DEFAULT 'email', -- email, push, sms
status VARCHAR(16) NOT NULL DEFAULT 'pending',
sent_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Ingestion Pipeline
The ingestion worker receives price updates from scrapers or partner feeds and decides whether a meaningful change has occurred before persisting a new record.
from decimal import Decimal
import psycopg2
CHANGE_THRESHOLD = Decimal('0.01') # ignore sub-cent floating noise
def ingest_price(db_conn, product_id, new_price, currency, source):
"""
Core ingestion function. Only records a new PriceRecord if the price
changed materially vs the last known value. Triggers alert evaluation
on any confirmed change.
"""
new_price = Decimal(str(new_price))
with db_conn.cursor() as cur:
# Read current price atomically with a row lock
cur.execute("""
SELECT current_price FROM ProductCurrentPrice
WHERE product_id = %s AND source = %s
FOR UPDATE
""", (product_id, source))
row = cur.fetchone()
old_price = Decimal(str(row[0])) if row else None
if old_price is not None and abs(new_price - old_price) <= CHANGE_THRESHOLD:
# Price unchanged — skip insert, saves ~95% of rows vs naive ingestion
return None
with db_conn.cursor() as cur:
# Insert the new price record
cur.execute("""
INSERT INTO PriceRecord (product_id, price, currency, source, recorded_at)
VALUES (%s, %s, %s, %s, NOW())
RETURNING record_id, recorded_at
""", (product_id, new_price, currency, source))
record_id, recorded_at = cur.fetchone()
# Upsert current price
cur.execute("""
INSERT INTO ProductCurrentPrice (product_id, source, current_price, currency, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (product_id, source) DO UPDATE
SET current_price = EXCLUDED.current_price,
currency = EXCLUDED.currency,
updated_at = NOW()
""", (product_id, new_price, currency))
db_conn.commit()
# Evaluate alerts outside the transaction — alert eval is idempotent
if old_price is not None:
evaluate_price_alerts(db_conn, product_id, old_price, new_price)
return record_id
Alert Evaluation
Alert evaluation runs synchronously after each confirmed price change. Because alerts are indexed by (product_id, status, direction, target_price), matching is a single indexed range scan.
def evaluate_price_alerts(db_conn, product_id, old_price, new_price):
"""
Fire alerts where the new price crosses the user's threshold in the
direction they specified.
"""
triggered = []
with db_conn.cursor() as cur:
if new_price = new_price
cur.execute("""
SELECT alert_id, user_id, target_price
FROM PriceAlert
WHERE product_id = %s
AND status = 'active'
AND direction = 'drop'
AND target_price >= %s
""", (product_id, new_price))
else:
# Price rose — fire 'rise' alerts where target_price <= new_price
cur.execute("""
SELECT alert_id, user_id, target_price
FROM PriceAlert
WHERE product_id = %s
AND status = 'active'
AND direction = 'rise'
AND target_price <= %s
""", (product_id, new_price))
triggered = cur.fetchall()
if not triggered:
return
alert_ids = [row[0] for row in triggered]
with db_conn.cursor() as cur:
# Mark alerts triggered
cur.execute("""
UPDATE PriceAlert
SET status = 'triggered', triggered_at = NOW()
WHERE alert_id = ANY(%s)
""", (alert_ids,))
# Create notification records
for alert_id, user_id, target_price in triggered:
cur.execute("""
INSERT INTO PriceAlertNotification
(alert_id, user_id, product_id, old_price, new_price, channel, status)
VALUES (%s, %s, %s, %s, %s, 'email', 'pending')
""", (alert_id, user_id, product_id, old_price, new_price))
db_conn.commit()
# Enqueue notification delivery jobs
for alert_id, user_id, _ in triggered:
enqueue_job('send_price_alert_notification', {
'alert_id': alert_id,
'user_id': user_id,
'product_id': product_id,
'old_price': str(old_price),
'new_price': str(new_price),
})
Historical Chart Queries
Historical price charts need fast time-range queries for a single product. The partition-by-month scheme means queries touching a 90-day window hit at most 4 partitions.
-- 90-day price history for a product, one data point per day (last price of day)
SELECT
DATE_TRUNC('day', recorded_at AT TIME ZONE 'UTC') AS price_date,
(ARRAY_AGG(price ORDER BY recorded_at DESC))[1] AS closing_price,
MIN(price) AS daily_low,
MAX(price) AS daily_high,
source
FROM PriceRecord
WHERE product_id = $1
AND source = $2
AND recorded_at > NOW() - INTERVAL '90 days'
GROUP BY price_date, source
ORDER BY price_date ASC;
-- All-time lowest price for a product across all sources
SELECT source, MIN(price) AS lowest_price, MIN(recorded_at) AS first_seen_at
FROM PriceRecord
WHERE product_id = $1
GROUP BY source
ORDER BY lowest_price ASC
LIMIT 10;
-- Current prices across all sources for a product (fast, from materialized table)
SELECT source, current_price, currency, updated_at
FROM ProductCurrentPrice
WHERE product_id = $1
ORDER BY current_price ASC;
For even faster chart rendering, a background job can pre-aggregate daily OHLC (open/high/low/close) rows into a PriceRecordDaily materialized table and serve charts from there instead of scanning raw partitions.
Key Design Decisions
- Insert-only on change saves ~95% of storage: Price scrapers often poll every few minutes even when prices are stable. By comparing against
ProductCurrentPriceand skipping inserts when the price hasn’t materially changed (sub-cent threshold), thePriceRecordtable stays lean. For a product checked every 5 minutes with a daily price change, this reduces writes from 288/day to ~1-2/day per source. - Alert evaluation at ingestion time, not polling: Evaluating alerts synchronously on each confirmed price change means latency from price drop to notification is seconds rather than the minutes-to-hours delay of a polling cron. The alert index on (product_id, status, direction, target_price) makes the matching query an O(log n) range scan.
- Partition by month for retention management: Monthly range partitions let you drop old data by simply running
DROP TABLE PriceRecord_2023_01— no expensive DELETE scans. Partition pruning also keeps 90-day chart queries fast by eliminating irrelevant partitions at the planner level. - ProductCurrentPrice as a fast lookup layer: Storing the current price separately avoids a
SELECT MAX(recorded_at)subquery on the append-onlyPriceRecordtable for every ingestion change-detection check. The upsert is a single primary-key write.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is price change detection implemented efficiently?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The current price is stored in a separate ProductCurrentPrice table; on insert, the new price is compared to the current value and only a new PriceRecord is written if they differ.”
}
},
{
“@type”: “Question”,
“name”: “How are price alerts evaluated at scale?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Alerts are evaluated at ingestion time using a database query that finds all active alerts for the product whose threshold crosses the new price.”
}
},
{
“@type”: “Question”,
“name”: “How is historical price data partitioned?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “PriceRecord is range-partitioned by month on recorded_at to keep index sizes manageable and enable efficient time-range queries.”
}
},
{
“@type”: “Question”,
“name”: “What chart data is served for price history graphs?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A query groups records by time bucket returning open/high/low/close aggregates, similar to OHLC candlestick data.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems
See also: Shopify Interview Guide