Data Retention Policy System Low-Level Design: TTL Enforcement, Archival, and Compliance Deletion

Data Retention Policy System Low-Level Design

Every production database accumulates data that must eventually be purged, archived, or anonymized — whether for storage cost, regulatory compliance (GDPR, CCPA, HIPAA), or contractual obligations. A data retention system codifies these rules as policy objects, runs scheduled evaluation jobs, and produces an immutable audit trail proving that deletion actually occurred. This article covers the complete low-level design: policy engine, archival to cold storage, soft-delete to hard-delete lifecycle, legal hold overrides, and right-to-erasure.

Core Concepts

  • RetentionPolicy: A rule binding a data class (e.g., user_messages) to a retention duration and an action (archive, anonymize, delete).
  • DataClassification: A label applied to a table or column indicating sensitivity and regulatory scope.
  • LegalHold: An override that suspends deletion for specific records pending litigation or audit.
  • RetentionJob: A scheduled execution of policy evaluation against a target table.

SQL Schema

CREATE TABLE retention_policy (
    id              BIGSERIAL PRIMARY KEY,
    name            TEXT        NOT NULL UNIQUE,
    data_class      TEXT        NOT NULL,          -- maps to data_classification.class_name
    target_table    TEXT        NOT NULL,
    date_column     TEXT        NOT NULL,           -- column to measure age against
    retain_days     INT         NOT NULL,           -- records older than this are actioned
    action          TEXT        NOT NULL CHECK (action IN ('archive','anonymize','delete')),
    archive_bucket  TEXT,                           -- S3 bucket for archive action
    is_active       BOOLEAN     NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE data_classification (
    id          BIGSERIAL PRIMARY KEY,
    class_name  TEXT NOT NULL UNIQUE,   -- 'pii','financial','operational','system_log'
    description TEXT,
    regulatory  TEXT[]                  -- e.g. ARRAY['gdpr','ccpa']
);

CREATE TABLE retention_job (
    id              BIGSERIAL PRIMARY KEY,
    policy_id       BIGINT      NOT NULL REFERENCES retention_policy(id),
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','running','completed','failed')),
    records_scanned BIGINT      NOT NULL DEFAULT 0,
    records_actioned BIGINT     NOT NULL DEFAULT 0,
    error_message   TEXT,
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE retention_audit (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES retention_job(id),
    policy_id   BIGINT      NOT NULL REFERENCES retention_policy(id),
    target_table TEXT       NOT NULL,
    record_id   BIGINT      NOT NULL,           -- PK of the actioned record
    action      TEXT        NOT NULL,
    archive_key TEXT,                           -- S3 object key if archived
    actioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_ra_job     ON retention_audit (job_id);
CREATE INDEX idx_ra_record  ON retention_audit (target_table, record_id);

CREATE TABLE legal_hold (
    id          BIGSERIAL PRIMARY KEY,
    target_table TEXT      NOT NULL,
    record_id   BIGINT     NOT NULL,
    reason      TEXT       NOT NULL,
    held_by     BIGINT     NOT NULL,            -- user_id of compliance officer
    held_until  TIMESTAMPTZ,                    -- NULL = indefinite
    released_at TIMESTAMPTZ,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (target_table, record_id)
);

Policy Evaluation Engine

import boto3
import json
import csv
import io
from datetime import datetime, timezone, timedelta
from typing import Iterator, List, Dict, Any
import psycopg2
import psycopg2.extras

S3_CLIENT = boto3.client('s3')

def evaluate_retention(db, policy_id: int) -> int:
    """
    Runs a single retention policy.
    Returns number of records actioned.
    """
    policy = db.fetchone(
        "SELECT * FROM retention_policy WHERE id=%s AND is_active=TRUE",
        (policy_id,)
    )
    if not policy:
        return 0

    cutoff = datetime.now(timezone.utc) - timedelta(days=policy['retain_days'])

    # Create job record
    db.execute(
        "INSERT INTO retention_job (policy_id, status, started_at) VALUES (%s,'running',NOW())",
        (policy_id,)
    )
    job_id = db.fetchone("SELECT lastval()")['lastval']

    total = 0
    try:
        for batch in fetch_eligible_records(db, policy, cutoff):
            if policy['action'] == 'archive':
                archive_records(db, policy, job_id, batch)
            elif policy['action'] == 'anonymize':
                anonymize_records(db, policy, job_id, batch)
            elif policy['action'] == 'delete':
                delete_records(db, policy, job_id, batch)
            total += len(batch)

        db.execute(
            """UPDATE retention_job SET status='completed', completed_at=NOW(),
               records_actioned=%s WHERE id=%s""",
            (total, job_id)
        )
    except Exception as exc:
        db.execute(
            "UPDATE retention_job SET status='failed', error_message=%s WHERE id=%s",
            (str(exc), job_id)
        )
        raise

    return total


def fetch_eligible_records(
    db, policy: Dict, cutoff: datetime, batch_size: int = 500
) -> Iterator[List[Dict]]:
    """
    Cursor-based scan; skips records under legal hold.
    """
    last_id = 0
    while True:
        rows = db.fetchall(
            f"""
            SELECT t.*
            FROM {policy['target_table']} t
            LEFT JOIN legal_hold lh
                ON lh.target_table = %s AND lh.record_id = t.id
                AND (lh.held_until IS NULL OR lh.held_until > NOW())
                AND lh.released_at IS NULL
            WHERE t.id > %s
              AND t.{policy['date_column']} < %s
              AND lh.id IS NULL
            ORDER BY t.id
            LIMIT %s
            """,
            (policy['target_table'], last_id, cutoff, batch_size)
        )
        if not rows:
            break
        yield rows
        last_id = rows[-1]['id']

Archival Pipeline to S3 Glacier

def archive_records(db, policy: Dict, job_id: int, batch: List[Dict]) -> None:
    """
    Serialize batch to JSON, upload to S3 Glacier Instant Retrieval,
    then delete from the live table.
    """
    date_prefix = datetime.utcnow().strftime('%Y/%m/%d')
    s3_key = f"retention/{policy['target_table']}/{date_prefix}/{job_id}-{batch[0]['id']}.json.gz"

    # Serialize
    import gzip
    payload = json.dumps(batch, default=str).encode()
    compressed = gzip.compress(payload)

    S3_CLIENT.put_object(
        Bucket=policy['archive_bucket'],
        Key=s3_key,
        Body=compressed,
        StorageClass='GLACIER_IR',
        ContentEncoding='gzip',
        ContentType='application/json'
    )

    ids = [r['id'] for r in batch]
    # Write audit rows
    audit_rows = [(job_id, policy['id'], policy['target_table'], rid, 'archive', s3_key)
                  for rid in ids]
    psycopg2.extras.execute_values(
        db.cursor(),
        """INSERT INTO retention_audit
           (job_id, policy_id, target_table, record_id, action, archive_key)
           VALUES %s""",
        audit_rows
    )

    # Hard delete from live table
    db.execute(
        f"DELETE FROM {policy['target_table']} WHERE id = ANY(%s)",
        (ids,)
    )
    db.commit()

GDPR Right-to-Erasure (Hard Delete)

A right-to-erasure request must delete all PII for a given user across every table tagged with the user’s data class, subject to legal hold checks:

def erase_user_data(db, user_id: int, request_id: str) -> Dict[str, int]:
    """
    Hard-deletes all PII for user_id from tables in the 'pii' data class.
    Returns {table_name: rows_deleted}.
    """
    policies = db.fetchall(
        "SELECT * FROM retention_policy WHERE data_class='pii' AND is_active=TRUE"
    )
    results = {}
    for p in policies:
        # Check for legal hold at user level (assuming FK user_id on target table)
        hold = db.fetchone(
            """SELECT 1 FROM legal_hold
               WHERE target_table=%s AND record_id IN (
                   SELECT id FROM {} WHERE user_id=%s
               ) AND released_at IS NULL LIMIT 1""".format(p['target_table']),
            (p['target_table'], user_id)
        )
        if hold:
            results[p['target_table']] = -1   # -1 signals held
            continue

        cur = db.execute(
            f"DELETE FROM {p['target_table']} WHERE user_id=%s",
            (user_id,)
        )
        results[p['target_table']] = cur.rowcount

    db.execute(
        "INSERT INTO retention_audit (job_id, policy_id, target_table, record_id, action)"
        " VALUES (NULL, NULL, 'ALL_PII', %s, 'erasure')",
        (user_id,)
    )
    db.commit()
    return results

Scheduling

Run retention jobs via a cron-driven dispatcher. To avoid table locks during business hours, schedule archival jobs for off-peak windows and use DELETE … LIMIT 500 loops with brief sleeps between batches on high-traffic tables. Monitor job duration and alert if a job runs longer than its scheduled interval, indicating policy scope needs tuning.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the difference between archival and deletion in a retention policy?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Archival moves records to cold storage (such as S3 Glacier) before removing them from the live database, preserving them for compliance retrieval. Deletion removes records permanently. Which action to use depends on regulatory requirements: GDPR right-to-erasure demands deletion of PII, while financial regulations often require 7-year archival of transaction records.”
}
},
{
“@type”: “Question”,
“name”: “How does a legal hold prevent accidental deletion?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A legal_hold row links a specific record (table + primary key) to a hold reason and optional expiry. The retention job’s fetch query joins against legal_hold and excludes any record with an active hold. This means the policy engine never even selects held records, so deletion is impossible without first releasing the hold.”
}
},
{
“@type”: “Question”,
“name”: “How do you prove to a regulator that data was actually deleted?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Every actioned record gets a row in retention_audit with the table name, record primary key, action type, timestamp, and job ID. The audit table is append-only (no updates or deletes permitted via application role) and can be exported as evidence. For erasure requests, the audit row includes the request ID for cross-reference.”
}
},
{
“@type”: “Question”,
“name”: “How do you avoid locking production tables during large retention jobs?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Process deletions in small batches (500 rows at a time) with a short sleep between batches. Use cursor-based pagination by primary key rather than OFFSET. Run jobs during off-peak hours. On PostgreSQL, use statement_timeout and lock_timeout settings on the retention role to auto-abort if a lock cannot be acquired quickly.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How are retention policies defined per data class?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “RetentionPolicy rows map a data_class (e.g., PII, financial, logs) to a retention_days value and action (archive or delete); the policy engine matches records by their data classification.”
}
},
{
“@type”: “Question”,
“name”: “How does archival to cold storage work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A retention job queries expired records, serializes them to JSON/Parquet, uploads to S3 Glacier via boto3, records the archive key, then soft-deletes the originals.”
}
},
{
“@type”: “Question”,
“name”: “How is GDPR right-to-erasure handled?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A deletion request creates a RetentionJob with action=hard_delete for all data classes associated with the user, bypassing normal retention TTLs.”
}
},
{
“@type”: “Question”,
“name”: “What prevents accidental deletion of legally held data?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A legal_hold flag on records overrides retention policies; the job skips rows with active legal holds and logs the skip in RetentionAudit.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Atlassian Interview Guide

Scroll to Top