Document Store Low-Level Design: Schema Flexibility, Indexing, and Query Engine

Document Store System: Overview

A document store persists and retrieves arbitrary JSON documents. Unlike relational databases, it does not enforce a fixed schema per collection. Low-level design covers document identity, storage engine choices, index types and maintenance, query operators, aggregation pipeline, and write durability.

Document Model

A document is an arbitrary nested JSON object. A collection groups documents of related type. Schema is optional: collections can enforce a JSON Schema for validation, but many deployments use schema-free mode for flexibility.

Document identity: each document has an _id field. If not provided by the client, the store generates a globally unique ObjectId composed of:

  • 4-byte Unix timestamp (seconds)
  • 5-byte random value (machine + process identifier)
  • 3-byte incrementing counter

This makes ObjectIds sortable by creation time and unique across a distributed cluster without coordination.

Storage Engine

The primary storage engine uses a B-tree indexed on _id:

  • Documents are stored as BSON (Binary JSON) or JSONB on disk for compact representation and fast field access.
  • Writes are appended to a Write-Ahead Log (WAL) before the B-tree page is modified, ensuring crash recovery.
  • The B-tree provides O(log N) lookup by _id and O(log N + K) range scans.

LSM-tree (Log-Structured Merge-tree) is an alternative for write-heavy workloads: memtable absorbs writes in memory, flushes to SSTables on disk, and background compaction merges SSTables. Tradeoff: higher write throughput at the cost of read amplification during compaction.

Secondary Index Types

  • Single-field index: index on one field (e.g., {"user_id": 1}). Supports equality and range queries on that field.
  • Compound index: index on multiple fields in order (e.g., {"status": 1, "created_at": -1}). Supports queries that filter on status and sort by created_at.
  • Multikey (array) index: when the indexed field contains an array, the index creates one entry per array element. Allows queries like {"tags": {$in: ["python", "java"]}} to use an index.
  • Text index: inverted index on tokenized string fields for full-text $text queries.
  • Sparse index: only indexes documents where the indexed field exists. Reduces index size for optional fields.

Secondary Index Maintenance

On every write (insert, update, delete), all secondary indexes must be updated. Two approaches:

  • Synchronous (foreground): index updates happen in the same transaction as the document write. Ensures consistency but adds write latency proportional to the number of indexes.
  • Asynchronous (background): document write commits first, index update queued. Allows reads on stale indexes for a brief window. Used for non-unique indexes where eventual consistency is acceptable.

For unique indexes (e.g., email field), synchronous maintenance is required to enforce the uniqueness constraint.

Query Operators

The query engine supports a filter DSL:

  • $eq: equality match — {"status": {$eq: "active"}} or shorthand {"status": "active"}
  • $gt, $gte, $lt, $lte: range comparison
  • $in: field value in array — {"role": {$in: ["admin", "editor"]}}
  • $elemMatch: match array elements satisfying multiple conditions — {"scores": {$elemMatch: {$gt: 80, $lt: 100}}}
  • $text: full-text search on text-indexed fields
  • $exists: check field presence
  • $and, $or, $not: logical composition

Query Planner and Index Selection

The query planner evaluates candidate indexes for a query and estimates cost:

  1. Identify indexes that cover at least one filter field.
  2. Estimate key scans: how many index entries will be scanned to satisfy the query.
  3. Prefer compound indexes that cover multiple filter fields over multiple single-field indexes.
  4. For sorted queries, prefer indexes whose sort order matches the query's sort, avoiding an in-memory sort.
  5. If no suitable index exists, fall back to a full collection scan.

Aggregation Pipeline

The aggregation pipeline processes documents through a sequence of stages:

  • $match: filter documents (uses indexes when first stage)
  • $group: group by key, compute accumulators ($sum, $avg, $max, $min, $count)
  • $sort: sort documents by field
  • $project: reshape documents — include, exclude, or compute fields
  • $limit / $skip: pagination
  • $lookup: left outer join with another collection (JOIN equivalent)
  • $unwind: deconstruct array field into individual documents (one per element)

Write Concern and Durability

Write concern controls durability acknowledgment:

  • w:0 (fire-and-forget): no acknowledgment. Fastest, no durability guarantee.
  • w:1 (default): acknowledge after primary node writes to memory (WAL not yet flushed).
  • w:majority: acknowledge after a majority of replica set members have written. Survives primary failure without data loss.
  • j:true: journal acknowledged — write is persisted to WAL before acknowledgment. Combines with w:majority for strongest guarantee.

SQL Schema (Internal Metadata)

-- Collection registry
CREATE TABLE Collection (
    name        VARCHAR(128) PRIMARY KEY,
    schema_json JSONB,           -- optional JSON Schema for validation
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Document storage (JSONB body)
CREATE TABLE Document (
    id            BIGSERIAL PRIMARY KEY,
    collection_id VARCHAR(128) NOT NULL REFERENCES Collection(name),
    _id           TEXT NOT NULL,             -- user-facing ObjectId or custom key
    body          JSONB NOT NULL,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (collection_id, _id)
);
CREATE INDEX idx_document_collection ON Document(collection_id, created_at DESC);
CREATE INDEX idx_document_body ON Document USING GIN(body);  -- JSONB index for field queries

-- Index metadata
CREATE TABLE Index (
    id            BIGSERIAL PRIMARY KEY,
    collection_id VARCHAR(128) NOT NULL REFERENCES Collection(name),
    fields        JSONB NOT NULL,   -- [{"field": "user_id", "order": 1}, ...]
    index_type    VARCHAR(32) NOT NULL DEFAULT 'btree',  -- btree, text, multikey
    unique        BOOLEAN NOT NULL DEFAULT FALSE,
    sparse        BOOLEAN NOT NULL DEFAULT FALSE,
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Python Implementation

import json
import uuid
import time
from typing import Any, Dict, List, Optional

def generate_object_id() -> str:
    """Generate a sortable unique document ID."""
    timestamp = int(time.time()).to_bytes(4, 'big').hex()
    random_bytes = uuid.uuid4().bytes[:5].hex()
    counter = int(time.time() * 1000 % 16777216).to_bytes(3, 'big').hex()
    return timestamp + random_bytes + counter

def insert_document(collection: str, doc: dict) -> str:
    """Insert a document into the collection. Returns _id."""
    if '_id' not in doc:
        doc['_id'] = generate_object_id()

    # Optional: validate against collection JSON Schema
    schema = get_collection_schema(collection)
    if schema:
        validate_against_schema(doc, schema)  # raises on violation

    db.execute(
        "INSERT INTO Document(collection_id, _id, body, created_at, updated_at)"
        " VALUES(%s, %s, %s, NOW(), NOW())",
        (collection, doc['_id'], json.dumps(doc))
    )

    # Update secondary indexes asynchronously via queue
    index_update_queue.enqueue(collection, doc['_id'], doc, operation='insert')
    return doc['_id']

def find(collection: str, filter_doc: dict,
         projection: Optional[dict] = None,
         sort: Optional[list] = None,
         limit: int = 100) -> List[dict]:
    """Query documents using filter operators. Uses JSONB containment for simple filters."""
    # Build WHERE clause from filter (simplified — production uses full query planner)
    where_clauses = ["collection_id = %s"]
    params: list = [collection]

    for field, condition in filter_doc.items():
        if isinstance(condition, dict):
            for op, val in condition.items():
                if op == '$eq':
                    where_clauses.append(f"body->>'{field}' = %s")
                    params.append(str(val))
                elif op == '$gt':
                    where_clauses.append(f"(body->>'{field}')::numeric > %s")
                    params.append(val)
                elif op == '$lt':
                    where_clauses.append(f"(body->>'{field}')::numeric >'{field}' IN ({placeholders})")
                    params.extend([str(v) for v in val])
        else:
            # Simple equality
            where_clauses.append(f"body @> %s")
            params.append(json.dumps({field: condition}))

    sql = f"SELECT body FROM Document WHERE {' AND '.join(where_clauses)}"

    if sort:
        order_parts = []
        for field, direction in sort:
            dir_str = 'DESC' if direction == -1 else 'ASC'
            order_parts.append(f"(body->>'{field}') {dir_str}")
        sql += f" ORDER BY {', '.join(order_parts)}"

    sql += f" LIMIT %s"
    params.append(limit)

    rows = db.execute(sql, params).fetchall()
    docs = [json.loads(row[0]) for row in rows]

    # Apply projection
    if projection:
        include = [k for k, v in projection.items() if v == 1]
        exclude = [k for k, v in projection.items() if v == 0]
        if include:
            docs = [{k: d[k] for k in include if k in d} for d in docs]
        elif exclude:
            docs = [{k: v for k, v in d.items() if k not in exclude} for d in docs]

    return docs

def aggregate(collection: str, pipeline: List[dict]) -> List[dict]:
    """Execute aggregation pipeline. Simplified implementation."""
    # Load initial documents
    docs = find(collection, {}, limit=100000)

    for stage in pipeline:
        if '$match' in stage:
            filt = stage['$match']
            docs = [d for d in docs if matches_filter(d, filt)]
        elif '$sort' in stage:
            for field, direction in reversed(list(stage['$sort'].items())):
                docs.sort(key=lambda d: d.get(field, 0), reverse=(direction == -1))
        elif '$limit' in stage:
            docs = docs[:stage['$limit']]
        elif '$project' in stage:
            proj = stage['$project']
            include = [k for k, v in proj.items() if v == 1]
            docs = [{k: d.get(k) for k in include} for d in docs]
        elif '$group' in stage:
            docs = group_documents(docs, stage['$group'])

    return docs

def matches_filter(doc: dict, filt: dict) -> bool:
    """Evaluate filter against a document. Simplified operator support."""
    for field, condition in filt.items():
        val = doc.get(field)
        if isinstance(condition, dict):
            for op, threshold in condition.items():
                if op == '$gt' and not (val is not None and val > threshold):
                    return False
                elif op == '$lt' and not (val is not None and val  list:
    """Group documents by _id expression and compute accumulators."""
    from collections import defaultdict
    groups: dict = defaultdict(list)
    id_field = group_spec['_id']

    for doc in docs:
        key = doc.get(id_field.lstrip('$')) if id_field else None
        groups[key].append(doc)

    result = []
    for key, group_docs in groups.items():
        row: dict = {'_id': key}
        for acc_name, acc_expr in group_spec.items():
            if acc_name == '_id':
                continue
            if '$sum' in acc_expr:
                field = acc_expr['$sum'].lstrip('$')
                row[acc_name] = sum(d.get(field, 0) for d in group_docs)
            elif '$avg' in acc_expr:
                field = acc_expr['$avg'].lstrip('$')
                vals = [d.get(field, 0) for d in group_docs]
                row[acc_name] = sum(vals) / len(vals) if vals else 0
            elif '$count' in acc_expr:
                row[acc_name] = len(group_docs)
        result.append(row)
    return result

def get_collection_schema(collection: str) -> Optional[dict]:
    row = db.execute(
        "SELECT schema_json FROM Collection WHERE name=%s", (collection,)
    ).fetchone()
    return json.loads(row[0]) if row and row[0] else None

def validate_against_schema(doc: dict, schema: dict) -> None:
    import jsonschema
    jsonschema.validate(instance=doc, schema=schema)

Key Design Decisions Summary

  • ObjectId generation encodes timestamp, enabling chronological sorting without a separate created_at query.
  • JSONB with GIN index in PostgreSQL provides flexible field-level querying without a dedicated document engine.
  • Compound indexes must match query field order — a compound index on (status, created_at) does not efficiently serve queries filtering only on created_at.
  • Synchronous index maintenance on unique indexes is required for correctness; async is acceptable for non-unique indexes.
  • Write concern majority + journal (j:true) provides the strongest durability guarantee at the cost of higher write latency.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What are the tradeoffs of schema flexibility in a document store?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Schema flexibility allows documents in the same collection to have different fields, making it easy to evolve data models without migrations. The tradeoff is that application code must handle missing or differently-typed fields defensively. Querying across inconsistently structured documents is harder to optimize since indexes assume consistent field presence. Optional JSON Schema validation at the collection level provides a middle ground: enforce only the critical fields while allowing extensions.”
}
},
{
“@type”: “Question”,
“name”: “How do multikey (array) indexes work in a document store?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A multikey index on an array field creates one index entry per array element. For a document with tags: [python, java, go], the index contains three entries, one for each tag, all pointing to the same document. This allows equality and range queries on array elements to use the index. The tradeoff is index size growth — each array element adds an entry, so arrays with many elements significantly increase index storage.”
}
},
{
“@type”: “Question”,
“name”: “What are the stages of an aggregation pipeline?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An aggregation pipeline processes documents through ordered stages: $match filters documents (and uses indexes when first), $group aggregates by key with accumulators like $sum and $avg, $sort orders documents, $project reshapes documents by including or computing fields, $limit and $skip paginate results, $lookup performs a left outer join with another collection, and $unwind deconstructs array fields into individual documents. Stages that reduce document count ($match, $limit) should appear as early as possible to minimize processing in later stages.”
}
},
{
“@type”: “Question”,
“name”: “What do different write concern levels mean for document durability?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Write concern w:0 means fire-and-forget with no acknowledgment and no durability. w:1 acknowledges after the primary writes to memory, but a crash before WAL flush can lose the write. w:majority acknowledges after a quorum of replicas have written, surviving a primary failure without data loss. Adding j:true requires the write to be flushed to the journal (WAL) before acknowledgment. For financial or critical data, use w:majority combined with j:true. For high-throughput logging where some loss is tolerable, w:1 or w:0 reduces latency.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does a multikey index work for array fields in a document store?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A multikey index creates one index entry per array element; a query like $elemMatch finds documents containing an array element matching the filter by looking up each possible value in the index.”
}
},
{
“@type”: “Question”,
“name”: “How is the aggregation pipeline executed?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each stage is a streaming operator; $match filters documents early using indexes; $group partitions and accumulates; $sort may spill to disk for large result sets; $project shapes the output.”
}
},
{
“@type”: “Question”,
“name”: “How does the document store handle schema evolution?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Without a strict schema, new fields can be added to documents at any time; optional JSON Schema validation catches structural violations; existing documents missing new fields return null for those fields in queries.”
}
},
{
“@type”: “Question”,
“name”: “What write concern levels are available and when should each be used?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Write concern 0 (fire-and-forget) for high-throughput non-critical data; write concern 1 (leader ack) for normal writes; write concern majority for financial or compliance-critical data requiring durability across replicas.”
}
}
]
}

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top