Search Index System Low-Level Design: Inverted Index, BM25 Ranking, and Incremental Indexing Pipeline

Search Index System: Low-Level Design

A search index system converts raw documents into a structure that supports fast full-text queries. Where a relational database answers “give me rows where column equals value,” a search index answers “give me documents that contain these words, ranked by relevance.” This design covers the inverted index data structure, the indexing pipeline from raw document to searchable token, query execution with BM25 ranking, and the incremental update pattern for near-real-time search.

Core Data Model

-- Document store: the source of truth for searchable content
CREATE TABLE SearchDocument (
    doc_id         BIGSERIAL PRIMARY KEY,
    entity_type    VARCHAR(50) NOT NULL,    -- 'job_post', 'user_profile', 'article'
    entity_id      BIGINT NOT NULL,
    title          TEXT,
    body           TEXT,
    tags           TEXT[],
    author_id      BIGINT,
    published_at   TIMESTAMPTZ,
    boost          NUMERIC(4,2) NOT NULL DEFAULT 1.0,  -- manual relevance boost
    indexed_at     TIMESTAMPTZ,
    updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (entity_type, entity_id)
);

-- Inverted index: token → list of (doc_id, positions, field_weight)
CREATE TABLE InvertedIndex (
    token          VARCHAR(200) NOT NULL,
    doc_id         BIGINT NOT NULL REFERENCES SearchDocument(doc_id) ON DELETE CASCADE,
    field          VARCHAR(20) NOT NULL,   -- 'title', 'body', 'tags'
    frequency      INT NOT NULL,           -- term frequency in this doc+field
    positions      INT[] NOT NULL,         -- character offsets for highlighting
    field_weight   NUMERIC(3,2) NOT NULL,  -- title=2.0, tags=1.5, body=1.0
    PRIMARY KEY (token, doc_id, field)
);

-- Index stats for BM25 (updated incrementally)
CREATE TABLE IndexStats (
    stat_key       VARCHAR(100) PRIMARY KEY,
    stat_value     NUMERIC(18,4) NOT NULL,
    updated_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- stat_key examples: 'total_docs', 'avg_body_length', 'df:python', 'df:engineer'

CREATE INDEX ON InvertedIndex(token, field_weight DESC);
CREATE INDEX ON SearchDocument(entity_type, published_at DESC);
CREATE INDEX ON SearchDocument(updated_at) WHERE indexed_at IS NULL OR indexed_at < updated_at;

Indexing Pipeline

import re, math
from collections import Counter
from typing import List, Tuple

STOP_WORDS = {
    'a','an','the','is','are','was','were','be','been','being',
    'have','has','had','do','does','did','will','would','could',
    'should','may','might','must','to','of','in','on','at','for',
    'with','by','from','as','into','through','and','or','but','not'
}

FIELD_WEIGHTS = {'title': 2.0, 'tags': 1.5, 'body': 1.0}

def tokenize(text: str) -> List[Tuple[str, int]]:
    """
    Returns [(token, position), ...].
    Lowercases, removes punctuation, strips stop words, applies simple stemming.
    """
    if not text:
        return []
    text = text.lower()
    text = re.sub(r'[^a-z0-9s]', ' ', text)
    tokens = []
    for pos, word in enumerate(text.split()):
        if word and word not in STOP_WORDS and len(word) > 1:
            token = _stem(word)
            tokens.append((token, pos))
    return tokens

def _stem(word: str) -> str:
    """Minimal suffix stripping (Porter-lite). Replace with NLTK/Snowball in production."""
    for suffix in ('ing', 'tion', 'tions', 'ness', 'ment', 'er', 'est', 'ly', 'ed', 's'):
        if word.endswith(suffix) and len(word) - len(suffix) >= 3:
            return word[:-len(suffix)]
    return word

def index_document(doc_id: int, title: str, body: str, tags: list):
    """
    Build inverted index entries for one document.
    Runs in the indexing worker; idempotent (DELETE + INSERT).
    """
    fields = {
        'title': ' '.join([title or '']),
        'body':  body or '',
        'tags':  ' '.join(tags or []),
    }

    # Remove stale index entries for this doc
    db.execute("DELETE FROM InvertedIndex WHERE doc_id=%s", (doc_id,))

    rows = []
    for field_name, text in fields.items():
        token_positions = tokenize(text)
        # Group positions by token
        token_pos_map: dict[str, list] = {}
        for token, pos in token_positions:
            token_pos_map.setdefault(token, []).append(pos)

        for token, positions in token_pos_map.items():
            rows.append((
                token, doc_id, field_name,
                len(positions),         # term frequency
                positions,              # positions array
                FIELD_WEIGHTS[field_name]
            ))

    if rows:
        db.executemany("""
            INSERT INTO InvertedIndex (token, doc_id, field, frequency, positions, field_weight)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, rows)

    db.execute("""
        UPDATE SearchDocument SET indexed_at=NOW() WHERE doc_id=%s
    """, (doc_id,))
    _update_document_frequency(rows)

def _update_document_frequency(rows: list):
    """Increment df:token for each unique token in the new doc."""
    tokens = {row[0] for row in rows}
    for token in tokens:
        db.execute("""
            INSERT INTO IndexStats (stat_key, stat_value) VALUES (%s, 1)
            ON CONFLICT (stat_key) DO UPDATE
                SET stat_value = IndexStats.stat_value + 1, updated_at=NOW()
        """, (f'df:{token}',))

def indexing_worker():
    """Poll for documents that need indexing and process them."""
    while True:
        docs = db.fetchall("""
            SELECT doc_id, title, body, tags FROM SearchDocument
            WHERE indexed_at IS NULL OR indexed_at < updated_at
            ORDER BY updated_at ASC
            LIMIT 50
            FOR UPDATE SKIP LOCKED
        """)
        if not docs:
            time.sleep(1)
            continue
        for doc in docs:
            index_document(doc['doc_id'], doc['title'], doc['body'], doc['tags'] or [])

BM25 Query Execution

def search(query: str, entity_type: str = None,
           limit: int = 20, offset: int = 0) -> list:
    """
    Full-text search using BM25 ranking.
    BM25 score = sum over query terms of:
        IDF(t) * (TF(t,d) * (k1+1)) / (TF(t,d) + k1*(1-b+b*|d|/avgdl))
    k1=1.5, b=0.75 (standard defaults)
    """
    tokens = [t for t, _ in tokenize(query)]
    if not tokens:
        return []

    stats = _load_stats()
    N = stats.get('total_docs', 1)
    avgdl = stats.get('avg_body_length', 100)
    k1, b = 1.5, 0.75

    # Fetch postings for all query tokens in one query
    placeholders = ','.join(['%s'] * len(tokens))
    postings = db.fetchall(f"""
        SELECT i.token, i.doc_id, i.frequency, i.field_weight,
               s.title, s.entity_type, s.entity_id, s.boost,
               s.published_at,
               (SELECT COALESCE(SUM(frequency),0) FROM InvertedIndex
                WHERE doc_id=i.doc_id AND field='body') AS body_length
        FROM InvertedIndex i
        JOIN SearchDocument s USING (doc_id)
        WHERE i.token IN ({placeholders})
          {'AND s.entity_type=%s' if entity_type else ''}
    """, tokens + ([entity_type] if entity_type else []))

    # Compute BM25 scores per doc
    scores: dict[int, float] = {}
    doc_meta: dict[int, dict] = {}

    for p in postings:
        doc_id = p['doc_id']
        token = p['token']

        df = stats.get(f'df:{token}', 1)
        idf = math.log((N - df + 0.5) / (df + 0.5) + 1)

        tf = p['frequency'] * p['field_weight']  # field-weighted TF
        dl = max(p['body_length'], 1)
        bm25 = idf * (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * dl / avgdl))

        scores[doc_id] = scores.get(doc_id, 0) + bm25 * p['boost']
        doc_meta[doc_id] = {
            'doc_id': doc_id, 'title': p['title'],
            'entity_type': p['entity_type'], 'entity_id': p['entity_id'],
            'published_at': p['published_at'],
        }

    ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    results = []
    for doc_id, score in ranked[offset:offset+limit]:
        result = doc_meta[doc_id]
        result['score'] = round(score, 4)
        results.append(result)

    return results

def _load_stats() -> dict:
    rows = db.fetchall("SELECT stat_key, stat_value FROM IndexStats")
    return {r['stat_key']: float(r['stat_value']) for r in rows}

Key Design Decisions

  • Field-weighted TF: multiplying term frequency by field_weight (title=2.0, body=1.0) before applying BM25 boosts title matches over body matches without requiring a separate index per field. A query matching the title is 2× more relevant than the same match in the body.
  • Incremental indexing via updated_at: the index worker picks documents where indexed_at IS NULL OR indexed_at < updated_at — documents changed since last indexing. This is efficient for high-update scenarios: only changed documents are re-indexed. The SKIP LOCKED clause allows multiple parallel indexing workers without conflicts.
  • Document frequency in IndexStats: storing df:token in a stats table avoids a COUNT(DISTINCT doc_id) query per token during search — which would be prohibitively expensive at query time. The trade-off: df values are slightly stale (eventually consistent with a few seconds delay after indexing). For BM25, this is acceptable.
  • Position arrays for highlighting: storing character positions enables snippet generation — extract the sentence containing the match positions and bold the matched tokens. Without positions, highlighting requires re-scanning the document text at query time.

Search index and full-text search system design is discussed in LinkedIn system design interview questions.

Search index and relevance ranking system design is covered in Google system design interview preparation.

Search index and enterprise content search design is discussed in Atlassian system design interview guide.

Scroll to Top