Media Processing Pipeline: Low-Level Design
A media processing pipeline handles the journey from raw video upload to CDN-delivered adaptive bitrate streams. The pipeline must be asynchronous, fault-tolerant, and horizontally scalable. This article designs the full pipeline: upload trigger through SNS/SQS to worker, FFmpeg transcoding to multiple renditions, HLS packaging, thumbnail sprite generation, MediaInfo metadata extraction, and CloudFront CDN invalidation, with SQL schema and Python worker implementation.
Pipeline Overview
- Client uploads raw video to S3 (pre-signed URL).
- S3 event notification → SNS topic → SQS queue.
- Transcoding worker polls SQS, downloads raw file, runs FFmpeg, uploads renditions to S3.
- Worker generates thumbnail sprite sheet, extracts MediaInfo metadata.
- Worker writes rendition records to the database and invalidates CloudFront cache.
SQL Schema
CREATE TABLE MediaAsset (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
owner_id BIGINT UNSIGNED NOT NULL,
title VARCHAR(512) NULL,
original_key VARCHAR(1024) NOT NULL, -- S3 key of raw upload
status ENUM('uploaded','processing','ready','failed') NOT NULL DEFAULT 'uploaded',
duration_secs DECIMAL(10,3) NULL,
width SMALLINT UNSIGNED NULL,
height SMALLINT UNSIGNED NULL,
framerate DECIMAL(6,3) NULL,
file_size_bytes BIGINT UNSIGNED NULL,
mime_type VARCHAR(128) NULL,
meta JSON NULL, -- full MediaInfo JSON
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_owner_status (owner_id, status),
INDEX idx_status_time (status, created_at DESC)
) ENGINE=InnoDB;
CREATE TABLE TranscodeJob (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
asset_id BIGINT UNSIGNED NOT NULL,
status ENUM('queued','running','done','failed') NOT NULL DEFAULT 'queued',
worker_id VARCHAR(64) NULL,
attempt TINYINT UNSIGNED NOT NULL DEFAULT 0,
max_attempts TINYINT UNSIGNED NOT NULL DEFAULT 3,
sqs_receipt VARCHAR(1024) NULL,
error_msg TEXT NULL,
queued_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
started_at DATETIME(3) NULL,
finished_at DATETIME(3) NULL,
PRIMARY KEY (id),
INDEX idx_asset (asset_id),
INDEX idx_status (status, queued_at ASC)
) ENGINE=InnoDB;
CREATE TABLE MediaRendition (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
asset_id BIGINT UNSIGNED NOT NULL,
profile VARCHAR(32) NOT NULL, -- '360p','720p','1080p','audio_only'
s3_key VARCHAR(1024) NOT NULL, -- HLS m3u8 manifest or mp4 key
codec_video VARCHAR(32) NULL, -- 'h264','h265','vp9'
codec_audio VARCHAR(32) NULL,
bitrate_kbps INT UNSIGNED NULL,
width SMALLINT UNSIGNED NULL,
height SMALLINT UNSIGNED NULL,
file_size_bytes BIGINT UNSIGNED NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY uq_asset_profile (asset_id, profile),
INDEX idx_asset (asset_id)
) ENGINE=InnoDB;
Python Worker Implementation
import boto3
import json
import os
import subprocess
import tempfile
import uuid
from pathlib import Path
import db
s3 = boto3.client("s3")
sqs = boto3.client("sqs")
sns = boto3.client("sns")
cloudfront = boto3.client("cloudfront")
S3_BUCKET = os.environ["MEDIA_BUCKET"]
CDN_DISTRIBUTION_ID = os.environ["CF_DISTRIBUTION_ID"]
SQS_QUEUE_URL = os.environ["TRANSCODE_QUEUE_URL"]
RENDITION_PROFILES = [
{"name": "360p", "width": 640, "height": 360, "vbitrate": "800k", "abitrate": "96k"},
{"name": "720p", "width": 1280, "height": 720, "vbitrate": "2500k", "abitrate": "128k"},
{"name": "1080p", "width": 1920, "height": 1080, "vbitrate": "5000k", "abitrate": "192k"},
]
def submit_transcode_job(asset_id: int) -> int:
"""Create a TranscodeJob and publish to SQS."""
job_id = db.execute(
"INSERT INTO TranscodeJob (asset_id) VALUES (%s)", (asset_id,)
)
message = json.dumps({"job_id": job_id, "asset_id": asset_id})
sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message)
return job_id
def process_transcode(sqs_message: dict) -> None:
"""
Main worker function — called for each SQS message.
Downloads raw video, transcodes all profiles, packages HLS,
generates thumbnails, and invalidates CDN.
"""
body = json.loads(sqs_message["Body"])
job_id = body["job_id"]
asset_id = body["asset_id"]
receipt = sqs_message["ReceiptHandle"]
# claim job
updated = db.execute(
"""UPDATE TranscodeJob SET status='running', worker_id=%s, started_at=NOW(3),
attempt=attempt+1, sqs_receipt=%s
WHERE id=%s AND status='queued' AND attempt None:
"""Run FFmpeg to produce HLS segments for a single profile."""
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-c:v", "libx264",
"-preset", "fast",
"-b:v", profile["vbitrate"],
"-vf", f"scale={profile['width']}:{profile['height']}:force_original_aspect_ratio=decrease,pad={profile['width']}:{profile['height']}:(ow-iw)/2:(oh-ih)/2",
"-c:a", "aac",
"-b:a", profile["abitrate"],
"-hls_time", "6",
"-hls_playlist_type", "vod",
"-hls_segment_filename", str(output_dir / "seg%05d.ts"),
str(output_dir / "index.m3u8"),
]
result = subprocess.run(cmd, capture_output=True, timeout=3600)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr.decode()[-2000:]}")
def _extract_mediainfo(path: Path) -> dict:
result = subprocess.run(
["mediainfo", "--Output=JSON", str(path)],
capture_output=True, timeout=60
)
data = json.loads(result.stdout)
tracks = data.get("media", {}).get("track", [])
meta = {}
for t in tracks:
if t.get("@type") == "Video":
meta["width"] = int(t.get("Width", 0))
meta["height"] = int(t.get("Height", 0))
meta["framerate"] = float(t.get("FrameRate", 0))
meta["duration"] = float(t.get("Duration", 0))
meta["codec_video"] = t.get("Format", "")
elif t.get("@type") == "Audio":
meta["codec_audio"] = t.get("Format", "")
return meta
def _generate_sprite(input_path: Path, output_path: Path, interval_secs: int) -> None:
"""Generate a thumbnail contact sheet using FFmpeg tile filter."""
cmd = [
"ffmpeg", "-y", "-i", str(input_path),
"-vf", f"fps=1/{interval_secs},scale=160:90,tile=10x10",
"-frames:v", "1",
str(output_path),
]
subprocess.run(cmd, capture_output=True, timeout=300, check=True)
def _build_master_playlist(rendition_keys: list, profiles: list) -> str:
lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
for (profile_name, manifest_key), profile in zip(rendition_keys, profiles):
bw = int(profile["vbitrate"].rstrip("k")) * 1000
lines.append(f'#EXT-X-STREAM-INF:BANDWIDTH={bw},RESOLUTION={profile["width"]}x{profile["height"]}')
lines.append(manifest_key)
return "n".join(lines)
def _invalidate_cdn(pattern: str) -> None:
cloudfront.create_invalidation(
DistributionId=CDN_DISTRIBUTION_ID,
InvalidationBatch={
"Paths": {"Quantity": 1, "Items": [pattern]},
"CallerReference": str(uuid.uuid4()),
}
)
def _upload_directory(local_dir: Path, bucket: str, prefix: str) -> None:
for f in local_dir.iterdir():
if f.is_file():
content_type = "application/x-mpegURL" if f.suffix == ".m3u8" else "video/MP2T"
s3.upload_file(str(f), bucket, f"{prefix}{f.name}",
ExtraArgs={"ContentType": content_type})
def _fail_job(job_id: int, error: str) -> None:
db.execute(
"UPDATE TranscodeJob SET status='failed', error_msg=%s, finished_at=NOW(3) WHERE id=%s",
(error[:9999], job_id)
)
Adaptive Bitrate (HLS) Packaging
Each rendition produces a set of 6-second .ts segments and an index.m3u8 playlist. A master playlist (master.m3u8) references all renditions with their bandwidth and resolution. The video player selects the appropriate rendition based on available bandwidth and switches between renditions at segment boundaries.
Thumbnail Sprite Sheets
FFmpeg’s tile filter composites one frame every N seconds into a grid image. A JSON manifest records the timestamp, row, and column for each frame so the player can render a preview thumbnail on timeline scrub without downloading the video segments.
CDN Invalidation on Reprocess
When a video is reprocessed (e.g., after a transcoding bug fix), the new segments are uploaded to the same S3 keys. A CloudFront invalidation for /assets/{asset_id}/* purges cached copies from all edge locations. To avoid invalidation cost on initial processing, the worker skips invalidation if the rendition rows did not previously exist.
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering