Price History System Low-Level Design: Time-Series Storage, Change Detection, and Alert Notifications

Overview

A price history system records product price changes over time, enables historical chart queries, and fires alerts when a product’s price crosses a user-defined threshold. The system must handle high-throughput ingestion from many price sources, efficiently store time-series data without recording redundant values, and evaluate alert conditions at ingestion time rather than on a polling loop. This LLD covers the data model, ingestion pipeline, alert matching, historical chart queries, and the key design tradeoffs that make the system both storage-efficient and low-latency for alert delivery.

Core Data Model


-- Time-series price records — insert-only, partitioned by month
CREATE TABLE PriceRecord (
    record_id     BIGSERIAL,
    product_id    BIGINT        NOT NULL,
    price         NUMERIC(12,4) NOT NULL,
    currency      CHAR(3)       NOT NULL DEFAULT 'USD',
    source        VARCHAR(64)   NOT NULL,  -- 'amazon', 'bestbuy', 'manual'
    recorded_at   TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    PRIMARY KEY (record_id, recorded_at)
) PARTITION BY RANGE (recorded_at);

-- Monthly partitions created ahead of time (or via pg_partman)
CREATE TABLE PriceRecord_2025_01 PARTITION OF PriceRecord
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE PriceRecord_2025_02 PARTITION OF PriceRecord
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- ... etc.

CREATE INDEX idx_pricerecord_product_time
    ON PriceRecord (product_id, recorded_at DESC);

-- Materialized current price per product+source for fast change detection
CREATE TABLE ProductCurrentPrice (
    product_id    BIGINT        NOT NULL,
    source        VARCHAR(64)   NOT NULL,
    current_price NUMERIC(12,4) NOT NULL,
    currency      CHAR(3)       NOT NULL,
    updated_at    TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    PRIMARY KEY (product_id, source)
);

-- User-defined price alerts
CREATE TABLE PriceAlert (
    alert_id      BIGSERIAL PRIMARY KEY,
    user_id       BIGINT        NOT NULL,
    product_id    BIGINT        NOT NULL,
    target_price  NUMERIC(12,4) NOT NULL,
    direction     VARCHAR(8)    NOT NULL CHECK (direction IN ('drop', 'rise')),
    currency      CHAR(3)       NOT NULL DEFAULT 'USD',
    status        VARCHAR(16)   NOT NULL DEFAULT 'active',  -- active, triggered, cancelled
    created_at    TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    triggered_at  TIMESTAMPTZ
);

CREATE INDEX idx_pricealert_product_active
    ON PriceAlert (product_id, status, direction, target_price)
    WHERE status = 'active';

-- Notification log for triggered alerts
CREATE TABLE PriceAlertNotification (
    notification_id BIGSERIAL PRIMARY KEY,
    alert_id        BIGINT        NOT NULL REFERENCES PriceAlert(alert_id),
    user_id         BIGINT        NOT NULL,
    product_id      BIGINT        NOT NULL,
    old_price       NUMERIC(12,4) NOT NULL,
    new_price       NUMERIC(12,4) NOT NULL,
    channel         VARCHAR(32)   NOT NULL DEFAULT 'email',  -- email, push, sms
    status          VARCHAR(16)   NOT NULL DEFAULT 'pending',
    sent_at         TIMESTAMPTZ,
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

Ingestion Pipeline

The ingestion worker receives price updates from scrapers or partner feeds and decides whether a meaningful change has occurred before persisting a new record.


from decimal import Decimal
import psycopg2

CHANGE_THRESHOLD = Decimal('0.01')  # ignore sub-cent floating noise

def ingest_price(db_conn, product_id, new_price, currency, source):
    """
    Core ingestion function. Only records a new PriceRecord if the price
    changed materially vs the last known value. Triggers alert evaluation
    on any confirmed change.
    """
    new_price = Decimal(str(new_price))

    with db_conn.cursor() as cur:
        # Read current price atomically with a row lock
        cur.execute("""
            SELECT current_price FROM ProductCurrentPrice
            WHERE product_id = %s AND source = %s
            FOR UPDATE
        """, (product_id, source))
        row = cur.fetchone()

    old_price = Decimal(str(row[0])) if row else None

    if old_price is not None and abs(new_price - old_price) <= CHANGE_THRESHOLD:
        # Price unchanged — skip insert, saves ~95% of rows vs naive ingestion
        return None

    with db_conn.cursor() as cur:
        # Insert the new price record
        cur.execute("""
            INSERT INTO PriceRecord (product_id, price, currency, source, recorded_at)
            VALUES (%s, %s, %s, %s, NOW())
            RETURNING record_id, recorded_at
        """, (product_id, new_price, currency, source))
        record_id, recorded_at = cur.fetchone()

        # Upsert current price
        cur.execute("""
            INSERT INTO ProductCurrentPrice (product_id, source, current_price, currency, updated_at)
            VALUES (%s, %s, %s, %s, NOW())
            ON CONFLICT (product_id, source) DO UPDATE
            SET current_price = EXCLUDED.current_price,
                currency      = EXCLUDED.currency,
                updated_at    = NOW()
        """, (product_id, new_price, currency))

    db_conn.commit()

    # Evaluate alerts outside the transaction — alert eval is idempotent
    if old_price is not None:
        evaluate_price_alerts(db_conn, product_id, old_price, new_price)

    return record_id

Alert Evaluation

Alert evaluation runs synchronously after each confirmed price change. Because alerts are indexed by (product_id, status, direction, target_price), matching is a single indexed range scan.


def evaluate_price_alerts(db_conn, product_id, old_price, new_price):
    """
    Fire alerts where the new price crosses the user's threshold in the
    direction they specified.
    """
    triggered = []

    with db_conn.cursor() as cur:
        if new_price = new_price
            cur.execute("""
                SELECT alert_id, user_id, target_price
                FROM PriceAlert
                WHERE product_id = %s
                  AND status = 'active'
                  AND direction = 'drop'
                  AND target_price >= %s
            """, (product_id, new_price))
        else:
            # Price rose — fire 'rise' alerts where target_price <= new_price
            cur.execute("""
                SELECT alert_id, user_id, target_price
                FROM PriceAlert
                WHERE product_id = %s
                  AND status = 'active'
                  AND direction = 'rise'
                  AND target_price <= %s
            """, (product_id, new_price))

        triggered = cur.fetchall()

    if not triggered:
        return

    alert_ids = [row[0] for row in triggered]

    with db_conn.cursor() as cur:
        # Mark alerts triggered
        cur.execute("""
            UPDATE PriceAlert
            SET status = 'triggered', triggered_at = NOW()
            WHERE alert_id = ANY(%s)
        """, (alert_ids,))

        # Create notification records
        for alert_id, user_id, target_price in triggered:
            cur.execute("""
                INSERT INTO PriceAlertNotification
                    (alert_id, user_id, product_id, old_price, new_price, channel, status)
                VALUES (%s, %s, %s, %s, %s, 'email', 'pending')
            """, (alert_id, user_id, product_id, old_price, new_price))

    db_conn.commit()

    # Enqueue notification delivery jobs
    for alert_id, user_id, _ in triggered:
        enqueue_job('send_price_alert_notification', {
            'alert_id': alert_id,
            'user_id': user_id,
            'product_id': product_id,
            'old_price': str(old_price),
            'new_price': str(new_price),
        })

Historical Chart Queries

Historical price charts need fast time-range queries for a single product. The partition-by-month scheme means queries touching a 90-day window hit at most 4 partitions.


-- 90-day price history for a product, one data point per day (last price of day)
SELECT
    DATE_TRUNC('day', recorded_at AT TIME ZONE 'UTC') AS price_date,
    (ARRAY_AGG(price ORDER BY recorded_at DESC))[1]   AS closing_price,
    MIN(price)                                         AS daily_low,
    MAX(price)                                         AS daily_high,
    source
FROM PriceRecord
WHERE product_id = $1
  AND source     = $2
  AND recorded_at > NOW() - INTERVAL '90 days'
GROUP BY price_date, source
ORDER BY price_date ASC;

-- All-time lowest price for a product across all sources
SELECT source, MIN(price) AS lowest_price, MIN(recorded_at) AS first_seen_at
FROM PriceRecord
WHERE product_id = $1
GROUP BY source
ORDER BY lowest_price ASC
LIMIT 10;

-- Current prices across all sources for a product (fast, from materialized table)
SELECT source, current_price, currency, updated_at
FROM ProductCurrentPrice
WHERE product_id = $1
ORDER BY current_price ASC;

For even faster chart rendering, a background job can pre-aggregate daily OHLC (open/high/low/close) rows into a PriceRecordDaily materialized table and serve charts from there instead of scanning raw partitions.

Key Design Decisions

  • Insert-only on change saves ~95% of storage: Price scrapers often poll every few minutes even when prices are stable. By comparing against ProductCurrentPrice and skipping inserts when the price hasn’t materially changed (sub-cent threshold), the PriceRecord table stays lean. For a product checked every 5 minutes with a daily price change, this reduces writes from 288/day to ~1-2/day per source.
  • Alert evaluation at ingestion time, not polling: Evaluating alerts synchronously on each confirmed price change means latency from price drop to notification is seconds rather than the minutes-to-hours delay of a polling cron. The alert index on (product_id, status, direction, target_price) makes the matching query an O(log n) range scan.
  • Partition by month for retention management: Monthly range partitions let you drop old data by simply running DROP TABLE PriceRecord_2023_01 — no expensive DELETE scans. Partition pruning also keeps 90-day chart queries fast by eliminating irrelevant partitions at the planner level.
  • ProductCurrentPrice as a fast lookup layer: Storing the current price separately avoids a SELECT MAX(recorded_at) subquery on the append-only PriceRecord table for every ingestion change-detection check. The upsert is a single primary-key write.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is price change detection implemented efficiently?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The current price is stored in a separate ProductCurrentPrice table; on insert, the new price is compared to the current value and only a new PriceRecord is written if they differ.”
}
},
{
“@type”: “Question”,
“name”: “How are price alerts evaluated at scale?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Alerts are evaluated at ingestion time using a database query that finds all active alerts for the product whose threshold crosses the new price.”
}
},
{
“@type”: “Question”,
“name”: “How is historical price data partitioned?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “PriceRecord is range-partitioned by month on recorded_at to keep index sizes manageable and enable efficient time-range queries.”
}
},
{
“@type”: “Question”,
“name”: “What chart data is served for price history graphs?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A query groups records by time bucket returning open/high/low/close aggregates, similar to OHLC candlestick data.”
}
}
]
}

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems

See also: Shopify Interview Guide

Scroll to Top