Document Store System: Overview
A document store persists and retrieves arbitrary JSON documents. Unlike relational databases, it does not enforce a fixed schema per collection. Low-level design covers document identity, storage engine choices, index types and maintenance, query operators, aggregation pipeline, and write durability.
Document Model
A document is an arbitrary nested JSON object. A collection groups documents of related type. Schema is optional: collections can enforce a JSON Schema for validation, but many deployments use schema-free mode for flexibility.
Document identity: each document has an _id field. If not provided by the client, the store generates a globally unique ObjectId composed of:
- 4-byte Unix timestamp (seconds)
- 5-byte random value (machine + process identifier)
- 3-byte incrementing counter
This makes ObjectIds sortable by creation time and unique across a distributed cluster without coordination.
Storage Engine
The primary storage engine uses a B-tree indexed on _id:
- Documents are stored as BSON (Binary JSON) or JSONB on disk for compact representation and fast field access.
- Writes are appended to a Write-Ahead Log (WAL) before the B-tree page is modified, ensuring crash recovery.
- The B-tree provides O(log N) lookup by
_idand O(log N + K) range scans.
LSM-tree (Log-Structured Merge-tree) is an alternative for write-heavy workloads: memtable absorbs writes in memory, flushes to SSTables on disk, and background compaction merges SSTables. Tradeoff: higher write throughput at the cost of read amplification during compaction.
Secondary Index Types
- Single-field index: index on one field (e.g.,
{"user_id": 1}). Supports equality and range queries on that field. - Compound index: index on multiple fields in order (e.g.,
{"status": 1, "created_at": -1}). Supports queries that filter on status and sort by created_at. - Multikey (array) index: when the indexed field contains an array, the index creates one entry per array element. Allows queries like
{"tags": {$in: ["python", "java"]}}to use an index. - Text index: inverted index on tokenized string fields for full-text
$textqueries. - Sparse index: only indexes documents where the indexed field exists. Reduces index size for optional fields.
Secondary Index Maintenance
On every write (insert, update, delete), all secondary indexes must be updated. Two approaches:
- Synchronous (foreground): index updates happen in the same transaction as the document write. Ensures consistency but adds write latency proportional to the number of indexes.
- Asynchronous (background): document write commits first, index update queued. Allows reads on stale indexes for a brief window. Used for non-unique indexes where eventual consistency is acceptable.
For unique indexes (e.g., email field), synchronous maintenance is required to enforce the uniqueness constraint.
Query Operators
The query engine supports a filter DSL:
$eq: equality match —{"status": {$eq: "active"}}or shorthand{"status": "active"}$gt,$gte,$lt,$lte: range comparison$in: field value in array —{"role": {$in: ["admin", "editor"]}}$elemMatch: match array elements satisfying multiple conditions —{"scores": {$elemMatch: {$gt: 80, $lt: 100}}}$text: full-text search on text-indexed fields$exists: check field presence$and,$or,$not: logical composition
Query Planner and Index Selection
The query planner evaluates candidate indexes for a query and estimates cost:
- Identify indexes that cover at least one filter field.
- Estimate key scans: how many index entries will be scanned to satisfy the query.
- Prefer compound indexes that cover multiple filter fields over multiple single-field indexes.
- For sorted queries, prefer indexes whose sort order matches the query's sort, avoiding an in-memory sort.
- If no suitable index exists, fall back to a full collection scan.
Aggregation Pipeline
The aggregation pipeline processes documents through a sequence of stages:
$match: filter documents (uses indexes when first stage)$group: group by key, compute accumulators ($sum, $avg, $max, $min, $count)$sort: sort documents by field$project: reshape documents — include, exclude, or compute fields$limit/$skip: pagination$lookup: left outer join with another collection (JOIN equivalent)$unwind: deconstruct array field into individual documents (one per element)
Write Concern and Durability
Write concern controls durability acknowledgment:
w:0(fire-and-forget): no acknowledgment. Fastest, no durability guarantee.w:1(default): acknowledge after primary node writes to memory (WAL not yet flushed).w:majority: acknowledge after a majority of replica set members have written. Survives primary failure without data loss.j:true: journal acknowledged — write is persisted to WAL before acknowledgment. Combines with w:majority for strongest guarantee.
SQL Schema (Internal Metadata)
-- Collection registry
CREATE TABLE Collection (
name VARCHAR(128) PRIMARY KEY,
schema_json JSONB, -- optional JSON Schema for validation
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Document storage (JSONB body)
CREATE TABLE Document (
id BIGSERIAL PRIMARY KEY,
collection_id VARCHAR(128) NOT NULL REFERENCES Collection(name),
_id TEXT NOT NULL, -- user-facing ObjectId or custom key
body JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (collection_id, _id)
);
CREATE INDEX idx_document_collection ON Document(collection_id, created_at DESC);
CREATE INDEX idx_document_body ON Document USING GIN(body); -- JSONB index for field queries
-- Index metadata
CREATE TABLE Index (
id BIGSERIAL PRIMARY KEY,
collection_id VARCHAR(128) NOT NULL REFERENCES Collection(name),
fields JSONB NOT NULL, -- [{"field": "user_id", "order": 1}, ...]
index_type VARCHAR(32) NOT NULL DEFAULT 'btree', -- btree, text, multikey
unique BOOLEAN NOT NULL DEFAULT FALSE,
sparse BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Python Implementation
import json
import uuid
import time
from typing import Any, Dict, List, Optional
def generate_object_id() -> str:
"""Generate a sortable unique document ID."""
timestamp = int(time.time()).to_bytes(4, 'big').hex()
random_bytes = uuid.uuid4().bytes[:5].hex()
counter = int(time.time() * 1000 % 16777216).to_bytes(3, 'big').hex()
return timestamp + random_bytes + counter
def insert_document(collection: str, doc: dict) -> str:
"""Insert a document into the collection. Returns _id."""
if '_id' not in doc:
doc['_id'] = generate_object_id()
# Optional: validate against collection JSON Schema
schema = get_collection_schema(collection)
if schema:
validate_against_schema(doc, schema) # raises on violation
db.execute(
"INSERT INTO Document(collection_id, _id, body, created_at, updated_at)"
" VALUES(%s, %s, %s, NOW(), NOW())",
(collection, doc['_id'], json.dumps(doc))
)
# Update secondary indexes asynchronously via queue
index_update_queue.enqueue(collection, doc['_id'], doc, operation='insert')
return doc['_id']
def find(collection: str, filter_doc: dict,
projection: Optional[dict] = None,
sort: Optional[list] = None,
limit: int = 100) -> List[dict]:
"""Query documents using filter operators. Uses JSONB containment for simple filters."""
# Build WHERE clause from filter (simplified — production uses full query planner)
where_clauses = ["collection_id = %s"]
params: list = [collection]
for field, condition in filter_doc.items():
if isinstance(condition, dict):
for op, val in condition.items():
if op == '$eq':
where_clauses.append(f"body->>'{field}' = %s")
params.append(str(val))
elif op == '$gt':
where_clauses.append(f"(body->>'{field}')::numeric > %s")
params.append(val)
elif op == '$lt':
where_clauses.append(f"(body->>'{field}')::numeric >'{field}' IN ({placeholders})")
params.extend([str(v) for v in val])
else:
# Simple equality
where_clauses.append(f"body @> %s")
params.append(json.dumps({field: condition}))
sql = f"SELECT body FROM Document WHERE {' AND '.join(where_clauses)}"
if sort:
order_parts = []
for field, direction in sort:
dir_str = 'DESC' if direction == -1 else 'ASC'
order_parts.append(f"(body->>'{field}') {dir_str}")
sql += f" ORDER BY {', '.join(order_parts)}"
sql += f" LIMIT %s"
params.append(limit)
rows = db.execute(sql, params).fetchall()
docs = [json.loads(row[0]) for row in rows]
# Apply projection
if projection:
include = [k for k, v in projection.items() if v == 1]
exclude = [k for k, v in projection.items() if v == 0]
if include:
docs = [{k: d[k] for k in include if k in d} for d in docs]
elif exclude:
docs = [{k: v for k, v in d.items() if k not in exclude} for d in docs]
return docs
def aggregate(collection: str, pipeline: List[dict]) -> List[dict]:
"""Execute aggregation pipeline. Simplified implementation."""
# Load initial documents
docs = find(collection, {}, limit=100000)
for stage in pipeline:
if '$match' in stage:
filt = stage['$match']
docs = [d for d in docs if matches_filter(d, filt)]
elif '$sort' in stage:
for field, direction in reversed(list(stage['$sort'].items())):
docs.sort(key=lambda d: d.get(field, 0), reverse=(direction == -1))
elif '$limit' in stage:
docs = docs[:stage['$limit']]
elif '$project' in stage:
proj = stage['$project']
include = [k for k, v in proj.items() if v == 1]
docs = [{k: d.get(k) for k in include} for d in docs]
elif '$group' in stage:
docs = group_documents(docs, stage['$group'])
return docs
def matches_filter(doc: dict, filt: dict) -> bool:
"""Evaluate filter against a document. Simplified operator support."""
for field, condition in filt.items():
val = doc.get(field)
if isinstance(condition, dict):
for op, threshold in condition.items():
if op == '$gt' and not (val is not None and val > threshold):
return False
elif op == '$lt' and not (val is not None and val list:
"""Group documents by _id expression and compute accumulators."""
from collections import defaultdict
groups: dict = defaultdict(list)
id_field = group_spec['_id']
for doc in docs:
key = doc.get(id_field.lstrip('$')) if id_field else None
groups[key].append(doc)
result = []
for key, group_docs in groups.items():
row: dict = {'_id': key}
for acc_name, acc_expr in group_spec.items():
if acc_name == '_id':
continue
if '$sum' in acc_expr:
field = acc_expr['$sum'].lstrip('$')
row[acc_name] = sum(d.get(field, 0) for d in group_docs)
elif '$avg' in acc_expr:
field = acc_expr['$avg'].lstrip('$')
vals = [d.get(field, 0) for d in group_docs]
row[acc_name] = sum(vals) / len(vals) if vals else 0
elif '$count' in acc_expr:
row[acc_name] = len(group_docs)
result.append(row)
return result
def get_collection_schema(collection: str) -> Optional[dict]:
row = db.execute(
"SELECT schema_json FROM Collection WHERE name=%s", (collection,)
).fetchone()
return json.loads(row[0]) if row and row[0] else None
def validate_against_schema(doc: dict, schema: dict) -> None:
import jsonschema
jsonschema.validate(instance=doc, schema=schema)
Key Design Decisions Summary
- ObjectId generation encodes timestamp, enabling chronological sorting without a separate created_at query.
- JSONB with GIN index in PostgreSQL provides flexible field-level querying without a dedicated document engine.
- Compound indexes must match query field order — a compound index on (status, created_at) does not efficiently serve queries filtering only on created_at.
- Synchronous index maintenance on unique indexes is required for correctness; async is acceptable for non-unique indexes.
- Write concern majority + journal (j:true) provides the strongest durability guarantee at the cost of higher write latency.
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture