Bulk Operations System Low-Level Design

What is a Bulk Operations System?

A bulk operations system allows users to create, update, or delete thousands of records in a single request — bulk-importing products to a catalog, mass-updating order statuses, batch-deleting spam accounts, or sending a campaign to 500K users. The core design challenge: HTTP requests time out in 30-60 seconds, but processing 100K records takes minutes. The solution is async processing with job tracking, partial failure handling, and idempotent retry.

Requirements

  • Accept bulk create/update/delete via CSV upload or JSON array
  • Process up to 1 million records per job
  • Async: return job ID immediately; report progress and results when done
  • Partial success: process all records; report which succeeded and which failed
  • Idempotent: re-running a job with the same data should not create duplicates
  • Result report: download CSV with per-row status (success/error/skipped)

Data Model

BulkJob(
    job_id          UUID PRIMARY KEY,
    user_id         UUID NOT NULL,
    operation       VARCHAR,           -- 'product.create', 'order.update_status'
    status          ENUM(QUEUED, PROCESSING, COMPLETED, FAILED),
    input_file_key  VARCHAR,           -- S3 key of uploaded CSV/JSON
    result_file_key VARCHAR,           -- S3 key of result CSV (set on completion)
    total_records   INT,
    processed       INT DEFAULT 0,
    succeeded       INT DEFAULT 0,
    failed          INT DEFAULT 0,
    skipped         INT DEFAULT 0,
    created_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ
)

Processing Pipeline

def process_bulk_job(job_id):
    job = db.get(BulkJob, job_id)
    db.update(job_id, status='PROCESSING')

    # Stream input file from S3
    input_stream = s3.get_object(BUCKET, job.input_file_key)
    reader = csv.DictReader(input_stream)

    result_rows = []
    succeeded = failed = skipped = 0

    for i, row in enumerate(reader, 1):
        try:
            result = process_row(job.operation, row)
            result_rows.append({**row, 'status': 'success',
                                 'id': result.id, 'error': ''})
            succeeded += 1
        except SkipException as e:
            result_rows.append({**row, 'status': 'skipped', 'error': str(e)})
            skipped += 1
        except ValidationError as e:
            result_rows.append({**row, 'status': 'error', 'error': str(e)})
            failed += 1

        # Update progress every 1000 rows
        if i % 1000 == 0:
            db.update(job_id, processed=i, succeeded=succeeded,
                      failed=failed, skipped=skipped)

    # Write result CSV to S3
    result_key = f'bulk-results/{job_id}/results.csv'
    write_csv_to_s3(result_key, result_rows)

    db.update(job_id, status='COMPLETED', result_file_key=result_key,
              processed=i, succeeded=succeeded, failed=failed, skipped=skipped,
              completed_at=now())

    notify_user(job.user_id, job_id=job_id, stats={
        'total': i, 'succeeded': succeeded, 'failed': failed})

Idempotency via External ID

# For bulk create: include an external_id (client-generated) in each row
# If the record already exists with this external_id, skip instead of duplicate

def process_create_row(row):
    external_id = row.get('external_id')
    if external_id:
        existing = db.get_by_external_id(external_id)
        if existing:
            raise SkipException(f'Already exists: {existing.id}')

    return db.create(Product(
        name=row['name'],
        price=row['price'],
        external_id=external_id,
        ...
    ))

Transaction Strategy: Per-Row vs Batch

Two approaches:

Per-row transactions: each row is its own transaction. A failure in row 500 doesn’t roll back rows 1-499. Results: partial success. Appropriate for imports where each record is independent.

Batch transactions: process N rows per transaction (e.g., 1000). If any row fails the batch, retry with per-row fallback. Faster for all-succeed cases; falls back gracefully.

# Batch transaction with per-row fallback
def process_batch(rows):
    try:
        with db.transaction():
            for row in rows:
                process_row(row)
        return [{'status': 'success'} for _ in rows]
    except Exception:
        # Fallback: process one by one
        return [safe_process_row(row) for row in rows]

Key Design Decisions

  • S3 for input/output files — decouple upload from processing; result CSV available for re-download
  • Async job + queue — 1M records takes minutes; HTTP timeout is 30-60s
  • Per-row error tracking — partial success is more useful than all-or-nothing failure
  • external_id for idempotency — safe to retry the entire job; already-processed rows are skipped
  • Progress updates every 1000 rows — useful UX signal without excessive DB writes

Bulk operations and product import system design is discussed in Shopify system design interview questions.

Bulk data processing and import system design is covered in Amazon system design interview preparation.

Bulk operations and large-scale data processing design is in Databricks system design interview guide.

Scroll to Top