Media Processing Pipeline Low-Level Design: Transcoding, Thumbnail Generation, and CDN Delivery

Media Processing Pipeline: Low-Level Design

A media processing pipeline handles the journey from raw video upload to CDN-delivered adaptive bitrate streams. The pipeline must be asynchronous, fault-tolerant, and horizontally scalable. This article designs the full pipeline: upload trigger through SNS/SQS to worker, FFmpeg transcoding to multiple renditions, HLS packaging, thumbnail sprite generation, MediaInfo metadata extraction, and CloudFront CDN invalidation, with SQL schema and Python worker implementation.

Pipeline Overview

  1. Client uploads raw video to S3 (pre-signed URL).
  2. S3 event notification → SNS topic → SQS queue.
  3. Transcoding worker polls SQS, downloads raw file, runs FFmpeg, uploads renditions to S3.
  4. Worker generates thumbnail sprite sheet, extracts MediaInfo metadata.
  5. Worker writes rendition records to the database and invalidates CloudFront cache.

SQL Schema


CREATE TABLE MediaAsset (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    owner_id        BIGINT UNSIGNED   NOT NULL,
    title           VARCHAR(512)      NULL,
    original_key    VARCHAR(1024)     NOT NULL,  -- S3 key of raw upload
    status          ENUM('uploaded','processing','ready','failed') NOT NULL DEFAULT 'uploaded',
    duration_secs   DECIMAL(10,3)     NULL,
    width           SMALLINT UNSIGNED NULL,
    height          SMALLINT UNSIGNED NULL,
    framerate       DECIMAL(6,3)      NULL,
    file_size_bytes BIGINT UNSIGNED   NULL,
    mime_type       VARCHAR(128)      NULL,
    meta            JSON              NULL,      -- full MediaInfo JSON
    created_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    INDEX idx_owner_status (owner_id, status),
    INDEX idx_status_time  (status, created_at DESC)
) ENGINE=InnoDB;

CREATE TABLE TranscodeJob (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    asset_id        BIGINT UNSIGNED   NOT NULL,
    status          ENUM('queued','running','done','failed') NOT NULL DEFAULT 'queued',
    worker_id       VARCHAR(64)       NULL,
    attempt         TINYINT UNSIGNED  NOT NULL DEFAULT 0,
    max_attempts    TINYINT UNSIGNED  NOT NULL DEFAULT 3,
    sqs_receipt     VARCHAR(1024)     NULL,
    error_msg       TEXT              NULL,
    queued_at       DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    started_at      DATETIME(3)       NULL,
    finished_at     DATETIME(3)       NULL,
    PRIMARY KEY (id),
    INDEX idx_asset  (asset_id),
    INDEX idx_status (status, queued_at ASC)
) ENGINE=InnoDB;

CREATE TABLE MediaRendition (
    id              BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    asset_id        BIGINT UNSIGNED   NOT NULL,
    profile         VARCHAR(32)       NOT NULL,  -- '360p','720p','1080p','audio_only'
    s3_key          VARCHAR(1024)     NOT NULL,  -- HLS m3u8 manifest or mp4 key
    codec_video     VARCHAR(32)       NULL,      -- 'h264','h265','vp9'
    codec_audio     VARCHAR(32)       NULL,
    bitrate_kbps    INT UNSIGNED      NULL,
    width           SMALLINT UNSIGNED NULL,
    height          SMALLINT UNSIGNED NULL,
    file_size_bytes BIGINT UNSIGNED   NULL,
    created_at      DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    UNIQUE KEY uq_asset_profile (asset_id, profile),
    INDEX idx_asset (asset_id)
) ENGINE=InnoDB;

Python Worker Implementation


import boto3
import json
import os
import subprocess
import tempfile
import uuid
from pathlib import Path
import db

s3 = boto3.client("s3")
sqs = boto3.client("sqs")
sns = boto3.client("sns")
cloudfront = boto3.client("cloudfront")

S3_BUCKET = os.environ["MEDIA_BUCKET"]
CDN_DISTRIBUTION_ID = os.environ["CF_DISTRIBUTION_ID"]
SQS_QUEUE_URL = os.environ["TRANSCODE_QUEUE_URL"]

RENDITION_PROFILES = [
    {"name": "360p",  "width": 640,  "height": 360,  "vbitrate": "800k",  "abitrate": "96k"},
    {"name": "720p",  "width": 1280, "height": 720,  "vbitrate": "2500k", "abitrate": "128k"},
    {"name": "1080p", "width": 1920, "height": 1080, "vbitrate": "5000k", "abitrate": "192k"},
]


def submit_transcode_job(asset_id: int) -> int:
    """Create a TranscodeJob and publish to SQS."""
    job_id = db.execute(
        "INSERT INTO TranscodeJob (asset_id) VALUES (%s)", (asset_id,)
    )
    message = json.dumps({"job_id": job_id, "asset_id": asset_id})
    sqs.send_message(QueueUrl=SQS_QUEUE_URL, MessageBody=message)
    return job_id


def process_transcode(sqs_message: dict) -> None:
    """
    Main worker function — called for each SQS message.
    Downloads raw video, transcodes all profiles, packages HLS,
    generates thumbnails, and invalidates CDN.
    """
    body = json.loads(sqs_message["Body"])
    job_id = body["job_id"]
    asset_id = body["asset_id"]
    receipt = sqs_message["ReceiptHandle"]

    # claim job
    updated = db.execute(
        """UPDATE TranscodeJob SET status='running', worker_id=%s, started_at=NOW(3),
                                   attempt=attempt+1, sqs_receipt=%s
           WHERE id=%s AND status='queued' AND attempt  None:
    """Run FFmpeg to produce HLS segments for a single profile."""
    cmd = [
        "ffmpeg", "-y", "-i", str(input_path),
        "-c:v", "libx264",
        "-preset", "fast",
        "-b:v", profile["vbitrate"],
        "-vf", f"scale={profile['width']}:{profile['height']}:force_original_aspect_ratio=decrease,pad={profile['width']}:{profile['height']}:(ow-iw)/2:(oh-ih)/2",
        "-c:a", "aac",
        "-b:a", profile["abitrate"],
        "-hls_time", "6",
        "-hls_playlist_type", "vod",
        "-hls_segment_filename", str(output_dir / "seg%05d.ts"),
        str(output_dir / "index.m3u8"),
    ]
    result = subprocess.run(cmd, capture_output=True, timeout=3600)
    if result.returncode != 0:
        raise RuntimeError(f"FFmpeg failed: {result.stderr.decode()[-2000:]}")


def _extract_mediainfo(path: Path) -> dict:
    result = subprocess.run(
        ["mediainfo", "--Output=JSON", str(path)],
        capture_output=True, timeout=60
    )
    data = json.loads(result.stdout)
    tracks = data.get("media", {}).get("track", [])
    meta = {}
    for t in tracks:
        if t.get("@type") == "Video":
            meta["width"] = int(t.get("Width", 0))
            meta["height"] = int(t.get("Height", 0))
            meta["framerate"] = float(t.get("FrameRate", 0))
            meta["duration"] = float(t.get("Duration", 0))
            meta["codec_video"] = t.get("Format", "")
        elif t.get("@type") == "Audio":
            meta["codec_audio"] = t.get("Format", "")
    return meta


def _generate_sprite(input_path: Path, output_path: Path, interval_secs: int) -> None:
    """Generate a thumbnail contact sheet using FFmpeg tile filter."""
    cmd = [
        "ffmpeg", "-y", "-i", str(input_path),
        "-vf", f"fps=1/{interval_secs},scale=160:90,tile=10x10",
        "-frames:v", "1",
        str(output_path),
    ]
    subprocess.run(cmd, capture_output=True, timeout=300, check=True)


def _build_master_playlist(rendition_keys: list, profiles: list) -> str:
    lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
    for (profile_name, manifest_key), profile in zip(rendition_keys, profiles):
        bw = int(profile["vbitrate"].rstrip("k")) * 1000
        lines.append(f'#EXT-X-STREAM-INF:BANDWIDTH={bw},RESOLUTION={profile["width"]}x{profile["height"]}')
        lines.append(manifest_key)
    return "n".join(lines)


def _invalidate_cdn(pattern: str) -> None:
    cloudfront.create_invalidation(
        DistributionId=CDN_DISTRIBUTION_ID,
        InvalidationBatch={
            "Paths": {"Quantity": 1, "Items": [pattern]},
            "CallerReference": str(uuid.uuid4()),
        }
    )


def _upload_directory(local_dir: Path, bucket: str, prefix: str) -> None:
    for f in local_dir.iterdir():
        if f.is_file():
            content_type = "application/x-mpegURL" if f.suffix == ".m3u8" else "video/MP2T"
            s3.upload_file(str(f), bucket, f"{prefix}{f.name}",
                           ExtraArgs={"ContentType": content_type})


def _fail_job(job_id: int, error: str) -> None:
    db.execute(
        "UPDATE TranscodeJob SET status='failed', error_msg=%s, finished_at=NOW(3) WHERE id=%s",
        (error[:9999], job_id)
    )

Adaptive Bitrate (HLS) Packaging

Each rendition produces a set of 6-second .ts segments and an index.m3u8 playlist. A master playlist (master.m3u8) references all renditions with their bandwidth and resolution. The video player selects the appropriate rendition based on available bandwidth and switches between renditions at segment boundaries.

Thumbnail Sprite Sheets

FFmpeg’s tile filter composites one frame every N seconds into a grid image. A JSON manifest records the timestamp, row, and column for each frame so the player can render a preview thumbnail on timeline scrub without downloading the video segments.

CDN Invalidation on Reprocess

When a video is reprocessed (e.g., after a transcoding bug fix), the new segments are uploaded to the same S3 keys. A CloudFront invalidation for /assets/{asset_id}/* purges cached copies from all edge locations. To avoid invalidation cost on initial processing, the worker skips invalidation if the rendition rows did not previously exist.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use SQS instead of directly triggering the transcoding worker from the upload?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “SQS decouples the upload event from the processing worker and provides durability, retry logic, and back-pressure. If all workers are busy, messages queue up without dropping uploads. SQS visibility timeout acts as a distributed lock: a worker claims a message by making it invisible; if the worker crashes, the message reappears after the timeout for another worker. Dead-letter queues capture jobs that exceed max_attempts without manual intervention.”
}
},
{
“@type”: “Question”,
“name”: “What is HLS adaptive bitrate streaming and how is it packaged?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “HLS (HTTP Live Streaming) splits video into short segments (e.g., 6 seconds) encoded at multiple bitrates. A master playlist (m3u8) lists each rendition with its bandwidth and resolution. The player downloads the master playlist, measures network bandwidth, selects an appropriate rendition, and fetches segments. It can switch renditions at any segment boundary. FFmpeg produces segments and per-rendition playlists; the pipeline assembles the master playlist from the rendition keys.”
}
},
{
“@type”: “Question”,
“name”: “How do thumbnail sprite sheets work for video timeline previews?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “FFmpeg extracts one frame per N seconds and tiles them into a single JPEG contact sheet using the tile filter. A JSON manifest records each frame’s timestamp, row index, and column index in the grid. When the user scrubs the video timeline, the player CSS-clips the sprite image to the frame corresponding to the hovered timestamp. This delivers timeline previews as a single HTTP request instead of fetching individual frame images.”
}
},
{
“@type”: “Question”,
“name”: “When and how do you invalidate a CDN cache after reprocessing a video?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “After uploading new rendition files to the same S3 keys (e.g., during a reprocess after a transcoding fix), issue a CloudFront invalidation for the path pattern /assets/{asset_id}/*. This purges all cached copies at every edge location. Invalidation costs apply per path per distribution, so skip invalidation on initial processing (nothing cached yet) and batch multiple asset invalidations in a single API call when reprocessing in bulk.”
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How is the transcoding job queue implemented?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”An S3 upload event triggers an SNS notification to SQS; workers poll SQS with long-polling, using visibility timeout as a distributed lock to prevent double-processing.”}},{“@type”:”Question”,”name”:”How does FFmpeg produce adaptive bitrate renditions?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”FFmpeg runs with multiple -vf scale outputs and -c:v libx264 with CRF values tuned per resolution; HLS segmenter (-hls_time 6) produces .m3u8 playlists and .ts segment files.”}},{“@type”:”Question”,”name”:”How is thumbnail sprite sheet generated?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”FFmpeg’s tile filter (fps=1/10,scale=160:-1,tile=10×10) generates a single sprite image; the sprite layout metadata (frame interval, dimensions) is stored in MediaRendition for player use.”}},{“@type”:”Question”,”name”:”How is CDN cache invalidated after reprocessing?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”CloudFront CreateInvalidation is called with the path pattern for the media asset’s directory; a worker polls the invalidation status before marking the MediaAsset as republished.”}}]}

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top