Bulk Data Import System Low-Level Design
Bulk import is one of those features that looks straightforward until a user uploads a 500,000-row CSV with inconsistent date formats and your database connection pool exhausts itself. A well-designed bulk import system streams the file rather than loading it entirely into memory, validates rows individually with structured error accumulation, processes work asynchronously with real-time progress, and delivers partial success semantics: valid rows are committed, invalid rows are reported back to the user with line-level detail.
Streaming CSV Parse
Never load the entire file into memory. Python’s csv.reader is a lazy iterator, and reading from a streaming file handle means memory usage stays bounded regardless of file size:
import csv
import hashlib
import io
from typing import Iterator, Tuple, Dict, Any, List
def parse_csv_stream(
file_obj, # file-like object (S3 streaming body, uploaded file, etc.)
expected_headers: List[str]
) -> Iterator[Tuple[int, Dict[str, str], str]]:
"""
Yields (row_number, row_dict, row_hash) for each data row.
row_hash is SHA-256 of the raw CSV line for idempotency.
Raises ValueError on header mismatch.
"""
reader = csv.DictReader(io.TextIOWrapper(file_obj, encoding='utf-8-sig'))
# Normalize header names: strip whitespace, lowercase
reader.fieldnames = [h.strip().lower() for h in (reader.fieldnames or [])]
missing = set(expected_headers) - set(reader.fieldnames)
if missing:
raise ValueError(f"CSV missing required columns: {missing}")
for row_num, raw_row in enumerate(reader, start=2): # row 1 is header
row = {k.strip().lower(): (v or '').strip() for k, v in raw_row.items()}
raw_line = ','.join(raw_row.values()).encode('utf-8')
row_hash = hashlib.sha256(raw_line).hexdigest()
yield row_num, row, row_hash
Per-Row Validation with Error Accumulation
Fail fast per-row, not globally. Collect all errors so the user can fix everything in one round-trip:
import re
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RowError:
row_number: int
column: str
message: str
@dataclass
class ValidationResult:
valid: bool
errors: List[RowError] = field(default_factory=list)
EMAIL_RE = re.compile(r'^[^@s]+@[^@s]+.[^@s]+$')
def validate_user_row(row_number: int, row: Dict[str, str]) -> ValidationResult:
errors = []
email = row.get('email', '')
if not email:
errors.append(RowError(row_number, 'email', 'Email is required'))
elif not EMAIL_RE.match(email):
errors.append(RowError(row_number, 'email', f'Invalid email: {email!r}'))
name = row.get('name', '')
if not name:
errors.append(RowError(row_number, 'name', 'Name is required'))
elif len(name) > 255:
errors.append(RowError(row_number, 'name', 'Name exceeds 255 characters'))
dob = row.get('date_of_birth', '')
if dob:
try:
from datetime import datetime
datetime.strptime(dob, '%Y-%m-%d')
except ValueError:
errors.append(RowError(row_number, 'date_of_birth',
f'Invalid date format (expected YYYY-MM-DD): {dob!r}'))
return ValidationResult(valid=len(errors) == 0, errors=errors)
SQL Schema
CREATE TABLE import_job (
id BIGSERIAL PRIMARY KEY,
import_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
user_id BIGINT NOT NULL,
filename TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','parsing','processing','completed','failed')),
total_rows INT NOT NULL DEFAULT 0,
valid_rows INT NOT NULL DEFAULT 0,
failed_rows INT NOT NULL DEFAULT 0,
processed_rows INT NOT NULL DEFAULT 0,
s3_key TEXT, -- original file location
error_report_key TEXT, -- S3 key for error CSV download
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE TABLE import_row (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
row_number INT NOT NULL,
row_hash TEXT NOT NULL, -- SHA-256 of raw CSV line
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','valid','failed','skipped')),
payload JSONB NOT NULL, -- normalized row dict
target_id BIGINT, -- PK of the created/updated record
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (job_id, row_hash) -- idempotency: skip duplicate rows
);
CREATE INDEX idx_ir_job_status ON import_row (job_id, status);
CREATE TABLE import_error (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
row_id BIGINT REFERENCES import_row(id),
row_number INT NOT NULL,
column_name TEXT,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_ie_job ON import_error (job_id);
Async Job Processing with Progress Updates
import time
from typing import Callable
BATCH_SIZE = 200 # rows per DB transaction
def process_import_job(
db,
job_id: int,
insert_fn: Callable[[Dict], int], # domain-specific insert; returns new record PK
progress_fn: Callable[[int, int], None] = None # optional websocket/SSE callback
) -> None:
"""
Processes pending import_row records for job_id.
Commits each batch independently for partial success semantics.
"""
db.execute(
"UPDATE import_job SET status='processing' WHERE id=%s", (job_id,)
)
db.commit()
offset = 0
while True:
rows = db.fetchall(
"""SELECT * FROM import_row
WHERE job_id=%s AND status='pending'
ORDER BY row_number
LIMIT %s""",
(job_id, BATCH_SIZE)
)
if not rows:
break
for row in rows:
try:
target_id = insert_fn(row['payload'])
db.execute(
"UPDATE import_row SET status='valid', target_id=%s WHERE id=%s",
(target_id, row['id'])
)
db.execute(
"UPDATE import_job SET valid_rows=valid_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
(job_id,)
)
except Exception as exc:
db.execute(
"UPDATE import_row SET status='failed' WHERE id=%s", (row['id'],)
)
db.execute(
"""INSERT INTO import_error (job_id, row_id, row_number, message)
VALUES (%s, %s, %s, %s)""",
(job_id, row['id'], row['row_number'], str(exc))
)
db.execute(
"UPDATE import_job SET failed_rows=failed_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
(job_id,)
)
db.commit() # commit each batch; partial progress is preserved on crash
if progress_fn:
total = db.fetchone("SELECT total_rows, processed_rows FROM import_job WHERE id=%s", (job_id,))
progress_fn(total['processed_rows'], total['total_rows'])
offset += BATCH_SIZE
# Mark completed
db.execute(
"UPDATE import_job SET status='completed', completed_at=NOW() WHERE id=%s",
(job_id,)
)
db.commit()
Error Report Generation
After processing, generate a downloadable error CSV and upload to S3 so the user can fix and re-upload:
import boto3, gzip, io, csv
def generate_error_report(db, s3, job_id: int, bucket: str) -> str:
errors = db.fetchall(
"""SELECT ie.row_number, ie.column_name, ie.message, ir.payload
FROM import_error ie
JOIN import_row ir ON ir.id = ie.row_id
WHERE ie.job_id = %s
ORDER BY ie.row_number""",
(job_id,)
)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['row_number', 'column', 'error', 'original_data'])
for e in errors:
writer.writerow([e['row_number'], e['column_name'] or '', e['message'],
json.dumps(e['payload'])])
key = f"import-errors/{job_id}/errors.csv"
s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode('utf-8'),
ContentType='text/csv')
db.execute("UPDATE import_job SET error_report_key=%s WHERE id=%s", (key, job_id))
db.commit()
return key
See also: Shopify Interview Guide
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture