An audit trail is an append-only log of every action taken on sensitive resources — who changed what, when, and from where. Required for compliance (SOC 2, HIPAA, PCI-DSS), incident investigation, and regulatory reporting. Core requirements: immutability (no UPDATE or DELETE on audit records), completeness (capture every state change), queryability (filter by actor, resource, time range), and long-term retention without table bloat.
Core Data Model
-- Append-only audit event log (NEVER UPDATE or DELETE rows)
CREATE TABLE AuditEvent (
event_id BIGSERIAL,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Who
actor_type TEXT NOT NULL, -- 'user', 'system', 'api_key'
actor_id UUID NOT NULL,
actor_ip INET,
actor_ua TEXT, -- User-Agent string
-- What
action TEXT NOT NULL, -- 'create', 'update', 'delete', 'login', 'export'
resource_type TEXT NOT NULL, -- 'user', 'order', 'payment', 'config'
resource_id TEXT NOT NULL, -- the PK of the affected row
-- Context
old_value JSONB, -- snapshot before change (NULL for creates)
new_value JSONB, -- snapshot after change (NULL for deletes)
metadata JSONB, -- extra context: request_id, session_id, org_id
PRIMARY KEY (event_id, occurred_at)
) PARTITION BY RANGE (occurred_at);
-- Monthly partitions for efficient range queries and retention management
-- CREATE TABLE audit_events_2026_04 PARTITION OF AuditEvent
-- FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
-- Index patterns matching query access
CREATE INDEX idx_audit_actor ON AuditEvent (actor_id, occurred_at DESC);
CREATE INDEX idx_audit_resource ON AuditEvent (resource_type, resource_id, occurred_at DESC);
CREATE INDEX idx_audit_action ON AuditEvent (action, occurred_at DESC);
Audit Middleware: Auto-Capture via Decorator
from functools import wraps
from datetime import datetime, timezone
import json, psycopg2
from typing import Any
def audit(action: str, resource_type: str, resource_id_param: str = 'resource_id'):
"""
Decorator that automatically logs an audit event around a function call.
Captures the before/after state of the resource.
Usage:
@audit('update', 'user', resource_id_param='user_id')
def update_user(conn, actor, user_id, **changes):
...
"""
def decorator(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
# Extract resource_id from kwargs or positional args
resource_id = kwargs.get(resource_id_param)
actor = kwargs.get('actor') or (args[1] if len(args) > 1 else None)
conn = kwargs.get('conn') or args[0]
# Capture old state before mutation
old_value = fetch_resource_snapshot(conn, resource_type, resource_id)
# Execute the actual function
result = fn(*args, **kwargs)
# Capture new state after mutation
new_value = fetch_resource_snapshot(conn, resource_type, resource_id)
# Write audit event
write_audit_event(
conn=conn,
actor_type=actor.get('type', 'user'),
actor_id=actor['id'],
actor_ip=actor.get('ip'),
action=action,
resource_type=resource_type,
resource_id=str(resource_id),
old_value=old_value,
new_value=new_value,
metadata={"request_id": actor.get('request_id')}
)
return result
return wrapper
return decorator
def write_audit_event(conn, actor_type, actor_id, actor_ip, action,
resource_type, resource_id, old_value, new_value, metadata):
with conn.cursor() as cur:
cur.execute("""
INSERT INTO AuditEvent
(actor_type, actor_id, actor_ip, action, resource_type, resource_id,
old_value, new_value, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
actor_type, actor_id, actor_ip, action,
resource_type, resource_id,
json.dumps(old_value) if old_value else None,
json.dumps(new_value) if new_value else None,
json.dumps(metadata) if metadata else None
))
# Audit writes should be in a separate transaction from the main mutation
# so that a failed business transaction doesn't prevent audit logging
Query Patterns
def get_resource_history(conn, resource_type: str, resource_id: str,
limit: int = 50, before: datetime | None = None) -> list[dict]:
"""
Fetch the full change history for a specific resource.
Cursor-paginated for large histories.
"""
with conn.cursor() as cur:
params = [resource_type, resource_id]
cursor_clause = ""
if before:
cursor_clause = "AND occurred_at list[dict]:
"""All actions taken by a specific user in a time range — for incident investigation."""
with conn.cursor() as cur:
cur.execute("""
SELECT occurred_at, action, resource_type, resource_id, actor_ip
FROM AuditEvent
WHERE actor_id = %s
AND occurred_at BETWEEN %s AND %s
ORDER BY occurred_at DESC
""", (actor_id, from_ts, to_ts))
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
def get_bulk_deletions(conn, since: datetime) -> list[dict]:
"""Security query: find any actor who deleted > 100 records in an hour."""
with conn.cursor() as cur:
cur.execute("""
SELECT actor_id, date_trunc('hour', occurred_at) as hour_bucket,
COUNT(*) as deletion_count
FROM AuditEvent
WHERE action = 'delete' AND occurred_at >= %s
GROUP BY actor_id, hour_bucket
HAVING COUNT(*) > 100
ORDER BY deletion_count DESC
""", (since,))
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
Retention and Immutability Enforcement
-- Revoke UPDATE and DELETE on AuditEvent at the DB level
-- Run once during schema setup:
REVOKE UPDATE, DELETE ON AuditEvent FROM application_role;
-- Postgres row-level trigger to block deletes (defense in depth)
CREATE OR REPLACE FUNCTION prevent_audit_delete()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE EXCEPTION 'Audit events are immutable — DELETE is not permitted';
END;
$$;
CREATE TRIGGER no_audit_delete
BEFORE DELETE ON AuditEvent
FOR EACH ROW EXECUTE FUNCTION prevent_audit_delete();
-- Retention policy: DROP old partitions (not DELETE rows — respects immutability within retention)
-- Keeps 7 years for SOC 2 compliance:
-- DROP TABLE IF EXISTS audit_events_2019_04; (run monthly via pg_cron)
Key Interview Points
- Separate transaction for audit writes: If you write the audit event inside the same transaction as the business update, a rollback erases both — the audit event is lost for a failed operation. Use a separate connection or a Postgres deferred trigger with a separate savepoint so the audit write commits independently.
- JSONB old/new value snapshots: Store the full row snapshot as JSONB, not a diff. A diff requires applying all diffs in sequence to reconstruct state at any point. Full snapshots allow direct point-in-time queries: “what did this record look like at 3pm?” The storage overhead is acceptable for most use cases — omit binary columns (file content) and large text fields from snapshots.
- Partitioning strategy: Monthly partitions allow dropping a month’s worth of data in O(1) by dropping the partition table. This is the only correct way to implement retention at scale — DELETE WHERE occurred_at < … on a 10 billion-row table takes hours and generates massive WAL. Range partitioning on occurred_at also accelerates time-bounded queries via partition pruning.
- Compliance export: SOC 2 auditors request activity logs for specific users over a 12-month window. Pre-aggregate into a CSV export job: SELECT * FROM AuditEvent WHERE actor_id = X AND occurred_at BETWEEN … signed with a timestamp for non-repudiation. S3-store the exports with Object Lock (WORM) enabled.
- Performance impact: Every mutation now writes two rows (business + audit). Mitigate: write audit events asynchronously via a background queue (Kafka, Redis list). The downside: audit events may lag by seconds in failure scenarios. For compliance use cases, synchronous writes with a separate connection are safer.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”Why should audit events never be updated or deleted?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”An audit trail is only useful for compliance and forensics if it cannot be tampered with. If audit records can be deleted, an attacker who gains admin access can cover their tracks — deleting evidence of data exfiltration before the security team investigates. If they can be updated, malicious actors can retroactively alter "who did what." Immutability is enforced at multiple layers: (1) REVOKE UPDATE, DELETE ON AuditEvent at the Postgres role level — the application user physically cannot issue UPDATE/DELETE; (2) a BEFORE DELETE trigger raises an exception as defense-in-depth; (3) no code path in the application constructs UPDATE/DELETE statements against AuditEvent. Retention (dropping old partitions) is the only legitimate way to remove audit data — dropping a partition is a schema operation, not a row-level delete, and requires DBA privileges.”}},{“@type”:”Question”,”name”:”How do you ensure audit events are written even when the main business transaction rolls back?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”If the audit write is inside the same database transaction as the business operation, a rollback erases both — you lose the audit record for the failed operation. This is often wrong: you want to know that an admin tried to delete a user record even if the delete failed due to a constraint. Solution: use a separate database connection for audit writes. After the business transaction commits or rolls back, write the audit event on a separate connection in its own transaction. This ensures the audit write is independent. Alternatively, use a Postgres DEFERRED trigger that fires AFTER the transaction commits — guaranteed to run once the business transaction is committed, in a separate savepoint. For high-throughput systems, write audit events to Kafka and let a separate consumer batch-insert them — decouples audit write latency from request latency.”}},{“@type”:”Question”,”name”:”How do you efficiently query audit events for compliance exports?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Compliance exports require all events for a specific actor or resource over a 12-month window. With monthly partitions and indexes on (actor_id, occurred_at) and (resource_type, resource_id, occurred_at), these queries are fast: SELECT * FROM AuditEvent WHERE actor_id = $1 AND occurred_at BETWEEN $2 AND $3. Postgres partition pruning limits the scan to the relevant monthly partitions — a 12-month query scans 12 partitions instead of the full table. For streaming large exports without loading millions of rows into memory: use a server-side cursor (fetchmany(1000)) and stream to a gzip-compressed CSV file uploaded to S3. Sign the file with a timestamp and store it in S3 with Object Lock (WORM compliance mode) so even bucket admins cannot delete it before the retention period expires.”}},{“@type”:”Question”,”name”:”How do you capture old and new values without coupling the audit system to every model?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Centralized approach: write a generic audit decorator that calls fetch_resource_snapshot(resource_type, resource_id) before and after the mutation. The snapshot function dispatches by resource_type to the appropriate query (SELECT * FROM User WHERE user_id = X, SELECT * FROM Order WHERE order_id = X). This requires no changes to individual model classes — add @audit("update", "user") to the service function. Alternative: Postgres triggers. Create a trigger function that captures OLD and NEW row values as JSONB and inserts into AuditEvent automatically. This captures all changes including direct SQL, ORM updates, and background jobs. Downside: triggers add latency to every write and are hard to test. The decorator approach is more explicit and testable; use triggers for tables where you cannot instrument all write paths.”}},{“@type”:”Question”,”name”:”How does partitioning by month support a 7-year retention policy?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”With 7-year retention, a single AuditEvent table accumulates billions of rows. Running DELETE WHERE occurred_at < 7_years_ago on a billion-row table takes hours, generates massive WAL (Write-Ahead Log), and blocks autovacuum. Partition strategy: create one partition per month (audit_events_2026_04 covers 2026-04-01 to 2026-05-01). To enforce retention: DROP TABLE audit_events_2019_04 — this runs in milliseconds, generates minimal WAL, and requires no row-level scan. pg_cron job runs on the 1st of each month: DROP TABLE IF EXISTS audit_events_{7_years_ago}. Each DROP is a catalog operation, not a data operation. Partition creation: automate with a pg_cron job that runs CREATE TABLE audit_events_{next_month} PARTITION OF AuditEvent FOR VALUES FROM (…) TO (…) three months in advance.”}}]}
Audit trail and financial compliance logging design is discussed in Stripe system design interview questions.
Audit trail and immutable activity log design is covered in Amazon system design interview preparation.
Audit trail and compliance event log design is discussed in Atlassian system design interview guide.