Content Recommendation System Low-Level Design: Collaborative Filtering, Embeddings, and Serving

Content Recommendation System: Low-Level Design

A content recommendation system must serve personalized item lists at low latency while continuously incorporating new user signals, handling cold-start for new users and items, and supporting controlled A/B experiments. This article designs a two-stage retrieve-then-rank pipeline using collaborative filtering with ALS, approximate nearest neighbor retrieval with HNSW, content embeddings for cold-start, and an XGBoost re-ranker, with a full SQL schema and Python implementation.

Architecture: Two-Stage Retrieve-then-Rank

Stage 1 (retrieval): given a user embedding, find the top-K candidate items using approximate nearest neighbor search (ANN). Fast but imprecise — may retrieve 500 candidates from millions of items in <10 ms.

Stage 2 (ranking): score the K candidates with a feature-rich model (XGBoost or a small neural net) that incorporates context signals (time of day, device, freshness). Slower but precise — re-ranks 500 → 20 results in <20 ms.

SQL Schema


CREATE TABLE UserEmbedding (
    user_id       BIGINT UNSIGNED   NOT NULL,
    embedding     BLOB              NOT NULL,   -- serialized float32 array (128-dim)
    model_version VARCHAR(32)       NOT NULL,
    updated_at    DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id),
    INDEX idx_model_version (model_version)
) ENGINE=InnoDB;

CREATE TABLE ItemEmbedding (
    item_id       BIGINT UNSIGNED   NOT NULL,
    embedding     BLOB              NOT NULL,   -- serialized float32 array (128-dim)
    content_emb   BLOB              NULL,       -- sentence-transformer embedding for cold-start
    model_version VARCHAR(32)       NOT NULL,
    updated_at    DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (item_id),
    INDEX idx_model_version (model_version)
) ENGINE=InnoDB;

CREATE TABLE ImplicitFeedback (
    user_id       BIGINT UNSIGNED   NOT NULL,
    item_id       BIGINT UNSIGNED   NOT NULL,
    event_type    ENUM('view','click','save','share','skip') NOT NULL,
    weight        DECIMAL(5,4)      NOT NULL,   -- 0.1=view, 0.5=click, 1.0=save, -0.5=skip
    session_id    VARCHAR(64)       NULL,
    recorded_at   DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (user_id, item_id, event_type, recorded_at),
    INDEX idx_item_events (item_id, recorded_at DESC),
    INDEX idx_user_events (user_id, recorded_at DESC)
) ENGINE=InnoDB;

CREATE TABLE RecommendationLog (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id       BIGINT UNSIGNED   NOT NULL,
    item_id       BIGINT UNSIGNED   NOT NULL,
    position      TINYINT UNSIGNED  NOT NULL,
    score         FLOAT             NOT NULL,
    experiment_id VARCHAR(64)       NULL,
    variant       VARCHAR(32)       NULL,
    served_at     DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id),
    INDEX idx_user_served  (user_id, served_at DESC),
    INDEX idx_experiment   (experiment_id, variant, served_at DESC)
) ENGINE=InnoDB;

CREATE TABLE ABExperiment (
    id            VARCHAR(64)       NOT NULL,
    name          VARCHAR(255)      NOT NULL,
    variants      JSON              NOT NULL,   -- [{"id":"control","weight":0.5},{"id":"als_v2","weight":0.5}]
    start_at      DATETIME          NOT NULL,
    end_at        DATETIME          NULL,
    enabled       TINYINT(1)        NOT NULL DEFAULT 1,
    PRIMARY KEY (id)
) ENGINE=InnoDB;

Python Implementation


import numpy as np
import struct
import hashlib
import json
import hnswlib
import xgboost as xgb
import db
from sentence_transformers import SentenceTransformer

EMBEDDING_DIM = 128
RETRIEVAL_K = 500
FINAL_K = 20
ANN_INDEX: hnswlib.Index | None = None
CONTENT_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
RANKER: xgb.Booster | None = None


def get_recommendations(
    user_id: int,
    context: dict,
    experiment_id: str | None = None,
) -> list[dict]:
    """
    Return top FINAL_K recommended item IDs for a user.
    context: {"device": "mobile", "hour": 14, "country": "US"}
    """
    variant = _assign_experiment_variant(user_id, experiment_id)

    # 1. get user embedding (or cold-start)
    user_emb = _get_user_embedding(user_id)

    # 2. ANN retrieval
    candidates = _retrieve_candidates(user_emb, k=RETRIEVAL_K)

    # 3. remove already-seen items
    seen = _get_seen_item_ids(user_id, limit=1000)
    candidates = [c for c in candidates if c not in seen]

    # 4. feature-based re-ranking
    ranked = _rank_candidates(user_id, user_emb, candidates, context)

    # 5. log recommendations
    results = ranked[:FINAL_K]
    _log_recommendations(user_id, results, experiment_id, variant)

    return [{"item_id": item_id, "score": score} for item_id, score in results]


def _get_user_embedding(user_id: int) -> np.ndarray:
    row = db.fetchone(
        "SELECT embedding FROM UserEmbedding WHERE user_id = %s", (user_id,)
    )
    if row:
        return _deserialize_embedding(row["embedding"])
    # cold-start: return mean of all item embeddings as proxy
    rows = db.fetchall("SELECT embedding FROM ItemEmbedding LIMIT 1000")
    if rows:
        embs = np.array([_deserialize_embedding(r["embedding"]) for r in rows])
        return embs.mean(axis=0)
    return np.zeros(EMBEDDING_DIM, dtype=np.float32)


def _retrieve_candidates(user_emb: np.ndarray, k: int) -> list[int]:
    if ANN_INDEX is None:
        raise RuntimeError("ANN index not loaded")
    labels, _ = ANN_INDEX.knn_query(user_emb.reshape(1, -1), k=k)
    return labels[0].tolist()


def _rank_candidates(
    user_id: int,
    user_emb: np.ndarray,
    candidate_ids: list[int],
    context: dict,
) -> list[tuple[int, float]]:
    if not candidate_ids:
        return []

    placeholders = ",".join(["%s"] * len(candidate_ids))
    item_rows = db.fetchall(
        f"SELECT item_id, embedding FROM ItemEmbedding WHERE item_id IN ({placeholders})",
        tuple(candidate_ids)
    )
    emb_map = {r["item_id"]: _deserialize_embedding(r["embedding"]) for r in item_rows}

    features = []
    valid_ids = []
    for item_id in candidate_ids:
        if item_id not in emb_map:
            continue
        item_emb = emb_map[item_id]
        cos_sim = float(np.dot(user_emb, item_emb) / (
            np.linalg.norm(user_emb) * np.linalg.norm(item_emb) + 1e-9
        ))
        features.append([
            cos_sim,
            context.get("hour", 12) / 24.0,
            1.0 if context.get("device") == "mobile" else 0.0,
        ])
        valid_ids.append(item_id)

    if not features or RANKER is None:
        # fallback: sort by cosine similarity
        scores = [f[0] for f in features]
        return sorted(zip(valid_ids, scores), key=lambda x: -x[1])

    dmat = xgb.DMatrix(np.array(features, dtype=np.float32))
    scores = RANKER.predict(dmat).tolist()
    ranked = sorted(zip(valid_ids, scores), key=lambda x: -x[1])
    return ranked


def update_embeddings(model_version: str) -> None:
    """
    Offline job: recompute ALS embeddings and update DB.
    In production this runs in a Spark job; shown here for conceptual clarity.
    """
    from implicit import als

    # build user-item interaction matrix from ImplicitFeedback
    rows = db.fetchall(
        "SELECT user_id, item_id, SUM(weight) as w FROM ImplicitFeedback GROUP BY user_id, item_id"
    )
    # ... build sparse matrix, fit ALS, store embeddings ...
    # For each user:
    #   emb_bytes = _serialize_embedding(user_factors[uid])
    #   db.execute("INSERT INTO UserEmbedding (...) VALUES (...) ON DUPLICATE KEY UPDATE ...", ...)
    pass  # omitted for brevity; full impl uses scipy.sparse + implicit.als.AlternatingLeastSquares


def embed_new_item(item_id: int, title: str, description: str) -> None:
    """Cold-start: generate content embedding for a new item with no interaction history."""
    text = f"{title}. {description}"
    emb = CONTENT_MODEL.encode(text, normalize_embeddings=True).astype(np.float32)
    emb_bytes = _serialize_embedding(emb)
    db.execute(
        """INSERT INTO ItemEmbedding (item_id, embedding, content_emb, model_version)
           VALUES (%s, %s, %s, 'content_v1')
           ON DUPLICATE KEY UPDATE content_emb = VALUES(content_emb)""",
        (item_id, emb_bytes, emb_bytes)
    )


def _assign_experiment_variant(user_id: int, experiment_id: str | None) -> str | None:
    if not experiment_id:
        return None
    exp = db.fetchone(
        "SELECT variants FROM ABExperiment WHERE id = %s AND enabled = 1 AND (end_at IS NULL OR end_at > NOW())",
        (experiment_id,)
    )
    if not exp:
        return None
    variants = json.loads(exp["variants"]) if isinstance(exp["variants"], str) else exp["variants"]
    # deterministic assignment: hash(user_id + experiment_id) mod 100
    h = int(hashlib.md5(f"{user_id}:{experiment_id}".encode()).hexdigest(), 16) % 100
    cumulative = 0
    for v in variants:
        cumulative += int(v["weight"] * 100)
        if h  set[int]:
    rows = db.fetchall(
        """SELECT DISTINCT item_id FROM ImplicitFeedback
           WHERE user_id = %s ORDER BY recorded_at DESC LIMIT %s""",
        (user_id, limit)
    )
    return {r["item_id"] for r in rows}


def _log_recommendations(user_id, results, experiment_id, variant):
    for pos, (item_id, score) in enumerate(results):
        db.execute(
            "INSERT INTO RecommendationLog (user_id, item_id, position, score, experiment_id, variant) VALUES (%s,%s,%s,%s,%s,%s)",
            (user_id, item_id, pos, score, experiment_id, variant)
        )


def _serialize_embedding(emb: np.ndarray) -> bytes:
    return struct.pack(f"{len(emb)}f", *emb)


def _deserialize_embedding(data: bytes) -> np.ndarray:
    n = len(data) // 4
    return np.array(struct.unpack(f"{n}f", data), dtype=np.float32)

Cold-Start for New Users and Items

New users with no interaction history receive the mean of all item embeddings as a proxy user vector, which surfaces popular/diverse content. Once 5+ interactions are logged, the next embedding update cycle produces a personalized ALS vector. New items receive a content_emb from a sentence-transformer on their title and description; this vector is used for retrieval until ALS produces a collaborative embedding after the next training run.

A/B Experiment Assignment

Experiment variant assignment is deterministic: MD5(user_id + experiment_id) mod 100 maps each user to a bucket consistently across requests without a database lookup. Variants are defined with weight fractions summing to 1.0. The RecommendationLog captures variant per served result for downstream CTR analysis.

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top