Image Moderation Pipeline Low-Level Design
User-generated image content requires automated screening at scale. A purely manual review operation collapses under volume; a purely automated pipeline blocks legitimate content and upsets users. The production answer is a tiered pipeline: hash-based blocklist for known bad content (instant, zero compute), ML classifier for unknown images, automatic accept/reject at high-confidence thresholds, human review queue for ambiguous cases, and an appeals workflow for disputed decisions.
Stage 1: Perceptual Hash Blocklist
A cryptographic hash (SHA-256) of an image changes if even one pixel differs. A perceptual hash (pHash) captures visual similarity and is robust to resizing, JPEG re-compression, and minor edits. Check pHash against a blocklist before spending any compute on ML inference:
import imagehash
from PIL import Image
import io
from typing import Optional
PHASH_THRESHOLD = 8 # Hamming distance; 0 = identical, higher = more different
def compute_phash(image_bytes: bytes) -> str:
"""Returns 64-bit pHash as hex string."""
img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
return str(imagehash.phash(img))
def check_phash_blocklist(db, phash: str) -> Optional[str]:
"""
Returns violation category if the image matches a blocklist entry,
None if clean. Uses Hamming distance to catch near-duplicates.
"""
# Store blocklist hashes as 64-bit integers for fast XOR comparison
phash_int = int(phash, 16)
rows = db.fetchall(
"""
SELECT category, phash_hex,
bit_count(phash_int # %s::bit(64)) AS hamming_dist
FROM phash_blocklist
WHERE bit_count(phash_int # %s::bit(64)) <= %s
ORDER BY hamming_dist ASC
LIMIT 1
""",
(phash_int, phash_int, PHASH_THRESHOLD)
)
return rows[0]['category'] if rows else None
SQL Schema
CREATE TABLE moderation_job (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
uploader_id BIGINT NOT NULL,
s3_key TEXT NOT NULL,
phash_hex TEXT,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN
('pending','phash_check','ml_pending','ml_done',
'auto_approved','auto_rejected','human_review',
'approved','rejected')),
final_decision TEXT CHECK (final_decision IN ('approved','rejected')),
decision_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
decided_at TIMESTAMPTZ
);
CREATE INDEX idx_mj_status ON moderation_job (status, created_at);
CREATE TABLE moderation_label (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES moderation_job(job_id),
label TEXT NOT NULL, -- e.g. 'explicit_nudity', 'violence'
confidence NUMERIC(5,4) NOT NULL, -- 0.0000 - 1.0000
source TEXT NOT NULL DEFAULT 'ml' -- 'ml', 'phash', 'human'
);
CREATE INDEX idx_ml_job ON moderation_label (job_id);
CREATE TABLE human_review_task (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL REFERENCES moderation_job(job_id) UNIQUE,
assigned_to BIGINT, -- moderator user_id, NULL = unassigned
priority INT NOT NULL DEFAULT 5, -- 1=highest, 10=lowest
queue TEXT NOT NULL DEFAULT 'standard',
notes TEXT,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','in_review','completed','escalated')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE INDEX idx_hrt_queue ON human_review_task (queue, priority, created_at)
WHERE status = 'pending';
CREATE TABLE moderation_appeal (
id BIGSERIAL PRIMARY KEY,
appeal_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
job_id UUID NOT NULL REFERENCES moderation_job(job_id),
appellant_id BIGINT NOT NULL,
reason TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open'
CHECK (status IN ('open','under_review','upheld','dismissed')),
reviewer_id BIGINT,
reviewer_notes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ
);
CREATE TABLE phash_blocklist (
id BIGSERIAL PRIMARY KEY,
phash_hex TEXT NOT NULL UNIQUE,
phash_int BIT(64) NOT NULL,
category TEXT NOT NULL, -- 'csam','explicit','violence'
added_by BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ML Classification with Confidence Thresholds
import boto3
from typing import List, Dict
REKOGNITION = boto3.client('rekognition')
# Decision thresholds
AUTO_REJECT_CONFIDENCE = 0.95 # >= 95%: auto-reject
AUTO_APPROVE_CONFIDENCE = 0.10 # List[Dict]:
"""
Calls AWS Rekognition DetectModerationLabels.
Returns list of {label, confidence} dicts for violation categories only.
"""
response = REKOGNITION.detect_moderation_labels(
Image={'S3Object': {'Bucket': s3_bucket, 'Name': s3_key}},
MinConfidence=5.0 # get all signals; we apply our own thresholds
)
results = []
for label in response.get('ModerationLabels', []):
parent = label.get('ParentName', '')
name = label['Name']
check_name = parent if parent in VIOLATION_LABELS else name
if check_name in VIOLATION_LABELS:
results.append({
'label': name,
'confidence': round(label['Confidence'] / 100, 4)
})
return results
def moderate_image(db, job_id: str, s3_bucket: str) -> str:
"""
Runs full moderation pipeline. Returns final status string.
"""
job = db.fetchone("SELECT * FROM moderation_job WHERE job_id=%s", (job_id,))
# Stage 1: pHash blocklist
if job['phash_hex']:
blocked_category = check_phash_blocklist(db, job['phash_hex'])
if blocked_category:
db.execute(
"""INSERT INTO moderation_label (job_id, label, confidence, source)
VALUES (%s, %s, 1.0, 'phash')""",
(job_id, blocked_category)
)
db.execute(
"""UPDATE moderation_job SET status='auto_rejected',
final_decision='rejected', decision_reason='phash_blocklist',
decided_at=NOW() WHERE job_id=%s""",
(job_id,)
)
db.commit()
return 'auto_rejected'
# Stage 2: ML classification
labels = run_ml_classification(s3_bucket, job['s3_key'])
for lbl in labels:
db.execute(
"INSERT INTO moderation_label (job_id, label, confidence) VALUES (%s,%s,%s)",
(job_id, lbl['label'], lbl['confidence'])
)
max_confidence = max((l['confidence'] for l in labels), default=0.0)
if max_confidence >= AUTO_REJECT_CONFIDENCE:
status = 'auto_rejected'
decision = 'rejected'
reason = f"ml_confidence_{max_confidence:.2f}"
elif max_confidence <= AUTO_APPROVE_CONFIDENCE:
status = 'auto_approved'
decision = 'approved'
reason = 'ml_clean'
else:
status = 'human_review'
decision = None
reason = None
db.execute(
"""UPDATE moderation_job SET status=%s, final_decision=%s,
decision_reason=%s, decided_at=CASE WHEN %s!='human_review' THEN NOW() END
WHERE job_id=%s""",
(status, decision, reason, status, job_id)
)
db.commit()
if status == 'human_review':
route_to_human_review(db, job_id, max_confidence)
return status
Human Review Queue Routing
def route_to_human_review(db, job_id: str, max_confidence: float) -> None:
"""
Assigns priority and queue based on violation confidence level.
High confidence near the threshold gets priority treatment.
"""
if max_confidence >= 0.80:
priority = 2
queue = 'urgent'
elif max_confidence >= 0.50:
priority = 5
queue = 'standard'
else:
priority = 8
queue = 'low_signal'
db.execute(
"""INSERT INTO human_review_task (job_id, priority, queue)
VALUES (%s, %s, %s)
ON CONFLICT (job_id) DO NOTHING""",
(job_id, priority, queue)
)
db.commit()
Appeals Workflow
def submit_appeal(db, job_id: str, appellant_id: int, reason: str) -> str:
"""User appeals an auto-rejection. Returns appeal_id."""
job = db.fetchone(
"SELECT final_decision FROM moderation_job WHERE job_id=%s", (job_id,)
)
if not job or job['final_decision'] != 'rejected':
raise ValueError("Only rejected moderation decisions can be appealed")
existing = db.fetchone(
"SELECT id FROM moderation_appeal WHERE job_id=%s AND appellant_id=%s",
(job_id, appellant_id)
)
if existing:
raise ValueError("Appeal already submitted for this decision")
appeal_id = str(uuid.uuid4())
db.execute(
"""INSERT INTO moderation_appeal (appeal_id, job_id, appellant_id, reason)
VALUES (%s,%s,%s,%s)""",
(appeal_id, job_id, appellant_id, reason)
)
# Route to human review with highest priority
db.execute(
"""INSERT INTO human_review_task (job_id, priority, queue, notes)
VALUES (%s, 1, 'appeals', %s)
ON CONFLICT (job_id) DO UPDATE SET
priority = 1, queue = 'appeals', notes = EXCLUDED.notes""",
(job_id, f"Appeal: {reason[:200]}")
)
db.commit()
return appeal_id
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why check perceptual hash before running ML on uploaded images?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A pHash blocklist check is microseconds of CPU and a single indexed database query. ML inference via Rekognition or a GPU model costs 50-200ms and money per call. Known-bad images (CSAM, viral explicit content) are caught instantly by the hash check without paying the ML cost, and near-duplicates edited to evade the hash are still caught within the Hamming distance threshold.”
}
},
{
“@type”: “Question”,
“name”: “How do you set auto-approve and auto-reject confidence thresholds?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Set thresholds empirically using a labeled validation set. Measure precision and recall at each threshold. The auto-reject threshold should be set for very high precision (few false positives) even at the cost of recall. The auto-approve threshold should be set for very high recall on clean content. Everything in between goes to human review — calibrate thresholds so human review volume stays within your moderation team’s capacity.”
}
},
{
“@type”: “Question”,
“name”: “What is the difference between a perceptual hash and a cryptographic hash for image deduplication?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A cryptographic hash (SHA-256) changes completely if even one bit of the file differs, making it useless for finding visually similar but technically different images. A perceptual hash (pHash, dHash) encodes visual structure as a 64-bit integer. The Hamming distance between two pHashes measures visual similarity: distance 0 is identical, distance under 10 means nearly identical. This catches re-encoded, resized, or slightly cropped versions of the same image.”
}
},
{
“@type”: “Question”,
“name”: “How do you prevent a moderation pipeline from becoming a bottleneck during upload spikes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Decouple upload from moderation: accept the upload immediately, store to S3, create a moderation_job record with status=’pending’, and return the upload ID to the user. A separate worker pool picks up jobs from the queue. Show the content as ‘under review’ to other users until a decision is made. Use priority queues so urgent cases (high-confidence violations) pre-empt lower-priority work.”
}
}
]
}
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What is a perceptual hash and how is it used for moderation?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”pHash generates a fingerprint based on image visual features; known-bad images are stored in a blocklist and new uploads are matched by Hamming distance threshold.”}},{“@type”:”Question”,”name”:”How are ML confidence scores used to route images?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Scores above an auto-reject threshold immediately reject the image; scores below an auto-approve threshold pass it; scores in between are sent to human review.”}},{“@type”:”Question”,”name”:”How does the human review queue prioritize items?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Items are prioritized by confidence score (borderline cases first), content category severity, and time in queue to prevent starvation.”}},{“@type”:”Question”,”name”:”How do moderation appeals work?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A user submits an appeal that creates a ModerationAppeal record; a senior reviewer re-evaluates the original decision, and if overturned, the content is reinstated with the original decision updated.”}}]}
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Snap Interview Guide
See also: Twitter/X Interview Guide 2026: Timeline Algorithms, Real-Time Search, and Content at Scale