Audit Log Export System Low-Level Design
Compliance teams and security analysts regularly need bulk exports of audit log data for external SIEM ingestion, legal discovery, or incident investigation. The challenge is that audit_log tables grow to hundreds of millions of rows, making synchronous query-and-download approaches impractical. A well-designed audit export system runs queries in the background, streams output to S3, provides presigned download URLs with configurable TTLs, and enforces size limits to prevent accidental export of 50 GB of data.
Audit Log Table Design (Partitioned)
Partition the audit_log table by month using PostgreSQL declarative partitioning. This is the single most important performance decision: queries filtered by date range only touch relevant partitions, and old partitions can be dropped or moved to cheaper storage independently.
-- Parent table (no data stored here directly)
CREATE TABLE audit_log (
id BIGSERIAL,
tenant_id BIGINT NOT NULL,
actor_id BIGINT,
actor_type TEXT NOT NULL CHECK (actor_type IN ('user','service','admin')),
action TEXT NOT NULL, -- e.g. 'user.login', 'record.delete'
resource TEXT NOT NULL, -- e.g. 'invoice:1234'
ip_address INET,
user_agent TEXT,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
-- Monthly partitions (created by a maintenance job)
CREATE TABLE audit_log_2026_01 PARTITION OF audit_log
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
CREATE TABLE audit_log_2026_02 PARTITION OF audit_log
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Index on each partition (auto-inherited from parent if created on parent)
CREATE INDEX ON audit_log (tenant_id, created_at DESC);
CREATE INDEX ON audit_log (actor_id, created_at DESC);
CREATE INDEX ON audit_log USING gin (metadata);
-- Export job tracking
CREATE TABLE export_job (
id BIGSERIAL PRIMARY KEY,
export_id UUID NOT NULL UNIQUE DEFAULT gen_random_uuid(),
tenant_id BIGINT NOT NULL,
requested_by BIGINT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','running','completed','failed','expired')),
filters JSONB NOT NULL, -- serialized filter params
format TEXT NOT NULL DEFAULT 'csv' CHECK (format IN ('csv','jsonl')),
row_count BIGINT NOT NULL DEFAULT 0,
file_size_bytes BIGINT NOT NULL DEFAULT 0,
s3_key TEXT,
download_expires_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
Export Job Creation
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List
import uuid
EXPORT_MAX_ROWS = 5_000_000
EXPORT_MAX_BYTES = 500 * 1024 * 1024 # 500 MB
DOWNLOAD_URL_TTL = 3600 # presigned URL expires in 1 hour
@dataclass
class ExportFilters:
tenant_id: int
actor_id: Optional[int] = None
actor_type: Optional[str] = None
action: Optional[str] = None
resource: Optional[str] = None
date_from: Optional[datetime] = None
date_to: Optional[datetime] = None
format: str = 'csv'
def create_export_job(db, requested_by: int, filters: ExportFilters) -> str:
"""Creates an export job record and returns the export_id UUID."""
import json
export_id = str(uuid.uuid4())
db.execute(
"""INSERT INTO export_job
(export_id, tenant_id, requested_by, filters, format)
VALUES (%s, %s, %s, %s, %s)""",
(export_id, filters.tenant_id, requested_by,
json.dumps(filters.__dict__, default=str), filters.format)
)
db.commit()
# Enqueue to background worker (Celery, SQS, etc.)
enqueue_export(export_id)
return export_id
Background File Generation with Cursor Pagination
import boto3
import gzip
import csv
import json
import io
from typing import Iterator
S3 = boto3.client('s3')
EXPORT_BUCKET = 'my-audit-exports'
CURSOR_BATCH = 10_000
def generate_export_file(db, export_id: str) -> None:
job = db.fetchone("SELECT * FROM export_job WHERE export_id=%s", (export_id,))
if not job:
return
db.execute("UPDATE export_job SET status='running' WHERE id=%s", (job['id'],))
db.commit()
filters = job['filters'] # already a dict via psycopg2 JSONB
s3_key = f"exports/{job['tenant_id']}/{export_id}.{job['format']}.gz"
row_count = 0
total_bytes = 0
try:
upload = S3.create_multipart_upload(
Bucket=EXPORT_BUCKET, Key=s3_key, ContentEncoding='gzip'
)
upload_id = upload['UploadId']
parts = []
part_number = 1
buf = io.BytesIO()
gz_writer = gzip.GzipFile(fileobj=buf, mode='wb')
csv_writer = None
jsonl_mode = job['format'] == 'jsonl'
if not jsonl_mode:
text_buf = io.StringIO()
csv_writer = csv.DictWriter(text_buf, fieldnames=[
'id','tenant_id','actor_id','actor_type','action',
'resource','ip_address','created_at','metadata'
])
csv_writer.writeheader()
gz_writer.write(text_buf.getvalue().encode())
last_id = 0
while True:
rows = fetch_audit_rows(db, filters, last_id, CURSOR_BATCH)
if not rows:
break
for row in rows:
if row_count >= EXPORT_MAX_ROWS:
raise ValueError(f"Export exceeded {EXPORT_MAX_ROWS} row limit")
if jsonl_mode:
line = json.dumps(dict(row), default=str) + 'n'
gz_writer.write(line.encode())
else:
text_buf = io.StringIO()
cw = csv.DictWriter(text_buf, fieldnames=list(row.keys()))
cw.writerow(dict(row))
gz_writer.write(text_buf.getvalue().encode())
row_count += 1
last_id = rows[-1]['id']
# Flush multipart chunk at ~10 MB
gz_writer.flush()
chunk = buf.getvalue()
if len(chunk) >= 10 * 1024 * 1024:
resp = S3.upload_part(Bucket=EXPORT_BUCKET, Key=s3_key,
UploadId=upload_id, PartNumber=part_number, Body=chunk)
parts.append({'PartNumber': part_number, 'ETag': resp['ETag']})
part_number += 1
total_bytes += len(chunk)
buf.seek(0); buf.truncate(0)
if total_bytes >= EXPORT_MAX_BYTES:
raise ValueError(f"Export exceeded {EXPORT_MAX_BYTES // 1024 // 1024} MB limit")
# Final chunk
gz_writer.close()
final_chunk = buf.getvalue()
if final_chunk:
resp = S3.upload_part(Bucket=EXPORT_BUCKET, Key=s3_key,
UploadId=upload_id, PartNumber=part_number, Body=final_chunk)
parts.append({'PartNumber': part_number, 'ETag': resp['ETag']})
total_bytes += len(final_chunk)
S3.complete_multipart_upload(
Bucket=EXPORT_BUCKET, Key=s3_key, UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=DOWNLOAD_URL_TTL)
db.execute(
"""UPDATE export_job SET status='completed', s3_key=%s,
row_count=%s, file_size_bytes=%s, download_expires_at=%s, completed_at=NOW()
WHERE id=%s""",
(s3_key, row_count, total_bytes, expires_at, job['id'])
)
db.commit()
except Exception as exc:
S3.abort_multipart_upload(Bucket=EXPORT_BUCKET, Key=s3_key, UploadId=upload_id)
db.execute(
"UPDATE export_job SET status='failed', error_message=%s WHERE id=%s",
(str(exc), job['id'])
)
db.commit()
raise
def fetch_audit_rows(db, filters: dict, last_id: int, limit: int) -> list:
clauses = ["tenant_id = %(tenant_id)s", "id > %(last_id)s"]
params = {'tenant_id': filters['tenant_id'], 'last_id': last_id, 'limit': limit}
for col in ('actor_id', 'actor_type', 'action'):
if filters.get(col):
clauses.append(f"{col} = %({col})s")
params[col] = filters[col]
if filters.get('date_from'):
clauses.append("created_at >= %(date_from)s")
params['date_from'] = filters['date_from']
if filters.get('date_to'):
clauses.append("created_at < %(date_to)s")
params['date_to'] = filters['date_to']
where = ' AND '.join(clauses)
return db.fetchall(
f"SELECT * FROM audit_log WHERE {where} ORDER BY id LIMIT %(limit)s",
params
)
Presigned Download URL
def get_download_url(db, export_id: str, requesting_user: int) -> str:
job = db.fetchone(
"SELECT * FROM export_job WHERE export_id=%s AND requested_by=%s",
(export_id, requesting_user)
)
if not job or job['status'] != 'completed':
raise ValueError("Export not ready")
if job['download_expires_at'] < datetime.now(timezone.utc):
raise ValueError("Download URL expired; request a new export")
return S3.generate_presigned_url(
'get_object',
Params={'Bucket': EXPORT_BUCKET, 'Key': job['s3_key'],
'ResponseContentDisposition': f'attachment; filename="audit-export-{export_id}.csv.gz"'},
ExpiresIn=DOWNLOAD_URL_TTL
)
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use cursor-based pagination instead of OFFSET for audit log exports?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “OFFSET N on a 500-million-row table requires the database to scan and discard N rows before returning results, making it progressively slower as the offset grows. Cursor-based pagination using WHERE id > last_seen_id ORDER BY id is O(log N) per page because it uses the primary key index directly regardless of position in the table.”
}
},
{
“@type”: “Question”,
“name”: “How do you prevent users from accidentally exporting hundreds of gigabytes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Enforce two limits: a maximum row count (e.g., 5 million rows) checked during generation, and a maximum compressed file size (e.g., 500 MB) tracked as bytes are written to the multipart upload buffer. If either limit is hit, the multipart upload is aborted and the job is marked failed with a descriptive error message telling the user to narrow their date range.”
}
},
{
“@type”: “Question”,
“name”: “How should the audit log table be partitioned for export performance?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Partition by month using PostgreSQL declarative range partitioning on the created_at column. Export queries filtered by date range will only scan relevant monthly partitions instead of the full table. Old partitions can be archived to cheaper tablespaces or detached and dropped without affecting active data.”
}
},
{
“@type”: “Question”,
“name”: “How long should presigned S3 download URLs be valid?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “One hour is a reasonable default for compliance exports. Store the expiry timestamp in export_job.download_expires_at. If a user requests the download URL after expiry, return a clear error. For very large exports that take more than an hour to generate, set the URL TTL relative to completion time rather than creation time.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is cursor-based pagination used for large audit log exports?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The export job tracks the last processed (created_at, id) tuple as a cursor; each batch fetches the next page using WHERE (created_at, id) > (cursor_ts, cursor_id).”
}
},
{
“@type”: “Question”,
“name”: “How are export files written to S3?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A background worker streams rows from PostgreSQL using server-side cursors and writes them incrementally to S3 using multipart upload to avoid buffering the full file.”
}
},
{
“@type”: “Question”,
“name”: “How are secure download URLs generated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Once the export job completes, a presigned S3 GET URL with a 24-hour TTL is generated and stored in ExportJob.download_url.”
}
},
{
“@type”: “Question”,
“name”: “How are export size limits enforced?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The job checks row count and estimated file size before starting; if limits are exceeded, it returns an error asking the user to narrow the filter criteria.”
}
}
]
}
See also: Atlassian Interview Guide
See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems
See also: Coinbase Interview Guide