Media Processing Pipeline: Low-Level Design
A media processing pipeline handles the journey from raw video upload to CDN-delivered adaptive bitrate streams. The pipeline must be asynchronous, fault-tolerant, and horizontally scalable. This article designs the full pipeline: upload trigger through SNS/SQS to worker, FFmpeg transcoding to multiple renditions, HLS packaging, thumbnail sprite generation, MediaInfo metadata extraction, and CloudFront CDN invalidation, with SQL schema and Python worker implementation.
Pipeline Overview
- Client uploads raw video to S3 (pre-signed URL).
- S3 event notification → SNS topic → SQS queue.
- Transcoding worker polls SQS, downloads raw file, runs FFmpeg, uploads renditions to S3.
- Worker generates thumbnail sprite sheet, extracts MediaInfo metadata.
- Worker writes rendition records to the database and invalidates CloudFront cache.
SQL Schema
CREATE TABLE MediaAsset (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
owner_id BIGINT UNSIGNED NOT NULL,
title VARCHAR(512) NULL,
original_key VARCHAR(1024) NOT NULL, -- S3 key of raw upload
status ENUM('uploaded','processing','ready','failed') NOT NULL DEFAULT 'uploaded',
duration_secs DECIMAL(10,3) NULL,
width SMALLINT UNSIGNED NULL,
height SMALLINT UNSIGNED NULL,
framerate DECIMAL(6,3) NULL,
file_size_bytes BIGINT UNSIGNED NULL,
mime_type VARCHAR(128) NULL,
meta JSON NULL, -- full MediaInfo JSON
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_owner_status (owner_id, status),
INDEX idx_status_time (status, created_at DESC)
) ENGINE=InnoDB;
CREATE TABLE TranscodeJob (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
asset_id BIGINT UNSIGNED NOT NULL,
status ENUM('queued','running','done','failed') NOT NULL DEFAULT 'queued',
worker_id VARCHAR(64) NULL,
attempt TINYINT UNSIGNED NOT NULL DEFAULT 0,
max_attempts TINYINT UNSIGNED NOT NULL DEFAULT 3,
sqs_receipt VARCHAR(1024) NULL,
error_msg TEXT NULL,
queued_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
started_at DATETIME(3) NULL,
finished_at DATETIME(3) NULL,
PRIMARY KEY (id),
INDEX idx_asset (asset_id),
INDEX idx_status (status, queued_at ASC)
) ENGINE=InnoDB;
CREATE TABLE MediaRendition (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
asset_id BIGINT UNSIGNED NOT NULL,
profile VARCHAR(32) NOT NULL, -- '360p','720p','1080p','audio_only'
s3_key VARCHAR(1024) NOT NULL, -- HLS m3u8 manifest or mp4 key
codec_video VARCHAR(32) NULL, -- 'h264','h265','vp9'
codec_audio VARCHAR(32) NULL,
bitrate_kbps INT UNSIGNED NULL,
width SMALLINT UNSIGNED NULL,
height SMALLINT UNSIGNED NULL,
file_size_bytes BIGINT UNSIGNED NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY uq_asset_profile (asset_id, profile),
INDEX idx_asset (asset_id)
) ENGINE=InnoDB;
Python Worker Implementation
import boto3
import json
import os
import subprocess
import tempfile
import uuid
from pathlib import Path
import db
s3 = boto3.client("s3")
sqs = boto3.client("sqs")
sns = boto3.client("sns")
cloudfront = boto3.client("cloudfront")
S3_BUCKET = os.environ["MEDIA_BUCKET"]
CDN_DISTRIBUTION_ID = os.environ["CF_DISTRIBUTION_ID"]
SQS_QUEUE_URL = os.environ["TRANSCODE_QUEUE_URL"]
RENDITION_PROFILES = [
{"name": "360p", "width": 640, "height": 360, "vbitrate": "800k", "abitrate": "96k"},
{"name": "720p", "width": 1280, "height": 720, "vbitrate": "2500k", "abitrate": "128k"},
{"name": "1080p", "width": 1920, "height": 1080, "vbitrate": "5000k", "abitrate": "192k"},
]
def submit_transcode_job(asset_id: int) -> int:
"""Create a TranscodeJob and publish to SQS."""
job_id = db.execute(
"INSERT INTO TranscodeJob (asset_id) VALUES (%s)", (asset_id,)
)
message = json.dumps({"job_id": job_id, "asset_id": asset_id})
sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message)
return job_id
def process_transcode(sqs_message: dict) -> None:
"""
Main worker function — called for each SQS message.
Downloads raw video, transcodes all profiles, packages HLS,
generates thumbnails, and invalidates CDN.
"""
body = json.loads(sqs_message["Body"])
job_id = body["job_id"]
asset_id = body["asset_id"]
receipt = sqs_message["ReceiptHandle"]
# claim job
updated = db.execute(
"""UPDATE TranscodeJob SET status='running', worker_id=%s, started_at=NOW(3),
attempt=attempt+1, sqs_receipt=%s
WHERE id=%s AND status='queued' AND attempt None:
"""Run FFmpeg to produce HLS segments for a single profile."""
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-c:v", "libx264",
"-preset", "fast",
"-b:v", profile["vbitrate"],
"-vf", f"scale={profile['width']}:{profile['height']}:force_original_aspect_ratio=decrease,pad={profile['width']}:{profile['height']}:(ow-iw)/2:(oh-ih)/2",
"-c:a", "aac",
"-b:a", profile["abitrate"],
"-hls_time", "6",
"-hls_playlist_type", "vod",
"-hls_segment_filename", str(output_dir / "seg%05d.ts"),
str(output_dir / "index.m3u8"),
]
result = subprocess.run(cmd, capture_output=True, timeout=3600)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr.decode()[-2000:]}")
def _extract_mediainfo(path: Path) -> dict:
result = subprocess.run(
["mediainfo", "--Output=JSON", str(path)],
capture_output=True, timeout=60
)
data = json.loads(result.stdout)
tracks = data.get("media", {}).get("track", [])
meta = {}
for t in tracks:
if t.get("@type") == "Video":
meta["width"] = int(t.get("Width", 0))
meta["height"] = int(t.get("Height", 0))
meta["framerate"] = float(t.get("FrameRate", 0))
meta["duration"] = float(t.get("Duration", 0))
meta["codec_video"] = t.get("Format", "")
elif t.get("@type") == "Audio":
meta["codec_audio"] = t.get("Format", "")
return meta
def _generate_sprite(input_path: Path, output_path: Path, interval_secs: int) -> None:
"""Generate a thumbnail contact sheet using FFmpeg tile filter."""
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-vf", f"fps=1/{interval_secs},scale=160:90,tile=10x10",
"-frames:v", "1",
str(output_path),
]
subprocess.run(cmd, capture_output=True, timeout=300, check=True)
def _build_master_playlist(rendition_keys: list, profiles: list) -> str:
lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
for (profile_name, manifest_key), profile in zip(rendition_keys, profiles):
bw = int(profile["vbitrate"].rstrip("k")) * 1000
lines.append(f'#EXT-X-STREAM-INF:BANDWIDTH={bw},RESOLUTION={profile["width"]}x{profile["height"]}')
lines.append(manifest_key)
return "n".join(lines)
def _invalidate_cdn(pattern: str) -> None:
cloudfront.create_invalidation(
DistributionId=CDN_DISTRIBUTION_ID,
InvalidationBatch={
"Paths": {"Quantity": 1, "Items": [pattern]},
"CallerReference": str(uuid.uuid4()),
}
)
def _upload_directory(local_dir: Path, bucket: str, prefix: str) -> None:
for f in local_dir.iterdir():
if f.is_file():
content_type = "application/x-mpegURL" if f.suffix == ".m3u8" else "video/MP2T"
s3.upload_file(str(f), bucket, f"{prefix}{f.name}",
ExtraArgs={"ContentType": content_type})
def _fail_job(job_id: int, error: str) -> None:
db.execute(
"UPDATE TranscodeJob SET status='failed', error_msg=%s, finished_at=NOW(3) WHERE id=%s",
(error[:9999], job_id)
)
Adaptive Bitrate (HLS) Packaging
Each rendition produces a set of 6-second .ts segments and an index.m3u8 playlist. A master playlist (master.m3u8) references all renditions with their bandwidth and resolution. The video player selects the appropriate rendition based on available bandwidth and switches between renditions at segment boundaries.
Thumbnail Sprite Sheets
FFmpeg’s tile filter composites one frame every N seconds into a grid image. A JSON manifest records the timestamp, row, and column for each frame so the player can render a preview thumbnail on timeline scrub without downloading the video segments.
CDN Invalidation on Reprocess
When a video is reprocessed (e.g., after a transcoding bug fix), the new segments are uploaded to the same S3 keys. A CloudFront invalidation for /assets/{asset_id}/* purges cached copies from all edge locations. To avoid invalidation cost on initial processing, the worker skips invalidation if the rendition rows did not previously exist.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use SQS instead of directly triggering the transcoding worker from the upload?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “SQS decouples the upload event from the processing worker and provides durability, retry logic, and back-pressure. If all workers are busy, messages queue up without dropping uploads. SQS visibility timeout acts as a distributed lock: a worker claims a message by making it invisible; if the worker crashes, the message reappears after the timeout for another worker. Dead-letter queues capture jobs that exceed max_attempts without manual intervention.”
}
},
{
“@type”: “Question”,
“name”: “What is HLS adaptive bitrate streaming and how is it packaged?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “HLS (HTTP Live Streaming) splits video into short segments (e.g., 6 seconds) encoded at multiple bitrates. A master playlist (m3u8) lists each rendition with its bandwidth and resolution. The player downloads the master playlist, measures network bandwidth, selects an appropriate rendition, and fetches segments. It can switch renditions at any segment boundary. FFmpeg produces segments and per-rendition playlists; the pipeline assembles the master playlist from the rendition keys.”
}
},
{
“@type”: “Question”,
“name”: “How do thumbnail sprite sheets work for video timeline previews?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “FFmpeg extracts one frame per N seconds and tiles them into a single JPEG contact sheet using the tile filter. A JSON manifest records each frame’s timestamp, row index, and column index in the grid. When the user scrubs the video timeline, the player CSS-clips the sprite image to the frame corresponding to the hovered timestamp. This delivers timeline previews as a single HTTP request instead of fetching individual frame images.”
}
},
{
“@type”: “Question”,
“name”: “When and how do you invalidate a CDN cache after reprocessing a video?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “After uploading new rendition files to the same S3 keys (e.g., during a reprocess after a transcoding fix), issue a CloudFront invalidation for the path pattern /assets/{asset_id}/*. This purges all cached copies at every edge location. Invalidation costs apply per path per distribution, so skip invalidation on initial processing (nothing cached yet) and batch multiple asset invalidations in a single API call when reprocessing in bulk.”
}
}
]
}
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How is the transcoding job queue implemented?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”An S3 upload event triggers an SNS notification to SQS; workers poll SQS with long-polling, using visibility timeout as a distributed lock to prevent double-processing.”}},{“@type”:”Question”,”name”:”How does FFmpeg produce adaptive bitrate renditions?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”FFmpeg runs with multiple -vf scale outputs and -c:v libx264 with CRF values tuned per resolution; HLS segmenter (-hls_time 6) produces .m3u8 playlists and .ts segment files.”}},{“@type”:”Question”,”name”:”How is thumbnail sprite sheet generated?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”FFmpeg’s tile filter (fps=1/10,scale=160:-1,tile=10×10) generates a single sprite image; the sprite layout metadata (frame interval, dimensions) is stored in MediaRendition for player use.”}},{“@type”:”Question”,”name”:”How is CDN cache invalidated after reprocessing?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”CloudFront CreateInvalidation is called with the path pattern for the media asset’s directory; a worker polls the invalidation status before marking the MediaAsset as republished.”}}]}
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering