Bulk Data Import System Low-Level Design: CSV Parsing, Validation, and Async Processing

Bulk Data Import System Low-Level Design

Bulk import is one of those features that looks straightforward until a user uploads a 500,000-row CSV with inconsistent date formats and your database connection pool exhausts itself. A well-designed bulk import system streams the file rather than loading it entirely into memory, validates rows individually with structured error accumulation, processes work asynchronously with real-time progress, and delivers partial success semantics: valid rows are committed, invalid rows are reported back to the user with line-level detail.

Streaming CSV Parse

Never load the entire file into memory. Python’s csv.reader is a lazy iterator, and reading from a streaming file handle means memory usage stays bounded regardless of file size:

import csv
import hashlib
import io
from typing import Iterator, Tuple, Dict, Any, List

def parse_csv_stream(
    file_obj,               # file-like object (S3 streaming body, uploaded file, etc.)
    expected_headers: List[str]
) -> Iterator[Tuple[int, Dict[str, str], str]]:
    """
    Yields (row_number, row_dict, row_hash) for each data row.
    row_hash is SHA-256 of the raw CSV line for idempotency.
    Raises ValueError on header mismatch.
    """
    reader = csv.DictReader(io.TextIOWrapper(file_obj, encoding='utf-8-sig'))

    # Normalize header names: strip whitespace, lowercase
    reader.fieldnames = [h.strip().lower() for h in (reader.fieldnames or [])]

    missing = set(expected_headers) - set(reader.fieldnames)
    if missing:
        raise ValueError(f"CSV missing required columns: {missing}")

    for row_num, raw_row in enumerate(reader, start=2):  # row 1 is header
        row = {k.strip().lower(): (v or '').strip() for k, v in raw_row.items()}
        raw_line = ','.join(raw_row.values()).encode('utf-8')
        row_hash = hashlib.sha256(raw_line).hexdigest()
        yield row_num, row, row_hash

Per-Row Validation with Error Accumulation

Fail fast per-row, not globally. Collect all errors so the user can fix everything in one round-trip:

import re
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RowError:
    row_number: int
    column:     str
    message:    str

@dataclass
class ValidationResult:
    valid:  bool
    errors: List[RowError] = field(default_factory=list)

EMAIL_RE = re.compile(r'^[^@s]+@[^@s]+.[^@s]+$')

def validate_user_row(row_number: int, row: Dict[str, str]) -> ValidationResult:
    errors = []

    email = row.get('email', '')
    if not email:
        errors.append(RowError(row_number, 'email', 'Email is required'))
    elif not EMAIL_RE.match(email):
        errors.append(RowError(row_number, 'email', f'Invalid email: {email!r}'))

    name = row.get('name', '')
    if not name:
        errors.append(RowError(row_number, 'name', 'Name is required'))
    elif len(name) > 255:
        errors.append(RowError(row_number, 'name', 'Name exceeds 255 characters'))

    dob = row.get('date_of_birth', '')
    if dob:
        try:
            from datetime import datetime
            datetime.strptime(dob, '%Y-%m-%d')
        except ValueError:
            errors.append(RowError(row_number, 'date_of_birth',
                                   f'Invalid date format (expected YYYY-MM-DD): {dob!r}'))

    return ValidationResult(valid=len(errors) == 0, errors=errors)

SQL Schema

CREATE TABLE import_job (
    id              BIGSERIAL PRIMARY KEY,
    import_id       UUID        NOT NULL UNIQUE DEFAULT gen_random_uuid(),
    user_id         BIGINT      NOT NULL,
    filename        TEXT        NOT NULL,
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','parsing','processing','completed','failed')),
    total_rows      INT         NOT NULL DEFAULT 0,
    valid_rows      INT         NOT NULL DEFAULT 0,
    failed_rows     INT         NOT NULL DEFAULT 0,
    processed_rows  INT         NOT NULL DEFAULT 0,
    s3_key          TEXT,                          -- original file location
    error_report_key TEXT,                         -- S3 key for error CSV download
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ
);

CREATE TABLE import_row (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
    row_number  INT         NOT NULL,
    row_hash    TEXT        NOT NULL,             -- SHA-256 of raw CSV line
    status      TEXT        NOT NULL DEFAULT 'pending'
                            CHECK (status IN ('pending','valid','failed','skipped')),
    payload     JSONB       NOT NULL,             -- normalized row dict
    target_id   BIGINT,                           -- PK of the created/updated record
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (job_id, row_hash)                     -- idempotency: skip duplicate rows
);

CREATE INDEX idx_ir_job_status ON import_row (job_id, status);

CREATE TABLE import_error (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
    row_id      BIGINT      REFERENCES import_row(id),
    row_number  INT         NOT NULL,
    column_name TEXT,
    message     TEXT        NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_ie_job ON import_error (job_id);

Async Job Processing with Progress Updates

import time
from typing import Callable

BATCH_SIZE = 200   # rows per DB transaction

def process_import_job(
    db,
    job_id: int,
    insert_fn: Callable[[Dict], int],   # domain-specific insert; returns new record PK
    progress_fn: Callable[[int, int], None] = None   # optional websocket/SSE callback
) -> None:
    """
    Processes pending import_row records for job_id.
    Commits each batch independently for partial success semantics.
    """
    db.execute(
        "UPDATE import_job SET status='processing' WHERE id=%s", (job_id,)
    )
    db.commit()

    offset = 0
    while True:
        rows = db.fetchall(
            """SELECT * FROM import_row
               WHERE job_id=%s AND status='pending'
               ORDER BY row_number
               LIMIT %s""",
            (job_id, BATCH_SIZE)
        )
        if not rows:
            break

        for row in rows:
            try:
                target_id = insert_fn(row['payload'])
                db.execute(
                    "UPDATE import_row SET status='valid', target_id=%s WHERE id=%s",
                    (target_id, row['id'])
                )
                db.execute(
                    "UPDATE import_job SET valid_rows=valid_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
                    (job_id,)
                )
            except Exception as exc:
                db.execute(
                    "UPDATE import_row SET status='failed' WHERE id=%s", (row['id'],)
                )
                db.execute(
                    """INSERT INTO import_error (job_id, row_id, row_number, message)
                       VALUES (%s, %s, %s, %s)""",
                    (job_id, row['id'], row['row_number'], str(exc))
                )
                db.execute(
                    "UPDATE import_job SET failed_rows=failed_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
                    (job_id,)
                )

        db.commit()   # commit each batch; partial progress is preserved on crash

        if progress_fn:
            total = db.fetchone("SELECT total_rows, processed_rows FROM import_job WHERE id=%s", (job_id,))
            progress_fn(total['processed_rows'], total['total_rows'])

        offset += BATCH_SIZE

    # Mark completed
    db.execute(
        "UPDATE import_job SET status='completed', completed_at=NOW() WHERE id=%s",
        (job_id,)
    )
    db.commit()

Error Report Generation

After processing, generate a downloadable error CSV and upload to S3 so the user can fix and re-upload:

import boto3, gzip, io, csv

def generate_error_report(db, s3, job_id: int, bucket: str) -> str:
    errors = db.fetchall(
        """SELECT ie.row_number, ie.column_name, ie.message, ir.payload
           FROM import_error ie
           JOIN import_row ir ON ir.id = ie.row_id
           WHERE ie.job_id = %s
           ORDER BY ie.row_number""",
        (job_id,)
    )
    buf = io.StringIO()
    writer = csv.writer(buf)
    writer.writerow(['row_number', 'column', 'error', 'original_data'])
    for e in errors:
        writer.writerow([e['row_number'], e['column_name'] or '', e['message'],
                         json.dumps(e['payload'])])
    key = f"import-errors/{job_id}/errors.csv"
    s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode('utf-8'),
                  ContentType='text/csv')
    db.execute("UPDATE import_job SET error_report_key=%s WHERE id=%s", (key, job_id))
    db.commit()
    return key

See also: Shopify Interview Guide

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top