Image Moderation Pipeline Low-Level Design
User-generated image content requires automated screening at scale. A purely manual review operation collapses under volume; a purely automated pipeline blocks legitimate content and upsets users. The production answer is a tiered pipeline: hash-based blocklist for known bad content (instant, zero compute), ML classifier for unknown images, automatic accept/reject at high-confidence thresholds, human review queue for ambiguous cases, and an appeals workflow for disputed decisions.
Stage 1: Perceptual Hash Blocklist
A cryptographic hash (SHA-256) of an image changes if even one pixel differs. A perceptual hash (pHash) captures visual similarity and is robust to resizing, JPEG re-compression, and minor edits. Check pHash against a blocklist before spending any compute on ML inference:
import imagehash
from PIL import Image
import io
from typing import Optional
PHASH_THRESHOLD = 8 # Hamming distance; 0 = identical, higher = more different
def compute_phash(image_bytes: bytes) -> str:
"""Returns 64-bit pHash as hex string."""
img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
return str(imagehash.phash(img))
def check_phash_blocklist(db, phash: str) -> Optional[str]:
"""
Returns violation category if the image matches a blocklist entry,
None if clean. Uses Hamming distance to catch near-duplicates.
"""
# Store blocklist hashes as 64-bit integers for fast XOR comparison
phash_int = int(phash, 16)
rows = db.fetchall(
"""
SELECT category, phash_hex,
bit_count(phash_int # %s::bit(64)) AS hamming_dist
FROM phash_blocklist
WHERE bit_count(phash_int # %s::bit(64)) <= %s
ORDER BY hamming_dist ASC
LIMIT 1
""",
(phash_int, phash_int, PHASH_THRESHOLD)
)
return rows[0]['category'] if rows else None
SQL Schema
CREATE TABLE moderation_job (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
uploader_id BIGINT NOT NULL,
s3_key TEXT NOT NULL,
phash_hex TEXT,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN
('pending','phash_check','ml_pending','ml_done',
'auto_approved','auto_rejected','human_review',
'approved','rejected')),
final_decision TEXT CHECK (final_decision IN ('approved','rejected')),
decision_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
decided_at TIMESTAMPTZ
);
CREATE INDEX idx_mj_status ON moderation_job (status, created_at);
CREATE TABLE moderation_label (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES moderation_job(job_id),
label TEXT NOT NULL, -- e.g. 'explicit_nudity', 'violence'
confidence NUMERIC(5,4) NOT NULL, -- 0.0000 - 1.0000
source TEXT NOT NULL DEFAULT 'ml' -- 'ml', 'phash', 'human'
);
CREATE INDEX idx_ml_job ON moderation_label (job_id);
CREATE TABLE human_review_task (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES moderation_job(job_id) UNIQUE,
assigned_to BIGINT, -- moderator user_id, NULL = unassigned
priority INT NOT NULL DEFAULT 5, -- 1=highest, 10=lowest
queue TEXT NOT NULL DEFAULT 'standard',
notes TEXT,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','in_review','completed','escalated')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_hrt_queue ON human_review_task (queue, priority, created_at)
WHERE status = 'pending';
CREATE TABLE moderation_appeal (
id BIGSERIAL PRIMARY KEY,
appeal_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
job_id UUID NOT NULL REFERENCES moderation_job(job_id),
appellant_id BIGINT NOT NULL,
reason TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open'
CHECK (status IN ('open','under_review','upheld','dismissed')),
reviewer_id BIGINT,
reviewer_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ
);
CREATE TABLE phash_blocklist (
id BIGSERIAL PRIMARY KEY,
phash_hex TEXT NOT NULL UNIQUE,
phash_int BIT(64) NOT NULL,
category TEXT NOT NULL, -- 'csam','explicit','violence'
added_by BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ML Classification with Confidence Thresholds
import boto3
from typing import List, Dict
REKOGNITION = boto3.client('rekognition')
# Decision thresholds
AUTO_REJECT_CONFIDENCE = 0.95 # >= 95%: auto-reject
AUTO_APPROVE_CONFIDENCE = 0.10 # List[Dict]:
"""
Calls AWS Rekognition DetectModerationLabels.
Returns list of {label, confidence} dicts for violation categories only.
"""
response = REKOGNITION.detect_moderation_labels(
Image={'S3Object': {'Bucket': s3_bucket, 'Name': s3_key}},
MinConfidence=5.0 # get all signals; we apply our own thresholds
)
results = []
for label in response.get('ModerationLabels', []):
parent = label.get('ParentName', '')
name = label['Name']
check_name = parent if parent in VIOLATION_LABELS else name
if check_name in VIOLATION_LABELS:
results.append({
'label': name,
'confidence': round(label['Confidence'] / 100, 4)
})
return results
def moderate_image(db, job_id: str, s3_bucket: str) -> str:
"""
Runs full moderation pipeline. Returns final status string.
"""
job = db.fetchone("SELECT * FROM moderation_job WHERE job_id=%s", (job_id,))
# Stage 1: pHash blocklist
if job['phash_hex']:
blocked_category = check_phash_blocklist(db, job['phash_hex'])
if blocked_category:
db.execute(
"""INSERT INTO moderation_label (job_id, label, confidence, source)
VALUES (%s, %s, 1.0, 'phash')""",
(job_id, blocked_category)
)
db.execute(
"""UPDATE moderation_job SET status='auto_rejected',
final_decision='rejected', decision_reason='phash_blocklist',
decided_at=NOW() WHERE job_id=%s""",
(job_id,)
)
db.commit()
return 'auto_rejected'
# Stage 2: ML classification
labels = run_ml_classification(s3_bucket, job['s3_key'])
for lbl in labels:
db.execute(
"INSERT INTO moderation_label (job_id, label, confidence) VALUES (%s,%s,%s)",
(job_id, lbl['label'], lbl['confidence'])
)
max_confidence = max((l['confidence'] for l in labels), default=0.0)
if max_confidence >= AUTO_REJECT_CONFIDENCE:
status = 'auto_rejected'
decision = 'rejected'
reason = f"ml_confidence_{max_confidence:.2f}"
elif max_confidence <= AUTO_APPROVE_CONFIDENCE:
status = 'auto_approved'
decision = 'approved'
reason = 'ml_clean'
else:
status = 'human_review'
decision = None
reason = None
db.execute(
"""UPDATE moderation_job SET status=%s, final_decision=%s,
decision_reason=%s, decided_at=CASE WHEN %s!='human_review' THEN NOW() END
WHERE job_id=%s""",
(status, decision, reason, status, job_id)
)
db.commit()
if status == 'human_review':
route_to_human_review(db, job_id, max_confidence)
return status
Human Review Queue Routing
def route_to_human_review(db, job_id: str, max_confidence: float) -> None:
"""
Assigns priority and queue based on violation confidence level.
High confidence near the threshold gets priority treatment.
"""
if max_confidence >= 0.80:
priority = 2
queue = 'urgent'
elif max_confidence >= 0.50:
priority = 5
queue = 'standard'
else:
priority = 8
queue = 'low_signal'
db.execute(
"""INSERT INTO human_review_task (job_id, priority, queue)
VALUES (%s, %s, %s)
ON CONFLICT (job_id) DO NOTHING""",
(job_id, priority, queue)
)
db.commit()
Appeals Workflow
def submit_appeal(db, job_id: str, appellant_id: int, reason: str) -> str:
"""User appeals an auto-rejection. Returns appeal_id."""
job = db.fetchone(
"SELECT final_decision FROM moderation_job WHERE job_id=%s", (job_id,)
)
if not job or job['final_decision'] != 'rejected':
raise ValueError("Only rejected moderation decisions can be appealed")
existing = db.fetchone(
"SELECT id FROM moderation_appeal WHERE job_id=%s AND appellant_id=%s",
(job_id, appellant_id)
)
if existing:
raise ValueError("Appeal already submitted for this decision")
appeal_id = str(uuid.uuid4())
db.execute(
"""INSERT INTO moderation_appeal (appeal_id, job_id, appellant_id, reason)
VALUES (%s,%s,%s,%s)""",
(appeal_id, job_id, appellant_id, reason)
)
# Route to human review with highest priority
db.execute(
"""INSERT INTO human_review_task (job_id, priority, queue, notes)
VALUES (%s, 1, 'appeals', %s)
ON CONFLICT (job_id) DO UPDATE SET
priority = 1, queue = 'appeals', notes = EXCLUDED.notes""",
(job_id, f"Appeal: {reason[:200]}")
)
db.commit()
return appeal_id
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Snap Interview Guide
See also: Twitter/X Interview Guide 2026: Timeline Algorithms, Real-Time Search, and Content at Scale