Bulk Data Import System Low-Level Design
Bulk import is one of those features that looks straightforward until a user uploads a 500,000-row CSV with inconsistent date formats and your database connection pool exhausts itself. A well-designed bulk import system streams the file rather than loading it entirely into memory, validates rows individually with structured error accumulation, processes work asynchronously with real-time progress, and delivers partial success semantics: valid rows are committed, invalid rows are reported back to the user with line-level detail.
Streaming CSV Parse
Never load the entire file into memory. Python’s csv.reader is a lazy iterator, and reading from a streaming file handle means memory usage stays bounded regardless of file size:
import csv
import hashlib
import io
from typing import Iterator, Tuple, Dict, Any, List
def parse_csv_stream(
file_obj, # file-like object (S3 streaming body, uploaded file, etc.)
expected_headers: List[str]
) -> Iterator[Tuple[int, Dict[str, str], str]]:
"""
Yields (row_number, row_dict, row_hash) for each data row.
row_hash is SHA-256 of the raw CSV line for idempotency.
Raises ValueError on header mismatch.
"""
reader = csv.DictReader(io.TextIOWrapper(file_obj, encoding='utf-8-sig'))
# Normalize header names: strip whitespace, lowercase
reader.fieldnames = [h.strip().lower() for h in (reader.fieldnames or [])]
missing = set(expected_headers) - set(reader.fieldnames)
if missing:
raise ValueError(f"CSV missing required columns: {missing}")
for row_num, raw_row in enumerate(reader, start=2): # row 1 is header
row = {k.strip().lower(): (v or '').strip() for k, v in raw_row.items()}
raw_line = ','.join(raw_row.values()).encode('utf-8')
row_hash = hashlib.sha256(raw_line).hexdigest()
yield row_num, row, row_hash
Per-Row Validation with Error Accumulation
Fail fast per-row, not globally. Collect all errors so the user can fix everything in one round-trip:
import re
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class RowError:
row_number: int
column: str
message: str
@dataclass
class ValidationResult:
valid: bool
errors: List[RowError] = field(default_factory=list)
EMAIL_RE = re.compile(r'^[^@s]+@[^@s]+.[^@s]+$')
def validate_user_row(row_number: int, row: Dict[str, str]) -> ValidationResult:
errors = []
email = row.get('email', '')
if not email:
errors.append(RowError(row_number, 'email', 'Email is required'))
elif not EMAIL_RE.match(email):
errors.append(RowError(row_number, 'email', f'Invalid email: {email!r}'))
name = row.get('name', '')
if not name:
errors.append(RowError(row_number, 'name', 'Name is required'))
elif len(name) > 255:
errors.append(RowError(row_number, 'name', 'Name exceeds 255 characters'))
dob = row.get('date_of_birth', '')
if dob:
try:
from datetime import datetime
datetime.strptime(dob, '%Y-%m-%d')
except ValueError:
errors.append(RowError(row_number, 'date_of_birth',
f'Invalid date format (expected YYYY-MM-DD): {dob!r}'))
return ValidationResult(valid=len(errors) == 0, errors=errors)
SQL Schema
CREATE TABLE import_job (
id BIGSERIAL PRIMARY KEY,
import_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
user_id BIGINT NOT NULL,
filename TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','parsing','processing','completed','failed')),
total_rows INT NOT NULL DEFAULT 0,
valid_rows INT NOT NULL DEFAULT 0,
failed_rows INT NOT NULL DEFAULT 0,
processed_rows INT NOT NULL DEFAULT 0,
s3_key TEXT, -- original file location
error_report_key TEXT, -- S3 key for error CSV download
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE TABLE import_row (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
row_number INT NOT NULL,
row_hash TEXT NOT NULL, -- SHA-256 of raw CSV line
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','valid','failed','skipped')),
payload JSONB NOT NULL, -- normalized row dict
target_id BIGINT, -- PK of the created/updated record
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (job_id, row_hash) -- idempotency: skip duplicate rows
);
CREATE INDEX idx_ir_job_status ON import_row (job_id, status);
CREATE TABLE import_error (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES import_job(id) ON DELETE CASCADE,
row_id BIGINT REFERENCES import_row(id),
row_number INT NOT NULL,
column_name TEXT,
message TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_ie_job ON import_error (job_id);
Async Job Processing with Progress Updates
import time
from typing import Callable
BATCH_SIZE = 200 # rows per DB transaction
def process_import_job(
db,
job_id: int,
insert_fn: Callable[[Dict], int], # domain-specific insert; returns new record PK
progress_fn: Callable[[int, int], None] = None # optional websocket/SSE callback
) -> None:
"""
Processes pending import_row records for job_id.
Commits each batch independently for partial success semantics.
"""
db.execute(
"UPDATE import_job SET status='processing' WHERE id=%s", (job_id,)
)
db.commit()
offset = 0
while True:
rows = db.fetchall(
"""SELECT * FROM import_row
WHERE job_id=%s AND status='pending'
ORDER BY row_number
LIMIT %s""",
(job_id, BATCH_SIZE)
)
if not rows:
break
for row in rows:
try:
target_id = insert_fn(row['payload'])
db.execute(
"UPDATE import_row SET status='valid', target_id=%s WHERE id=%s",
(target_id, row['id'])
)
db.execute(
"UPDATE import_job SET valid_rows=valid_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
(job_id,)
)
except Exception as exc:
db.execute(
"UPDATE import_row SET status='failed' WHERE id=%s", (row['id'],)
)
db.execute(
"""INSERT INTO import_error (job_id, row_id, row_number, message)
VALUES (%s, %s, %s, %s)""",
(job_id, row['id'], row['row_number'], str(exc))
)
db.execute(
"UPDATE import_job SET failed_rows=failed_rows+1, processed_rows=processed_rows+1 WHERE id=%s",
(job_id,)
)
db.commit() # commit each batch; partial progress is preserved on crash
if progress_fn:
total = db.fetchone("SELECT total_rows, processed_rows FROM import_job WHERE id=%s", (job_id,))
progress_fn(total['processed_rows'], total['total_rows'])
offset += BATCH_SIZE
# Mark completed
db.execute(
"UPDATE import_job SET status='completed', completed_at=NOW() WHERE id=%s",
(job_id,)
)
db.commit()
Error Report Generation
After processing, generate a downloadable error CSV and upload to S3 so the user can fix and re-upload:
import boto3, gzip, io, csv
def generate_error_report(db, s3, job_id: int, bucket: str) -> str:
errors = db.fetchall(
"""SELECT ie.row_number, ie.column_name, ie.message, ir.payload
FROM import_error ie
JOIN import_row ir ON ir.id = ie.row_id
WHERE ie.job_id = %s
ORDER BY ie.row_number""",
(job_id,)
)
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(['row_number', 'column', 'error', 'original_data'])
for e in errors:
writer.writerow([e['row_number'], e['column_name'] or '', e['message'],
json.dumps(e['payload'])])
key = f"import-errors/{job_id}/errors.csv"
s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue().encode('utf-8'),
ContentType='text/csv')
db.execute("UPDATE import_job SET error_report_key=%s WHERE id=%s", (key, job_id))
db.commit()
return key
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How do you handle duplicate rows in a bulk import without re-importing the entire file?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Hash each raw CSV line with SHA-256 and store the hash in import_row with a UNIQUE constraint on (job_id, row_hash). If a row with that hash already exists, the INSERT is skipped via ON CONFLICT DO NOTHING. This makes re-processing idempotent: re-uploading the same file after a partial failure only processes previously unprocessed rows.”
}
},
{
“@type”: “Question”,
“name”: “What does partial success mean in a bulk import and how do you implement it?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Partial success means valid rows are committed to the database even when other rows in the same file fail validation or insertion. Implement it by committing each processing batch independently in separate transactions. Mark failed rows with status=’failed’ and record detailed errors in import_error without rolling back previously committed valid rows.”
}
},
{
“@type”: “Question”,
“name”: “How do you report import progress to the user in real time?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Store processed_rows and total_rows on the import_job record. After each committed batch, update these counters. The client polls a GET /import/{job_id}/status endpoint or receives updates via Server-Sent Events (SSE). The processed_rows / total_rows ratio gives an accurate progress percentage without the overhead of a separate progress table.”
}
},
{
“@type”: “Question”,
“name”: “How should large CSV files be uploaded before async processing begins?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Use a two-step flow: first create an import_job record and get a presigned S3 PUT URL; the client uploads directly to S3. Once the upload completes, the client calls a confirm endpoint which enqueues the processing job. This keeps large file bytes off your application servers and lets S3 handle the upload reliability.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is streaming CSV parsing implemented?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “csv.reader wraps a file-like object from S3 or a direct upload stream; rows are processed one at a time without loading the entire file into memory.”
}
},
{
“@type”: “Question”,
“name”: “How does partial success work in bulk import?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Valid rows are committed in batches; invalid rows are accumulated in ImportError with row number and error message, allowing the job to complete with a partial result.”
}
},
{
“@type”: “Question”,
“name”: “How is import idempotency guaranteed?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each row is hashed (import_id + row content); rows with an existing hash are skipped, allowing safe re-submission of partially failed imports.”
}
},
{
“@type”: “Question”,
“name”: “How is import progress reported?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The ImportJob row stores processed_rows and total_rows; the API polls this row to return percentage progress to the client.”
}
}
]
}
See also: Shopify Interview Guide
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture