Media Processing Pipeline Low-Level Design: Transcoding, Thumbnail Generation, and CDN Delivery

Media Processing Pipeline: Low-Level Design

A media processing pipeline handles the journey from raw video upload to CDN-delivered adaptive bitrate streams. The pipeline must be asynchronous, fault-tolerant, and horizontally scalable. This article designs the full pipeline: upload trigger through SNS/SQS to worker, FFmpeg transcoding to multiple renditions, HLS packaging, thumbnail sprite generation, MediaInfo metadata extraction, and CloudFront CDN invalidation, with SQL schema and Python worker implementation.

Pipeline Overview

  1. Client uploads raw video to S3 (pre-signed URL).
  2. S3 event notification → SNS topic → SQS queue.
  3. Transcoding worker polls SQS, downloads raw file, runs FFmpeg, uploads renditions to S3.
  4. Worker generates thumbnail sprite sheet, extracts MediaInfo metadata.
  5. Worker writes rendition records to the database and invalidates CloudFront cache.

SQL Schema


CREATE TABLE MediaAsset (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    owner_id        BIGINT UNSIGNED   NOT NULL,
    title           VARCHAR(512)      NULL,
    original_key    VARCHAR(1024)     NOT NULL,  -- S3 key of raw upload
    status          ENUM('uploaded','processing','ready','failed') NOT NULL DEFAULT 'uploaded',
    duration_secs   DECIMAL(10,3)     NULL,
    width           SMALLINT UNSIGNED NULL,
    height          SMALLINT UNSIGNED NULL,
    framerate       DECIMAL(6,3)      NULL,
    file_size_bytes BIGINT UNSIGNED   NULL,
    mime_type       VARCHAR(128)      NULL,
    meta            JSON              NULL,      -- full MediaInfo JSON
    created_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    INDEX idx_owner_status (owner_id, status),
    INDEX idx_status_time  (status, created_at DESC)
) ENGINE=InnoDB;

CREATE TABLE TranscodeJob (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    asset_id        BIGINT UNSIGNED   NOT NULL,
    status          ENUM('queued','running','done','failed') NOT NULL DEFAULT 'queued',
    worker_id       VARCHAR(64)       NULL,
    attempt         TINYINT UNSIGNED  NOT NULL DEFAULT 0,
    max_attempts    TINYINT UNSIGNED  NOT NULL DEFAULT 3,
    sqs_receipt     VARCHAR(1024)     NULL,
    error_msg       TEXT              NULL,
    queued_at       DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    started_at      DATETIME(3)       NULL,
    finished_at     DATETIME(3)       NULL,
    PRIMARY KEY (id),
    INDEX idx_asset  (asset_id),
    INDEX idx_status (status, queued_at ASC)
) ENGINE=InnoDB;

CREATE TABLE MediaRendition (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    asset_id        BIGINT UNSIGNED   NOT NULL,
    profile         VARCHAR(32)       NOT NULL,  -- '360p','720p','1080p','audio_only'
    s3_key          VARCHAR(1024)     NOT NULL,  -- HLS m3u8 manifest or mp4 key
    codec_video     VARCHAR(32)       NULL,      -- 'h264','h265','vp9'
    codec_audio     VARCHAR(32)       NULL,
    bitrate_kbps    INT UNSIGNED      NULL,
    width           SMALLINT UNSIGNED NULL,
    height          SMALLINT UNSIGNED NULL,
    file_size_bytes BIGINT UNSIGNED   NULL,
    created_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    UNIQUE KEY uq_asset_profile (asset_id, profile),
    INDEX idx_asset (asset_id)
) ENGINE=InnoDB;

Python Worker Implementation


import boto3
import json
import os
import subprocess
import tempfile
import uuid
from pathlib import Path
import db

s3 = boto3.client("s3")
sqs = boto3.client("sqs")
sns = boto3.client("sns")
cloudfront = boto3.client("cloudfront")

S3_BUCKET = os.environ["MEDIA_BUCKET"]
CDN_DISTRIBUTION_ID = os.environ["CF_DISTRIBUTION_ID"]
SQS_QUEUE_URL = os.environ["TRANSCODE_QUEUE_URL"]

RENDITION_PROFILES = [
    {"name": "360p",  "width": 640,  "height": 360,  "vbitrate": "800k",  "abitrate": "96k"},
    {"name": "720p",  "width": 1280, "height": 720,  "vbitrate": "2500k", "abitrate": "128k"},
    {"name": "1080p", "width": 1920, "height": 1080, "vbitrate": "5000k", "abitrate": "192k"},
]


def submit_transcode_job(asset_id: int) -> int:
    """Create a TranscodeJob and publish to SQS."""
    job_id = db.execute(
        "INSERT INTO TranscodeJob (asset_id) VALUES (%s)", (asset_id,)
    )
    message = json.dumps({"job_id": job_id, "asset_id": asset_id})
    sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message)
    return job_id


def process_transcode(sqs_message: dict) -> None:
    """
    Main worker function — called for each SQS message.
    Downloads raw video, transcodes all profiles, packages HLS,
    generates thumbnails, and invalidates CDN.
    """
    body = json.loads(sqs_message["Body"])
    job_id = body["job_id"]
    asset_id = body["asset_id"]
    receipt = sqs_message["ReceiptHandle"]

    # claim job
    updated = db.execute(
        """UPDATE TranscodeJob SET status='running', worker_id=%s, started_at=NOW(3),
                                   attempt=attempt+1, sqs_receipt=%s
           WHERE id=%s AND status='queued' AND attempt  None:
    """Run FFmpeg to produce HLS segments for a single profile."""
    cmd = [
        "ffmpeg", "-y", "-i", str(input_path),
        "-c:v", "libx264",
        "-preset", "fast",
        "-b:v", profile["vbitrate"],
        "-vf", f"scale={profile['width']}:{profile['height']}:force_original_aspect_ratio=decrease,pad={profile['width']}:{profile['height']}:(ow-iw)/2:(oh-ih)/2",
        "-c:a", "aac",
        "-b:a", profile["abitrate"],
        "-hls_time", "6",
        "-hls_playlist_type", "vod",
        "-hls_segment_filename", str(output_dir / "seg%05d.ts"),
        str(output_dir / "index.m3u8"),
    ]
    result = subprocess.run(cmd, capture_output=True, timeout=3600)
    if result.returncode != 0:
        raise RuntimeError(f"FFmpeg failed: {result.stderr.decode()[-2000:]}")


def _extract_mediainfo(path: Path) -> dict:
    result = subprocess.run(
        ["mediainfo", "--Output=JSON", str(path)],
        capture_output=True, timeout=60
    )
    data = json.loads(result.stdout)
    tracks = data.get("media", {}).get("track", [])
    meta = {}
    for t in tracks:
        if t.get("@type") == "Video":
            meta["width"] = int(t.get("Width", 0))
            meta["height"] = int(t.get("Height", 0))
            meta["framerate"] = float(t.get("FrameRate", 0))
            meta["duration"] = float(t.get("Duration", 0))
            meta["codec_video"] = t.get("Format", "")
        elif t.get("@type") == "Audio":
            meta["codec_audio"] = t.get("Format", "")
    return meta


def _generate_sprite(input_path: Path, output_path: Path, interval_secs: int) -> None:
    """Generate a thumbnail contact sheet using FFmpeg tile filter."""
    cmd = [
        "ffmpeg", "-y", "-i", str(input_path),
        "-vf", f"fps=1/{interval_secs},scale=160:90,tile=10x10",
        "-frames:v", "1",
        str(output_path),
    ]
    subprocess.run(cmd, capture_output=True, timeout=300, check=True)


def _build_master_playlist(rendition_keys: list, profiles: list) -> str:
    lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
    for (profile_name, manifest_key), profile in zip(rendition_keys, profiles):
        bw = int(profile["vbitrate"].rstrip("k")) * 1000
        lines.append(f'#EXT-X-STREAM-INF:BANDWIDTH={bw},RESOLUTION={profile["width"]}x{profile["height"]}')
        lines.append(manifest_key)
    return "n".join(lines)


def _invalidate_cdn(pattern: str) -> None:
    cloudfront.create_invalidation(
        DistributionId=CDN_DISTRIBUTION_ID,
        InvalidationBatch={
            "Paths": {"Quantity": 1, "Items": [pattern]},
            "CallerReference": str(uuid.uuid4()),
        }
    )


def _upload_directory(local_dir: Path, bucket: str, prefix: str) -> None:
    for f in local_dir.iterdir():
        if f.is_file():
            content_type = "application/x-mpegURL" if f.suffix == ".m3u8" else "video/MP2T"
            s3.upload_file(str(f), bucket, f"{prefix}{f.name}",
                           ExtraArgs={"ContentType": content_type})


def _fail_job(job_id: int, error: str) -> None:
    db.execute(
        "UPDATE TranscodeJob SET status='failed', error_msg=%s, finished_at=NOW(3) WHERE id=%s",
        (error[:9999], job_id)
    )

Adaptive Bitrate (HLS) Packaging

Each rendition produces a set of 6-second .ts segments and an index.m3u8 playlist. A master playlist (master.m3u8) references all renditions with their bandwidth and resolution. The video player selects the appropriate rendition based on available bandwidth and switches between renditions at segment boundaries.

Thumbnail Sprite Sheets

FFmpeg’s tile filter composites one frame every N seconds into a grid image. A JSON manifest records the timestamp, row, and column for each frame so the player can render a preview thumbnail on timeline scrub without downloading the video segments.

CDN Invalidation on Reprocess

When a video is reprocessed (e.g., after a transcoding bug fix), the new segments are uploaded to the same S3 keys. A CloudFront invalidation for /assets/{asset_id}/* purges cached copies from all edge locations. To avoid invalidation cost on initial processing, the worker skips invalidation if the rendition rows did not previously exist.

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top