Distributed File System Low-Level Design: Metadata Server, Data Nodes, Replication, and Fault Tolerance

A distributed file system (DFS) provides a POSIX-like file interface over a cluster of machines, storing file data reliably across many nodes with automatic replication and fault tolerance. The canonical design — a centralised metadata server plus many data nodes — underpins HDFS (Hadoop Distributed File System) and influences modern cloud-native storage systems.

Architecture: Metadata Server and Data Nodes

Metadata server (NameNode): maintains the entire namespace in memory — an inode tree mapping paths to inode records, and an inode-to-chunk-list mapping. Does not store file data. All namespace operations (create, rename, delete, list) pass through the metadata server. The in-memory design is critical for performance: a stat() call on a multi-petabyte filesystem returns in microseconds because no disk I/O is needed for namespace resolution.

Data nodes: store chunk data on local disk. Handle data read and write I/O directly from/to clients. Report their chunk inventory to the metadata server periodically (block report) and send heartbeats every 3 seconds.

Client: communicates with the metadata server for namespace operations and chunk location lookups, then communicates directly with data nodes for data I/O. The metadata server is never in the data path.

File and Chunk Model

Files are split into fixed-size chunks (commonly 64 MB or 128 MB). Each chunk is assigned a globally unique chunk_id. Chunks are stored independently on data nodes — a file with 5 chunks may have its chunks spread across 15 different data nodes (3 replicas each).

Large chunk sizes are a deliberate trade-off: they reduce metadata overhead (fewer chunks per file) and amortise network round-trips across large sequential reads, at the cost of inefficiency for small files.

Write Path: Client → Primary → Secondaries

  1. Client requests chunk allocation from metadata server: “I want to write chunk #7 of file F”.
  2. Metadata server selects 3 data nodes (rack-aware) and returns (primary, secondary_1, secondary_2).
  3. Client establishes a write pipeline: client → primary → secondary_1 → secondary_2 (TCP streaming).
  4. Data flows in a pipeline: client sends to primary, primary forwards to secondary_1, secondary_1 forwards to secondary_2 — all concurrently.
  5. Once secondary_2 has written the data, ACKs flow back up the pipeline: secondary_2 → secondary_1 → primary → client.
  6. Client receives final ACK; metadata server records (chunk_id → [primary, secondary_1, secondary_2]).

Read Path: Client → Nearest Data Node

  1. Client asks metadata server for chunk locations for chunk #7 of file F.
  2. Metadata server returns the list of data nodes holding replicas, annotated with network topology distance.
  3. Client reads from the nearest data node (same rack preferred). If that node is slow or unavailable, client retries the next replica.
  4. Client verifies chunk checksum on receipt; corrupted reads trigger retry from a different replica.

Heartbeat and Failure Detection

Data nodes send a heartbeat to the metadata server every 3 seconds. The heartbeat carries node-level statistics (disk usage, I/O load). If no heartbeat is received for 10 minutes (configurable), the metadata server marks the node dead and begins re-replication of all chunks that were on that node.

Block reports: data nodes send a full inventory of all chunks they hold every hour. The metadata server reconciles reported chunks against its records, identifying over-replicated or corrupt chunks.

Rack-Aware Replication

The metadata server knows the rack topology of the cluster (typically via a rack-awareness script or static map). Default replica placement:

  • Replica 1: same node as the writer (or a local data node if client is off-cluster).
  • Replica 2: a node on a different rack from replica 1.
  • Replica 3: a different node on the same rack as replica 2.

This layout ensures that a single rack failure (switch failure) does not cause data unavailability — at least one replica (replica 1) is always on a different rack.

SQL DDL

-- File inode table
CREATE TABLE FileInode (
    id          BIGSERIAL PRIMARY KEY,
    path        TEXT          NOT NULL UNIQUE,
    size_bytes  BIGINT        NOT NULL DEFAULT 0,
    owner       VARCHAR(128)  NOT NULL,
    permissions SMALLINT      NOT NULL DEFAULT 644,
    created_at  TIMESTAMPTZ   NOT NULL DEFAULT now(),
    modified_at TIMESTAMPTZ   NOT NULL DEFAULT now()
);

CREATE INDEX idx_inode_path ON FileInode (path text_pattern_ops);

-- Chunk-to-inode mapping
CREATE TABLE FileChunk (
    inode_id    BIGINT       NOT NULL REFERENCES FileInode(id) ON DELETE CASCADE,
    chunk_index INTEGER      NOT NULL,
    chunk_id    BIGINT       NOT NULL UNIQUE,
    checksum    VARCHAR(64)  NOT NULL,
    PRIMARY KEY (inode_id, chunk_index)
);

-- Chunk replica locations
CREATE TABLE ChunkLocation (
    chunk_id      BIGINT       NOT NULL REFERENCES FileChunk(chunk_id),
    data_node_id  BIGINT       NOT NULL,
    rack_id       VARCHAR(64)  NOT NULL,
    PRIMARY KEY (chunk_id, data_node_id)
);

-- Data node registry
CREATE TABLE DataNode (
    id                BIGSERIAL PRIMARY KEY,
    hostname          VARCHAR(255)  NOT NULL UNIQUE,
    rack_id           VARCHAR(64)   NOT NULL,
    capacity_bytes    BIGINT        NOT NULL,
    used_bytes        BIGINT        NOT NULL DEFAULT 0,
    last_heartbeat_at TIMESTAMPTZ   NOT NULL DEFAULT now()
);

Python: Core Operations

import hashlib
import time
from typing import Optional

CHUNK_SIZE = 64 * 1024 * 1024  # 64 MB
HEARTBEAT_TIMEOUT = 600        # 10 minutes

# Simulated state
_inodes: dict[str, dict] = {}        # path -> inode
_chunks: dict[int, dict] = {}        # chunk_id -> {data, checksum}
_chunk_locations: dict[int, list] = {}  # chunk_id -> [data_node_id]
_data_nodes: dict[int, dict] = {}    # node_id -> {rack, last_heartbeat, used}

def create_file(path: str) -> int:
    """Create a new file inode and return its ID."""
    inode_id = len(_inodes) + 1
    _inodes[path] = {'id': inode_id, 'chunks': [], 'size': 0}
    print(f"Inode {inode_id} created for {path}")
    return inode_id

def write_chunk(file_id: int, chunk_index: int, data: bytes) -> int:
    """Write a chunk to 3 data nodes (rack-aware selection simplified)."""
    chunk_id = (file_id * 10000) + chunk_index
    checksum = hashlib.sha256(data).hexdigest()
    _chunks[chunk_id] = {'data': data, 'checksum': checksum}
    # Select 3 healthy nodes (rack-aware logic simplified to first 3 healthy nodes)
    healthy = [nid for nid, n in _data_nodes.items()
               if time.time() - n['last_heartbeat']  Optional[bytes]:
    """Read a chunk from the nearest available data node."""
    chunk_id = (file_id * 10000) + chunk_index
    if chunk_id not in _chunks:
        return None
    # In production: sort locations by network proximity to client
    locations = _chunk_locations.get(chunk_id, [])
    for node_id in locations:
        node = _data_nodes.get(node_id, {})
        if time.time() - node.get('last_heartbeat', 0)  list[int]:
    """Identify under-replicated chunks and trigger re-replication."""
    under_replicated = []
    for chunk_id, locations in _chunk_locations.items():
        if node_id in locations:
            remaining = [n for n in locations if n != node_id]
            _chunk_locations[chunk_id] = remaining
            if len(remaining) < 3:
                under_replicated.append(chunk_id)
                # In production: select a new target node and instruct source to replicate
                print(f"Chunk {chunk_id} under-replicated ({len(remaining)} replicas); re-replication queued")
    return under_replicated

Design Considerations Summary

  • Metadata server SPOF: use Active-Standby with shared edit log (QJM) or federated metadata for HA.
  • Rack awareness: essential for surviving rack-level switch failures without data loss or unavailability.
  • Re-replication: triggered by heartbeat timeout; tune timeout to avoid false positives from slow nodes.
  • Small file problem: DFS with large chunk sizes is inefficient for small files; use archiving (HAR, sequence files) or object storage for small file workloads.
  • Chunk size trade-off: larger chunks reduce metadata overhead and improve sequential throughput; smaller chunks improve parallelism for small files and reduce write amplification.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top