Content Recommendation System: Low-Level Design
A content recommendation system must serve personalized item lists at low latency while continuously incorporating new user signals, handling cold-start for new users and items, and supporting controlled A/B experiments. This article designs a two-stage retrieve-then-rank pipeline using collaborative filtering with ALS, approximate nearest neighbor retrieval with HNSW, content embeddings for cold-start, and an XGBoost re-ranker, with a full SQL schema and Python implementation.
Architecture: Two-Stage Retrieve-then-Rank
Stage 1 (retrieval): given a user embedding, find the top-K candidate items using approximate nearest neighbor search (ANN). Fast but imprecise — may retrieve 500 candidates from millions of items in <10 ms.
Stage 2 (ranking): score the K candidates with a feature-rich model (XGBoost or a small neural net) that incorporates context signals (time of day, device, freshness). Slower but precise — re-ranks 500 → 20 results in <20 ms.
SQL Schema
CREATE TABLE UserEmbedding (
user_id BIGINT UNSIGNED NOT NULL,
embedding BLOB NOT NULL, -- serialized float32 array (128-dim)
model_version VARCHAR(32) NOT NULL,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (user_id),
INDEX idx_model_version (model_version)
) ENGINE=InnoDB;
CREATE TABLE ItemEmbedding (
item_id BIGINT UNSIGNED NOT NULL,
embedding BLOB NOT NULL, -- serialized float32 array (128-dim)
content_emb BLOB NULL, -- sentence-transformer embedding for cold-start
model_version VARCHAR(32) NOT NULL,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (item_id),
INDEX idx_model_version (model_version)
) ENGINE=InnoDB;
CREATE TABLE ImplicitFeedback (
user_id BIGINT UNSIGNED NOT NULL,
item_id BIGINT UNSIGNED NOT NULL,
event_type ENUM('view','click','save','share','skip') NOT NULL,
weight DECIMAL(5,4) NOT NULL, -- 0.1=view, 0.5=click, 1.0=save, -0.5=skip
session_id VARCHAR(64) NULL,
recorded_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (user_id, item_id, event_type, recorded_at),
INDEX idx_item_events (item_id, recorded_at DESC),
INDEX idx_user_events (user_id, recorded_at DESC)
) ENGINE=InnoDB;
CREATE TABLE RecommendationLog (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
user_id BIGINT UNSIGNED NOT NULL,
item_id BIGINT UNSIGNED NOT NULL,
position TINYINT UNSIGNED NOT NULL,
score FLOAT NOT NULL,
experiment_id VARCHAR(64) NULL,
variant VARCHAR(32) NULL,
served_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (id),
INDEX idx_user_served (user_id, served_at DESC),
INDEX idx_experiment (experiment_id, variant, served_at DESC)
) ENGINE=InnoDB;
CREATE TABLE ABExperiment (
id VARCHAR(64) NOT NULL,
name VARCHAR(255) NOT NULL,
variants JSON NOT NULL, -- [{"id":"control","weight":0.5},{"id":"als_v2","weight":0.5}]
start_at DATETIME NOT NULL,
end_at DATETIME NULL,
enabled TINYINT(1) NOT NULL DEFAULT 1,
PRIMARY KEY (id)
) ENGINE=InnoDB;
Python Implementation
import numpy as np
import struct
import hashlib
import json
import hnswlib
import xgboost as xgb
import db
from sentence_transformers import SentenceTransformer
EMBEDDING_DIM = 128
RETRIEVAL_K = 500
FINAL_K = 20
ANN_INDEX: hnswlib.Index | None = None
CONTENT_MODEL = SentenceTransformer("all-MiniLM-L6-v2")
RANKER: xgb.Booster | None = None
def get_recommendations(
user_id: int,
context: dict,
experiment_id: str | None = None,
) -> list[dict]:
"""
Return top FINAL_K recommended item IDs for a user.
context: {"device": "mobile", "hour": 14, "country": "US"}
"""
variant = _assign_experiment_variant(user_id, experiment_id)
# 1. get user embedding (or cold-start)
user_emb = _get_user_embedding(user_id)
# 2. ANN retrieval
candidates = _retrieve_candidates(user_emb, k=RETRIEVAL_K)
# 3. remove already-seen items
seen = _get_seen_item_ids(user_id, limit=1000)
candidates = [c for c in candidates if c not in seen]
# 4. feature-based re-ranking
ranked = _rank_candidates(user_id, user_emb, candidates, context)
# 5. log recommendations
results = ranked[:FINAL_K]
_log_recommendations(user_id, results, experiment_id, variant)
return [{"item_id": item_id, "score": score} for item_id, score in results]
def _get_user_embedding(user_id: int) -> np.ndarray:
row = db.fetchone(
"SELECT embedding FROM UserEmbedding WHERE user_id = %s", (user_id,)
)
if row:
return _deserialize_embedding(row["embedding"])
# cold-start: return mean of all item embeddings as proxy
rows = db.fetchall("SELECT embedding FROM ItemEmbedding LIMIT 1000")
if rows:
embs = np.array([_deserialize_embedding(r["embedding"]) for r in rows])
return embs.mean(axis=0)
return np.zeros(EMBEDDING_DIM, dtype=np.float32)
def _retrieve_candidates(user_emb: np.ndarray, k: int) -> list[int]:
if ANN_INDEX is None:
raise RuntimeError("ANN index not loaded")
labels, _ = ANN_INDEX.knn_query(user_emb.reshape(1, -1), k=k)
return labels[0].tolist()
def _rank_candidates(
user_id: int,
user_emb: np.ndarray,
candidate_ids: list[int],
context: dict,
) -> list[tuple[int, float]]:
if not candidate_ids:
return []
placeholders = ",".join(["%s"] * len(candidate_ids))
item_rows = db.fetchall(
f"SELECT item_id, embedding FROM ItemEmbedding WHERE item_id IN ({placeholders})",
tuple(candidate_ids)
)
emb_map = {r["item_id"]: _deserialize_embedding(r["embedding"]) for r in item_rows}
features = []
valid_ids = []
for item_id in candidate_ids:
if item_id not in emb_map:
continue
item_emb = emb_map[item_id]
cos_sim = float(np.dot(user_emb, item_emb) / (
np.linalg.norm(user_emb) * np.linalg.norm(item_emb) + 1e-9
))
features.append([
cos_sim,
context.get("hour", 12) / 24.0,
1.0 if context.get("device") == "mobile" else 0.0,
])
valid_ids.append(item_id)
if not features or RANKER is None:
# fallback: sort by cosine similarity
scores = [f[0] for f in features]
return sorted(zip(valid_ids, scores), key=lambda x: -x[1])
dmat = xgb.DMatrix(np.array(features, dtype=np.float32))
scores = RANKER.predict(dmat).tolist()
ranked = sorted(zip(valid_ids, scores), key=lambda x: -x[1])
return ranked
def update_embeddings(model_version: str) -> None:
"""
Offline job: recompute ALS embeddings and update DB.
In production this runs in a Spark job; shown here for conceptual clarity.
"""
from implicit import als
# build user-item interaction matrix from ImplicitFeedback
rows = db.fetchall(
"SELECT user_id, item_id, SUM(weight) as w FROM ImplicitFeedback GROUP BY user_id, item_id"
)
# ... build sparse matrix, fit ALS, store embeddings ...
# For each user:
# emb_bytes = _serialize_embedding(user_factors[uid])
# db.execute("INSERT INTO UserEmbedding (...) VALUES (...) ON DUPLICATE KEY UPDATE ...", ...)
pass # omitted for brevity; full impl uses scipy.sparse + implicit.als.AlternatingLeastSquares
def embed_new_item(item_id: int, title: str, description: str) -> None:
"""Cold-start: generate content embedding for a new item with no interaction history."""
text = f"{title}. {description}"
emb = CONTENT_MODEL.encode(text, normalize_embeddings=True).astype(np.float32)
emb_bytes = _serialize_embedding(emb)
db.execute(
"""INSERT INTO ItemEmbedding (item_id, embedding, content_emb, model_version)
VALUES (%s, %s, %s, 'content_v1')
ON DUPLICATE KEY UPDATE content_emb = VALUES(content_emb)""",
(item_id, emb_bytes, emb_bytes)
)
def _assign_experiment_variant(user_id: int, experiment_id: str | None) -> str | None:
if not experiment_id:
return None
exp = db.fetchone(
"SELECT variants FROM ABExperiment WHERE id = %s AND enabled = 1 AND (end_at IS NULL OR end_at > NOW())",
(experiment_id,)
)
if not exp:
return None
variants = json.loads(exp["variants"]) if isinstance(exp["variants"], str) else exp["variants"]
# deterministic assignment: hash(user_id + experiment_id) mod 100
h = int(hashlib.md5(f"{user_id}:{experiment_id}".encode()).hexdigest(), 16) % 100
cumulative = 0
for v in variants:
cumulative += int(v["weight"] * 100)
if h set[int]:
rows = db.fetchall(
"""SELECT DISTINCT item_id FROM ImplicitFeedback
WHERE user_id = %s ORDER BY recorded_at DESC LIMIT %s""",
(user_id, limit)
)
return {r["item_id"] for r in rows}
def _log_recommendations(user_id, results, experiment_id, variant):
for pos, (item_id, score) in enumerate(results):
db.execute(
"INSERT INTO RecommendationLog (user_id, item_id, position, score, experiment_id, variant) VALUES (%s,%s,%s,%s,%s,%s)",
(user_id, item_id, pos, score, experiment_id, variant)
)
def _serialize_embedding(emb: np.ndarray) -> bytes:
return struct.pack(f"{len(emb)}f", *emb)
def _deserialize_embedding(data: bytes) -> np.ndarray:
n = len(data) // 4
return np.array(struct.unpack(f"{n}f", data), dtype=np.float32)
Cold-Start for New Users and Items
New users with no interaction history receive the mean of all item embeddings as a proxy user vector, which surfaces popular/diverse content. Once 5+ interactions are logged, the next embedding update cycle produces a personalized ALS vector. New items receive a content_emb from a sentence-transformer on their title and description; this vector is used for retrieval until ALS produces a collaborative embedding after the next training run.
A/B Experiment Assignment
Experiment variant assignment is deterministic: MD5(user_id + experiment_id) mod 100 maps each user to a bucket consistently across requests without a database lookup. Variants are defined with weight fractions summing to 1.0. The RecommendationLog captures variant per served result for downstream CTR analysis.
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering