Cost Allocation System Low-Level Design: Cloud Billing Ingestion, Team Attribution, and Anomaly Detection

Cost Allocation System: Low-Level Design

A cost allocation system attributes infrastructure spend to the teams, products, or customers that generated it. Without it, a single AWS bill lands on a shared cost center and engineers have no incentive to optimize. With it, teams see their cloud spend in dashboards, anomalies surface within hours, and chargeback to customers enables accurate SaaS unit economics. This design covers the data pipeline from cloud billing APIs to per-team attribution, anomaly detection, and budget alerting.

Core Data Model

CREATE TABLE CloudCostLineItem (
    line_item_id       BIGSERIAL PRIMARY KEY,
    cloud_provider     VARCHAR(20) NOT NULL,       -- aws, gcp, azure
    billing_date       DATE NOT NULL,
    service            VARCHAR(100) NOT NULL,      -- EC2, S3, BigQuery
    resource_id        VARCHAR(255),               -- i-0abc123, arn:aws:s3:::my-bucket
    tags               JSONB NOT NULL DEFAULT '{}',-- {"team":"payments","env":"prod","customer":"acme"}
    usage_amount       NUMERIC(18,6) NOT NULL,
    usage_unit         VARCHAR(50),                -- Hrs, GB-Mo, Requests
    unblended_cost     NUMERIC(12,6) NOT NULL,     -- USD
    imported_at        TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);

CREATE TABLE CostAllocation (
    allocation_id      BIGSERIAL PRIMARY KEY,
    billing_date       DATE NOT NULL,
    team               VARCHAR(100) NOT NULL,
    product            VARCHAR(100),
    customer_id        BIGINT,
    service            VARCHAR(100) NOT NULL,
    allocated_cost     NUMERIC(12,6) NOT NULL,
    allocation_method  VARCHAR(30) NOT NULL,  -- tag, proportion, manual
    source_line_items  BIGINT[],              -- IDs of contributing CloudCostLineItem rows
    created_at         TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);

CREATE TABLE CostBudget (
    budget_id          SERIAL PRIMARY KEY,
    team               VARCHAR(100) NOT NULL,
    period_type        VARCHAR(10) NOT NULL DEFAULT 'monthly',  -- monthly, quarterly
    period_start       DATE NOT NULL,
    budget_usd         NUMERIC(12,2) NOT NULL,
    alert_pct          SMALLINT NOT NULL DEFAULT 80,  -- alert when this % consumed
    alerted_at         TIMESTAMPTZ,
    UNIQUE (team, period_start)
);

CREATE TABLE CostAnomaly (
    anomaly_id         BIGSERIAL PRIMARY KEY,
    billing_date       DATE NOT NULL,
    team               VARCHAR(100),
    service            VARCHAR(100) NOT NULL,
    expected_cost      NUMERIC(12,6),
    actual_cost        NUMERIC(12,6),
    z_score            NUMERIC(6,2),
    acknowledged       BOOLEAN NOT NULL DEFAULT FALSE,
    created_at         TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ON CloudCostLineItem(billing_date, tags);
CREATE INDEX ON CostAllocation(billing_date, team);
CREATE INDEX ON CostAnomaly(billing_date, acknowledged);

Ingestion Pipeline

import boto3, json, datetime
from decimal import Decimal

def ingest_aws_cost_explorer(billing_date: datetime.date):
    """
    Pull daily costs from AWS Cost Explorer. Run nightly after billing data is finalized (~24h lag).
    """
    ce = boto3.client('ce', region_name='us-east-1')
    date_str = billing_date.isoformat()

    paginator_token = None
    while True:
        kwargs = {
            'TimePeriod': {'Start': date_str, 'End': (billing_date + datetime.timedelta(1)).isoformat()},
            'Granularity': 'DAILY',
            'GroupBy': [
                {'Type': 'DIMENSION', 'Key': 'SERVICE'},
                {'Type': 'DIMENSION', 'Key': 'RESOURCE_ID'},
            ],
            'Metrics': ['UnblendedCost', 'UsageQuantity'],
            'Filter': {'Not': {'Dimensions': {'Key': 'RECORD_TYPE', 'Values': ['Credit', 'Refund']}}},
        }
        if paginator_token:
            kwargs['NextPageToken'] = paginator_token

        resp = ce.get_cost_and_usage_with_resources(**kwargs)

        rows = []
        for result_by_time in resp['ResultsByTime']:
            for group in result_by_time['Groups']:
                service    = group['Keys'][0]
                resource   = group['Keys'][1] if len(group['Keys']) > 1 else None
                cost       = Decimal(group['Metrics']['UnblendedCost']['Amount'])
                usage      = Decimal(group['Metrics']['UsageQuantity']['Amount'])
                usage_unit = group['Metrics']['UsageQuantity']['Unit']
                if cost == 0:
                    continue
                tags = _fetch_resource_tags(resource) if resource else {}
                rows.append((
                    'aws', billing_date, service, resource,
                    json.dumps(tags), float(usage), usage_unit, float(cost)
                ))

        if rows:
            db.executemany("""
                INSERT INTO CloudCostLineItem
                    (cloud_provider, billing_date, service, resource_id, tags,
                     usage_amount, usage_unit, unblended_cost)
                VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
                ON CONFLICT DO NOTHING
            """, rows)

        paginator_token = resp.get('NextPageToken')
        if not paginator_token:
            break

def _fetch_resource_tags(resource_id: str) -> dict:
    """
    Fetch resource tags from AWS Resource Groups Tagging API.
    Tags like {"team":"payments","env":"prod"} drive allocation.
    Cache results in Redis (1-hour TTL) — tag calls are rate-limited.
    """
    import redis
    r = redis.Redis(decode_responses=True)
    cache_key = f"tags:{resource_id}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    tagging = boto3.client('resourcegroupstaggingapi', region_name='us-east-1')
    try:
        resp = tagging.get_resources(ResourceARNList=[resource_id])
        tags = {t['Key']: t['Value'] for t in resp['ResourceTagMappingList'][0].get('Tags', [])} if resp['ResourceTagMappingList'] else {}
    except Exception:
        tags = {}
    r.setex(cache_key, 3600, json.dumps(tags))
    return tags

Allocation Engine

def allocate_costs(billing_date: datetime.date):
    """
    Three-pass allocation:
    1. Tagged resources → direct allocation to team in tag
    2. Shared resources (no team tag) → proportion by tagged usage
    3. Remaining unallocated → split equally across all teams
    """
    # Pass 1: direct tag-based allocation
    tagged_rows = db.fetchall("""
        SELECT line_item_id, service, tags->>'team' AS team,
               tags->>'product' AS product, unblended_cost
        FROM CloudCostLineItem
        WHERE billing_date=%s AND tags->>'team' IS NOT NULL
    """, (billing_date,))

    for row in tagged_rows:
        db.execute("""
            INSERT INTO CostAllocation
                (billing_date, team, product, service, allocated_cost, allocation_method, source_line_items)
            VALUES (%s, %s, %s, %s, %s, 'tag', %s)
        """, (billing_date, row['team'], row['product'], row['service'],
              row['unblended_cost'], [row['line_item_id']]))

    # Pass 2: untagged shared costs → split by each team's proportion of tagged spend
    total_tagged = db.fetchone(
        "SELECT COALESCE(SUM(allocated_cost),0) AS total FROM CostAllocation WHERE billing_date=%s",
        (billing_date,)
    )['total']

    untagged_rows = db.fetchall("""
        SELECT line_item_id, service, unblended_cost
        FROM CloudCostLineItem
        WHERE billing_date=%s AND tags->>'team' IS NULL
    """, (billing_date,))

    if untagged_rows and total_tagged > 0:
        team_shares = db.fetchall("""
            SELECT team, SUM(allocated_cost)/(%s) AS share
            FROM CostAllocation WHERE billing_date=%s GROUP BY team
        """, (total_tagged, billing_date))

        for row in untagged_rows:
            for ts in team_shares:
                db.execute("""
                    INSERT INTO CostAllocation
                        (billing_date, team, service, allocated_cost, allocation_method, source_line_items)
                    VALUES (%s, %s, %s, %s, 'proportion', %s)
                """, (billing_date, ts['team'], row['service'],
                      float(row['unblended_cost']) * float(ts['share']),
                      [row['line_item_id']]))

Anomaly Detection

def detect_anomalies(billing_date: datetime.date, lookback_days: int = 28, z_threshold: float = 3.0):
    """
    For each (team, service) pair, compute z-score vs. trailing 28-day window.
    z > 3.0 → anomaly. Fires an alert to Slack.
    """
    import numpy as np

    combos = db.fetchall("""
        SELECT DISTINCT team, service FROM CostAllocation
        WHERE billing_date >= %s AND billing_date = %s AND billing_date < %s
            GROUP BY billing_date ORDER BY billing_date
        """, (team, service,
              billing_date - datetime.timedelta(lookback_days), billing_date))

        if len(history) < 7:
            continue  # not enough data

        costs = [float(r['daily_cost']) for r in history]
        mean, std = np.mean(costs), np.std(costs)
        if std = z_threshold:
            db.execute("""
                INSERT INTO CostAnomaly
                    (billing_date, team, service, expected_cost, actual_cost, z_score)
                VALUES (%s,%s,%s,%s,%s,%s)
            """, (billing_date, team, service, round(mean, 4), today_cost, round(z, 2)))
            _send_anomaly_alert(team, service, mean, float(today_cost), z)

def _send_anomaly_alert(team, service, expected, actual, z_score):
    direction = "spike" if z_score > 0 else "drop"
    pct_change = abs(actual - expected) / expected * 100
    message = (
        f":warning: Cost anomaly detected — {team} / {service}n"
        f"Expected: ${expected:.2f} | Actual: ${actual:.2f} | {pct_change:.0f}% {direction} (z={z_score:.1f})"
    )
    slack_client.post_message(channel=f"#cost-alerts-{team}", text=message)

Budget Alert Check

def check_budget_alerts():
    """Run daily after allocation. Alert when a team's month-to-date spend crosses alert_pct."""
    today = datetime.date.today()
    month_start = today.replace(day=1)

    budgets = db.fetchall("""
        SELECT b.*, COALESCE(SUM(a.allocated_cost),0) AS mtd_spend
        FROM CostBudget b
        LEFT JOIN CostAllocation a
            ON a.team=b.team AND a.billing_date >= b.period_start AND a.billing_date = budget['alert_pct']:
            slack_client.post_message(
                channel=f"#cost-alerts-{budget['team']}",
                text=(
                    f":money_with_wings: Budget alert — {budget['team']}n"
                    f"${budget['mtd_spend']:.0f} of ${budget['budget_usd']:.0f} "
                    f"({pct_used:.0f}%) used with {(today.replace(month=today.month%12+1,day=1)-datetime.timedelta(1)-today).days+1} days left"
                )
            )
            db.execute(
                "UPDATE CostBudget SET alerted_at=NOW() WHERE budget_id=%s",
                (budget['budget_id'],)
            )

Key Design Decisions

  • Tag-first allocation: teams that tag their resources properly get exact attribution. The proportion pass for untagged resources is deliberately visible — engineers can see “15% of your allocation comes from untagged shared resources” and are incentivized to add tags. Report untagged fraction in dashboards to drive tagging compliance.
  • Partition by billing_date: cost data grows ~30 rows per resource per day. Monthly partitions let old data be archived to cold storage (S3 via partition DETACH) without touching the live table. Anomaly detection uses the last 28 days — always within the current + prior partition.
  • Z-score over fixed thresholds: a fixed threshold ($50/day anomaly) misses a service that normally costs $5,000/day spiking to $7,500 (50% jump, z=1.5 — not anomalous) while alerting on a $0 → $50 jump in a new service. Z-score correctly normalizes by historical variance. Require at least 7 days of history to avoid false positives for new services.
  • 24-hour billing lag: AWS Cost Explorer finalizes daily costs ~24 hours after the billing day ends. Ingest billing_date T at the end of day T+1. Never ingest the current day’s partial costs — they skew anomaly baselines.

Cost allocation and cloud billing system design is discussed in Amazon system design interview questions.

Cost allocation and compute resource attribution design is covered in Databricks system design interview preparation.

Cost allocation and cloud spend attribution design is discussed in Netflix system design interview guide.

Scroll to Top