Low Level Design: Batch Processing Framework

What Is a Batch Processing Framework?

A batch processing framework executes large-scale data transformation jobs over finite datasets on a scheduled or on-demand basis. Unlike stream processing, batch jobs run to completion and produce a discrete output. Canonical workloads include nightly ETL pipelines, report generation, ML feature engineering, log aggregation, and database export/import. The framework is responsible for partitioning input, coordinating workers, tracking progress, handling failures without re-processing completed work, and emitting results reliably.

Core Data Model

Job and Task Schema

CREATE TABLE batch_jobs (
    job_id          BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
    job_name        VARCHAR(256)    NOT NULL,
    job_class       VARCHAR(512)    NOT NULL,  -- fully qualified class/function name
    input_spec      JSON            NOT NULL,  -- source dataset descriptor
    output_spec     JSON            NOT NULL,  -- destination descriptor
    status          ENUM('pending','partitioning','running','paused','completed','failed') NOT NULL DEFAULT 'pending',
    priority        TINYINT UNSIGNED NOT NULL DEFAULT 5,
    max_retries     TINYINT UNSIGNED NOT NULL DEFAULT 3,
    timeout_secs    INT UNSIGNED    NOT NULL DEFAULT 3600,
    created_at      DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    started_at      DATETIME,
    completed_at    DATETIME,
    created_by      VARCHAR(128)    NOT NULL,
    PRIMARY KEY (job_id),
    INDEX idx_status_priority (status, priority, created_at)
);

CREATE TABLE batch_partitions (
    partition_id    BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
    job_id          BIGINT UNSIGNED NOT NULL,
    partition_index INT UNSIGNED    NOT NULL,
    input_range     JSON            NOT NULL,  -- e.g. {start_key, end_key} or {file_path, byte_offset, byte_length}
    status          ENUM('pending','assigned','running','completed','failed') NOT NULL DEFAULT 'pending',
    worker_id       VARCHAR(128),
    attempt_count   TINYINT UNSIGNED NOT NULL DEFAULT 0,
    records_in      BIGINT UNSIGNED NOT NULL DEFAULT 0,
    records_out     BIGINT UNSIGNED NOT NULL DEFAULT 0,
    error_msg       TEXT,
    assigned_at     DATETIME,
    completed_at    DATETIME,
    checkpoint_data JSON,  -- resumable state within a partition
    PRIMARY KEY (partition_id),
    UNIQUE INDEX idx_job_part (job_id, partition_index),
    INDEX idx_job_status (job_id, status),
    FOREIGN KEY (job_id) REFERENCES batch_jobs(job_id) ON DELETE CASCADE
);

CREATE TABLE batch_workers (
    worker_id       VARCHAR(128)    NOT NULL,  -- hostname:pid or UUID
    status          ENUM('idle','busy','draining','dead') NOT NULL DEFAULT 'idle',
    current_part_id BIGINT UNSIGNED,
    heartbeat_at    DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    started_at      DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    capacity        TINYINT UNSIGNED NOT NULL DEFAULT 1,
    PRIMARY KEY (worker_id)
);

CREATE TABLE batch_checkpoints (
    checkpoint_id   BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
    partition_id    BIGINT UNSIGNED NOT NULL,
    attempt         TINYINT UNSIGNED NOT NULL,
    cursor          JSON            NOT NULL,  -- last successfully processed record position
    records_done    BIGINT UNSIGNED NOT NULL DEFAULT 0,
    saved_at        DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (checkpoint_id),
    INDEX idx_part_attempt (partition_id, attempt),
    FOREIGN KEY (partition_id) REFERENCES batch_partitions(partition_id) ON DELETE CASCADE
);

Job Lifecycle and Workflow

A batch job moves through a well-defined lifecycle managed by a coordinator process:

  1. Submit — client inserts a row into batch_jobs with status 'pending' and an input_spec describing the dataset (e.g., S3 path, database table with date range, Kafka topic offsets).
  2. Partition — the coordinator picks up the job, sets status to 'partitioning', and splits the input into N roughly equal partitions. Each partition is a row in batch_partitions with status 'pending'.
  3. Dispatch — the coordinator polls for idle workers and assigns pending partitions via an UPDATE … WHERE status='pending' … LIMIT 1 with optimistic locking or SELECT FOR UPDATE to avoid double-assignment.
  4. Execute — each worker fetches its partition descriptor, reads the input range, processes records, and writes output. Progress is checkpointed periodically.
  5. Monitor — the coordinator checks for workers that have missed heartbeat_at by more than a threshold; their partitions are reassigned.
  6. Complete — when all partitions reach status 'completed', the coordinator marks the job completed and triggers any downstream notifications.

Partitioning Strategies

The partitioning strategy determines how evenly work is distributed and how much parallelism is achievable:

  • Range partitioning — split on a monotonic key (primary key, timestamp). Works well for ordered datasets. Risk: skew if key distribution is non-uniform (e.g., most records cluster in a recent time range).
  • Hash partitioning — hash(record_key) mod N. Uniform distribution. No ordering guarantees. Cannot be used when output must be sorted.
  • File-based partitioning — one partition per input file or per N bytes within a splittable file format (Parquet, ORC, text with newline boundaries). Simplest for file-system sources.
  • Dynamic partitioning — start with coarse partitions; if a partition takes too long or its size exceeds a threshold, split it at runtime. Adds complexity but handles skewed data naturally.

Target partition size: 100MB–1GB of input data or 60–300 seconds of processing time. Too small: scheduling overhead dominates. Too large: a single slow partition blocks job completion.

Worker Coordination

Workers compete for partitions through the coordinator. Two common patterns:

  • Centralized coordinator — a single coordinator process owns the dispatch loop. Simple, consistent, single-threaded scheduling logic. SPOF mitigated by running with a hot standby using leader election (ZooKeeper, etcd, or database advisory lock).
  • Pull-based workers — workers self-schedule by polling the coordinator's API for available partitions. The coordinator is a stateless API server; state lives in the DB. Scales to many workers without coordinator bottleneck.

Partition assignment pseudocode (atomic, handles concurrent workers):

BEGIN TRANSACTION;
SELECT partition_id FROM batch_partitions
WHERE job_id = ? AND status = 'pending'
ORDER BY partition_index
LIMIT 1
FOR UPDATE SKIP LOCKED;

UPDATE batch_partitions
SET status = 'assigned',
    worker_id = ?,
    assigned_at = NOW(),
    attempt_count = attempt_count + 1
WHERE partition_id = ?;
COMMIT;

SKIP LOCKED ensures multiple workers polling simultaneously each get a distinct partition without deadlocking.

Checkpoint-Based Fault Tolerance

A partition may contain millions of records. If a worker fails halfway through, re-processing from scratch wastes time and risks duplicate output. Checkpointing saves the last successfully committed cursor position:

function process_partition(partition):
    cursor = load_latest_checkpoint(partition.id)
    reader = open_input(partition.input_range, start_at=cursor)
    writer = open_output(partition)
    batch  = []

    for record in reader:
        batch.append(transform(record))
        if len(batch) >= CHECKPOINT_BATCH_SIZE:
            writer.write_batch(batch)
            save_checkpoint(partition.id, reader.current_cursor(), len(batch))
            batch = []
            update_heartbeat(worker_id)

    if batch:
        writer.write_batch(batch)
        save_checkpoint(partition.id, reader.current_cursor(), len(batch))

    mark_partition_complete(partition.id)

Checkpoint interval trade-off: checkpointing every record minimizes re-work but adds write overhead. Checkpointing every 10k–100k records is a typical balance. The checkpoint must be written atomically with (or before) the output flush to avoid the "processed but not checkpointed" state causing re-emission.

Output Commit Protocol

Partial output from a failed attempt must not contaminate the final result. Use a two-phase output strategy:

  1. Write to staging — each worker writes output to a partition-specific staging location (temp file, staging table, S3 prefix with attempt ID).
  2. Atomic rename/commit — only after the partition completes successfully does the coordinator (or worker) move/rename the output to the final location. S3 does not support atomic rename; instead, write a manifest file listing completed partition outputs that the downstream consumer reads.

This ensures exactly-once output semantics: a partition's output appears in the final location once and only once, even if the partition was attempted multiple times.

Retry Policies

Failed partitions are retried up to max_retries times with exponential backoff:

retry_delay_secs = base_delay * (2 ** attempt_count) + jitter(0, base_delay)
-- e.g., attempt 1: 30s, attempt 2: 90s, attempt 3: 210s

Failure categories and handling:

  • Transient (network timeout, throttling) — retry with backoff.
  • Data error (malformed record, schema mismatch) — write the record to a dead-letter partition, continue processing the rest. Do not fail the whole partition on one bad record.
  • Resource exhaustion (OOM, disk full) — fail the partition, alert on-call. Retry on a larger worker if available.
  • Poison pill (a record that always crashes the worker) — after N attempts on different workers, quarantine the partition range and alert.

Progress Tracking

Expose job progress as: (completed_partitions / total_partitions) * 100, and within a partition as (records_done / records_in) * 100. Update records_out on each checkpoint save. The coordinator aggregates partition-level metrics to produce job-level metrics exposed via an API and a dashboard. ETA estimation: (elapsed_time / completed_records) * remaining_records, smoothed with an exponential moving average to dampen noise from skewed partitions.

Failure Handling and Edge Cases

  • Worker crash without heartbeat — coordinator detects missed heartbeat (e.g., > 2 * heartbeat_interval seconds since last update). Marks partition as 'pending' for reassignment. Worker is marked 'dead'.
  • Coordinator failover — if the coordinator crashes, its standby takes over after winning leader election. Jobs in 'running' status are checked: alive workers continue; partitions with dead workers are reassigned.
  • Duplicate output on retry — if a worker completes a partition but crashes before updating the DB, the partition gets retried. The output commit protocol (staging + atomic rename) means the second attempt's output simply overwrites the staging location before commit, producing identical output. Idempotent transforms are required.
  • Input mutation during job — if the source dataset changes while a job is running, results may be inconsistent. Take a snapshot (read timestamp or S3 version ID) at job start and pin all reads to that snapshot.
  • Skewed partitions — one partition 10x larger than others becomes the job's tail latency. Detect via heartbeat-reported records_done rate; split the partition dynamically if it exceeds a size threshold after assignment.

Scalability Considerations

  • Partition count — scale partitions to match worker count * target_utilization. 1000 workers * 0.9 utilization = ~900 simultaneous partitions. More fine-grained partitions improve load balancing but increase coordinator overhead.
  • Coordinator throughput — a single coordinator can dispatch ~500 partitions/sec against a well-indexed DB. Beyond that, use a message queue (Kafka, SQS) as the dispatch channel: coordinator enqueues partition IDs, workers consume.
  • Database scalability — the batch_partitions table is the hot table. Index on (job_id, status) is critical. Archive completed partitions to a history table after job completion to keep the working set small.
  • Multi-cluster — for very large jobs (petabyte-scale), partition work across multiple compute clusters. Each cluster handles a subset of partitions; a global coordinator tracks cross-cluster progress.
  • Priority scheduling — jobs with higher priority get workers assigned first. Implement a weighted fair-share scheduler to prevent high-priority jobs from completely starving low-priority ones.

Summary

A batch processing framework reduces a large job to a set of independently executable partitions, assigns those partitions to a pool of workers, and ensures the job completes correctly even when individual workers fail. The three pillars are partitioning (dividing input evenly, adapting to skew), coordination (atomic partition assignment via SKIP LOCKED or a message queue, heartbeat-based failure detection), and fault tolerance (checkpoint-based resume within a partition, two-phase output commit for exactly-once semantics, retry policies with dead-lettering for poison pills). At scale, the coordinator becomes the bottleneck first; replacing the DB polling loop with a message queue decouples dispatch throughput from DB write capacity. Idempotent transforms and snapshot-isolated input reads are prerequisites for correct behavior in the face of retries and concurrent mutations.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top