Distributed File System Low-Level Design: Metadata Server, Data Nodes, Replication, and Fault Tolerance

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do you address the metadata server single point of failure?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The classic HDFS architecture uses a single NameNode which is a single point of failure — if the NameNode is unavailable, the entire filesystem is inaccessible even though data nodes are healthy. Mitigations include: (1) Active-Standby NameNode with shared edit log via a Quorum Journal Manager (QJM) — the standby replays the edit log continuously and can take over in seconds; (2) Secondary NameNode (checkpoint node) that periodically checkpoints the in-memory inode tree to disk, reducing recovery time after a crash; (3) Federation — multiple independent NameNodes each owning a namespace subtree, eliminating a single bottleneck. Modern Kubernetes-native DFS designs (like Ceph) use distributed metadata via RADOS for full metadata HA without a single coordinator.”
}
},
{
“@type”: “Question”,
“name”: “How does rack-aware replication improve fault tolerance?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Without rack awareness, all three replicas of a chunk might land on data nodes within the same rack. A single top-of-rack switch failure would make all three replicas unavailable simultaneously, causing data unavailability even though replication factor is 3. Rack-aware replication places replicas on nodes in different physical racks: the HDFS default policy places the first replica on the same node as the writer (or a local node), the second replica on a different rack, and the third replica on a different node in the same rack as the second. This layout tolerates both a rack-level failure (entire rack goes down) and a node-level failure within a rack simultaneously.”
}
},
{
“@type”: “Question”,
“name”: “How does the filesystem detect data node failures and trigger re-replication?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Data nodes send a heartbeat to the metadata server every 3 seconds. If the metadata server does not receive a heartbeat from a data node for a configurable timeout (typically 10 minutes in HDFS to avoid false positives from slow nodes), the data node is declared dead. The metadata server scans all chunks that had a replica on the dead node and identifies those that now have fewer than the target replication factor. For each under-replicated chunk, the metadata server selects a healthy source data node that has a replica and a healthy destination node, instructs the source to pipeline a copy to the destination, and marks the chunk as under-replication-in-progress. Once the new replica is confirmed (via block report), the chunk is marked fully replicated again.”
}
},
{
“@type”: “Question”,
“name”: “What is the small file problem in distributed file systems and how is it mitigated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In a DFS with large chunk sizes (64MB or 128MB), storing millions of small files (KB to a few MB each) wastes storage because each file occupies at least one full chunk. More critically, each small file requires an inode entry in the metadata server's in-memory inode tree. With millions of files, the metadata server memory footprint becomes the bottleneck — HDFS recommends no more than 150 million files on a single NameNode with 64 GB RAM. Mitigations include: (1) HAR (Hadoop Archive) files — packing many small files into a large archive with its own index; (2) sequence files — combining many small records into a single large file with an embedded index; (3) using an object storage system (designed for arbitrary file sizes) for small file workloads instead of a block-oriented DFS.”
}
}
]
}

A distributed file system (DFS) provides a POSIX-like file interface over a cluster of machines, storing file data reliably across many nodes with automatic replication and fault tolerance. The canonical design — a centralised metadata server plus many data nodes — underpins HDFS (Hadoop Distributed File System) and influences modern cloud-native storage systems.

Architecture: Metadata Server and Data Nodes

Metadata server (NameNode): maintains the entire namespace in memory — an inode tree mapping paths to inode records, and an inode-to-chunk-list mapping. Does not store file data. All namespace operations (create, rename, delete, list) pass through the metadata server. The in-memory design is critical for performance: a stat() call on a multi-petabyte filesystem returns in microseconds because no disk I/O is needed for namespace resolution.

Data nodes: store chunk data on local disk. Handle data read and write I/O directly from/to clients. Report their chunk inventory to the metadata server periodically (block report) and send heartbeats every 3 seconds.

Client: communicates with the metadata server for namespace operations and chunk location lookups, then communicates directly with data nodes for data I/O. The metadata server is never in the data path.

File and Chunk Model

Files are split into fixed-size chunks (commonly 64 MB or 128 MB). Each chunk is assigned a globally unique chunk_id. Chunks are stored independently on data nodes — a file with 5 chunks may have its chunks spread across 15 different data nodes (3 replicas each).

Large chunk sizes are a deliberate trade-off: they reduce metadata overhead (fewer chunks per file) and amortise network round-trips across large sequential reads, at the cost of inefficiency for small files.

Write Path: Client → Primary → Secondaries

  1. Client requests chunk allocation from metadata server: “I want to write chunk #7 of file F”.
  2. Metadata server selects 3 data nodes (rack-aware) and returns (primary, secondary_1, secondary_2).
  3. Client establishes a write pipeline: client → primary → secondary_1 → secondary_2 (TCP streaming).
  4. Data flows in a pipeline: client sends to primary, primary forwards to secondary_1, secondary_1 forwards to secondary_2 — all concurrently.
  5. Once secondary_2 has written the data, ACKs flow back up the pipeline: secondary_2 → secondary_1 → primary → client.
  6. Client receives final ACK; metadata server records (chunk_id → [primary, secondary_1, secondary_2]).

Read Path: Client → Nearest Data Node

  1. Client asks metadata server for chunk locations for chunk #7 of file F.
  2. Metadata server returns the list of data nodes holding replicas, annotated with network topology distance.
  3. Client reads from the nearest data node (same rack preferred). If that node is slow or unavailable, client retries the next replica.
  4. Client verifies chunk checksum on receipt; corrupted reads trigger retry from a different replica.

Heartbeat and Failure Detection

Data nodes send a heartbeat to the metadata server every 3 seconds. The heartbeat carries node-level statistics (disk usage, I/O load). If no heartbeat is received for 10 minutes (configurable), the metadata server marks the node dead and begins re-replication of all chunks that were on that node.

Block reports: data nodes send a full inventory of all chunks they hold every hour. The metadata server reconciles reported chunks against its records, identifying over-replicated or corrupt chunks.

Rack-Aware Replication

The metadata server knows the rack topology of the cluster (typically via a rack-awareness script or static map). Default replica placement:

  • Replica 1: same node as the writer (or a local data node if client is off-cluster).
  • Replica 2: a node on a different rack from replica 1.
  • Replica 3: a different node on the same rack as replica 2.

This layout ensures that a single rack failure (switch failure) does not cause data unavailability — at least one replica (replica 1) is always on a different rack.

SQL DDL

-- File inode table
CREATE TABLE FileInode (
    id          BIGSERIAL PRIMARY KEY,
    path        TEXT          NOT NULL UNIQUE,
    size_bytes  BIGINT        NOT NULL DEFAULT 0,
    owner       VARCHAR(128)  NOT NULL,
    permissions SMALLINT      NOT NULL DEFAULT 644,
    created_at  TIMESTAMPTZ   NOT NULL DEFAULT now(),
    modified_at TIMESTAMPTZ   NOT NULL DEFAULT now()
);

CREATE INDEX idx_inode_path ON FileInode (path text_pattern_ops);

-- Chunk-to-inode mapping
CREATE TABLE FileChunk (
    inode_id    BIGINT       NOT NULL REFERENCES FileInode(id) ON DELETE CASCADE,
    chunk_index INTEGER      NOT NULL,
    chunk_id    BIGINT       NOT NULL UNIQUE,
    checksum    VARCHAR(64)  NOT NULL,
    PRIMARY KEY (inode_id, chunk_index)
);

-- Chunk replica locations
CREATE TABLE ChunkLocation (
    chunk_id      BIGINT       NOT NULL REFERENCES FileChunk(chunk_id),
    data_node_id  BIGINT       NOT NULL,
    rack_id       VARCHAR(64)  NOT NULL,
    PRIMARY KEY (chunk_id, data_node_id)
);

-- Data node registry
CREATE TABLE DataNode (
    id                BIGSERIAL PRIMARY KEY,
    hostname          VARCHAR(255)  NOT NULL UNIQUE,
    rack_id           VARCHAR(64)   NOT NULL,
    capacity_bytes    BIGINT        NOT NULL,
    used_bytes        BIGINT        NOT NULL DEFAULT 0,
    last_heartbeat_at TIMESTAMPTZ   NOT NULL DEFAULT now()
);

Python: Core Operations

import hashlib
import time
from typing import Optional

CHUNK_SIZE = 64 * 1024 * 1024  # 64 MB
HEARTBEAT_TIMEOUT = 600        # 10 minutes

# Simulated state
_inodes: dict[str, dict] = {}        # path -> inode
_chunks: dict[int, dict] = {}        # chunk_id -> {data, checksum}
_chunk_locations: dict[int, list] = {}  # chunk_id -> [data_node_id]
_data_nodes: dict[int, dict] = {}    # node_id -> {rack, last_heartbeat, used}

def create_file(path: str) -> int:
    """Create a new file inode and return its ID."""
    inode_id = len(_inodes) + 1
    _inodes[path] = {'id': inode_id, 'chunks': [], 'size': 0}
    print(f"Inode {inode_id} created for {path}")
    return inode_id

def write_chunk(file_id: int, chunk_index: int, data: bytes) -> int:
    """Write a chunk to 3 data nodes (rack-aware selection simplified)."""
    chunk_id = (file_id * 10000) + chunk_index
    checksum = hashlib.sha256(data).hexdigest()
    _chunks[chunk_id] = {'data': data, 'checksum': checksum}
    # Select 3 healthy nodes (rack-aware logic simplified to first 3 healthy nodes)
    healthy = [nid for nid, n in _data_nodes.items()
               if time.time() - n['last_heartbeat']  Optional[bytes]:
    """Read a chunk from the nearest available data node."""
    chunk_id = (file_id * 10000) + chunk_index
    if chunk_id not in _chunks:
        return None
    # In production: sort locations by network proximity to client
    locations = _chunk_locations.get(chunk_id, [])
    for node_id in locations:
        node = _data_nodes.get(node_id, {})
        if time.time() - node.get('last_heartbeat', 0)  list[int]:
    """Identify under-replicated chunks and trigger re-replication."""
    under_replicated = []
    for chunk_id, locations in _chunk_locations.items():
        if node_id in locations:
            remaining = [n for n in locations if n != node_id]
            _chunk_locations[chunk_id] = remaining
            if len(remaining) < 3:
                under_replicated.append(chunk_id)
                # In production: select a new target node and instruct source to replicate
                print(f"Chunk {chunk_id} under-replicated ({len(remaining)} replicas); re-replication queued")
    return under_replicated

Design Considerations Summary

  • Metadata server SPOF: use Active-Standby with shared edit log (QJM) or federated metadata for HA.
  • Rack awareness: essential for surviving rack-level switch failures without data loss or unavailability.
  • Re-replication: triggered by heartbeat timeout; tune timeout to avoid false positives from slow nodes.
  • Small file problem: DFS with large chunk sizes is inefficient for small files; use archiving (HAR, sequence files) or object storage for small file workloads.
  • Chunk size trade-off: larger chunks reduce metadata overhead and improve sequential throughput; smaller chunks improve parallelism for small files and reduce write amplification.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does the metadata server avoid becoming a single point of failure?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The metadata server can be made highly available with primary-standby replication and automatic failover; a distributed alternative uses consensus (Raft) across 3+ metadata nodes so any majority can serve requests.”
}
},
{
“@type”: “Question”,
“name”: “How does rack-aware replication protect against rack-level failures?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When placing 3 replicas of a chunk, the system ensures at most one replica per rack; this means a single rack power or network failure cannot cause data loss even if all machines in that rack go offline simultaneously.”
}
},
{
“@type”: “Question”,
“name”: “How is re-replication triggered after a data node failure?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The metadata server detects a data node as failed when its heartbeat has not been received for a configurable timeout; it then identifies all chunks whose replica count dropped below the target and schedules re-replication to healthy nodes.”
}
},
{
“@type”: “Question”,
“name”: “What is the small file problem and how is it mitigated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Storing many small files (less than the chunk size) wastes storage and creates metadata overhead; mitigations include file bundling (packing many small files into one chunk with an index), or using a separate fast metadata store optimized for small objects.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top