Data Retention Policy System Low-Level Design
Every production database accumulates data that must eventually be purged, archived, or anonymized — whether for storage cost, regulatory compliance (GDPR, CCPA, HIPAA), or contractual obligations. A data retention system codifies these rules as policy objects, runs scheduled evaluation jobs, and produces an immutable audit trail proving that deletion actually occurred. This article covers the complete low-level design: policy engine, archival to cold storage, soft-delete to hard-delete lifecycle, legal hold overrides, and right-to-erasure.
Core Concepts
- RetentionPolicy: A rule binding a data class (e.g.,
user_messages) to a retention duration and an action (archive, anonymize, delete). - DataClassification: A label applied to a table or column indicating sensitivity and regulatory scope.
- LegalHold: An override that suspends deletion for specific records pending litigation or audit.
- RetentionJob: A scheduled execution of policy evaluation against a target table.
SQL Schema
CREATE TABLE retention_policy (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
data_class TEXT NOT NULL, -- maps to data_classification.class_name
target_table TEXT NOT NULL,
date_column TEXT NOT NULL, -- column to measure age against
retain_days INT NOT NULL, -- records older than this are actioned
action TEXT NOT NULL CHECK (action IN ('archive','anonymize','delete')),
archive_bucket TEXT, -- S3 bucket for archive action
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE data_classification (
id BIGSERIAL PRIMARY KEY,
class_name TEXT NOT NULL UNIQUE, -- 'pii','financial','operational','system_log'
description TEXT,
regulatory TEXT[] -- e.g. ARRAY['gdpr','ccpa']
);
CREATE TABLE retention_job (
id BIGSERIAL PRIMARY KEY,
policy_id BIGINT NOT NULL REFERENCES retention_policy(id),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','running','completed','failed')),
records_scanned BIGINT NOT NULL DEFAULT 0,
records_actioned BIGINT NOT NULL DEFAULT 0,
error_message TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE retention_audit (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES retention_job(id),
policy_id BIGINT NOT NULL REFERENCES retention_policy(id),
target_table TEXT NOT NULL,
record_id BIGINT NOT NULL, -- PK of the actioned record
action TEXT NOT NULL,
archive_key TEXT, -- S3 object key if archived
actioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_ra_job ON retention_audit (job_id);
CREATE INDEX idx_ra_record ON retention_audit (target_table, record_id);
CREATE TABLE legal_hold (
id BIGSERIAL PRIMARY KEY,
target_table TEXT NOT NULL,
record_id BIGINT NOT NULL,
reason TEXT NOT NULL,
held_by BIGINT NOT NULL, -- user_id of compliance officer
held_until TIMESTAMPTZ, -- NULL = indefinite
released_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (target_table, record_id)
);
Policy Evaluation Engine
import boto3
import json
import csv
import io
from datetime import datetime, timezone, timedelta
from typing import Iterator, List, Dict, Any
import psycopg2
import psycopg2.extras
S3_CLIENT = boto3.client('s3')
def evaluate_retention(db, policy_id: int) -> int:
"""
Runs a single retention policy.
Returns number of records actioned.
"""
policy = db.fetchone(
"SELECT * FROM retention_policy WHERE id=%s AND is_active=TRUE",
(policy_id,)
)
if not policy:
return 0
cutoff = datetime.now(timezone.utc) - timedelta(days=policy['retain_days'])
# Create job record
db.execute(
"INSERT INTO retention_job (policy_id, status, started_at) VALUES (%s,'running',NOW())",
(policy_id,)
)
job_id = db.fetchone("SELECT lastval()")['lastval']
total = 0
try:
for batch in fetch_eligible_records(db, policy, cutoff):
if policy['action'] == 'archive':
archive_records(db, policy, job_id, batch)
elif policy['action'] == 'anonymize':
anonymize_records(db, policy, job_id, batch)
elif policy['action'] == 'delete':
delete_records(db, policy, job_id, batch)
total += len(batch)
db.execute(
"""UPDATE retention_job SET status='completed', completed_at=NOW(),
records_actioned=%s WHERE id=%s""",
(total, job_id)
)
except Exception as exc:
db.execute(
"UPDATE retention_job SET status='failed', error_message=%s WHERE id=%s",
(str(exc), job_id)
)
raise
return total
def fetch_eligible_records(
db, policy: Dict, cutoff: datetime, batch_size: int = 500
) -> Iterator[List[Dict]]:
"""
Cursor-based scan; skips records under legal hold.
"""
last_id = 0
while True:
rows = db.fetchall(
f"""
SELECT t.*
FROM {policy['target_table']} t
LEFT JOIN legal_hold lh
ON lh.target_table = %s AND lh.record_id = t.id
AND (lh.held_until IS NULL OR lh.held_until > NOW())
AND lh.released_at IS NULL
WHERE t.id > %s
AND t.{policy['date_column']} < %s
AND lh.id IS NULL
ORDER BY t.id
LIMIT %s
""",
(policy['target_table'], last_id, cutoff, batch_size)
)
if not rows:
break
yield rows
last_id = rows[-1]['id']
Archival Pipeline to S3 Glacier
def archive_records(db, policy: Dict, job_id: int, batch: List[Dict]) -> None:
"""
Serialize batch to JSON, upload to S3 Glacier Instant Retrieval,
then delete from the live table.
"""
date_prefix = datetime.utcnow().strftime('%Y/%m/%d')
s3_key = f"retention/{policy['target_table']}/{date_prefix}/{job_id}-{batch[0]['id']}.json.gz"
# Serialize
import gzip
payload = json.dumps(batch, default=str).encode()
compressed = gzip.compress(payload)
S3_CLIENT.put_object(
Bucket=policy['archive_bucket'],
Key=s3_key,
Body=compressed,
StorageClass='GLACIER_IR',
ContentEncoding='gzip',
ContentType='application/json'
)
ids = [r['id'] for r in batch]
# Write audit rows
audit_rows = [(job_id, policy['id'], policy['target_table'], rid, 'archive', s3_key)
for rid in ids]
psycopg2.extras.execute_values(
db.cursor(),
"""INSERT INTO retention_audit
(job_id, policy_id, target_table, record_id, action, archive_key)
VALUES %s""",
audit_rows
)
# Hard delete from live table
db.execute(
f"DELETE FROM {policy['target_table']} WHERE id = ANY(%s)",
(ids,)
)
db.commit()
GDPR Right-to-Erasure (Hard Delete)
A right-to-erasure request must delete all PII for a given user across every table tagged with the user’s data class, subject to legal hold checks:
def erase_user_data(db, user_id: int, request_id: str) -> Dict[str, int]:
"""
Hard-deletes all PII for user_id from tables in the 'pii' data class.
Returns {table_name: rows_deleted}.
"""
policies = db.fetchall(
"SELECT * FROM retention_policy WHERE data_class='pii' AND is_active=TRUE"
)
results = {}
for p in policies:
# Check for legal hold at user level (assuming FK user_id on target table)
hold = db.fetchone(
"""SELECT 1 FROM legal_hold
WHERE target_table=%s AND record_id IN (
SELECT id FROM {} WHERE user_id=%s
) AND released_at IS NULL LIMIT 1""".format(p['target_table']),
(p['target_table'], user_id)
)
if hold:
results[p['target_table']] = -1 # -1 signals held
continue
cur = db.execute(
f"DELETE FROM {p['target_table']} WHERE user_id=%s",
(user_id,)
)
results[p['target_table']] = cur.rowcount
db.execute(
"INSERT INTO retention_audit (job_id, policy_id, target_table, record_id, action)"
" VALUES (NULL, NULL, 'ALL_PII', %s, 'erasure')",
(user_id,)
)
db.commit()
return results
Scheduling
Run retention jobs via a cron-driven dispatcher. To avoid table locks during business hours, schedule archival jobs for off-peak windows and use DELETE … LIMIT 500 loops with brief sleeps between batches on high-traffic tables. Monitor job duration and alert if a job runs longer than its scheduled interval, indicating policy scope needs tuning.
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the difference between archival and deletion in a retention policy?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Archival moves records to cold storage (such as S3 Glacier) before removing them from the live database, preserving them for compliance retrieval. Deletion removes records permanently. Which action to use depends on regulatory requirements: GDPR right-to-erasure demands deletion of PII, while financial regulations often require 7-year archival of transaction records.”
}
},
{
“@type”: “Question”,
“name”: “How does a legal hold prevent accidental deletion?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A legal_hold row links a specific record (table + primary key) to a hold reason and optional expiry. The retention job’s fetch query joins against legal_hold and excludes any record with an active hold. This means the policy engine never even selects held records, so deletion is impossible without first releasing the hold.”
}
},
{
“@type”: “Question”,
“name”: “How do you prove to a regulator that data was actually deleted?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Every actioned record gets a row in retention_audit with the table name, record primary key, action type, timestamp, and job ID. The audit table is append-only (no updates or deletes permitted via application role) and can be exported as evidence. For erasure requests, the audit row includes the request ID for cross-reference.”
}
},
{
“@type”: “Question”,
“name”: “How do you avoid locking production tables during large retention jobs?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Process deletions in small batches (500 rows at a time) with a short sleep between batches. Use cursor-based pagination by primary key rather than OFFSET. Run jobs during off-peak hours. On PostgreSQL, use statement_timeout and lock_timeout settings on the retention role to auto-abort if a lock cannot be acquired quickly.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are retention policies defined per data class?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “RetentionPolicy rows map a data_class (e.g., PII, financial, logs) to a retention_days value and action (archive or delete); the policy engine matches records by their data classification.”
}
},
{
“@type”: “Question”,
“name”: “How does archival to cold storage work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A retention job queries expired records, serializes them to JSON/Parquet, uploads to S3 Glacier via boto3, records the archive key, then soft-deletes the originals.”
}
},
{
“@type”: “Question”,
“name”: “How is GDPR right-to-erasure handled?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A deletion request creates a RetentionJob with action=hard_delete for all data classes associated with the user, bypassing normal retention TTLs.”
}
},
{
“@type”: “Question”,
“name”: “What prevents accidental deletion of legally held data?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A legal_hold flag on records overrides retention policies; the job skips rows with active legal holds and logs the skip in RetentionAudit.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Atlassian Interview Guide