Bulk Data Import System Low-Level Design: CSV Parsing, Validation, and Async Processing

Bulk Data Import System Low-Level Design

Bulk import is one of those features that looks straightforward until a user uploads a 500,000-row CSV with inconsistent date formats and your database connection pool exhausts itself. A well-designed bulk import system streams the file rather than loading it entirely into memory, validates rows individually with structured error accumulation, processes work asynchronously with real-time progress, and delivers partial success semantics: valid rows are committed, invalid rows are reported back to the user with line-level detail.

Streaming CSV Parse

Never load the entire file into memory. Python’s csv.reader is a lazy iterator, and reading from a streaming file handle means memory usage stays bounded regardless of file size:

import csv
import hashlib
import io
from typing import Iterator, Tuple, Dict, Any, List

def parse_csv_stream(
    file_obj,               # file-like object (S3 streaming body, uploaded file, etc.)
    expected_headers: List[str]
) -> Iterator[Tuple[int, Dict[str, str], str]]:
    """
    Yields (row_number, row_dict, row_hash) for each data row.
    row_hash is SHA-256 of the raw CSV line for idempotency.
    Raises ValueError on header mismatch.
    """
    reader = csv.DictReader(io.TextIOWrapper(file_obj, encoding='utf-8-sig'))

    # Normalize header names: strip whitespace, lowercase
    reader.fieldnames = [h.strip().lower() for h in (reader.fieldnames or [])]

    missing = set(expected_headers) - set(reader.fieldnames)
    if missing:
        raise ValueError(f"CSV missing required columns: {missing}")

    for row_num, raw_row in enumerate(reader, start=2):  # row 1 is header
        row = {k.strip().lower(): (v or '').strip() for k, v in raw_row.items()}
        raw_line = ','.join(raw_row.values()).encode('utf-8')
        row_hash = hashlib.sha256(raw_line).hexdigest()
        yield row_num, row, row_hash

Per-Row Validation with Error Accumulation

Fail fast per-row, not globally. Collect all errors so the user can fix everything in one round-trip:

import re
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class RowError:
    row_number: int
    column:     str
    message:    str

@dataclass
class ValidationResult:
    valid:  bool
    errors: List[RowError] = field(default_factory=list)

EMAIL_RE = re.compile(r'^[^@s]+@[^@s]+.[^@s]+$')

def validate_user_row(row_number: int, row: Dict[str, str]) -> ValidationResult:
    errors = []

    email = row.get('email', '')
    if not email:
        errors.append(RowError(row_number, 'email', 'Email is required'))
    elif not EMAIL_RE.match(email):
        errors.append(RowError(row_number, 'email', f'Invalid email: {email!r}'))

    name = row.get('name', '')
    if not name:
        errors.append(RowError(row_number, 'name', 'Name is required'))
    elif len(name) > 255:
        errors.append(RowError(row_number, 'name', 'Name exceeds 255 characters'))

    dob = row.get('date_of_birth', '')
    if dob:
        try:
            from datetime import datetime
            datetime.strptime(dob, '%Y-%m-%d')
        except ValueError:
            errors.append(RowError(row_number, 'date_of_birth',
                                   f'Invalid date format (expected YYYY-MM-DD): {dob!r}'))

    return ValidationResult(valid=len(errors) == 0, errors=errors)

SQL Schema

CREATE TABLE import_job (
    id              BIGSERIAL PRIMARY KEY,
    import_id       UUID        NOT NULL UNIQUE DEFAULT gen_random_uuid(),
    user_id         BIGINT      NOT NULL,
    filename        TEXT        NOT NULL,
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','parsing','processing','completed','failed')),
    total_rows      INT         NOT NULL DEFAULT 0,
    valid_rows      INT         NOT NULL DEFAULT 0,
    failed_rows     INT         NOT NULL DEFAULT 0,
    processed_rows  INT         NOT NULL DEFAULT 0,
    s3_key          TEXT,                          -- original file location
    error_report_key TEXT,                         -- S3 key for error CSV download
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ
);

CREATE TABLE import_row (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
    row_number  INT         NOT NULL,
    row_hash    TEXT        NOT NULL,             -- SHA-256 of raw CSV line
    status      TEXT        NOT NULL DEFAULT 'pending'
                            CHECK (status IN ('pending','valid','failed','skipped')),
    payload     JSONB       NOT NULL,             -- normalized row dict
    target_id   BIGINT,                           -- PK of the created/updated record
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (job_id, row_hash)                     -- idempotency: skip duplicate rows
);

CREATE INDEX idx_ir_job_status ON import_row (job_id, status);

CREATE TABLE import_error (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
    row_id      BIGINT      REFERENCES import_row(id),
    row_number  INT         NOT NULL,
    column_name TEXT,
    message     TEXT        NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_ie_job ON import_error (job_id);

Async Job Processing with Progress Updates

import time
from typing import Callable

BATCH_SIZE = 200   # rows per DB transaction

def process_import_job(
    db,
    job_id: int,
    insert_fn: Callable[[Dict], int],   # domain-specific insert; returns new record PK
    progress_fn: Callable[[int, int], None] = None   # optional websocket/SSE callback
) -> None:
    """
    Processes pending import_row records for job_id.
    Commits each batch independently for partial success semantics.
    """
    db.execute(
        "UPDATE import_job SET status='processing' WHERE id=%s", (job_id,)
    )
    db.commit()

    offset = 0
    while True:
        rows = db.fetchall(
            """SELECT * FROM import_row
               WHERE job_id=%s AND status='pending'
               ORDER BY row_number
               LIMIT %s""",
            (job_id, BATCH_SIZE)
        )
        if not rows:
            break

        for row in rows:
            try:
                target_id = insert_fn(row['payload'])
                db.execute(
                    "UPDATE import_row SET status='valid', target_id=%s WHERE id=%s",
                    (target_id, row['id'])
                )
                db.execute(
                    "UPDATE import_job SET valid_rows=valid_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
                    (job_id,)
                )
            except Exception as exc:
                db.execute(
                    "UPDATE import_row SET status='failed' WHERE id=%s", (row['id'],)
                )
                db.execute(
                    """INSERT INTO import_error (job_id, row_id, row_number, message)
                       VALUES (%s, %s, %s, %s)""",
                    (job_id, row['id'], row['row_number'], str(exc))
                )
                db.execute(
                    "UPDATE import_job SET failed_rows=failed_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
                    (job_id,)
                )

        db.commit()   # commit each batch; partial progress is preserved on crash

        if progress_fn:
            total = db.fetchone("SELECT total_rows, processed_rows FROM import_job WHERE id=%s", (job_id,))
            progress_fn(total['processed_rows'], total['total_rows'])

        offset += BATCH_SIZE

    # Mark completed
    db.execute(
        "UPDATE import_job SET status='completed', completed_at=NOW() WHERE id=%s",
        (job_id,)
    )
    db.commit()

Error Report Generation

After processing, generate a downloadable error CSV and upload to S3 so the user can fix and re-upload:

import boto3, gzip, io, csv

def generate_error_report(db, s3, job_id: int, bucket: str) -> str:
    errors = db.fetchall(
        """SELECT ie.row_number, ie.column_name, ie.message, ir.payload
           FROM import_error ie
           JOIN import_row ir ON ir.id = ie.row_id
           WHERE ie.job_id = %s
           ORDER BY ie.row_number""",
        (job_id,)
    )
    buf = io.StringIO()
    writer = csv.writer(buf)
    writer.writerow(['row_number', 'column', 'error', 'original_data'])
    for e in errors:
        writer.writerow([e['row_number'], e['column_name'] or '', e['message'],
                         json.dumps(e['payload'])])
    key = f"import-errors/{job_id}/errors.csv"
    s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode('utf-8'),
                  ContentType='text/csv')
    db.execute("UPDATE import_job SET error_report_key=%s WHERE id=%s", (key, job_id))
    db.commit()
    return key

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do you handle duplicate rows in a bulk import without re-importing the entire file?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Hash each raw CSV line with SHA-256 and store the hash in import_row with a UNIQUE constraint on (job_id, row_hash). If a row with that hash already exists, the INSERT is skipped via ON CONFLICT DO NOTHING. This makes re-processing idempotent: re-uploading the same file after a partial failure only processes previously unprocessed rows.”
}
},
{
“@type”: “Question”,
“name”: “What does partial success mean in a bulk import and how do you implement it?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Partial success means valid rows are committed to the database even when other rows in the same file fail validation or insertion. Implement it by committing each processing batch independently in separate transactions. Mark failed rows with status=’failed’ and record detailed errors in import_error without rolling back previously committed valid rows.”
}
},
{
“@type”: “Question”,
“name”: “How do you report import progress to the user in real time?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Store processed_rows and total_rows on the import_job record. After each committed batch, update these counters. The client polls a GET /import/{job_id}/status endpoint or receives updates via Server-Sent Events (SSE). The processed_rows / total_rows ratio gives an accurate progress percentage without the overhead of a separate progress table.”
}
},
{
“@type”: “Question”,
“name”: “How should large CSV files be uploaded before async processing begins?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Use a two-step flow: first create an import_job record and get a presigned S3 PUT URL; the client uploads directly to S3. Once the upload completes, the client calls a confirm endpoint which enqueues the processing job. This keeps large file bytes off your application servers and lets S3 handle the upload reliability.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is streaming CSV parsing implemented?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “csv.reader wraps a file-like object from S3 or a direct upload stream; rows are processed one at a time without loading the entire file into memory.”
}
},
{
“@type”: “Question”,
“name”: “How does partial success work in bulk import?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Valid rows are committed in batches; invalid rows are accumulated in ImportError with row number and error message, allowing the job to complete with a partial result.”
}
},
{
“@type”: “Question”,
“name”: “How is import idempotency guaranteed?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each row is hashed (import_id + row content); rows with an existing hash are skipped, allowing safe re-submission of partially failed imports.”
}
},
{
“@type”: “Question”,
“name”: “How is import progress reported?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The ImportJob row stores processed_rows and total_rows; the API polls this row to return percentage progress to the client.”
}
}
]
}

See also: Shopify Interview Guide

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

Scroll to Top