Content Recommendation System: Low-Level Design
A content recommendation system must serve personalized item lists at low latency while continuously incorporating new user signals, handling cold-start for new users and items, and supporting controlled A/B experiments. This article designs a two-stage retrieve-then-rank pipeline using collaborative filtering with ALS, approximate nearest neighbor retrieval with HNSW, content embeddings for cold-start, and an XGBoost re-ranker, with a full SQL schema and Python implementation.
Architecture: Two-Stage Retrieve-then-Rank
Stage 1 (retrieval): given a user embedding, find the top-K candidate items using approximate nearest neighbor search (ANN). Fast but imprecise — may retrieve 500 candidates from millions of items in <10 ms.
Stage 2 (ranking): score the K candidates with a feature-rich model (XGBoost or a small neural net) that incorporates context signals (time of day, device, freshness). Slower but precise — re-ranks 500 → 20 results in <20 ms.
SQL Schema
CREATE TABLE UserEmbedding (
user_id BIGINT UNSIGNED NOT NULL,
embedding BLOB NOT NULL, -- serialized float32 array (128-dim)
model_version VARCHAR(32) NOT NULL,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (user_id),
INDEX idx_model_version (model_version)
) ENGINE=InnoDB;
CREATE TABLE ItemEmbedding (
item_id BIGINT UNSIGNED NOT NULL,
embedding BLOB NOT NULL, -- serialized float32 array (128-dim)
content_emb BLOB NULL, -- sentence-transformer embedding for cold-start
model_version VARCHAR(32) NOT NULL,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (item_id),
INDEX idx_model_version (model_version)
) ENGINE=InnoDB;
CREATE TABLE ImplicitFeedback (
user_id BIGINT UNSIGNED NOT NULL,
item_id BIGINT UNSIGNED NOT NULL,
event_type ENUM('view','click','save','share','skip') NOT NULL,
weight DECIMAL(5,4) NOT NULL, -- 0.1=view, 0.5=click, 1.0=save, -0.5=skip
session_id VARCHAR(64) NULL,
recorded_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (user_id, item_id, event_type, recorded_at),
INDEX idx_item_events (item_id, recorded_at DESC),
INDEX idx_user_events (user_id, recorded_at DESC)
) ENGINE=InnoDB;
CREATE TABLE RecommendationLog (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
user_id BIGINT UNSIGNED NOT NULL,
item_id BIGINT UNSIGNED NOT NULL,
position TINYINT UNSIGNED NOT NULL,
score FLOAT NOT NULL,
experiment_id VARCHAR(64) NULL,
variant VARCHAR(32) NULL,
served_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (id),
INDEX idx_user_served (user_id, served_at DESC),
INDEX idx_experiment (experiment_id, variant, served_at DESC)
) ENGINE=InnoDB;
CREATE TABLE ABExperiment (
id VARCHAR(64) NOT NULL,
name VARCHAR(255) NOT NULL,
variants JSON NOT NULL, -- [{"id":"control","weight":0.5},{"id":"als_v2","weight":0.5}]
start_at DATETIME NOT NULL,
end_at DATETIME NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
PRIMARY KEY (id)
) ENGINE=InnoDB;
Python Implementation
import numpy as np
import struct
import hashlib
import json
import hnswlib
import xgboost as xgb
import db
from sentence_transformers import SentenceTransformer
EMBEDDING_DIM = 128
RETRIEVAL_K = 500
FINAL_K = 20
ANN_INDEX: hnswlib.Index | None = None
CONTENT_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
RANKER: xgb.Booster | None = None
def get_recommendations(
user_id: int,
context: dict,
experiment_id: str | None = None,
) -> list[dict]:
"""
Return top FINAL_K recommended item IDs for a user.
context: {"device": "mobile", "hour": 14, "country": "US"}
"""
variant = _assign_experiment_variant(user_id, experiment_id)
# 1. get user embedding (or cold-start)
user_emb = _get_user_embedding(user_id)
# 2. ANN retrieval
candidates = _retrieve_candidates(user_emb, k=RETRIEVAL_K)
# 3. remove already-seen items
seen = _get_seen_item_ids(user_id, limit=1000)
candidates = [c for c in candidates if c not in seen]
# 4. feature-based re-ranking
ranked = _rank_candidates(user_id, user_emb, candidates, context)
# 5. log recommendations
results = ranked[:FINAL_K]
_log_recommendations(user_id, results, experiment_id, variant)
return [{"item_id": item_id, "score": score} for item_id, score in results]
def _get_user_embedding(user_id: int) -> np.ndarray:
row = db.fetchone(
"SELECT embedding FROM UserEmbedding WHERE user_id = %s", (user_id,)
)
if row:
return _deserialize_embedding(row["embedding"])
# cold-start: return mean of all item embeddings as proxy
rows = db.fetchall("SELECT embedding FROM ItemEmbedding LIMIT 1000")
if rows:
embs = np.array([_deserialize_embedding(r["embedding"]) for r in rows])
return embs.mean(axis=0)
return np.zeros(EMBEDDING_DIM, dtype=np.float32)
def _retrieve_candidates(user_emb: np.ndarray, k: int) -> list[int]:
if ANN_INDEX is None:
raise RuntimeError("ANN index not loaded")
labels, _ = ANN_INDEX.knn_query(user_emb.reshape(1, -1), k=k)
return labels[0].tolist()
def _rank_candidates(
user_id: int,
user_emb: np.ndarray,
candidate_ids: list[int],
context: dict,
) -> list[tuple[int, float]]:
if not candidate_ids:
return []
placeholders = ",".join(["%s"] * len(candidate_ids))
item_rows = db.fetchall(
f"SELECT item_id, embedding FROM ItemEmbedding WHERE item_id IN ({placeholders})",
tuple(candidate_ids)
)
emb_map = {r["item_id"]: _deserialize_embedding(r["embedding"]) for r in item_rows}
features = []
valid_ids = []
for item_id in candidate_ids:
if item_id not in emb_map:
continue
item_emb = emb_map[item_id]
cos_sim = float(np.dot(user_emb, item_emb) / (
np.linalg.norm(user_emb) * np.linalg.norm(item_emb) + 1e-9
))
features.append([
cos_sim,
context.get("hour", 12) / 24.0,
1.0 if context.get("device") == "mobile" else 0.0,
])
valid_ids.append(item_id)
if not features or RANKER is None:
# fallback: sort by cosine similarity
scores = [f[0] for f in features]
return sorted(zip(valid_ids, scores), key=lambda x: -x[1])
dmat = xgb.DMatrix(np.array(features, dtype=np.float32))
scores = RANKER.predict(dmat).tolist()
ranked = sorted(zip(valid_ids, scores), key=lambda x: -x[1])
return ranked
def update_embeddings(model_version: str) -> None:
"""
Offline job: recompute ALS embeddings and update DB.
In production this runs in a Spark job; shown here for conceptual clarity.
"""
from implicit import als
# build user-item interaction matrix from ImplicitFeedback
rows = db.fetchall(
"SELECT user_id, item_id, SUM(weight) as w FROM ImplicitFeedback GROUP BY user_id, item_id"
)
# ... build sparse matrix, fit ALS, store embeddings ...
# For each user:
# emb_bytes = _serialize_embedding(user_factors[uid])
# db.execute("INSERT INTO UserEmbedding (...) VALUES (...) ON DUPLICATE KEY UPDATE ...", ...)
pass # omitted for brevity; full impl uses scipy.sparse + implicit.als.AlternatingLeastSquares
def embed_new_item(item_id: int, title: str, description: str) -> None:
"""Cold-start: generate content embedding for a new item with no interaction history."""
text = f"{title}. {description}"
emb = CONTENT_MODEL.encode(text, normalize_embeddings=True).astype(np.float32)
emb_bytes = _serialize_embedding(emb)
db.execute(
"""INSERT INTO ItemEmbedding (item_id, embedding, content_emb, model_version)
VALUES (%s, %s, %s, 'content_v1')
ON DUPLICATE KEY UPDATE content_emb = VALUES(content_emb)""",
(item_id, emb_bytes, emb_bytes)
)
def _assign_experiment_variant(user_id: int, experiment_id: str | None) -> str | None:
if not experiment_id:
return None
exp = db.fetchone(
"SELECT variants FROM ABExperiment WHERE id = %s AND enabled = 1 AND (end_at IS NULL OR end_at > NOW())",
(experiment_id,)
)
if not exp:
return None
variants = json.loads(exp["variants"]) if isinstance(exp["variants"], str) else exp["variants"]
# deterministic assignment: hash(user_id + experiment_id) mod 100
h = int(hashlib.md5(f"{user_id}:{experiment_id}".encode()).hexdigest(), 16) % 100
cumulative = 0
for v in variants:
cumulative += int(v["weight"] * 100)
if h set[int]:
rows = db.fetchall(
"""SELECT DISTINCT item_id FROM ImplicitFeedback
WHERE user_id = %s ORDER BY recorded_at DESC LIMIT %s""",
(user_id, limit)
)
return {r["item_id"] for r in rows}
def _log_recommendations(user_id, results, experiment_id, variant):
for pos, (item_id, score) in enumerate(results):
db.execute(
"INSERT INTO RecommendationLog (user_id, item_id, position, score, experiment_id, variant) VALUES (%s,%s,%s,%s,%s,%s)",
(user_id, item_id, pos, score, experiment_id, variant)
)
def _serialize_embedding(emb: np.ndarray) -> bytes:
return struct.pack(f"{len(emb)}f", *emb)
def _deserialize_embedding(data: bytes) -> np.ndarray:
n = len(data) // 4
return np.array(struct.unpack(f"{n}f", data), dtype=np.float32)
Cold-Start for New Users and Items
New users with no interaction history receive the mean of all item embeddings as a proxy user vector, which surfaces popular/diverse content. Once 5+ interactions are logged, the next embedding update cycle produces a personalized ALS vector. New items receive a content_emb from a sentence-transformer on their title and description; this vector is used for retrieval until ALS produces a collaborative embedding after the next training run.
A/B Experiment Assignment
Experiment variant assignment is deterministic: MD5(user_id + experiment_id) mod 100 maps each user to a bucket consistently across requests without a database lookup. Variants are defined with weight fractions summing to 1.0. The RecommendationLog captures variant per served result for downstream CTR analysis.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the two-stage retrieve-then-rank pattern in recommendation systems?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In stage 1 (retrieval), an approximate nearest neighbor (ANN) index finds the top-K candidate items (e.g., 500) from millions using the user embedding, in under 10ms. In stage 2 (ranking), a more expensive model (e.g., XGBoost) scores each candidate using richer features like cosine similarity, time of day, and device type, reducing 500 candidates to 20 final results. This two-stage approach balances recall (retrieve broadly) with precision (rank carefully).”
}
},
{
“@type”: “Question”,
“name”: “How does collaborative filtering with ALS work for recommendations?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “ALS (Alternating Least Squares) factorizes the user-item implicit feedback matrix into user and item embedding matrices. It alternates between fixing item embeddings and solving for user embeddings (a least squares problem), and vice versa, until convergence. The resulting embeddings capture latent affinity: users who interacted with similar items end up with similar embedding vectors, enabling nearest-neighbor retrieval.”
}
},
{
“@type”: “Question”,
“name”: “How do content embeddings solve the cold-start problem?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “New items have no interaction history for ALS to learn from. A sentence-transformer model encodes the item’s title and description into a dense vector in the same embedding space used for retrieval. This content embedding is used immediately after item creation, enabling the item to appear in recommendations before any user has interacted with it. As interactions accumulate, the next ALS training run produces a collaborative embedding that replaces the content embedding.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement deterministic A/B experiment assignment without a database lookup?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Hash the combination of user_id and experiment_id using MD5 (or MurmurHash for speed), take the result modulo 100, and compare against cumulative variant weight buckets. Because the hash is deterministic, the same user always maps to the same variant for the same experiment without storing an assignment row. The RecommendationLog captures the variant per impression for downstream analysis.”
}
}
]
}
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How does collaborative filtering handle cold-start for new users?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”New users without interaction history fall back to content-based recommendations using item embeddings (sentence-transformers on title/description) matched to their explicit profile preferences.”}},{“@type”:”Question”,”name”:”How is the two-stage retrieve-then-rank pipeline structured?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Stage 1 uses ANN (HNSW index) to retrieve the top-K candidate items by embedding similarity; Stage 2 re-ranks using an XGBoost model with additional context features (recency, diversity, CTR).”}},{“@type”:”Question”,”name”:”How are embeddings kept up to date?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A nightly batch job recomputes ALS embeddings on the full interaction matrix; item embeddings from content models are updated on item publish or significant metadata change.”}},{“@type”:”Question”,”name”:”How is A/B testing integrated into recommendations?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Users are assigned to experiment buckets at session start; the recommendation endpoint reads the bucket assignment and applies the corresponding ranking model or retrieval strategy.”}}]}
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering