Cost Allocation System Low-Level Design: Cloud Billing Ingestion, Team Attribution, and Anomaly Detection

Cost Allocation System: Low-Level Design

A cost allocation system attributes infrastructure spend to the teams, products, or customers that generated it. Without it, a single AWS bill lands on a shared cost center and engineers have no incentive to optimize. With it, teams see their cloud spend in dashboards, anomalies surface within hours, and chargeback to customers enables accurate SaaS unit economics. This design covers the data pipeline from cloud billing APIs to per-team attribution, anomaly detection, and budget alerting.

Core Data Model

CREATE TABLE CloudCostLineItem (
    line_item_id       BIGSERIAL PRIMARY KEY,
    cloud_provider     VARCHAR(20) NOT NULL,       -- aws, gcp, azure
    billing_date       DATE NOT NULL,
    service            VARCHAR(100) NOT NULL,      -- EC2, S3, BigQuery
    resource_id        VARCHAR(255),               -- i-0abc123, arn:aws:s3:::my-bucket
    tags               JSONB NOT NULL DEFAULT '{}',-- {"team":"payments","env":"prod","customer":"acme"}
    usage_amount       NUMERIC(18,6) NOT NULL,
    usage_unit         VARCHAR(50),                -- Hrs, GB-Mo, Requests
    unblended_cost     NUMERIC(12,6) NOT NULL,     -- USD
    imported_at        TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);

CREATE TABLE CostAllocation (
    allocation_id      BIGSERIAL PRIMARY KEY,
    billing_date       DATE NOT NULL,
    team               VARCHAR(100) NOT NULL,
    product            VARCHAR(100),
    customer_id        BIGINT,
    service            VARCHAR(100) NOT NULL,
    allocated_cost     NUMERIC(12,6) NOT NULL,
    allocation_method  VARCHAR(30) NOT NULL,  -- tag, proportion, manual
    source_line_items  BIGINT[],              -- IDs of contributing CloudCostLineItem rows
    created_at         TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);

CREATE TABLE CostBudget (
    budget_id          SERIAL PRIMARY KEY,
    team               VARCHAR(100) NOT NULL,
    period_type        VARCHAR(10) NOT NULL DEFAULT 'monthly',  -- monthly, quarterly
    period_start       DATE NOT NULL,
    budget_usd         NUMERIC(12,2) NOT NULL,
    alert_pct          SMALLINT NOT NULL DEFAULT 80,  -- alert when this % consumed
    alerted_at         TIMESTAMPTZ,
    UNIQUE (team, period_start)
);

CREATE TABLE CostAnomaly (
    anomaly_id         BIGSERIAL PRIMARY KEY,
    billing_date       DATE NOT NULL,
    team               VARCHAR(100),
    service            VARCHAR(100) NOT NULL,
    expected_cost      NUMERIC(12,6),
    actual_cost        NUMERIC(12,6),
    z_score            NUMERIC(6,2),
    acknowledged       BOOLEAN NOT NULL DEFAULT FALSE,
    created_at         TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX ON CloudCostLineItem(billing_date, tags);
CREATE INDEX ON CostAllocation(billing_date, team);
CREATE INDEX ON CostAnomaly(billing_date, acknowledged);

Ingestion Pipeline

import boto3, json, datetime
from decimal import Decimal

def ingest_aws_cost_explorer(billing_date: datetime.date):
    """
    Pull daily costs from AWS Cost Explorer. Run nightly after billing data is finalized (~24h lag).
    """
    ce = boto3.client('ce', region_name='us-east-1')
    date_str = billing_date.isoformat()

    paginator_token = None
    while True:
        kwargs = {
            'TimePeriod': {'Start': date_str, 'End': (billing_date + datetime.timedelta(1)).isoformat()},
            'Granularity': 'DAILY',
            'GroupBy': [
                {'Type': 'DIMENSION', 'Key': 'SERVICE'},
                {'Type': 'DIMENSION', 'Key': 'RESOURCE_ID'},
            ],
            'Metrics': ['UnblendedCost', 'UsageQuantity'],
            'Filter': {'Not': {'Dimensions': {'Key': 'RECORD_TYPE', 'Values': ['Credit', 'Refund']}}},
        }
        if paginator_token:
            kwargs['NextPageToken'] = paginator_token

        resp = ce.get_cost_and_usage_with_resources(**kwargs)

        rows = []
        for result_by_time in resp['ResultsByTime']:
            for group in result_by_time['Groups']:
                service    = group['Keys'][0]
                resource   = group['Keys'][1] if len(group['Keys']) > 1 else None
                cost       = Decimal(group['Metrics']['UnblendedCost']['Amount'])
                usage      = Decimal(group['Metrics']['UsageQuantity']['Amount'])
                usage_unit = group['Metrics']['UsageQuantity']['Unit']
                if cost == 0:
                    continue
                tags = _fetch_resource_tags(resource) if resource else {}
                rows.append((
                    'aws', billing_date, service, resource,
                    json.dumps(tags), float(usage), usage_unit, float(cost)
                ))

        if rows:
            db.executemany("""
                INSERT INTO CloudCostLineItem
                    (cloud_provider, billing_date, service, resource_id, tags,
                     usage_amount, usage_unit, unblended_cost)
                VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
                ON CONFLICT DO NOTHING
            """, rows)

        paginator_token = resp.get('NextPageToken')
        if not paginator_token:
            break

def _fetch_resource_tags(resource_id: str) -> dict:
    """
    Fetch resource tags from AWS Resource Groups Tagging API.
    Tags like {"team":"payments","env":"prod"} drive allocation.
    Cache results in Redis (1-hour TTL) — tag calls are rate-limited.
    """
    import redis
    r = redis.Redis(decode_responses=True)
    cache_key = f"tags:{resource_id}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)
    tagging = boto3.client('resourcegroupstaggingapi', region_name='us-east-1')
    try:
        resp = tagging.get_resources(ResourceARNList=[resource_id])
        tags = {t['Key']: t['Value'] for t in resp['ResourceTagMappingList'][0].get('Tags', [])} if resp['ResourceTagMappingList'] else {}
    except Exception:
        tags = {}
    r.setex(cache_key, 3600, json.dumps(tags))
    return tags

Allocation Engine

def allocate_costs(billing_date: datetime.date):
    """
    Three-pass allocation:
    1. Tagged resources → direct allocation to team in tag
    2. Shared resources (no team tag) → proportion by tagged usage
    3. Remaining unallocated → split equally across all teams
    """
    # Pass 1: direct tag-based allocation
    tagged_rows = db.fetchall("""
        SELECT line_item_id, service, tags->>'team' AS team,
               tags->>'product' AS product, unblended_cost
        FROM CloudCostLineItem
        WHERE billing_date=%s AND tags->>'team' IS NOT NULL
    """, (billing_date,))

    for row in tagged_rows:
        db.execute("""
            INSERT INTO CostAllocation
                (billing_date, team, product, service, allocated_cost, allocation_method, source_line_items)
            VALUES (%s, %s, %s, %s, %s, 'tag', %s)
        """, (billing_date, row['team'], row['product'], row['service'],
              row['unblended_cost'], [row['line_item_id']]))

    # Pass 2: untagged shared costs → split by each team's proportion of tagged spend
    total_tagged = db.fetchone(
        "SELECT COALESCE(SUM(allocated_cost),0) AS total FROM CostAllocation WHERE billing_date=%s",
        (billing_date,)
    )['total']

    untagged_rows = db.fetchall("""
        SELECT line_item_id, service, unblended_cost
        FROM CloudCostLineItem
        WHERE billing_date=%s AND tags->>'team' IS NULL
    """, (billing_date,))

    if untagged_rows and total_tagged > 0:
        team_shares = db.fetchall("""
            SELECT team, SUM(allocated_cost)/(%s) AS share
            FROM CostAllocation WHERE billing_date=%s GROUP BY team
        """, (total_tagged, billing_date))

        for row in untagged_rows:
            for ts in team_shares:
                db.execute("""
                    INSERT INTO CostAllocation
                        (billing_date, team, service, allocated_cost, allocation_method, source_line_items)
                    VALUES (%s, %s, %s, %s, 'proportion', %s)
                """, (billing_date, ts['team'], row['service'],
                      float(row['unblended_cost']) * float(ts['share']),
                      [row['line_item_id']]))

Anomaly Detection

def detect_anomalies(billing_date: datetime.date, lookback_days: int = 28, z_threshold: float = 3.0):
    """
    For each (team, service) pair, compute z-score vs. trailing 28-day window.
    z > 3.0 → anomaly. Fires an alert to Slack.
    """
    import numpy as np

    combos = db.fetchall("""
        SELECT DISTINCT team, service FROM CostAllocation
        WHERE billing_date >= %s AND billing_date = %s AND billing_date < %s
            GROUP BY billing_date ORDER BY billing_date
        """, (team, service,
              billing_date - datetime.timedelta(lookback_days), billing_date))

        if len(history) < 7:
            continue  # not enough data

        costs = [float(r['daily_cost']) for r in history]
        mean, std = np.mean(costs), np.std(costs)
        if std = z_threshold:
            db.execute("""
                INSERT INTO CostAnomaly
                    (billing_date, team, service, expected_cost, actual_cost, z_score)
                VALUES (%s,%s,%s,%s,%s,%s)
            """, (billing_date, team, service, round(mean, 4), today_cost, round(z, 2)))
            _send_anomaly_alert(team, service, mean, float(today_cost), z)

def _send_anomaly_alert(team, service, expected, actual, z_score):
    direction = "spike" if z_score > 0 else "drop"
    pct_change = abs(actual - expected) / expected * 100
    message = (
        f":warning: Cost anomaly detected — {team} / {service}n"
        f"Expected: ${expected:.2f} | Actual: ${actual:.2f} | {pct_change:.0f}% {direction} (z={z_score:.1f})"
    )
    slack_client.post_message(channel=f"#cost-alerts-{team}", text=message)

Budget Alert Check

def check_budget_alerts():
    """Run daily after allocation. Alert when a team's month-to-date spend crosses alert_pct."""
    today = datetime.date.today()
    month_start = today.replace(day=1)

    budgets = db.fetchall("""
        SELECT b.*, COALESCE(SUM(a.allocated_cost),0) AS mtd_spend
        FROM CostBudget b
        LEFT JOIN CostAllocation a
            ON a.team=b.team AND a.billing_date >= b.period_start AND a.billing_date = budget['alert_pct']:
            slack_client.post_message(
                channel=f"#cost-alerts-{budget['team']}",
                text=(
                    f":money_with_wings: Budget alert — {budget['team']}n"
                    f"${budget['mtd_spend']:.0f} of ${budget['budget_usd']:.0f} "
                    f"({pct_used:.0f}%) used with {(today.replace(month=today.month%12+1,day=1)-datetime.timedelta(1)-today).days+1} days left"
                )
            )
            db.execute(
                "UPDATE CostBudget SET alerted_at=NOW() WHERE budget_id=%s",
                (budget['budget_id'],)
            )

Key Design Decisions

  • Tag-first allocation: teams that tag their resources properly get exact attribution. The proportion pass for untagged resources is deliberately visible — engineers can see “15% of your allocation comes from untagged shared resources” and are incentivized to add tags. Report untagged fraction in dashboards to drive tagging compliance.
  • Partition by billing_date: cost data grows ~30 rows per resource per day. Monthly partitions let old data be archived to cold storage (S3 via partition DETACH) without touching the live table. Anomaly detection uses the last 28 days — always within the current + prior partition.
  • Z-score over fixed thresholds: a fixed threshold ($50/day anomaly) misses a service that normally costs $5,000/day spiking to $7,500 (50% jump, z=1.5 — not anomalous) while alerting on a $0 → $50 jump in a new service. Z-score correctly normalizes by historical variance. Require at least 7 days of history to avoid false positives for new services.
  • 24-hour billing lag: AWS Cost Explorer finalizes daily costs ~24 hours after the billing day ends. Ingest billing_date T at the end of day T+1. Never ingest the current day’s partial costs — they skew anomaly baselines.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”Why partition CloudCostLineItem by billing_date instead of a single table?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”AWS billing data grows approximately 30 line items per resource per day. A company with 5,000 EC2 instances and hundreds of S3 buckets can accumulate 500,000+ rows per day — 180M rows per year. A single unpartitioned table with 2 years of history (360M rows) makes anomaly detection queries (which need 28-day windows) extremely slow even with indexes. Monthly partitions: each partition covers one calendar month. The 28-day anomaly lookback always spans the current + previous partition — a maximum of 2 partitions touched per query. Old partitions (12+ months) are DETACHed and moved to S3 as Parquet for Athena queries. Partition DETACH is O(1) — it removes the partition from the parent table without touching the data. The active table stays lean; historical data remains queryable via Athena.”}},{“@type”:”Question”,”name”:”How does the proportional pass work for untagged shared costs like NAT gateway or load balancer?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Shared infrastructure (NAT gateways, load balancers, VPC endpoints, data transfer) serves all teams but belongs to no single team tag. Without allocation, it lands as "untagged" overhead that disappears from team dashboards. The proportional pass distributes untagged costs based on each team’s share of total tagged spend: if the payments team spent $800 and the identity team spent $200 (total tagged=$1,000), the payments team gets 80% of untagged costs, identity gets 20%. This is a "you use more infrastructure, you pay more shared overhead" model. A fairer alternative for specific shared resources: tag the load balancer with team=shared and split it proportionally by request count per team (requires application-level metrics). The simple proportional-by-spend model is accurate enough for most cost reviews and requires no additional instrumentation.”}},{“@type”:”Question”,”name”:”How do you handle cost spikes that are legitimate, not anomalies?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Z-score anomaly detection fires on any statistically unusual day — including legitimate spikes like "we ran a batch ML training job on the 15th of every month" or "Black Friday traffic tripled our database costs." Two approaches to reduce noise: (1) Acknowledged anomalies: engineers mark resolved anomalies as acknowledged=TRUE in CostAnomaly. A weekly anomaly report only shows unacknowledged rows. After acknowledging "monthly ML training job," it won’t alert again for that signature. (2) Expected events: maintain a CostExpectedEvent table (date, service, reason, expected_multiplier). Before inserting an anomaly, check: if today is in expected_events for this service, skip the z-score check. Teams register planned batch jobs, migrations, or load tests in advance. This shifts anomaly detection from reactive (alert on anything unusual) to contextual (alert on unexplained unusual spend).”}},{“@type”:”Question”,”name”:”How do you allocate costs to individual customers for SaaS unit economics?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Per-customer cost attribution adds a customer dimension to allocation. Tag customer-specific infrastructure: tag EC2 instances or RDS clusters with "customer=acme-corp" where dedicated resources exist. For shared infrastructure serving multiple customers, use usage-based splitting: API calls, storage bytes, or compute minutes per customer (from application metrics) as the allocation key. Schema addition: customer_id column in CostAllocation. Query: SELECT c.customer_name, SUM(ca.allocated_cost) AS monthly_infra_cost FROM CostAllocation ca JOIN Customer c USING (customer_id) WHERE ca.billing_date >= $month_start GROUP BY c.customer_name ORDER BY monthly_infra_cost DESC. Compare with customer MRR: if ACME pays $10K/month but costs $8K to serve, gross margin is 20% — well below target. This per-customer P&L drives pricing decisions and identifies customers to upsell to dedicated tiers.”}},{“@type”:”Question”,”name”:”How do you forecast next month’s cloud spend to set accurate budgets?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Simple linear forecast: compute average daily spend over the last 90 days per team and service. Multiply by days in next month. SELECT team, AVG(daily_cost)*30 AS forecast_30d FROM (SELECT team, billing_date, SUM(allocated_cost) AS daily_cost FROM CostAllocation WHERE billing_date > NOW()-INTERVAL ’90 days’ GROUP BY team, billing_date) sub GROUP BY team. Adjust for known growth: if active users are growing 10%/month and compute scales linearly with users, multiply the forecast by 1.1. Budget alert threshold: set at 90% of the 90-day forecast + 10% buffer. This automatically adjusts as spend grows. For capital planning: use the 90-day trend slope to project 12-month spend. Alert finance when the slope implies exceeding the annual cloud budget by >15% — early enough to either optimize or re-negotiate contracts.”}}]}

Cost allocation and cloud billing system design is discussed in Amazon system design interview questions.

Cost allocation and compute resource attribution design is covered in Databricks system design interview preparation.

Cost allocation and cloud spend attribution design is discussed in Netflix system design interview guide.

Scroll to Top