Audit Trail Low-Level Design: Immutable Event Log, Compliance Queries, and Retention

An audit trail is an append-only log of every action taken on sensitive resources — who changed what, when, and from where. Required for compliance (SOC 2, HIPAA, PCI-DSS), incident investigation, and regulatory reporting. Core requirements: immutability (no UPDATE or DELETE on audit records), completeness (capture every state change), queryability (filter by actor, resource, time range), and long-term retention without table bloat.

Core Data Model

-- Append-only audit event log (NEVER UPDATE or DELETE rows)
CREATE TABLE AuditEvent (
    event_id      BIGSERIAL,
    occurred_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- Who
    actor_type    TEXT NOT NULL,   -- 'user', 'system', 'api_key'
    actor_id      UUID NOT NULL,
    actor_ip      INET,
    actor_ua      TEXT,            -- User-Agent string
    -- What
    action        TEXT NOT NULL,   -- 'create', 'update', 'delete', 'login', 'export'
    resource_type TEXT NOT NULL,   -- 'user', 'order', 'payment', 'config'
    resource_id   TEXT NOT NULL,   -- the PK of the affected row
    -- Context
    old_value     JSONB,           -- snapshot before change (NULL for creates)
    new_value     JSONB,           -- snapshot after change (NULL for deletes)
    metadata      JSONB,           -- extra context: request_id, session_id, org_id
    PRIMARY KEY (event_id, occurred_at)
) PARTITION BY RANGE (occurred_at);

-- Monthly partitions for efficient range queries and retention management
-- CREATE TABLE audit_events_2026_04 PARTITION OF AuditEvent
--     FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');

-- Index patterns matching query access
CREATE INDEX idx_audit_actor ON AuditEvent (actor_id, occurred_at DESC);
CREATE INDEX idx_audit_resource ON AuditEvent (resource_type, resource_id, occurred_at DESC);
CREATE INDEX idx_audit_action ON AuditEvent (action, occurred_at DESC);

Audit Middleware: Auto-Capture via Decorator

from functools import wraps
from datetime import datetime, timezone
import json, psycopg2
from typing import Any

def audit(action: str, resource_type: str, resource_id_param: str = 'resource_id'):
    """
    Decorator that automatically logs an audit event around a function call.
    Captures the before/after state of the resource.

    Usage:
        @audit('update', 'user', resource_id_param='user_id')
        def update_user(conn, actor, user_id, **changes):
            ...
    """
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            # Extract resource_id from kwargs or positional args
            resource_id = kwargs.get(resource_id_param)
            actor = kwargs.get('actor') or (args[1] if len(args) > 1 else None)
            conn = kwargs.get('conn') or args[0]

            # Capture old state before mutation
            old_value = fetch_resource_snapshot(conn, resource_type, resource_id)

            # Execute the actual function
            result = fn(*args, **kwargs)

            # Capture new state after mutation
            new_value = fetch_resource_snapshot(conn, resource_type, resource_id)

            # Write audit event
            write_audit_event(
                conn=conn,
                actor_type=actor.get('type', 'user'),
                actor_id=actor['id'],
                actor_ip=actor.get('ip'),
                action=action,
                resource_type=resource_type,
                resource_id=str(resource_id),
                old_value=old_value,
                new_value=new_value,
                metadata={"request_id": actor.get('request_id')}
            )
            return result
        return wrapper
    return decorator

def write_audit_event(conn, actor_type, actor_id, actor_ip, action,
                       resource_type, resource_id, old_value, new_value, metadata):
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO AuditEvent
            (actor_type, actor_id, actor_ip, action, resource_type, resource_id,
             old_value, new_value, metadata)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            actor_type, actor_id, actor_ip, action,
            resource_type, resource_id,
            json.dumps(old_value) if old_value else None,
            json.dumps(new_value) if new_value else None,
            json.dumps(metadata) if metadata else None
        ))
    # Audit writes should be in a separate transaction from the main mutation
    # so that a failed business transaction doesn't prevent audit logging

Query Patterns

def get_resource_history(conn, resource_type: str, resource_id: str,
                          limit: int = 50, before: datetime | None = None) -> list[dict]:
    """
    Fetch the full change history for a specific resource.
    Cursor-paginated for large histories.
    """
    with conn.cursor() as cur:
        params = [resource_type, resource_id]
        cursor_clause = ""
        if before:
            cursor_clause = "AND occurred_at  list[dict]:
    """All actions taken by a specific user in a time range — for incident investigation."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT occurred_at, action, resource_type, resource_id, actor_ip
            FROM AuditEvent
            WHERE actor_id = %s
              AND occurred_at BETWEEN %s AND %s
            ORDER BY occurred_at DESC
        """, (actor_id, from_ts, to_ts))
        cols = [d[0] for d in cur.description]
        return [dict(zip(cols, row)) for row in cur.fetchall()]

def get_bulk_deletions(conn, since: datetime) -> list[dict]:
    """Security query: find any actor who deleted > 100 records in an hour."""
    with conn.cursor() as cur:
        cur.execute("""
            SELECT actor_id, date_trunc('hour', occurred_at) as hour_bucket,
                   COUNT(*) as deletion_count
            FROM AuditEvent
            WHERE action = 'delete' AND occurred_at >= %s
            GROUP BY actor_id, hour_bucket
            HAVING COUNT(*) > 100
            ORDER BY deletion_count DESC
        """, (since,))
        cols = [d[0] for d in cur.description]
        return [dict(zip(cols, row)) for row in cur.fetchall()]

Retention and Immutability Enforcement

-- Revoke UPDATE and DELETE on AuditEvent at the DB level
-- Run once during schema setup:
REVOKE UPDATE, DELETE ON AuditEvent FROM application_role;

-- Postgres row-level trigger to block deletes (defense in depth)
CREATE OR REPLACE FUNCTION prevent_audit_delete()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
    RAISE EXCEPTION 'Audit events are immutable — DELETE is not permitted';
END;
$$;

CREATE TRIGGER no_audit_delete
BEFORE DELETE ON AuditEvent
FOR EACH ROW EXECUTE FUNCTION prevent_audit_delete();

-- Retention policy: DROP old partitions (not DELETE rows — respects immutability within retention)
-- Keeps 7 years for SOC 2 compliance:
-- DROP TABLE IF EXISTS audit_events_2019_04;  (run monthly via pg_cron)

Key Interview Points

  • Separate transaction for audit writes: If you write the audit event inside the same transaction as the business update, a rollback erases both — the audit event is lost for a failed operation. Use a separate connection or a Postgres deferred trigger with a separate savepoint so the audit write commits independently.
  • JSONB old/new value snapshots: Store the full row snapshot as JSONB, not a diff. A diff requires applying all diffs in sequence to reconstruct state at any point. Full snapshots allow direct point-in-time queries: “what did this record look like at 3pm?” The storage overhead is acceptable for most use cases — omit binary columns (file content) and large text fields from snapshots.
  • Partitioning strategy: Monthly partitions allow dropping a month’s worth of data in O(1) by dropping the partition table. This is the only correct way to implement retention at scale — DELETE WHERE occurred_at < … on a 10 billion-row table takes hours and generates massive WAL. Range partitioning on occurred_at also accelerates time-bounded queries via partition pruning.
  • Compliance export: SOC 2 auditors request activity logs for specific users over a 12-month window. Pre-aggregate into a CSV export job: SELECT * FROM AuditEvent WHERE actor_id = X AND occurred_at BETWEEN … signed with a timestamp for non-repudiation. S3-store the exports with Object Lock (WORM) enabled.
  • Performance impact: Every mutation now writes two rows (business + audit). Mitigate: write audit events asynchronously via a background queue (Kafka, Redis list). The downside: audit events may lag by seconds in failure scenarios. For compliance use cases, synchronous writes with a separate connection are safer.

Audit trail and financial compliance logging design is discussed in Stripe system design interview questions.

Audit trail and immutable activity log design is covered in Amazon system design interview preparation.

Audit trail and compliance event log design is discussed in Atlassian system design interview guide.

Scroll to Top