Data Retention Policy System Low-Level Design
Every production database accumulates data that must eventually be purged, archived, or anonymized — whether for storage cost, regulatory compliance (GDPR, CCPA, HIPAA), or contractual obligations. A data retention system codifies these rules as policy objects, runs scheduled evaluation jobs, and produces an immutable audit trail proving that deletion actually occurred. This article covers the complete low-level design: policy engine, archival to cold storage, soft-delete to hard-delete lifecycle, legal hold overrides, and right-to-erasure.
Core Concepts
- RetentionPolicy: A rule binding a data class (e.g.,
user_messages) to a retention duration and an action (archive, anonymize, delete). - DataClassification: A label applied to a table or column indicating sensitivity and regulatory scope.
- LegalHold: An override that suspends deletion for specific records pending litigation or audit.
- RetentionJob: A scheduled execution of policy evaluation against a target table.
SQL Schema
CREATE TABLE retention_policy (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
data_class TEXT NOT NULL, -- maps to data_classification.class_name
target_table TEXT NOT NULL,
date_column TEXT NOT NULL, -- column to measure age against
retain_days INT NOT NULL, -- records older than this are actioned
action TEXT NOT NULL CHECK (action IN ('archive','anonymize','delete')),
archive_bucket TEXT, -- S3 bucket for archive action
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE data_classification (
id BIGSERIAL PRIMARY KEY,
class_name TEXT NOT NULL UNIQUE, -- 'pii','financial','operational','system_log'
description TEXT,
regulatory TEXT[] -- e.g. ARRAY['gdpr','ccpa']
);
CREATE TABLE retention_job (
id BIGSERIAL PRIMARY KEY,
policy_id BIGINT NOT NULL REFERENCES retention_policy(id),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','running','completed','failed')),
records_scanned BIGINT NOT NULL DEFAULT 0,
records_actioned BIGINT NOT NULL DEFAULT 0,
error_message TEXT,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE retention_audit (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT NOT NULL REFERENCES retention_job(id),
policy_id BIGINT NOT NULL REFERENCES retention_policy(id),
target_table TEXT NOT NULL,
record_id BIGINT NOT NULL, -- PK of the actioned record
action TEXT NOT NULL,
archive_key TEXT, -- S3 object key if archived
actioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_ra_job ON retention_audit (job_id);
CREATE INDEX idx_ra_record ON retention_audit (target_table, record_id);
CREATE TABLE legal_hold (
id BIGSERIAL PRIMARY KEY,
target_table TEXT NOT NULL,
record_id BIGINT NOT NULL,
reason TEXT NOT NULL,
held_by BIGINT NOT NULL, -- user_id of compliance officer
held_until TIMESTAMPTZ, -- NULL = indefinite
released_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (target_table, record_id)
);
Policy Evaluation Engine
import boto3
import json
import csv
import io
from datetime import datetime, timezone, timedelta
from typing import Iterator, List, Dict, Any
import psycopg2
import psycopg2.extras
S3_CLIENT = boto3.client('s3')
def evaluate_retention(db, policy_id: int) -> int:
"""
Runs a single retention policy.
Returns number of records actioned.
"""
policy = db.fetchone(
"SELECT * FROM retention_policy WHERE id=%s AND is_active=TRUE",
(policy_id,)
)
if not policy:
return 0
cutoff = datetime.now(timezone.utc) - timedelta(days=policy['retain_days'])
# Create job record
db.execute(
"INSERT INTO retention_job (policy_id, status, started_at) VALUES (%s,'running',NOW())",
(policy_id,)
)
job_id = db.fetchone("SELECT lastval()")['lastval']
total = 0
try:
for batch in fetch_eligible_records(db, policy, cutoff):
if policy['action'] == 'archive':
archive_records(db, policy, job_id, batch)
elif policy['action'] == 'anonymize':
anonymize_records(db, policy, job_id, batch)
elif policy['action'] == 'delete':
delete_records(db, policy, job_id, batch)
total += len(batch)
db.execute(
"""UPDATE retention_job SET status='completed', completed_at=NOW(),
records_actioned=%s WHERE id=%s""",
(total, job_id)
)
except Exception as exc:
db.execute(
"UPDATE retention_job SET status='failed', error_message=%s WHERE id=%s",
(str(exc), job_id)
)
raise
return total
def fetch_eligible_records(
db, policy: Dict, cutoff: datetime, batch_size: int = 500
) -> Iterator[List[Dict]]:
"""
Cursor-based scan; skips records under legal hold.
"""
last_id = 0
while True:
rows = db.fetchall(
f"""
SELECT t.*
FROM {policy['target_table']} t
LEFT JOIN legal_hold lh
ON lh.target_table = %s AND lh.record_id = t.id
AND (lh.held_until IS NULL OR lh.held_until > NOW())
AND lh.released_at IS NULL
WHERE t.id > %s
AND t.{policy['date_column']} < %s
AND lh.id IS NULL
ORDER BY t.id
LIMIT %s
""",
(policy['target_table'], last_id, cutoff, batch_size)
)
if not rows:
break
yield rows
last_id = rows[-1]['id']
Archival Pipeline to S3 Glacier
def archive_records(db, policy: Dict, job_id: int, batch: List[Dict]) -> None:
"""
Serialize batch to JSON, upload to S3 Glacier Instant Retrieval,
then delete from the live table.
"""
date_prefix = datetime.utcnow().strftime('%Y/%m/%d')
s3_key = f"retention/{policy['target_table']}/{date_prefix}/{job_id}-{batch[0]['id']}.json.gz"
# Serialize
import gzip
payload = json.dumps(batch, default=str).encode()
compressed = gzip.compress(payload)
S3_CLIENT.put_object(
Bucket=policy['archive_bucket'],
Key=s3_key,
Body=compressed,
StorageClass='GLACIER_IR',
ContentEncoding='gzip',
ContentType='application/json'
)
ids = [r['id'] for r in batch]
# Write audit rows
audit_rows = [(job_id, policy['id'], policy['target_table'], rid, 'archive', s3_key)
for rid in ids]
psycopg2.extras.execute_values(
db.cursor(),
"""INSERT INTO retention_audit
(job_id, policy_id, target_table, record_id, action, archive_key)
VALUES %s""",
audit_rows
)
# Hard delete from live table
db.execute(
f"DELETE FROM {policy['target_table']} WHERE id = ANY(%s)",
(ids,)
)
db.commit()
GDPR Right-to-Erasure (Hard Delete)
A right-to-erasure request must delete all PII for a given user across every table tagged with the user’s data class, subject to legal hold checks:
def erase_user_data(db, user_id: int, request_id: str) -> Dict[str, int]:
"""
Hard-deletes all PII for user_id from tables in the 'pii' data class.
Returns {table_name: rows_deleted}.
"""
policies = db.fetchall(
"SELECT * FROM retention_policy WHERE data_class='pii' AND is_active=TRUE"
)
results = {}
for p in policies:
# Check for legal hold at user level (assuming FK user_id on target table)
hold = db.fetchone(
"""SELECT 1 FROM legal_hold
WHERE target_table=%s AND record_id IN (
SELECT id FROM {} WHERE user_id=%s
) AND released_at IS NULL LIMIT 1""".format(p['target_table']),
(p['target_table'], user_id)
)
if hold:
results[p['target_table']] = -1 # -1 signals held
continue
cur = db.execute(
f"DELETE FROM {p['target_table']} WHERE user_id=%s",
(user_id,)
)
results[p['target_table']] = cur.rowcount
db.execute(
"INSERT INTO retention_audit (job_id, policy_id, target_table, record_id, action)"
" VALUES (NULL, NULL, 'ALL_PII', %s, 'erasure')",
(user_id,)
)
db.commit()
return results
Scheduling
Run retention jobs via a cron-driven dispatcher. To avoid table locks during business hours, schedule archival jobs for off-peak windows and use DELETE … LIMIT 500 loops with brief sleeps between batches on high-traffic tables. Monitor job duration and alert if a job runs longer than its scheduled interval, indicating policy scope needs tuning.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Atlassian Interview Guide