System Design Interview: Design a Distributed File System (HDFS/GFS)

System Design Interview: Design a Distributed File System (HDFS/GFS)

Distributed file systems are foundational infrastructure, powering big data processing, cloud storage, and ML training pipelines. Understanding HDFS and Google File System (GFS) principles is essential for data engineering roles at Google, Meta, Amazon, Databricks, and Netflix. This design is also the foundation for Dropbox/Google Drive.

Requirements Clarification

Functional Requirements

  • Store large files (gigabytes to terabytes each)
  • Read and write files sequentially (streaming, not random access)
  • Replicate files for fault tolerance
  • Automatic recovery when nodes fail
  • Namespace operations: create, delete, rename, list directory

Non-Functional Requirements

  • Scale: petabytes of data across thousands of machines
  • Fault tolerance: automatic re-replication when nodes fail
  • High throughput for sequential reads/writes (not low latency)
  • Consistency: append-only writes; no random updates
  • Replication factor: 3x (default HDFS)

Architecture Overview

Client
  ↓
NameNode (master)          DataNodes (workers)
- Namespace (directory tree) - Store actual file chunks
- File → chunks mapping      - Serve chunk reads/writes
- DataNode locations         - Send heartbeats to NameNode
- No file data stored        - Replicate chunks to peers

Key Design: Chunking

CHUNK_SIZE = 64 * 1024 * 1024  # 64MB (HDFS default)
# GFS uses 64MB chunks; Colossus (GFS successor) uses smaller chunks

class FileChunk:
    def __init__(self, chunk_id: str, file_id: str, chunk_index: int):
        self.chunk_id = chunk_id
        self.file_id = file_id
        self.chunk_index = chunk_index
        self.size = 0            # actual bytes in chunk
        self.checksum: str = ""  # MD5 or CRC32 for integrity
        self.version: int = 1    # incremented on mutation
        self.locations: list[str] = []  # DataNode IDs storing this chunk

class FileMetadata:
    def __init__(self, file_id: str, path: str, size: int):
        self.file_id = file_id
        self.path = path
        self.total_size = size
        self.chunk_ids: list[str] = []  # ordered list of chunk IDs
        self.replication_factor = 3
        self.created_at: float = 0
        self.modified_at: float = 0
        self.permissions: int = 0o644

NameNode: Master Coordinator

import threading
from collections import defaultdict
import time

class NameNode:
    """
    Single master (HDFS) — SPOF addressed by:
    - Secondary NameNode: checkpoints namespace periodically
    - HDFS HA: Active/Standby NameNodes with shared edit log (ZooKeeper)
    - Google Colossus: replaced with distributed Bigtable-backed namespace
    """

    def __init__(self, replication_factor: int = 3):
        self.replication_factor = replication_factor

        # In-memory namespace (persisted to disk as edit log + fsimage)
        self.namespace: dict[str, FileMetadata] = {}  # path -> FileMetadata
        self.chunk_map: dict[str, FileChunk] = {}      # chunk_id -> FileChunk
        self.datanode_chunks: dict[str, set] = defaultdict(set)  # datanode_id -> chunk_ids
        self.datanode_info: dict[str, dict] = {}       # datanode_id -> {capacity, free, last_heartbeat}

        self.lock = threading.RLock()

    def create_file(self, path: str, replication: int = None) -> FileMetadata:
        """Create a new file entry (no data yet)"""
        with self.lock:
            if path in self.namespace:
                raise FileExistsError(f"{path} already exists")

            file_id = generate_id()
            metadata = FileMetadata(file_id, path, 0)
            metadata.replication_factor = replication or self.replication_factor
            metadata.created_at = time.time()
            self.namespace[path] = metadata
            return metadata

    def allocate_chunk(self, file_path: str) -> tuple[str, list[str]]:
        """
        Allocate a new chunk for file, return (chunk_id, [datanode_ids]).
        Client will write directly to DataNodes, not through NameNode.
        """
        with self.lock:
            metadata = self.namespace.get(file_path)
            if not metadata:
                raise FileNotFoundError(f"{file_path} not found")

            chunk_id = generate_chunk_id()
            chunk_index = len(metadata.chunk_ids)

            # Select DataNodes for this chunk (placement policy)
            datanodes = self._select_datanodes(metadata.replication_factor)
            chunk = FileChunk(chunk_id, metadata.file_id, chunk_index)
            chunk.locations = datanodes

            self.chunk_map[chunk_id] = chunk
            metadata.chunk_ids.append(chunk_id)

            return chunk_id, datanodes

    def _select_datanodes(self, count: int) -> list[str]:
        """
        Rack-aware placement policy (HDFS default):
        - 1st replica: on same rack as writer (minimize cross-rack traffic)
        - 2nd replica: different rack (fault isolation)
        - 3rd replica: same rack as 2nd but different node
        
        Simplified: just select by available capacity
        """
        available = [
            (info['free'], node_id)
            for node_id, info in self.datanode_info.items()
            if time.time() - info['last_heartbeat']  list[tuple[str, list[str]]]:
        """Return chunk_id, [datanode_ids] for each chunk of file (for reading)"""
        with self.lock:
            metadata = self.namespace.get(file_path)
            if not metadata:
                raise FileNotFoundError(f"{file_path} not found")

            return [
                (chunk_id, self.chunk_map[chunk_id].locations)
                for chunk_id in metadata.chunk_ids
            ]

    def process_heartbeat(self, datanode_id: str, capacity: int,
                          free: int, chunk_reports: list[str]):
        """
        DataNodes send heartbeat every 3 seconds.
        NameNode tracks live nodes and chunk inventory.
        Returns commands to execute (e.g., delete orphan chunks, re-replicate).
        """
        with self.lock:
            self.datanode_info[datanode_id] = {
                'capacity': capacity,
                'free': free,
                'last_heartbeat': time.time(),
            }
            self.datanode_chunks[datanode_id] = set(chunk_reports)

        return self._check_replication()

    def _check_replication(self) -> list[dict]:
        """Find under-replicated chunks and issue re-replication commands"""
        commands = []
        with self.lock:
            for chunk_id, chunk in self.chunk_map.items():
                alive_locations = [
                    loc for loc in chunk.locations
                    if time.time() - self.datanode_info.get(loc, {}).get('last_heartbeat', 0) < 30
                ]
                if len(alive_locations) < self.replication_factor:
                    # Command a DataNode to copy this chunk to a new DataNode
                    target = self._select_datanodes(1)
                    if target and alive_locations:
                        commands.append({
                            'type': 'REPLICATE',
                            'chunk_id': chunk_id,
                            'source': alive_locations[0],
                            'target': target[0],
                        })
        return commands

DataNode: Storage Worker

import os
import hashlib

class DataNode:
    """
    Stores chunks as files on local disk.
    Serves read/write requests from clients.
    Replicates to other DataNodes in the pipeline.
    """

    def __init__(self, node_id: str, storage_dir: str, namenode_address: str):
        self.node_id = node_id
        self.storage_dir = storage_dir
        self.namenode = NameNodeClient(namenode_address)
        os.makedirs(storage_dir, exist_ok=True)

    def write_chunk(self, chunk_id: str, data: bytes,
                    pipeline: list[str]) -> bool:
        """
        Write chunk data.
        Pipeline: [this_node, next_node, ...] — forward to pipeline tail.
        Ensures write is replicated before acknowledging to client.
        """
        # Write to local disk
        chunk_path = os.path.join(self.storage_dir, chunk_id)
        checksum = hashlib.md5(data).hexdigest()

        with open(chunk_path, 'wb') as f:
            f.write(data)

        # Forward to next DataNode in pipeline (chain replication)
        if pipeline:
            next_node = pipeline[0]
            remaining_pipeline = pipeline[1:]
            datanode_client = DataNodeClient(next_node)
            if not datanode_client.write_chunk(chunk_id, data, remaining_pipeline):
                os.remove(chunk_path)  # Rollback on pipeline failure
                return False

        # Notify NameNode chunk was written
        self.namenode.report_chunk(self.node_id, chunk_id, len(data), checksum)
        return True

    def read_chunk(self, chunk_id: str, offset: int = 0,
                   length: int = None) -> bytes:
        """Read chunk data with integrity verification"""
        chunk_path = os.path.join(self.storage_dir, chunk_id)
        if not os.path.exists(chunk_path):
            raise ChunkNotFoundError(f"Chunk {chunk_id} not found")

        with open(chunk_path, 'rb') as f:
            if offset:
                f.seek(offset)
            data = f.read(length) if length else f.read()

        return data

    def send_heartbeat(self):
        """Send heartbeat to NameNode every 3 seconds"""
        stat = os.statvfs(self.storage_dir)
        capacity = stat.f_blocks * stat.f_frsize
        free = stat.f_available * stat.f_frsize
        chunk_ids = [f for f in os.listdir(self.storage_dir)]

        commands = self.namenode.heartbeat(
            self.node_id, capacity, free, chunk_ids
        )
        self._execute_commands(commands)

    def _execute_commands(self, commands: list[dict]):
        """Execute NameNode commands: replicate, delete orphan chunks"""
        for cmd in commands:
            if cmd['type'] == 'REPLICATE':
                data = self.read_chunk(cmd['chunk_id'])
                target_client = DataNodeClient(cmd['target'])
                target_client.write_chunk(cmd['chunk_id'], data, [])
            elif cmd['type'] == 'DELETE':
                chunk_path = os.path.join(self.storage_dir, cmd['chunk_id'])
                if os.path.exists(chunk_path):
                    os.remove(chunk_path)

Client API

class DFSClient:
    """
    Client library for reading and writing files.
    Communicates with NameNode for metadata, DataNodes for data.
    """

    def __init__(self, namenode_address: str):
        self.namenode = NameNodeClient(namenode_address)
        self.chunk_size = 64 * 1024 * 1024  # 64MB

    def write(self, remote_path: str, data: bytes):
        """Write file to DFS"""
        self.namenode.create_file(remote_path)
        offset = 0

        while offset  bytes:
        """Read entire file from DFS"""
        chunk_locations = self.namenode.get_chunk_locations(remote_path)
        result = []

        for chunk_id, datanodes in chunk_locations:
            # Try each DataNode; use first available (closest)
            for datanode_addr in datanodes:
                try:
                    client = DataNodeClient(datanode_addr)
                    chunk_data = client.read_chunk(chunk_id)
                    result.append(chunk_data)
                    break
                except Exception:
                    continue  # Try next replica
            else:
                raise Exception(f"All replicas unavailable for chunk {chunk_id}")

        return b''.join(result)

Key Design Decisions

  • 64MB chunk size: Large chunks reduce NameNode memory (fewer entries), reduce network round-trips for sequential reads, but waste space for small files. HDFS small files problem: 1M small files = 1M NameNode entries (memory bottleneck).
  • Single NameNode: Simplifies consistency — all metadata operations serialized. SPOF addressed with HDFS HA (Active/Standby + shared edit log via JournalNodes/ZooKeeper). Google Colossus distributed this across multiple masters.
  • Chain replication: Client writes to DataNode 1, which forwards to DN2, which forwards to DN3. Ack flows back. vs Fan-out (client writes to all): chain is slower but uses less client bandwidth.
  • Checksums: Each chunk stored with checksum. On read, DataNode verifies — if mismatch, returns error, client tries next replica. Silent data corruption detected automatically.
  • Heartbeat-based failure detection: DataNodes not heard from in 10 minutes → marked dead → NameNode triggers re-replication of all chunks that were on dead node.

HDFS vs GFS vs S3

Property HDFS GFS/Colossus S3
Chunk size 128MB 64MB→variable N/A (object)
Consistency Strong (single master) Relaxed (concurrent appends) Eventually consistent→strong
Mutation model Append only Random write Put/replace object
Namespace Hierarchical Hierarchical Flat (bucket/key)
Use case MapReduce/Spark Google internal General object storage


{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is the difference between NameNode and DataNode in HDFS?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”NameNode is the master server storing file system metadata: directory tree, file-to-chunk mappings, and chunk-to-DataNode locations. It runs in memory for fast access and never stores actual file data. DataNodes are worker machines that store the actual file chunks (64MB each by default) on local disk. Clients contact NameNode to locate chunks, then communicate directly with DataNodes to read/write data. NameNode is a single point of failure addressed by HDFS High Availability: Active/Standby NameNode pair with shared edit log via JournalNodes, with ZooKeeper managing failover.”}},{“@type”:”Question”,”name”:”Why does HDFS use 64MB chunk size?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Large chunks (64MB) reduce NameNode memory usage — fewer chunk records to store (a 1TB file needs only ~16,384 entries vs millions with 1KB chunks). Large chunks also reduce overhead: sequential reads of a 1GB file need only ~16 network round-trips instead of millions. Tradeoff: small files waste storage (a 1KB file occupies a full 64MB chunk entry in NameNode). The HDFS small files problem: storing millions of tiny files overwhelms NameNode memory. Solutions: HAR (Hadoop Archive) to pack small files, or sequence files to combine many records into one HDFS file.”}},{“@type”:”Question”,”name”:”How does HDFS handle DataNode failures?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”DataNodes send heartbeats to NameNode every 3 seconds. If NameNode doesn’t receive a heartbeat for 10 minutes, the DataNode is declared dead. NameNode then: (1) Marks all chunks on the dead node as under-replicated, (2) Issues replication commands to other DataNodes to copy those chunks to new DataNodes to restore the replication factor of 3. Re-replication is prioritized: chunks with only 1 replica are replicated first. This ensures the cluster automatically recovers from single machine failures, disk failures, and even entire rack failures (with rack-aware placement: 2 replicas on different racks).”}},{“@type”:”Question”,”name”:”What is rack-aware replica placement in HDFS?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”HDFS default replica placement: (1) First replica: on the same rack as the writer client (minimize cross-rack traffic during write). (2) Second replica: on a different rack (fault isolation — rack failure won’t lose all copies). (3) Third replica: on the same rack as the second replica but different node (minimize cross-rack traffic for second copy). This policy balances write throughput (only one cross-rack hop during write) with fault tolerance (data survives complete rack failure — at least one replica always on a different rack).”}},{“@type”:”Question”,”name”:”How does chain replication work during HDFS writes?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”HDFS write pipeline (chain replication): (1) Client writes block to DataNode 1, (2) DataNode 1 forwards data to DataNode 2 while simultaneously writing to local disk, (3) DataNode 2 forwards to DataNode 3. Ack flows back in reverse: DN3 → DN2 → DN1 → Client. Only after all three nodes acknowledge does the write complete. Benefits: client uses only one cross-rack connection (vs fan-out where client would connect to all three nodes). If any node fails mid-write, the pipeline is repaired by the client working with NameNode to route around the failed node.”}}]}

Scroll to Top