Audit Log Export System Low-Level Design: Filtered Queries, Async Generation, and Secure Download

Audit Log Export System Low-Level Design

Compliance teams and security analysts regularly need bulk exports of audit log data for external SIEM ingestion, legal discovery, or incident investigation. The challenge is that audit_log tables grow to hundreds of millions of rows, making synchronous query-and-download approaches impractical. A well-designed audit export system runs queries in the background, streams output to S3, provides presigned download URLs with configurable TTLs, and enforces size limits to prevent accidental export of 50 GB of data.

Audit Log Table Design (Partitioned)

Partition the audit_log table by month using PostgreSQL declarative partitioning. This is the single most important performance decision: queries filtered by date range only touch relevant partitions, and old partitions can be dropped or moved to cheaper storage independently.

-- Parent table (no data stored here directly)
CREATE TABLE audit_log (
    id          BIGSERIAL,
    tenant_id   BIGINT      NOT NULL,
    actor_id    BIGINT,
    actor_type  TEXT        NOT NULL CHECK (actor_type IN ('user','service','admin')),
    action      TEXT        NOT NULL,          -- e.g. 'user.login', 'record.delete'
    resource    TEXT        NOT NULL,          -- e.g. 'invoice:1234'
    ip_address  INET,
    user_agent  TEXT,
    metadata    JSONB,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

-- Monthly partitions (created by a maintenance job)
CREATE TABLE audit_log_2026_01 PARTITION OF audit_log
    FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');

CREATE TABLE audit_log_2026_02 PARTITION OF audit_log
    FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

-- Index on each partition (auto-inherited from parent if created on parent)
CREATE INDEX ON audit_log (tenant_id, created_at DESC);
CREATE INDEX ON audit_log (actor_id, created_at DESC);
CREATE INDEX ON audit_log USING gin (metadata);

-- Export job tracking
CREATE TABLE export_job (
    id              BIGSERIAL PRIMARY KEY,
    export_id       UUID        NOT NULL UNIQUE DEFAULT gen_random_uuid(),
    tenant_id       BIGINT      NOT NULL,
    requested_by    BIGINT      NOT NULL,
    status          TEXT        NOT NULL DEFAULT 'pending'
                                CHECK (status IN ('pending','running','completed','failed','expired')),
    filters         JSONB       NOT NULL,       -- serialized filter params
    format          TEXT        NOT NULL DEFAULT 'csv' CHECK (format IN ('csv','jsonl')),
    row_count       BIGINT      NOT NULL DEFAULT 0,
    file_size_bytes BIGINT      NOT NULL DEFAULT 0,
    s3_key          TEXT,
    download_expires_at TIMESTAMPTZ,
    error_message   TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ
);

Export Job Creation

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List
import uuid

EXPORT_MAX_ROWS     = 5_000_000
EXPORT_MAX_BYTES    = 500 * 1024 * 1024   # 500 MB
DOWNLOAD_URL_TTL    = 3600                # presigned URL expires in 1 hour

@dataclass
class ExportFilters:
    tenant_id:    int
    actor_id:     Optional[int]    = None
    actor_type:   Optional[str]    = None
    action:       Optional[str]    = None
    resource:     Optional[str]    = None
    date_from:    Optional[datetime] = None
    date_to:      Optional[datetime] = None
    format:       str              = 'csv'

def create_export_job(db, requested_by: int, filters: ExportFilters) -> str:
    """Creates an export job record and returns the export_id UUID."""
    import json
    export_id = str(uuid.uuid4())
    db.execute(
        """INSERT INTO export_job
           (export_id, tenant_id, requested_by, filters, format)
           VALUES (%s, %s, %s, %s, %s)""",
        (export_id, filters.tenant_id, requested_by,
         json.dumps(filters.__dict__, default=str), filters.format)
    )
    db.commit()
    # Enqueue to background worker (Celery, SQS, etc.)
    enqueue_export(export_id)
    return export_id

Background File Generation with Cursor Pagination

import boto3
import gzip
import csv
import json
import io
from typing import Iterator

S3 = boto3.client('s3')
EXPORT_BUCKET = 'my-audit-exports'
CURSOR_BATCH  = 10_000

def generate_export_file(db, export_id: str) -> None:
    job = db.fetchone("SELECT * FROM export_job WHERE export_id=%s", (export_id,))
    if not job:
        return

    db.execute("UPDATE export_job SET status='running' WHERE id=%s", (job['id'],))
    db.commit()

    filters = job['filters']  # already a dict via psycopg2 JSONB
    s3_key = f"exports/{job['tenant_id']}/{export_id}.{job['format']}.gz"

    row_count   = 0
    total_bytes = 0

    try:
        upload = S3.create_multipart_upload(
            Bucket=EXPORT_BUCKET, Key=s3_key, ContentEncoding='gzip'
        )
        upload_id = upload['UploadId']
        parts = []
        part_number = 1
        buf = io.BytesIO()
        gz_writer = gzip.GzipFile(fileobj=buf, mode='wb')
        csv_writer = None
        jsonl_mode = job['format'] == 'jsonl'

        if not jsonl_mode:
            text_buf = io.StringIO()
            csv_writer = csv.DictWriter(text_buf, fieldnames=[
                'id','tenant_id','actor_id','actor_type','action',
                'resource','ip_address','created_at','metadata'
            ])
            csv_writer.writeheader()
            gz_writer.write(text_buf.getvalue().encode())

        last_id = 0
        while True:
            rows = fetch_audit_rows(db, filters, last_id, CURSOR_BATCH)
            if not rows:
                break

            for row in rows:
                if row_count >= EXPORT_MAX_ROWS:
                    raise ValueError(f"Export exceeded {EXPORT_MAX_ROWS} row limit")

                if jsonl_mode:
                    line = json.dumps(dict(row), default=str) + 'n'
                    gz_writer.write(line.encode())
                else:
                    text_buf = io.StringIO()
                    cw = csv.DictWriter(text_buf, fieldnames=list(row.keys()))
                    cw.writerow(dict(row))
                    gz_writer.write(text_buf.getvalue().encode())

                row_count += 1

            last_id = rows[-1]['id']

            # Flush multipart chunk at ~10 MB
            gz_writer.flush()
            chunk = buf.getvalue()
            if len(chunk) >= 10 * 1024 * 1024:
                resp = S3.upload_part(Bucket=EXPORT_BUCKET, Key=s3_key,
                                      UploadId=upload_id, PartNumber=part_number, Body=chunk)
                parts.append({'PartNumber': part_number, 'ETag': resp['ETag']})
                part_number += 1
                total_bytes += len(chunk)
                buf.seek(0); buf.truncate(0)

                if total_bytes >= EXPORT_MAX_BYTES:
                    raise ValueError(f"Export exceeded {EXPORT_MAX_BYTES // 1024 // 1024} MB limit")

        # Final chunk
        gz_writer.close()
        final_chunk = buf.getvalue()
        if final_chunk:
            resp = S3.upload_part(Bucket=EXPORT_BUCKET, Key=s3_key,
                                  UploadId=upload_id, PartNumber=part_number, Body=final_chunk)
            parts.append({'PartNumber': part_number, 'ETag': resp['ETag']})
            total_bytes += len(final_chunk)

        S3.complete_multipart_upload(
            Bucket=EXPORT_BUCKET, Key=s3_key, UploadId=upload_id,
            MultipartUpload={'Parts': parts}
        )

        expires_at = datetime.now(timezone.utc) + timedelta(seconds=DOWNLOAD_URL_TTL)
        db.execute(
            """UPDATE export_job SET status='completed', s3_key=%s,
               row_count=%s, file_size_bytes=%s, download_expires_at=%s, completed_at=NOW()
               WHERE id=%s""",
            (s3_key, row_count, total_bytes, expires_at, job['id'])
        )
        db.commit()

    except Exception as exc:
        S3.abort_multipart_upload(Bucket=EXPORT_BUCKET, Key=s3_key, UploadId=upload_id)
        db.execute(
            "UPDATE export_job SET status='failed', error_message=%s WHERE id=%s",
            (str(exc), job['id'])
        )
        db.commit()
        raise


def fetch_audit_rows(db, filters: dict, last_id: int, limit: int) -> list:
    clauses = ["tenant_id = %(tenant_id)s", "id > %(last_id)s"]
    params  = {'tenant_id': filters['tenant_id'], 'last_id': last_id, 'limit': limit}

    for col in ('actor_id', 'actor_type', 'action'):
        if filters.get(col):
            clauses.append(f"{col} = %({col})s")
            params[col] = filters[col]

    if filters.get('date_from'):
        clauses.append("created_at >= %(date_from)s")
        params['date_from'] = filters['date_from']
    if filters.get('date_to'):
        clauses.append("created_at < %(date_to)s")
        params['date_to'] = filters['date_to']

    where = ' AND '.join(clauses)
    return db.fetchall(
        f"SELECT * FROM audit_log WHERE {where} ORDER BY id LIMIT %(limit)s",
        params
    )

Presigned Download URL

def get_download_url(db, export_id: str, requesting_user: int) -> str:
    job = db.fetchone(
        "SELECT * FROM export_job WHERE export_id=%s AND requested_by=%s",
        (export_id, requesting_user)
    )
    if not job or job['status'] != 'completed':
        raise ValueError("Export not ready")
    if job['download_expires_at'] < datetime.now(timezone.utc):
        raise ValueError("Download URL expired; request a new export")

    return S3.generate_presigned_url(
        'get_object',
        Params={'Bucket': EXPORT_BUCKET, 'Key': job['s3_key'],
                'ResponseContentDisposition': f'attachment; filename="audit-export-{export_id}.csv.gz"'},
        ExpiresIn=DOWNLOAD_URL_TTL
    )

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use cursor-based pagination instead of OFFSET for audit log exports?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “OFFSET N on a 500-million-row table requires the database to scan and discard N rows before returning results, making it progressively slower as the offset grows. Cursor-based pagination using WHERE id > last_seen_id ORDER BY id is O(log N) per page because it uses the primary key index directly regardless of position in the table.”
}
},
{
“@type”: “Question”,
“name”: “How do you prevent users from accidentally exporting hundreds of gigabytes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Enforce two limits: a maximum row count (e.g., 5 million rows) checked during generation, and a maximum compressed file size (e.g., 500 MB) tracked as bytes are written to the multipart upload buffer. If either limit is hit, the multipart upload is aborted and the job is marked failed with a descriptive error message telling the user to narrow their date range.”
}
},
{
“@type”: “Question”,
“name”: “How should the audit log table be partitioned for export performance?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Partition by month using PostgreSQL declarative range partitioning on the created_at column. Export queries filtered by date range will only scan relevant monthly partitions instead of the full table. Old partitions can be archived to cheaper tablespaces or detached and dropped without affecting active data.”
}
},
{
“@type”: “Question”,
“name”: “How long should presigned S3 download URLs be valid?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “One hour is a reasonable default for compliance exports. Store the expiry timestamp in export_job.download_expires_at. If a user requests the download URL after expiry, return a clear error. For very large exports that take more than an hour to generate, set the URL TTL relative to completion time rather than creation time.”
}
}
]
}

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How is cursor-based pagination used for large audit log exports?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The export job tracks the last processed (created_at, id) tuple as a cursor; each batch fetches the next page using WHERE (created_at, id) > (cursor_ts, cursor_id).”
}
},
{
“@type”: “Question”,
“name”: “How are export files written to S3?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A background worker streams rows from PostgreSQL using server-side cursors and writes them incrementally to S3 using multipart upload to avoid buffering the full file.”
}
},
{
“@type”: “Question”,
“name”: “How are secure download URLs generated?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Once the export job completes, a presigned S3 GET URL with a 24-hour TTL is generated and stored in ExportJob.download_url.”
}
},
{
“@type”: “Question”,
“name”: “How are export size limits enforced?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The job checks row count and estimated file size before starting; if limits are exceeded, it returns an error asking the user to narrow the filter criteria.”
}
}
]
}

See also: Atlassian Interview Guide

See also: Stripe Interview Guide 2026: Process, Bug Bash Round, and Payment Systems

See also: Coinbase Interview Guide

Scroll to Top