File Upload System Low-Level Design: Multipart Upload, Resumable Transfers, Virus Scanning, and Storage

Overview

A file upload system must handle large binary transfers reliably — resuming interrupted uploads, scanning for malware, validating content type, and landing files in durable object storage — without routing all bytes through the application server. The standard architecture uses presigned S3 URLs for direct client-to-storage uploads, a chunk tracking table for resumable transfers, and a post-upload processing pipeline that runs virus scanning and type validation before moving files from a staging prefix to their permanent location. This LLD covers the data model, presigned URL flow, resumable chunk protocol, post-upload pipeline, and the key design decisions.

Core Data Model


-- Master upload record — created before the client sends any bytes
CREATE TABLE Upload (
    upload_id       UUID          PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id         BIGINT        NOT NULL,
    filename        VARCHAR(512)  NOT NULL,
    size_bytes      BIGINT        NOT NULL,
    mime_type       VARCHAR(128)  NOT NULL,
    storage_key     VARCHAR(1024),           -- final storage path (set after processing)
    staging_key     VARCHAR(1024),           -- temp S3 key during upload
    checksum_sha256 CHAR(64),                -- client-provided, verified after upload
    status          VARCHAR(32)   NOT NULL DEFAULT 'pending',
    -- pending -> uploading -> uploaded -> scanning -> clean | infected -> processing -> complete | failed
    upload_type     VARCHAR(64)   NOT NULL DEFAULT 'single',  -- 'single', 'multipart'
    multipart_upload_id VARCHAR(512),        -- AWS S3 multipart upload ID (for resumable)
    total_chunks    INT,                     -- for resumable uploads
    error_message   TEXT,
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ
);

CREATE INDEX idx_upload_user      ON Upload(user_id, created_at DESC);
CREATE INDEX idx_upload_status    ON Upload(status) WHERE status NOT IN ('complete', 'failed');

-- One row per chunk for resumable multipart uploads
CREATE TABLE UploadChunk (
    chunk_id        BIGSERIAL     PRIMARY KEY,
    upload_id       UUID          NOT NULL REFERENCES Upload(upload_id) ON DELETE CASCADE,
    chunk_number    INT           NOT NULL,   -- 1-based, matches S3 part number
    byte_offset     BIGINT        NOT NULL,
    size_bytes      INT           NOT NULL,
    etag            VARCHAR(128),             -- S3 ETag returned after part upload
    checksum_md5    CHAR(32),                 -- client-provided per-chunk checksum
    status          VARCHAR(16)   NOT NULL DEFAULT 'pending',  -- pending, uploaded, verified
    uploaded_at     TIMESTAMPTZ,
    PRIMARY KEY (upload_id, chunk_number)     -- override to ensure uniqueness
);

-- Post-upload processing log
CREATE TABLE UploadProcessingEvent (
    event_id        BIGSERIAL     PRIMARY KEY,
    upload_id       UUID          NOT NULL REFERENCES Upload(upload_id),
    stage           VARCHAR(64)   NOT NULL,   -- 'virus_scan', 'mime_validation', 'thumbnail', 'move'
    status          VARCHAR(16)   NOT NULL,   -- 'started', 'passed', 'failed'
    details         JSONB         NOT NULL DEFAULT '{}',
    created_at      TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);

Presigned URL Flow (Single File Upload)

For files under a configured size threshold (e.g. 100 MB), a single presigned PUT URL is sufficient. The client uploads directly to S3 — the application server never handles the bytes.


import uuid
import boto3
from datetime import datetime, timezone

s3 = boto3.client('s3', region_name='us-east-1')
STAGING_BUCKET = 'uploads-staging'
MAX_FILE_SIZE   = 100 * 1024 * 1024   # 100 MB enforced via S3 condition
PRESIGN_TTL_SEC = 900                  # 15 minutes to complete the PUT

ALLOWED_MIME_TYPES = {
    'image/jpeg', 'image/png', 'image/webp', 'image/gif',
    'application/pdf', 'video/mp4', 'text/csv',
    'application/zip',
}

def create_upload(db_conn, user_id, filename, size_bytes, mime_type, checksum_sha256=None):
    """
    Validate the request, create an Upload record, and return a presigned PUT URL.
    Client uses the URL to upload directly to S3; app server never touches the bytes.
    """
    if mime_type not in ALLOWED_MIME_TYPES:
        raise ValueError(f"MIME type not allowed: {mime_type}")

    if size_bytes > MAX_FILE_SIZE:
        raise ValueError(f"File too large: {size_bytes} bytes (max {MAX_FILE_SIZE})")

    upload_id   = str(uuid.uuid4())
    staging_key = f"staging/{user_id}/{upload_id}/{filename}"

    with db_conn.cursor() as cur:
        cur.execute("""
            INSERT INTO Upload
                (upload_id, user_id, filename, size_bytes, mime_type,
                 staging_key, checksum_sha256, status, upload_type)
            VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', 'single')
        """, (upload_id, user_id, filename, size_bytes, mime_type,
               staging_key, checksum_sha256))
    db_conn.commit()

    # Generate presigned PUT URL with content-length restriction
    presigned_url = s3.generate_presigned_url(
        'put_object',
        Params={
            'Bucket':        STAGING_BUCKET,
            'Key':           staging_key,
            'ContentType':   mime_type,
            'ContentLength': size_bytes,
            # Optional: enforce checksum at S3 level
            **(({'ChecksumSHA256': checksum_sha256}) if checksum_sha256 else {}),
        },
        ExpiresIn=PRESIGN_TTL_SEC,
    )

    return {
        'upload_id':    upload_id,
        'presigned_url': presigned_url,
        'method':       'PUT',
        'expires_in':   PRESIGN_TTL_SEC,
    }


def handle_upload_complete_webhook(upload_id, db_conn):
    """
    Called by S3 Event Notification (via SQS/SNS) when the PUT completes.
    Transitions status and enqueues the post-upload pipeline.
    """
    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE Upload SET status='uploaded', updated_at=NOW()
            WHERE upload_id=%s AND status='pending'
        """, (upload_id,))
    db_conn.commit()
    enqueue_job('process_upload', {'upload_id': upload_id})

Resumable Multipart Upload

For files over 100 MB, or where the client needs resume capability (mobile, slow connections), the system uses S3 Multipart Upload with per-chunk presigned URLs.


CHUNK_SIZE = 5 * 1024 * 1024  # 5 MB — S3 minimum part size

def create_multipart_upload(db_conn, user_id, filename, size_bytes, mime_type, checksum_sha256=None):
    """Initialize an S3 multipart upload and record chunk metadata."""
    import math

    upload_id   = str(uuid.uuid4())
    staging_key = f"staging/{user_id}/{upload_id}/{filename}"
    total_chunks = math.ceil(size_bytes / CHUNK_SIZE)

    # Initiate S3 multipart upload
    mpu_response = s3.create_multipart_upload(
        Bucket=STAGING_BUCKET,
        Key=staging_key,
        ContentType=mime_type,
    )
    s3_multipart_id = mpu_response['UploadId']

    with db_conn.cursor() as cur:
        cur.execute("""
            INSERT INTO Upload
                (upload_id, user_id, filename, size_bytes, mime_type,
                 staging_key, checksum_sha256, status, upload_type,
                 multipart_upload_id, total_chunks)
            VALUES (%s, %s, %s, %s, %s, %s, %s, 'uploading', 'multipart', %s, %s)
        """, (upload_id, user_id, filename, size_bytes, mime_type,
               staging_key, checksum_sha256, s3_multipart_id, total_chunks))

        # Pre-create chunk rows so client can query upload progress
        for chunk_num in range(1, total_chunks + 1):
            byte_offset = (chunk_num - 1) * CHUNK_SIZE
            chunk_size  = min(CHUNK_SIZE, size_bytes - byte_offset)
            cur.execute("""
                INSERT INTO UploadChunk
                    (upload_id, chunk_number, byte_offset, size_bytes, status)
                VALUES (%s, %s, %s, %s, 'pending')
            """, (upload_id, chunk_num, byte_offset, chunk_size))
    db_conn.commit()

    return {'upload_id': upload_id, 'total_chunks': total_chunks}


def get_chunk_presigned_url(db_conn, upload_id, chunk_number):
    """Return a presigned URL for uploading a specific chunk."""
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT u.staging_key, u.multipart_upload_id, c.status
            FROM Upload u
            JOIN UploadChunk c ON c.upload_id = u.upload_id
            WHERE u.upload_id=%s AND c.chunk_number=%s
        """, (upload_id, chunk_number))
        row = cur.fetchone()

    if not row:
        raise ValueError("Unknown upload or chunk number")

    staging_key, s3_multipart_id, chunk_status = row

    if chunk_status == 'uploaded':
        return {'already_uploaded': True}  # client can skip re-upload

    presigned_url = s3.generate_presigned_url(
        'upload_part',
        Params={
            'Bucket':     STAGING_BUCKET,
            'Key':        staging_key,
            'UploadId':   s3_multipart_id,
            'PartNumber': chunk_number,
        },
        ExpiresIn=3600,
    )
    return {'presigned_url': presigned_url, 'method': 'PUT', 'part_number': chunk_number}


def confirm_chunk_uploaded(db_conn, upload_id, chunk_number, etag):
    """Called by client after successfully uploading a chunk."""
    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE UploadChunk
            SET status='uploaded', etag=%s, uploaded_at=NOW()
            WHERE upload_id=%s AND chunk_number=%s
        """, (etag, upload_id, chunk_number))

        # Check if all chunks are now uploaded
        cur.execute("""
            SELECT COUNT(*) FROM UploadChunk
            WHERE upload_id=%s AND status != 'uploaded'
        """, (upload_id,))
        remaining = cur.fetchone()[0]
    db_conn.commit()

    if remaining == 0:
        complete_multipart_upload(db_conn, upload_id)


def complete_multipart_upload(db_conn, upload_id):
    """Tell S3 to assemble the parts and kick off the processing pipeline."""
    with db_conn.cursor() as cur:
        cur.execute("""
            SELECT u.staging_key, u.multipart_upload_id,
                   ARRAY_AGG(c.chunk_number ORDER BY c.chunk_number) AS nums,
                   ARRAY_AGG(c.etag        ORDER BY c.chunk_number) AS etags
            FROM Upload u
            JOIN UploadChunk c ON c.upload_id = u.upload_id
            WHERE u.upload_id=%s
            GROUP BY u.staging_key, u.multipart_upload_id
        """, (upload_id,))
        row = cur.fetchone()

    staging_key, s3_multipart_id, nums, etags = row
    parts = [{'PartNumber': n, 'ETag': e} for n, e in zip(nums, etags)]

    s3.complete_multipart_upload(
        Bucket=STAGING_BUCKET,
        Key=staging_key,
        UploadId=s3_multipart_id,
        MultipartUpload={'Parts': parts},
    )

    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE Upload SET status='uploaded', updated_at=NOW() WHERE upload_id=%s
        """, (upload_id,))
    db_conn.commit()
    enqueue_job('process_upload', {'upload_id': upload_id})

Post-Upload Processing Pipeline

After upload is confirmed, a processing job runs the following stages in sequence. Each stage logs a row to UploadProcessingEvent.


import magic       # python-magic for MIME detection
import hashlib
import subprocess

PERMANENT_BUCKET = 'uploads-permanent'

def process_upload(upload_id, db_conn):
    """Sequential post-upload pipeline: scan -> validate -> thumbnail -> move."""
    upload = fetch_upload(db_conn, upload_id)

    update_status(db_conn, upload_id, 'scanning')

    # Stage 1: Virus scan
    scan_result = virus_scan(upload['staging_key'])
    log_event(db_conn, upload_id, 'virus_scan', scan_result['status'],
              {'engine': scan_result['engine'], 'signature': scan_result.get('signature')})

    if scan_result['status'] == 'infected':
        # Delete from staging immediately; do not move to permanent storage
        s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
        update_status(db_conn, upload_id, 'infected',
                      error=f"Virus detected: {scan_result['signature']}")
        notify_user_infection(upload['user_id'], upload['filename'])
        return

    # Stage 2: MIME type validation (server-side magic bytes check)
    update_status(db_conn, upload_id, 'processing')
    s3_obj = s3.get_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
    header_bytes = s3_obj['Body'].read(512)   # read first 512 bytes for magic

    detected_mime = magic.from_buffer(header_bytes, mime=True)
    log_event(db_conn, upload_id, 'mime_validation',
              'passed' if detected_mime == upload['mime_type'] else 'failed',
              {'declared': upload['mime_type'], 'detected': detected_mime})

    if detected_mime != upload['mime_type']:
        s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
        update_status(db_conn, upload_id, 'failed',
                      error=f"MIME mismatch: declared {upload['mime_type']}, got {detected_mime}")
        return

    # Stage 3: Optional thumbnail generation for images
    if upload['mime_type'].startswith('image/'):
        generate_thumbnail(upload_id, upload['staging_key'], upload['user_id'], db_conn)

    # Stage 4: Move from staging to permanent storage
    permanent_key = f"uploads/{upload['user_id']}/{upload_id}/{upload['filename']}"
    s3.copy_object(
        CopySource={'Bucket': STAGING_BUCKET, 'Key': upload['staging_key']},
        Bucket=PERMANENT_BUCKET,
        Key=permanent_key,
        MetadataDirective='COPY',
    )
    s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])

    log_event(db_conn, upload_id, 'move', 'passed', {'permanent_key': permanent_key})

    with db_conn.cursor() as cur:
        cur.execute("""
            UPDATE Upload
            SET status='complete', storage_key=%s, completed_at=NOW(), updated_at=NOW()
            WHERE upload_id=%s
        """, (permanent_key, upload_id))
    db_conn.commit()


def virus_scan(s3_key):
    """
    Download file to /tmp and run ClamAV scan.
    In production, delegate to a VirusTotal API or a dedicated scanning sidecar.
    """
    local_path = f"/tmp/scan_{s3_key.replace('/', '_')}"
    s3.download_file(STAGING_BUCKET, s3_key, local_path)

    result = subprocess.run(
        ['clamscan', '--no-summary', local_path],
        capture_output=True, text=True
    )
    import os; os.unlink(local_path)

    if result.returncode == 0:
        return {'status': 'clean', 'engine': 'clamav'}
    elif result.returncode == 1:
        # Virus found — extract signature name from output
        signature = result.stdout.split(':')[-1].strip() if ':' in result.stdout else 'unknown'
        return {'status': 'infected', 'engine': 'clamav', 'signature': signature}
    else:
        raise RuntimeError(f"ClamAV error: {result.stderr}")

Key Design Decisions

  • Presigned URLs offload bandwidth from app servers: By generating a presigned S3 PUT URL and returning it to the client, the application server handles only the metadata request (a few hundred bytes) rather than proxying gigabytes of file data. This means a 4-vCPU app server can handle hundreds of concurrent large file uploads without bandwidth saturation. S3 handles durability, multipart assembly, and transfer acceleration automatically.
  • Client-side checksum prevents silent corruption: The client computes a SHA-256 checksum before upload and sends it with the metadata request. S3 can be configured to reject the PUT if the content hash does not match (via ChecksumSHA256 condition). The server also stores the declared checksum in Upload.checksum_sha256 for later audit. This catches corruption during transfer without a separate verification download.
  • Virus scan runs before serving, not in-band: Scanning happens as an async pipeline stage after the upload completes but before the file is moved to permanent storage or made accessible to other users. Infected files are deleted immediately and never reach permanent storage. Running the scan asynchronously means the upload API response is fast and the client doesn’t wait for ClamAV.
  • Max file size enforced at presign time: The ContentLength condition on the presigned URL means S3 will reject PUTs that exceed the declared size. This prevents a client from uploading a 10 GB file after requesting a presigned URL for 10 MB. Combined with the server-side size check during metadata creation, enforcement is belt-and-suspenders.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does resumable multipart upload work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each part gets a presigned PUT URL; the client tracks uploaded parts and on resume fetches the list of completed parts, uploading only missing ones.”
}
},
{
“@type”: “Question”,
“name”: “How is virus scanning integrated into the upload pipeline?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “After S3 upload, a worker downloads the file, runs ClamAV scan, and either moves the file to permanent storage or marks it quarantined.”
}
},
{
“@type”: “Question”,
“name”: “How are MIME type spoofing attacks prevented?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “python-magic reads the file's magic bytes (not the Content-Type header) to verify the actual file format matches the declared type.”
}
},
{
“@type”: “Question”,
“name”: “What triggers thumbnail generation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An S3 event notification publishes to SQS; a worker consumes the event, generates thumbnails using Pillow, and stores them in a /thumbnails/ prefix.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

Scroll to Top