Data Retention Policy System Low-Level Design: TTL Enforcement, Archival, and Compliance Deletion

Data Retention Policy System Low-Level Design

Every production database accumulates data that must eventually be purged, archived, or anonymized — whether for storage cost, regulatory compliance (GDPR, CCPA, HIPAA), or contractual obligations. A data retention system codifies these rules as policy objects, runs scheduled evaluation jobs, and produces an immutable audit trail proving that deletion actually occurred. This article covers the complete low-level design: policy engine, archival to cold storage, soft-delete to hard-delete lifecycle, legal hold overrides, and right-to-erasure.

Core Concepts

  • RetentionPolicy: A rule binding a data class (e.g., user_messages) to a retention duration and an action (archive, anonymize, delete).
  • DataClassification: A label applied to a table or column indicating sensitivity and regulatory scope.
  • LegalHold: An override that suspends deletion for specific records pending litigation or audit.
  • RetentionJob: A scheduled execution of policy evaluation against a target table.

SQL Schema

CREATE TABLE retention_policy (
    id              BIGSERIAL PRIMARY KEY,
    name            TEXT        NOT NULL UNIQUE,
    data_class      TEXT        NOT NULL,          -- maps to data_classification.class_name
    target_table    TEXT        NOT NULL,
    date_column     TEXT        NOT NULL,           -- column to measure age against
    retain_days     INT         NOT NULL,           -- records older than this are actioned
    action          TEXT        NOT NULL CHECK (action IN ('archive','anonymize','delete')),
    archive_bucket  TEXT,                           -- S3 bucket for archive action
    is_active       BOOLEAN     NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE data_classification (
    id          BIGSERIAL PRIMARY KEY,
    class_name  TEXT NOT NULL UNIQUE,   -- 'pii','financial','operational','system_log'
    description TEXT,
    regulatory  TEXT[]                  -- e.g. ARRAY['gdpr','ccpa']
);

CREATE TABLE retention_job (
    id              BIGSERIAL PRIMARY KEY,
    policy_id       BIGINT      NOT NULL REFERENCES retention_policy(id),
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','running','completed','failed')),
    records_scanned BIGINT      NOT NULL DEFAULT 0,
    records_actioned BIGINT     NOT NULL DEFAULT 0,
    error_message   TEXT,
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE retention_audit (
    id          BIGSERIAL PRIMARY KEY,
    job_id      BIGINT      NOT NULL REFERENCES retention_job(id),
    policy_id   BIGINT      NOT NULL REFERENCES retention_policy(id),
    target_table TEXT       NOT NULL,
    record_id   BIGINT      NOT NULL,           -- PK of the actioned record
    action      TEXT        NOT NULL,
    archive_key TEXT,                           -- S3 object key if archived
    actioned_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_ra_job     ON retention_audit (job_id);
CREATE INDEX idx_ra_record  ON retention_audit (target_table, record_id);

CREATE TABLE legal_hold (
    id          BIGSERIAL PRIMARY KEY,
    target_table TEXT      NOT NULL,
    record_id   BIGINT     NOT NULL,
    reason      TEXT       NOT NULL,
    held_by     BIGINT     NOT NULL,            -- user_id of compliance officer
    held_until  TIMESTAMPTZ,                    -- NULL = indefinite
    released_at TIMESTAMPTZ,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (target_table, record_id)
);

Policy Evaluation Engine

import boto3
import json
import csv
import io
from datetime import datetime, timezone, timedelta
from typing import Iterator, List, Dict, Any
import psycopg2
import psycopg2.extras

S3_CLIENT = boto3.client('s3')

def evaluate_retention(db, policy_id: int) -> int:
    """
    Runs a single retention policy.
    Returns number of records actioned.
    """
    policy = db.fetchone(
        "SELECT * FROM retention_policy WHERE id=%s AND is_active=TRUE",
        (policy_id,)
    )
    if not policy:
        return 0

    cutoff = datetime.now(timezone.utc) - timedelta(days=policy['retain_days'])

    # Create job record
    db.execute(
        "INSERT INTO retention_job (policy_id, status, started_at) VALUES (%s,'running',NOW())",
        (policy_id,)
    )
    job_id = db.fetchone("SELECT lastval()")['lastval']

    total = 0
    try:
        for batch in fetch_eligible_records(db, policy, cutoff):
            if policy['action'] == 'archive':
                archive_records(db, policy, job_id, batch)
            elif policy['action'] == 'anonymize':
                anonymize_records(db, policy, job_id, batch)
            elif policy['action'] == 'delete':
                delete_records(db, policy, job_id, batch)
            total += len(batch)

        db.execute(
            """UPDATE retention_job SET status='completed', completed_at=NOW(),
               records_actioned=%s WHERE id=%s""",
            (total, job_id)
        )
    except Exception as exc:
        db.execute(
            "UPDATE retention_job SET status='failed', error_message=%s WHERE id=%s",
            (str(exc), job_id)
        )
        raise

    return total


def fetch_eligible_records(
    db, policy: Dict, cutoff: datetime, batch_size: int = 500
) -> Iterator[List[Dict]]:
    """
    Cursor-based scan; skips records under legal hold.
    """
    last_id = 0
    while True:
        rows = db.fetchall(
            f"""
            SELECT t.*
            FROM {policy['target_table']} t
            LEFT JOIN legal_hold lh
                ON lh.target_table = %s AND lh.record_id = t.id
                AND (lh.held_until IS NULL OR lh.held_until > NOW())
                AND lh.released_at IS NULL
            WHERE t.id > %s
              AND t.{policy['date_column']} < %s
              AND lh.id IS NULL
            ORDER BY t.id
            LIMIT %s
            """,
            (policy['target_table'], last_id, cutoff, batch_size)
        )
        if not rows:
            break
        yield rows
        last_id = rows[-1]['id']

Archival Pipeline to S3 Glacier

def archive_records(db, policy: Dict, job_id: int, batch: List[Dict]) -> None:
    """
    Serialize batch to JSON, upload to S3 Glacier Instant Retrieval,
    then delete from the live table.
    """
    date_prefix = datetime.utcnow().strftime('%Y/%m/%d')
    s3_key = f"retention/{policy['target_table']}/{date_prefix}/{job_id}-{batch[0]['id']}.json.gz"

    # Serialize
    import gzip
    payload = json.dumps(batch, default=str).encode()
    compressed = gzip.compress(payload)

    S3_CLIENT.put_object(
        Bucket=policy['archive_bucket'],
        Key=s3_key,
        Body=compressed,
        StorageClass='GLACIER_IR',
        ContentEncoding='gzip',
        ContentType='application/json'
    )

    ids = [r['id'] for r in batch]
    # Write audit rows
    audit_rows = [(job_id, policy['id'], policy['target_table'], rid, 'archive', s3_key)
                  for rid in ids]
    psycopg2.extras.execute_values(
        db.cursor(),
        """INSERT INTO retention_audit
           (job_id, policy_id, target_table, record_id, action, archive_key)
           VALUES %s""",
        audit_rows
    )

    # Hard delete from live table
    db.execute(
        f"DELETE FROM {policy['target_table']} WHERE id = ANY(%s)",
        (ids,)
    )
    db.commit()

GDPR Right-to-Erasure (Hard Delete)

A right-to-erasure request must delete all PII for a given user across every table tagged with the user’s data class, subject to legal hold checks:

def erase_user_data(db, user_id: int, request_id: str) -> Dict[str, int]:
    """
    Hard-deletes all PII for user_id from tables in the 'pii' data class.
    Returns {table_name: rows_deleted}.
    """
    policies = db.fetchall(
        "SELECT * FROM retention_policy WHERE data_class='pii' AND is_active=TRUE"
    )
    results = {}
    for p in policies:
        # Check for legal hold at user level (assuming FK user_id on target table)
        hold = db.fetchone(
            """SELECT 1 FROM legal_hold
               WHERE target_table=%s AND record_id IN (
                   SELECT id FROM {} WHERE user_id=%s
               ) AND released_at IS NULL LIMIT 1""".format(p['target_table']),
            (p['target_table'], user_id)
        )
        if hold:
            results[p['target_table']] = -1   # -1 signals held
            continue

        cur = db.execute(
            f"DELETE FROM {p['target_table']} WHERE user_id=%s",
            (user_id,)
        )
        results[p['target_table']] = cur.rowcount

    db.execute(
        "INSERT INTO retention_audit (job_id, policy_id, target_table, record_id, action)"
        " VALUES (NULL, NULL, 'ALL_PII', %s, 'erasure')",
        (user_id,)
    )
    db.commit()
    return results

Scheduling

Run retention jobs via a cron-driven dispatcher. To avoid table locks during business hours, schedule archival jobs for off-peak windows and use DELETE … LIMIT 500 loops with brief sleeps between batches on high-traffic tables. Monitor job duration and alert if a job runs longer than its scheduled interval, indicating policy scope needs tuning.

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Atlassian Interview Guide

Scroll to Top