A distributed file system (DFS) provides a POSIX-like file interface over a cluster of machines, storing file data reliably across many nodes with automatic replication and fault tolerance. The canonical design — a centralised metadata server plus many data nodes — underpins HDFS (Hadoop Distributed File System) and influences modern cloud-native storage systems.
Architecture: Metadata Server and Data Nodes
Metadata server (NameNode): maintains the entire namespace in memory — an inode tree mapping paths to inode records, and an inode-to-chunk-list mapping. Does not store file data. All namespace operations (create, rename, delete, list) pass through the metadata server. The in-memory design is critical for performance: a stat() call on a multi-petabyte filesystem returns in microseconds because no disk I/O is needed for namespace resolution.
Data nodes: store chunk data on local disk. Handle data read and write I/O directly from/to clients. Report their chunk inventory to the metadata server periodically (block report) and send heartbeats every 3 seconds.
Client: communicates with the metadata server for namespace operations and chunk location lookups, then communicates directly with data nodes for data I/O. The metadata server is never in the data path.
File and Chunk Model
Files are split into fixed-size chunks (commonly 64 MB or 128 MB). Each chunk is assigned a globally unique chunk_id. Chunks are stored independently on data nodes — a file with 5 chunks may have its chunks spread across 15 different data nodes (3 replicas each).
Large chunk sizes are a deliberate trade-off: they reduce metadata overhead (fewer chunks per file) and amortise network round-trips across large sequential reads, at the cost of inefficiency for small files.
Write Path: Client → Primary → Secondaries
- Client requests chunk allocation from metadata server: “I want to write chunk #7 of file F”.
- Metadata server selects 3 data nodes (rack-aware) and returns (primary, secondary_1, secondary_2).
- Client establishes a write pipeline: client → primary → secondary_1 → secondary_2 (TCP streaming).
- Data flows in a pipeline: client sends to primary, primary forwards to secondary_1, secondary_1 forwards to secondary_2 — all concurrently.
- Once secondary_2 has written the data, ACKs flow back up the pipeline: secondary_2 → secondary_1 → primary → client.
- Client receives final ACK; metadata server records (chunk_id → [primary, secondary_1, secondary_2]).
Read Path: Client → Nearest Data Node
- Client asks metadata server for chunk locations for chunk #7 of file F.
- Metadata server returns the list of data nodes holding replicas, annotated with network topology distance.
- Client reads from the nearest data node (same rack preferred). If that node is slow or unavailable, client retries the next replica.
- Client verifies chunk checksum on receipt; corrupted reads trigger retry from a different replica.
Heartbeat and Failure Detection
Data nodes send a heartbeat to the metadata server every 3 seconds. The heartbeat carries node-level statistics (disk usage, I/O load). If no heartbeat is received for 10 minutes (configurable), the metadata server marks the node dead and begins re-replication of all chunks that were on that node.
Block reports: data nodes send a full inventory of all chunks they hold every hour. The metadata server reconciles reported chunks against its records, identifying over-replicated or corrupt chunks.
Rack-Aware Replication
The metadata server knows the rack topology of the cluster (typically via a rack-awareness script or static map). Default replica placement:
- Replica 1: same node as the writer (or a local data node if client is off-cluster).
- Replica 2: a node on a different rack from replica 1.
- Replica 3: a different node on the same rack as replica 2.
This layout ensures that a single rack failure (switch failure) does not cause data unavailability — at least one replica (replica 1) is always on a different rack.
SQL DDL
-- File inode table
CREATE TABLE FileInode (
id BIGSERIAL PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
size_bytes BIGINT NOT NULL DEFAULT 0,
owner VARCHAR(128) NOT NULL,
permissions SMALLINT NOT NULL DEFAULT 644,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
modified_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_inode_path ON FileInode (path text_pattern_ops);
-- Chunk-to-inode mapping
CREATE TABLE FileChunk (
inode_id BIGINT NOT NULL REFERENCES FileInode(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL,
chunk_id BIGINT NOT NULL UNIQUE,
checksum VARCHAR(64) NOT NULL,
PRIMARY KEY (inode_id, chunk_index)
);
-- Chunk replica locations
CREATE TABLE ChunkLocation (
chunk_id BIGINT NOT NULL REFERENCES FileChunk(chunk_id),
data_node_id BIGINT NOT NULL,
rack_id VARCHAR(64) NOT NULL,
PRIMARY KEY (chunk_id, data_node_id)
);
-- Data node registry
CREATE TABLE DataNode (
id BIGSERIAL PRIMARY KEY,
hostname VARCHAR(255) NOT NULL UNIQUE,
rack_id VARCHAR(64) NOT NULL,
capacity_bytes BIGINT NOT NULL,
used_bytes BIGINT NOT NULL DEFAULT 0,
last_heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
Python: Core Operations
import hashlib
import time
from typing import Optional
CHUNK_SIZE = 64 * 1024 * 1024 # 64 MB
HEARTBEAT_TIMEOUT = 600 # 10 minutes
# Simulated state
_inodes: dict[str, dict] = {} # path -> inode
_chunks: dict[int, dict] = {} # chunk_id -> {data, checksum}
_chunk_locations: dict[int, list] = {} # chunk_id -> [data_node_id]
_data_nodes: dict[int, dict] = {} # node_id -> {rack, last_heartbeat, used}
def create_file(path: str) -> int:
"""Create a new file inode and return its ID."""
inode_id = len(_inodes) + 1
_inodes[path] = {'id': inode_id, 'chunks': [], 'size': 0}
print(f"Inode {inode_id} created for {path}")
return inode_id
def write_chunk(file_id: int, chunk_index: int, data: bytes) -> int:
"""Write a chunk to 3 data nodes (rack-aware selection simplified)."""
chunk_id = (file_id * 10000) + chunk_index
checksum = hashlib.sha256(data).hexdigest()
_chunks[chunk_id] = {'data': data, 'checksum': checksum}
# Select 3 healthy nodes (rack-aware logic simplified to first 3 healthy nodes)
healthy = [nid for nid, n in _data_nodes.items()
if time.time() - n['last_heartbeat'] Optional[bytes]:
"""Read a chunk from the nearest available data node."""
chunk_id = (file_id * 10000) + chunk_index
if chunk_id not in _chunks:
return None
# In production: sort locations by network proximity to client
locations = _chunk_locations.get(chunk_id, [])
for node_id in locations:
node = _data_nodes.get(node_id, {})
if time.time() - node.get('last_heartbeat', 0) list[int]:
"""Identify under-replicated chunks and trigger re-replication."""
under_replicated = []
for chunk_id, locations in _chunk_locations.items():
if node_id in locations:
remaining = [n for n in locations if n != node_id]
_chunk_locations[chunk_id] = remaining
if len(remaining) < 3:
under_replicated.append(chunk_id)
# In production: select a new target node and instruct source to replicate
print(f"Chunk {chunk_id} under-replicated ({len(remaining)} replicas); re-replication queued")
return under_replicated
Design Considerations Summary
- Metadata server SPOF: use Active-Standby with shared edit log (QJM) or federated metadata for HA.
- Rack awareness: essential for surviving rack-level switch failures without data loss or unavailability.
- Re-replication: triggered by heartbeat timeout; tune timeout to avoid false positives from slow nodes.
- Small file problem: DFS with large chunk sizes is inefficient for small files; use archiving (HAR, sequence files) or object storage for small file workloads.
- Chunk size trade-off: larger chunks reduce metadata overhead and improve sequential throughput; smaller chunks improve parallelism for small files and reduce write amplification.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture