Image Moderation Pipeline Low-Level Design: ML Classification, Human Review Queue, and Appeals

Image Moderation Pipeline Low-Level Design

User-generated image content requires automated screening at scale. A purely manual review operation collapses under volume; a purely automated pipeline blocks legitimate content and upsets users. The production answer is a tiered pipeline: hash-based blocklist for known bad content (instant, zero compute), ML classifier for unknown images, automatic accept/reject at high-confidence thresholds, human review queue for ambiguous cases, and an appeals workflow for disputed decisions.

Stage 1: Perceptual Hash Blocklist

A cryptographic hash (SHA-256) of an image changes if even one pixel differs. A perceptual hash (pHash) captures visual similarity and is robust to resizing, JPEG re-compression, and minor edits. Check pHash against a blocklist before spending any compute on ML inference:

import imagehash
from PIL import Image
import io
from typing import Optional

PHASH_THRESHOLD = 8   # Hamming distance; 0 = identical, higher = more different

def compute_phash(image_bytes: bytes) -> str:
    """Returns 64-bit pHash as hex string."""
    img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
    return str(imagehash.phash(img))

def check_phash_blocklist(db, phash: str) -> Optional[str]:
    """
    Returns violation category if the image matches a blocklist entry,
    None if clean. Uses Hamming distance to catch near-duplicates.
    """
    # Store blocklist hashes as 64-bit integers for fast XOR comparison
    phash_int = int(phash, 16)
    rows = db.fetchall(
        """
        SELECT category, phash_hex,
               bit_count(phash_int # %s::bit(64)) AS hamming_dist
        FROM phash_blocklist
        WHERE bit_count(phash_int # %s::bit(64)) <= %s
        ORDER BY hamming_dist ASC
        LIMIT 1
        """,
        (phash_int, phash_int, PHASH_THRESHOLD)
    )
    return rows[0]['category'] if rows else None

SQL Schema

CREATE TABLE moderation_job (
    id              BIGSERIAL PRIMARY KEY,
    job_id          UUID        NOT NULL UNIQUE DEFAULT gen_random_uuid(),
    uploader_id     BIGINT      NOT NULL,
    s3_key          TEXT        NOT NULL,
    phash_hex       TEXT,
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN
                                  ('pending','phash_check','ml_pending','ml_done',
                                   'auto_approved','auto_rejected','human_review',
                                   'approved','rejected')),
    final_decision  TEXT        CHECK (final_decision IN ('approved','rejected')),
    decision_reason TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    decided_at      TIMESTAMPTZ
);

CREATE INDEX idx_mj_status ON moderation_job (status, created_at);

CREATE TABLE moderation_label (
    id          BIGSERIAL PRIMARY KEY,
    job_id      UUID        NOT NULL REFERENCES moderation_job(job_id),
    label       TEXT        NOT NULL,              -- e.g. 'explicit_nudity', 'violence'
    confidence  NUMERIC(5,4) NOT NULL,             -- 0.0000 - 1.0000
    source      TEXT        NOT NULL DEFAULT 'ml'  -- 'ml', 'phash', 'human'
);

CREATE INDEX idx_ml_job ON moderation_label (job_id);

CREATE TABLE human_review_task (
    id              BIGSERIAL PRIMARY KEY,
    job_id          UUID        NOT NULL REFERENCES moderation_job(job_id) UNIQUE,
    assigned_to     BIGINT,                        -- moderator user_id, NULL = unassigned
    priority        INT         NOT NULL DEFAULT 5, -- 1=highest, 10=lowest
    queue           TEXT        NOT NULL DEFAULT 'standard',
    notes           TEXT,
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','in_review','completed','escalated')),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ
);

CREATE INDEX idx_hrt_queue ON human_review_task (queue, priority, created_at)
    WHERE status = 'pending';

CREATE TABLE moderation_appeal (
    id              BIGSERIAL PRIMARY KEY,
    appeal_id       UUID        NOT NULL UNIQUE DEFAULT gen_random_uuid(),
    job_id          UUID        NOT NULL REFERENCES moderation_job(job_id),
    appellant_id    BIGINT      NOT NULL,
    reason          TEXT        NOT NULL,
    status          TEXT        NOT NULL DEFAULT 'open'
                                CHECK (status IN ('open','under_review','upheld','dismissed')),
    reviewer_id     BIGINT,
    reviewer_notes  TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at     TIMESTAMPTZ
);

CREATE TABLE phash_blocklist (
    id          BIGSERIAL PRIMARY KEY,
    phash_hex   TEXT        NOT NULL UNIQUE,
    phash_int   BIT(64)     NOT NULL,
    category    TEXT        NOT NULL,   -- 'csam','explicit','violence'
    added_by    BIGINT,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

ML Classification with Confidence Thresholds

import boto3
from typing import List, Dict

REKOGNITION = boto3.client('rekognition')

# Decision thresholds
AUTO_REJECT_CONFIDENCE  = 0.95   # >= 95%: auto-reject
AUTO_APPROVE_CONFIDENCE = 0.10   #  List[Dict]:
    """
    Calls AWS Rekognition DetectModerationLabels.
    Returns list of {label, confidence} dicts for violation categories only.
    """
    response = REKOGNITION.detect_moderation_labels(
        Image={'S3Object': {'Bucket': s3_bucket, 'Name': s3_key}},
        MinConfidence=5.0   # get all signals; we apply our own thresholds
    )
    results = []
    for label in response.get('ModerationLabels', []):
        parent = label.get('ParentName', '')
        name   = label['Name']
        check_name = parent if parent in VIOLATION_LABELS else name
        if check_name in VIOLATION_LABELS:
            results.append({
                'label':      name,
                'confidence': round(label['Confidence'] / 100, 4)
            })
    return results


def moderate_image(db, job_id: str, s3_bucket: str) -> str:
    """
    Runs full moderation pipeline. Returns final status string.
    """
    job = db.fetchone("SELECT * FROM moderation_job WHERE job_id=%s", (job_id,))

    # Stage 1: pHash blocklist
    if job['phash_hex']:
        blocked_category = check_phash_blocklist(db, job['phash_hex'])
        if blocked_category:
            db.execute(
                """INSERT INTO moderation_label (job_id, label, confidence, source)
                   VALUES (%s, %s, 1.0, 'phash')""",
                (job_id, blocked_category)
            )
            db.execute(
                """UPDATE moderation_job SET status='auto_rejected',
                   final_decision='rejected', decision_reason='phash_blocklist',
                   decided_at=NOW() WHERE job_id=%s""",
                (job_id,)
            )
            db.commit()
            return 'auto_rejected'

    # Stage 2: ML classification
    labels = run_ml_classification(s3_bucket, job['s3_key'])
    for lbl in labels:
        db.execute(
            "INSERT INTO moderation_label (job_id, label, confidence) VALUES (%s,%s,%s)",
            (job_id, lbl['label'], lbl['confidence'])
        )

    max_confidence = max((l['confidence'] for l in labels), default=0.0)

    if max_confidence >= AUTO_REJECT_CONFIDENCE:
        status = 'auto_rejected'
        decision = 'rejected'
        reason = f"ml_confidence_{max_confidence:.2f}"
    elif max_confidence <= AUTO_APPROVE_CONFIDENCE:
        status = 'auto_approved'
        decision = 'approved'
        reason = 'ml_clean'
    else:
        status = 'human_review'
        decision = None
        reason = None

    db.execute(
        """UPDATE moderation_job SET status=%s, final_decision=%s,
           decision_reason=%s, decided_at=CASE WHEN %s!='human_review' THEN NOW() END
           WHERE job_id=%s""",
        (status, decision, reason, status, job_id)
    )
    db.commit()

    if status == 'human_review':
        route_to_human_review(db, job_id, max_confidence)

    return status

Human Review Queue Routing

def route_to_human_review(db, job_id: str, max_confidence: float) -> None:
    """
    Assigns priority and queue based on violation confidence level.
    High confidence near the threshold gets priority treatment.
    """
    if max_confidence >= 0.80:
        priority = 2
        queue    = 'urgent'
    elif max_confidence >= 0.50:
        priority = 5
        queue    = 'standard'
    else:
        priority = 8
        queue    = 'low_signal'

    db.execute(
        """INSERT INTO human_review_task (job_id, priority, queue)
           VALUES (%s, %s, %s)
           ON CONFLICT (job_id) DO NOTHING""",
        (job_id, priority, queue)
    )
    db.commit()

Appeals Workflow

def submit_appeal(db, job_id: str, appellant_id: int, reason: str) -> str:
    """User appeals an auto-rejection. Returns appeal_id."""
    job = db.fetchone(
        "SELECT final_decision FROM moderation_job WHERE job_id=%s", (job_id,)
    )
    if not job or job['final_decision'] != 'rejected':
        raise ValueError("Only rejected moderation decisions can be appealed")

    existing = db.fetchone(
        "SELECT id FROM moderation_appeal WHERE job_id=%s AND appellant_id=%s",
        (job_id, appellant_id)
    )
    if existing:
        raise ValueError("Appeal already submitted for this decision")

    appeal_id = str(uuid.uuid4())
    db.execute(
        """INSERT INTO moderation_appeal (appeal_id, job_id, appellant_id, reason)
           VALUES (%s,%s,%s,%s)""",
        (appeal_id, job_id, appellant_id, reason)
    )
    # Route to human review with highest priority
    db.execute(
        """INSERT INTO human_review_task (job_id, priority, queue, notes)
           VALUES (%s, 1, 'appeals', %s)
           ON CONFLICT (job_id) DO UPDATE SET
               priority = 1, queue = 'appeals', notes = EXCLUDED.notes""",
        (job_id, f"Appeal: {reason[:200]}")
    )
    db.commit()
    return appeal_id

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why check perceptual hash before running ML on uploaded images?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A pHash blocklist check is microseconds of CPU and a single indexed database query. ML inference via Rekognition or a GPU model costs 50-200ms and money per call. Known-bad images (CSAM, viral explicit content) are caught instantly by the hash check without paying the ML cost, and near-duplicates edited to evade the hash are still caught within the Hamming distance threshold.”
}
},
{
“@type”: “Question”,
“name”: “How do you set auto-approve and auto-reject confidence thresholds?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Set thresholds empirically using a labeled validation set. Measure precision and recall at each threshold. The auto-reject threshold should be set for very high precision (few false positives) even at the cost of recall. The auto-approve threshold should be set for very high recall on clean content. Everything in between goes to human review — calibrate thresholds so human review volume stays within your moderation team’s capacity.”
}
},
{
“@type”: “Question”,
“name”: “What is the difference between a perceptual hash and a cryptographic hash for image deduplication?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A cryptographic hash (SHA-256) changes completely if even one bit of the file differs, making it useless for finding visually similar but technically different images. A perceptual hash (pHash, dHash) encodes visual structure as a 64-bit integer. The Hamming distance between two pHashes measures visual similarity: distance 0 is identical, distance under 10 means nearly identical. This catches re-encoded, resized, or slightly cropped versions of the same image.”
}
},
{
“@type”: “Question”,
“name”: “How do you prevent a moderation pipeline from becoming a bottleneck during upload spikes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Decouple upload from moderation: accept the upload immediately, store to S3, create a moderation_job record with status=’pending’, and return the upload ID to the user. A separate worker pool picks up jobs from the queue. Show the content as ‘under review’ to other users until a decision is made. Use priority queues so urgent cases (high-confidence violations) pre-empt lower-priority work.”
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is a perceptual hash and how is it used for moderation?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”pHash generates a fingerprint based on image visual features; known-bad images are stored in a blocklist and new uploads are matched by Hamming distance threshold.”}},{“@type”:”Question”,”name”:”How are ML confidence scores used to route images?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Scores above an auto-reject threshold immediately reject the image; scores below an auto-approve threshold pass it; scores in between are sent to human review.”}},{“@type”:”Question”,”name”:”How does the human review queue prioritize items?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Items are prioritized by confidence score (borderline cases first), content category severity, and time in queue to prevent starvation.”}},{“@type”:”Question”,”name”:”How do moderation appeals work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A user submits an appeal that creates a ModerationAppeal record; a senior reviewer re-evaluates the original decision, and if overturned, the content is reinstated with the original decision updated.”}}]}

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: Snap Interview Guide

See also: Twitter/X Interview Guide 2026: Timeline Algorithms, Real-Time Search, and Content at Scale

Scroll to Top