Overview
A file upload system must handle large binary transfers reliably — resuming interrupted uploads, scanning for malware, validating content type, and landing files in durable object storage — without routing all bytes through the application server. The standard architecture uses presigned S3 URLs for direct client-to-storage uploads, a chunk tracking table for resumable transfers, and a post-upload processing pipeline that runs virus scanning and type validation before moving files from a staging prefix to their permanent location. This LLD covers the data model, presigned URL flow, resumable chunk protocol, post-upload pipeline, and the key design decisions.
Core Data Model
-- Master upload record — created before the client sends any bytes
CREATE TABLE Upload (
upload_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id BIGINT NOT NULL,
filename VARCHAR(512) NOT NULL,
size_bytes BIGINT NOT NULL,
mime_type VARCHAR(128) NOT NULL,
storage_key VARCHAR(1024), -- final storage path (set after processing)
staging_key VARCHAR(1024), -- temp S3 key during upload
checksum_sha256 CHAR(64), -- client-provided, verified after upload
status VARCHAR(32) NOT NULL DEFAULT 'pending',
-- pending -> uploading -> uploaded -> scanning -> clean | infected -> processing -> complete | failed
upload_type VARCHAR(64) NOT NULL DEFAULT 'single', -- 'single', 'multipart'
multipart_upload_id VARCHAR(512), -- AWS S3 multipart upload ID (for resumable)
total_chunks INT, -- for resumable uploads
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_upload_user ON Upload(user_id, created_at DESC);
CREATE INDEX idx_upload_status ON Upload(status) WHERE status NOT IN ('complete', 'failed');
-- One row per chunk for resumable multipart uploads
CREATE TABLE UploadChunk (
chunk_id BIGSERIAL PRIMARY KEY,
upload_id UUID NOT NULL REFERENCES Upload(upload_id) ON DELETE CASCADE,
chunk_number INT NOT NULL, -- 1-based, matches S3 part number
byte_offset BIGINT NOT NULL,
size_bytes INT NOT NULL,
etag VARCHAR(128), -- S3 ETag returned after part upload
checksum_md5 CHAR(32), -- client-provided per-chunk checksum
status VARCHAR(16) NOT NULL DEFAULT 'pending', -- pending, uploaded, verified
uploaded_at TIMESTAMPTZ,
PRIMARY KEY (upload_id, chunk_number) -- override to ensure uniqueness
);
-- Post-upload processing log
CREATE TABLE UploadProcessingEvent (
event_id BIGSERIAL PRIMARY KEY,
upload_id UUID NOT NULL REFERENCES Upload(upload_id),
stage VARCHAR(64) NOT NULL, -- 'virus_scan', 'mime_validation', 'thumbnail', 'move'
status VARCHAR(16) NOT NULL, -- 'started', 'passed', 'failed'
details JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Presigned URL Flow (Single File Upload)
For files under a configured size threshold (e.g. 100 MB), a single presigned PUT URL is sufficient. The client uploads directly to S3 — the application server never handles the bytes.
import uuid
import boto3
from datetime import datetime, timezone
s3 = boto3.client('s3', region_name='us-east-1')
STAGING_BUCKET = 'uploads-staging'
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB enforced via S3 condition
PRESIGN_TTL_SEC = 900 # 15 minutes to complete the PUT
ALLOWED_MIME_TYPES = {
'image/jpeg', 'image/png', 'image/webp', 'image/gif',
'application/pdf', 'video/mp4', 'text/csv',
'application/zip',
}
def create_upload(db_conn, user_id, filename, size_bytes, mime_type, checksum_sha256=None):
"""
Validate the request, create an Upload record, and return a presigned PUT URL.
Client uses the URL to upload directly to S3; app server never touches the bytes.
"""
if mime_type not in ALLOWED_MIME_TYPES:
raise ValueError(f"MIME type not allowed: {mime_type}")
if size_bytes > MAX_FILE_SIZE:
raise ValueError(f"File too large: {size_bytes} bytes (max {MAX_FILE_SIZE})")
upload_id = str(uuid.uuid4())
staging_key = f"staging/{user_id}/{upload_id}/{filename}"
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO Upload
(upload_id, user_id, filename, size_bytes, mime_type,
staging_key, checksum_sha256, status, upload_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', 'single')
""", (upload_id, user_id, filename, size_bytes, mime_type,
staging_key, checksum_sha256))
db_conn.commit()
# Generate presigned PUT URL with content-length restriction
presigned_url = s3.generate_presigned_url(
'put_object',
Params={
'Bucket': STAGING_BUCKET,
'Key': staging_key,
'ContentType': mime_type,
'ContentLength': size_bytes,
# Optional: enforce checksum at S3 level
**(({'ChecksumSHA256': checksum_sha256}) if checksum_sha256 else {}),
},
ExpiresIn=PRESIGN_TTL_SEC,
)
return {
'upload_id': upload_id,
'presigned_url': presigned_url,
'method': 'PUT',
'expires_in': PRESIGN_TTL_SEC,
}
def handle_upload_complete_webhook(upload_id, db_conn):
"""
Called by S3 Event Notification (via SQS/SNS) when the PUT completes.
Transitions status and enqueues the post-upload pipeline.
"""
with db_conn.cursor() as cur:
cur.execute("""
UPDATE Upload SET status='uploaded', updated_at=NOW()
WHERE upload_id=%s AND status='pending'
""", (upload_id,))
db_conn.commit()
enqueue_job('process_upload', {'upload_id': upload_id})
Resumable Multipart Upload
For files over 100 MB, or where the client needs resume capability (mobile, slow connections), the system uses S3 Multipart Upload with per-chunk presigned URLs.
CHUNK_SIZE = 5 * 1024 * 1024 # 5 MB — S3 minimum part size
def create_multipart_upload(db_conn, user_id, filename, size_bytes, mime_type, checksum_sha256=None):
"""Initialize an S3 multipart upload and record chunk metadata."""
import math
upload_id = str(uuid.uuid4())
staging_key = f"staging/{user_id}/{upload_id}/{filename}"
total_chunks = math.ceil(size_bytes / CHUNK_SIZE)
# Initiate S3 multipart upload
mpu_response = s3.create_multipart_upload(
Bucket=STAGING_BUCKET,
Key=staging_key,
ContentType=mime_type,
)
s3_multipart_id = mpu_response['UploadId']
with db_conn.cursor() as cur:
cur.execute("""
INSERT INTO Upload
(upload_id, user_id, filename, size_bytes, mime_type,
staging_key, checksum_sha256, status, upload_type,
multipart_upload_id, total_chunks)
VALUES (%s, %s, %s, %s, %s, %s, %s, 'uploading', 'multipart', %s, %s)
""", (upload_id, user_id, filename, size_bytes, mime_type,
staging_key, checksum_sha256, s3_multipart_id, total_chunks))
# Pre-create chunk rows so client can query upload progress
for chunk_num in range(1, total_chunks + 1):
byte_offset = (chunk_num - 1) * CHUNK_SIZE
chunk_size = min(CHUNK_SIZE, size_bytes - byte_offset)
cur.execute("""
INSERT INTO UploadChunk
(upload_id, chunk_number, byte_offset, size_bytes, status)
VALUES (%s, %s, %s, %s, 'pending')
""", (upload_id, chunk_num, byte_offset, chunk_size))
db_conn.commit()
return {'upload_id': upload_id, 'total_chunks': total_chunks}
def get_chunk_presigned_url(db_conn, upload_id, chunk_number):
"""Return a presigned URL for uploading a specific chunk."""
with db_conn.cursor() as cur:
cur.execute("""
SELECT u.staging_key, u.multipart_upload_id, c.status
FROM Upload u
JOIN UploadChunk c ON c.upload_id = u.upload_id
WHERE u.upload_id=%s AND c.chunk_number=%s
""", (upload_id, chunk_number))
row = cur.fetchone()
if not row:
raise ValueError("Unknown upload or chunk number")
staging_key, s3_multipart_id, chunk_status = row
if chunk_status == 'uploaded':
return {'already_uploaded': True} # client can skip re-upload
presigned_url = s3.generate_presigned_url(
'upload_part',
Params={
'Bucket': STAGING_BUCKET,
'Key': staging_key,
'UploadId': s3_multipart_id,
'PartNumber': chunk_number,
},
ExpiresIn=3600,
)
return {'presigned_url': presigned_url, 'method': 'PUT', 'part_number': chunk_number}
def confirm_chunk_uploaded(db_conn, upload_id, chunk_number, etag):
"""Called by client after successfully uploading a chunk."""
with db_conn.cursor() as cur:
cur.execute("""
UPDATE UploadChunk
SET status='uploaded', etag=%s, uploaded_at=NOW()
WHERE upload_id=%s AND chunk_number=%s
""", (etag, upload_id, chunk_number))
# Check if all chunks are now uploaded
cur.execute("""
SELECT COUNT(*) FROM UploadChunk
WHERE upload_id=%s AND status != 'uploaded'
""", (upload_id,))
remaining = cur.fetchone()[0]
db_conn.commit()
if remaining == 0:
complete_multipart_upload(db_conn, upload_id)
def complete_multipart_upload(db_conn, upload_id):
"""Tell S3 to assemble the parts and kick off the processing pipeline."""
with db_conn.cursor() as cur:
cur.execute("""
SELECT u.staging_key, u.multipart_upload_id,
ARRAY_AGG(c.chunk_number ORDER BY c.chunk_number) AS nums,
ARRAY_AGG(c.etag ORDER BY c.chunk_number) AS etags
FROM Upload u
JOIN UploadChunk c ON c.upload_id = u.upload_id
WHERE u.upload_id=%s
GROUP BY u.staging_key, u.multipart_upload_id
""", (upload_id,))
row = cur.fetchone()
staging_key, s3_multipart_id, nums, etags = row
parts = [{'PartNumber': n, 'ETag': e} for n, e in zip(nums, etags)]
s3.complete_multipart_upload(
Bucket=STAGING_BUCKET,
Key=staging_key,
UploadId=s3_multipart_id,
MultipartUpload={'Parts': parts},
)
with db_conn.cursor() as cur:
cur.execute("""
UPDATE Upload SET status='uploaded', updated_at=NOW() WHERE upload_id=%s
""", (upload_id,))
db_conn.commit()
enqueue_job('process_upload', {'upload_id': upload_id})
Post-Upload Processing Pipeline
After upload is confirmed, a processing job runs the following stages in sequence. Each stage logs a row to UploadProcessingEvent.
import magic # python-magic for MIME detection
import hashlib
import subprocess
PERMANENT_BUCKET = 'uploads-permanent'
def process_upload(upload_id, db_conn):
"""Sequential post-upload pipeline: scan -> validate -> thumbnail -> move."""
upload = fetch_upload(db_conn, upload_id)
update_status(db_conn, upload_id, 'scanning')
# Stage 1: Virus scan
scan_result = virus_scan(upload['staging_key'])
log_event(db_conn, upload_id, 'virus_scan', scan_result['status'],
{'engine': scan_result['engine'], 'signature': scan_result.get('signature')})
if scan_result['status'] == 'infected':
# Delete from staging immediately; do not move to permanent storage
s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
update_status(db_conn, upload_id, 'infected',
error=f"Virus detected: {scan_result['signature']}")
notify_user_infection(upload['user_id'], upload['filename'])
return
# Stage 2: MIME type validation (server-side magic bytes check)
update_status(db_conn, upload_id, 'processing')
s3_obj = s3.get_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
header_bytes = s3_obj['Body'].read(512) # read first 512 bytes for magic
detected_mime = magic.from_buffer(header_bytes, mime=True)
log_event(db_conn, upload_id, 'mime_validation',
'passed' if detected_mime == upload['mime_type'] else 'failed',
{'declared': upload['mime_type'], 'detected': detected_mime})
if detected_mime != upload['mime_type']:
s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
update_status(db_conn, upload_id, 'failed',
error=f"MIME mismatch: declared {upload['mime_type']}, got {detected_mime}")
return
# Stage 3: Optional thumbnail generation for images
if upload['mime_type'].startswith('image/'):
generate_thumbnail(upload_id, upload['staging_key'], upload['user_id'], db_conn)
# Stage 4: Move from staging to permanent storage
permanent_key = f"uploads/{upload['user_id']}/{upload_id}/{upload['filename']}"
s3.copy_object(
CopySource={'Bucket': STAGING_BUCKET, 'Key': upload['staging_key']},
Bucket=PERMANENT_BUCKET,
Key=permanent_key,
MetadataDirective='COPY',
)
s3.delete_object(Bucket=STAGING_BUCKET, Key=upload['staging_key'])
log_event(db_conn, upload_id, 'move', 'passed', {'permanent_key': permanent_key})
with db_conn.cursor() as cur:
cur.execute("""
UPDATE Upload
SET status='complete', storage_key=%s, completed_at=NOW(), updated_at=NOW()
WHERE upload_id=%s
""", (permanent_key, upload_id))
db_conn.commit()
def virus_scan(s3_key):
"""
Download file to /tmp and run ClamAV scan.
In production, delegate to a VirusTotal API or a dedicated scanning sidecar.
"""
local_path = f"/tmp/scan_{s3_key.replace('/', '_')}"
s3.download_file(STAGING_BUCKET, s3_key, local_path)
result = subprocess.run(
['clamscan', '--no-summary', local_path],
capture_output=True, text=True
)
import os; os.unlink(local_path)
if result.returncode == 0:
return {'status': 'clean', 'engine': 'clamav'}
elif result.returncode == 1:
# Virus found — extract signature name from output
signature = result.stdout.split(':')[-1].strip() if ':' in result.stdout else 'unknown'
return {'status': 'infected', 'engine': 'clamav', 'signature': signature}
else:
raise RuntimeError(f"ClamAV error: {result.stderr}")
Key Design Decisions
- Presigned URLs offload bandwidth from app servers: By generating a presigned S3 PUT URL and returning it to the client, the application server handles only the metadata request (a few hundred bytes) rather than proxying gigabytes of file data. This means a 4-vCPU app server can handle hundreds of concurrent large file uploads without bandwidth saturation. S3 handles durability, multipart assembly, and transfer acceleration automatically.
- Client-side checksum prevents silent corruption: The client computes a SHA-256 checksum before upload and sends it with the metadata request. S3 can be configured to reject the PUT if the content hash does not match (via
ChecksumSHA256condition). The server also stores the declared checksum inUpload.checksum_sha256for later audit. This catches corruption during transfer without a separate verification download. - Virus scan runs before serving, not in-band: Scanning happens as an async pipeline stage after the upload completes but before the file is moved to permanent storage or made accessible to other users. Infected files are deleted immediately and never reach permanent storage. Running the scan asynchronously means the upload API response is fast and the client doesn’t wait for ClamAV.
- Max file size enforced at presign time: The
ContentLengthcondition on the presigned URL means S3 will reject PUTs that exceed the declared size. This prevents a client from uploading a 10 GB file after requesting a presigned URL for 10 MB. Combined with the server-side size check during metadata creation, enforcement is belt-and-suspenders.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does resumable multipart upload work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each part gets a presigned PUT URL; the client tracks uploaded parts and on resume fetches the list of completed parts, uploading only missing ones.”
}
},
{
“@type”: “Question”,
“name”: “How is virus scanning integrated into the upload pipeline?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “After S3 upload, a worker downloads the file, runs ClamAV scan, and either moves the file to permanent storage or marks it quarantined.”
}
},
{
“@type”: “Question”,
“name”: “How are MIME type spoofing attacks prevented?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “python-magic reads the file's magic bytes (not the Content-Type header) to verify the actual file format matches the declared type.”
}
},
{
“@type”: “Question”,
“name”: “What triggers thumbnail generation?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An S3 event notification publishes to SQS; a worker consumes the event, generates thumbnails using Pillow, and stores them in a /thumbnails/ prefix.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering