Data Import Service Low-Level Design: File Validation, Streaming Parse, and Idempotent Upsert

Data Import Service Low-Level Design

A data import service ingests user-supplied files into the system's database. The main challenges are: handling large files without memory exhaustion, validating data quality before committing, deduplicating records on retry, and giving users actionable error feedback without hiding problems.

Import Job Schema

  • job_id — UUID
  • user_id — owner
  • file_s3_key — location of the uploaded file
  • entity_type — target table or entity (e.g., products, contacts)
  • mapping_config — JSON: maps file columns to database fields
  • statusUPLOADEDVALIDATINGPROCESSINGCOMPLETED | FAILED
  • total_rows, processed_rows, error_rows, created_at

Upload Flow

Never upload files through the application server — it adds latency and consumes server memory. Instead:

  1. Client requests a presigned S3 PUT URL from the API
  2. Client uploads the file directly to S3 (bypasses the application server entirely)
  3. Client notifies the API of upload completion, providing the S3 key
  4. API creates the import job record and enqueues processing

Validation Phase

Before processing any data rows, stream-parse the file to validate structure:

  • Check that required column headers are present and correctly named
  • Verify the file is parseable (encoding, delimiter, valid JSON if applicable)
  • Validate the mapping_config references columns that exist in the file

Structural errors are reported immediately with status FAILED before any rows are written. This saves time when the user uploaded the wrong file.

Row Processing

Stream rows from S3 in chunks of 1,000. For each chunk:

  1. Parse — apply column mapping, type coercion (string to int, date parsing)
  2. Validate — check required fields, value ranges, format constraints (email regex, positive integer)
  3. Dedup lookup — query by external_id
  4. Upsert — INSERT or UPDATE based on lookup result
  5. Collect errors — accumulate row-level errors without stopping

Deduplication

Each importable entity has an external_id field — a stable identifier from the user's source system. On every row, look up an existing record by external_id:

  • Found: UPDATE the existing record with new field values
  • Not found: INSERT a new record

This makes imports idempotent. Re-running the same file after a worker crash produces no duplicates — all UPDATEs on previously inserted rows are safe to repeat.

Row-Level Error Reporting

A row validation failure does not stop the import. The worker records the error with:

  • Row number in the original file
  • Field name that failed
  • Error message and the invalid value

Processing continues for subsequent rows. At completion, the job summary includes total errors and a downloadable error report CSV so users can fix only the failed rows and re-import.

Error Threshold and Abort

If the error rate exceeds 20% of processed rows, abort the import and mark it FAILED. A 20% error rate indicates a systemic problem — wrong file, wrong mapping, or corrupted data — and proceeding would import mostly bad data. Report the error rate and the first 50 errors so the user can diagnose the root cause.

Rollback Strategy

Two approaches with different tradeoffs:

  • Transaction rollback: wrap the entire import in a DB transaction; on critical failure, roll back all rows. Guarantees consistency but holds a long transaction, causing lock contention and replication lag for large imports.
  • Row-by-row commit with error accumulation: commit each chunk independently; on failure, report errors and leave successful rows in place. Better for large imports; user may need to re-import only failed rows.

For imports under 100K rows, use transaction rollback. For larger imports, use row-by-row commit with the error threshold abort as a safeguard.

Progress Tracking and Notification

Update job.processed_rows every 1,000 rows. Expose progress via the polling status endpoint. On completion, send an email summary: rows imported successfully, rows failed, link to the error report if applicable.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What validation steps should a data import service perform before persisting any records?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Validation occurs in two phases. Structural validation checks file format, encoding (UTF-8), delimiter consistency, header presence, and row count limits without parsing data values. Semantic validation checks each row: required fields present, data types match schema, string lengths within bounds, enum values in allowed set, foreign key references exist, and uniqueness constraints not violated within the batch. Reject the entire file on structural failure; for semantic errors, either reject the whole import or collect per-row errors and return a detailed error report, depending on the product contract.”
}
},
{
“@type”: “Question”,
“name”: “How do you stream-parse a multi-gigabyte CSV import file without exhausting server memory?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Read the file from object storage using a streaming HTTP GET (range requests or chunked download) and pipe bytes through a streaming CSV parser (e.g., Papa Parse in streaming mode, Python's csv.reader over a buffered reader, or Apache Commons CSV). Process rows one at a time or in small batches (e.g., 500 rows), accumulating them into a write buffer. Flush the buffer to the database via a bulk INSERT when full. This keeps memory usage to O(batch_size) regardless of file size. Track the last successfully committed row offset so a retry can resume after a crash.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement idempotent upserts in a data import pipeline to handle duplicate submissions safely?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Assign each import job a client-provided or server-generated idempotency key stored in an imports table. On re-submission with the same key, return the original job result without reprocessing. For row-level idempotency, use a natural business key (e.g., external_id) and execute an INSERT … ON CONFLICT (external_id) DO UPDATE SET … with a condition that only updates when the new row differs (comparing a hash of column values). This ensures re-importing the same file produces identical state and retrying a partial failure is safe.”
}
},
{
“@type”: “Question”,
“name”: “How would you design the import job state machine to support pause, resume, and partial rollback?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Model the job with states: PENDING, VALIDATING, PROCESSING, PAUSED, COMPLETED, FAILED. Store the last committed batch offset in the jobs table. On PAUSED or FAILED, workers stop after finishing the current batch and persist the offset. On resume, a new worker picks up from the stored offset. For rollback, import into a staging table first; only copy to the live table after the entire file is validated. This makes rollback a simple DELETE from staging rather than issuing compensating deletes against production data. Use a database transaction per batch with explicit savepoints when supported.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Shopify Interview Guide

Scroll to Top