Cost Allocation System: Low-Level Design
A cost allocation system attributes infrastructure spend to the teams, products, or customers that generated it. Without it, a single AWS bill lands on a shared cost center and engineers have no incentive to optimize. With it, teams see their cloud spend in dashboards, anomalies surface within hours, and chargeback to customers enables accurate SaaS unit economics. This design covers the data pipeline from cloud billing APIs to per-team attribution, anomaly detection, and budget alerting.
Core Data Model
CREATE TABLE CloudCostLineItem (
line_item_id BIGSERIAL PRIMARY KEY,
cloud_provider VARCHAR(20) NOT NULL, -- aws, gcp, azure
billing_date DATE NOT NULL,
service VARCHAR(100) NOT NULL, -- EC2, S3, BigQuery
resource_id VARCHAR(255), -- i-0abc123, arn:aws:s3:::my-bucket
tags JSONB NOT NULL DEFAULT '{}',-- {"team":"payments","env":"prod","customer":"acme"}
usage_amount NUMERIC(18,6) NOT NULL,
usage_unit VARCHAR(50), -- Hrs, GB-Mo, Requests
unblended_cost NUMERIC(12,6) NOT NULL, -- USD
imported_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);
CREATE TABLE CostAllocation (
allocation_id BIGSERIAL PRIMARY KEY,
billing_date DATE NOT NULL,
team VARCHAR(100) NOT NULL,
product VARCHAR(100),
customer_id BIGINT,
service VARCHAR(100) NOT NULL,
allocated_cost NUMERIC(12,6) NOT NULL,
allocation_method VARCHAR(30) NOT NULL, -- tag, proportion, manual
source_line_items BIGINT[], -- IDs of contributing CloudCostLineItem rows
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (billing_date);
CREATE TABLE CostBudget (
budget_id SERIAL PRIMARY KEY,
team VARCHAR(100) NOT NULL,
period_type VARCHAR(10) NOT NULL DEFAULT 'monthly', -- monthly, quarterly
period_start DATE NOT NULL,
budget_usd NUMERIC(12,2) NOT NULL,
alert_pct SMALLINT NOT NULL DEFAULT 80, -- alert when this % consumed
alerted_at TIMESTAMPTZ,
UNIQUE (team, period_start)
);
CREATE TABLE CostAnomaly (
anomaly_id BIGSERIAL PRIMARY KEY,
billing_date DATE NOT NULL,
team VARCHAR(100),
service VARCHAR(100) NOT NULL,
expected_cost NUMERIC(12,6),
actual_cost NUMERIC(12,6),
z_score NUMERIC(6,2),
acknowledged BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX ON CloudCostLineItem(billing_date, tags);
CREATE INDEX ON CostAllocation(billing_date, team);
CREATE INDEX ON CostAnomaly(billing_date, acknowledged);
Ingestion Pipeline
import boto3, json, datetime
from decimal import Decimal
def ingest_aws_cost_explorer(billing_date: datetime.date):
"""
Pull daily costs from AWS Cost Explorer. Run nightly after billing data is finalized (~24h lag).
"""
ce = boto3.client('ce', region_name='us-east-1')
date_str = billing_date.isoformat()
paginator_token = None
while True:
kwargs = {
'TimePeriod': {'Start': date_str, 'End': (billing_date + datetime.timedelta(1)).isoformat()},
'Granularity': 'DAILY',
'GroupBy': [
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'DIMENSION', 'Key': 'RESOURCE_ID'},
],
'Metrics': ['UnblendedCost', 'UsageQuantity'],
'Filter': {'Not': {'Dimensions': {'Key': 'RECORD_TYPE', 'Values': ['Credit', 'Refund']}}},
}
if paginator_token:
kwargs['NextPageToken'] = paginator_token
resp = ce.get_cost_and_usage_with_resources(**kwargs)
rows = []
for result_by_time in resp['ResultsByTime']:
for group in result_by_time['Groups']:
service = group['Keys'][0]
resource = group['Keys'][1] if len(group['Keys']) > 1 else None
cost = Decimal(group['Metrics']['UnblendedCost']['Amount'])
usage = Decimal(group['Metrics']['UsageQuantity']['Amount'])
usage_unit = group['Metrics']['UsageQuantity']['Unit']
if cost == 0:
continue
tags = _fetch_resource_tags(resource) if resource else {}
rows.append((
'aws', billing_date, service, resource,
json.dumps(tags), float(usage), usage_unit, float(cost)
))
if rows:
db.executemany("""
INSERT INTO CloudCostLineItem
(cloud_provider, billing_date, service, resource_id, tags,
usage_amount, usage_unit, unblended_cost)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT DO NOTHING
""", rows)
paginator_token = resp.get('NextPageToken')
if not paginator_token:
break
def _fetch_resource_tags(resource_id: str) -> dict:
"""
Fetch resource tags from AWS Resource Groups Tagging API.
Tags like {"team":"payments","env":"prod"} drive allocation.
Cache results in Redis (1-hour TTL) — tag calls are rate-limited.
"""
import redis
r = redis.Redis(decode_responses=True)
cache_key = f"tags:{resource_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
tagging = boto3.client('resourcegroupstaggingapi', region_name='us-east-1')
try:
resp = tagging.get_resources(ResourceARNList=[resource_id])
tags = {t['Key']: t['Value'] for t in resp['ResourceTagMappingList'][0].get('Tags', [])} if resp['ResourceTagMappingList'] else {}
except Exception:
tags = {}
r.setex(cache_key, 3600, json.dumps(tags))
return tags
Allocation Engine
def allocate_costs(billing_date: datetime.date):
"""
Three-pass allocation:
1. Tagged resources → direct allocation to team in tag
2. Shared resources (no team tag) → proportion by tagged usage
3. Remaining unallocated → split equally across all teams
"""
# Pass 1: direct tag-based allocation
tagged_rows = db.fetchall("""
SELECT line_item_id, service, tags->>'team' AS team,
tags->>'product' AS product, unblended_cost
FROM CloudCostLineItem
WHERE billing_date=%s AND tags->>'team' IS NOT NULL
""", (billing_date,))
for row in tagged_rows:
db.execute("""
INSERT INTO CostAllocation
(billing_date, team, product, service, allocated_cost, allocation_method, source_line_items)
VALUES (%s, %s, %s, %s, %s, 'tag', %s)
""", (billing_date, row['team'], row['product'], row['service'],
row['unblended_cost'], [row['line_item_id']]))
# Pass 2: untagged shared costs → split by each team's proportion of tagged spend
total_tagged = db.fetchone(
"SELECT COALESCE(SUM(allocated_cost),0) AS total FROM CostAllocation WHERE billing_date=%s",
(billing_date,)
)['total']
untagged_rows = db.fetchall("""
SELECT line_item_id, service, unblended_cost
FROM CloudCostLineItem
WHERE billing_date=%s AND tags->>'team' IS NULL
""", (billing_date,))
if untagged_rows and total_tagged > 0:
team_shares = db.fetchall("""
SELECT team, SUM(allocated_cost)/(%s) AS share
FROM CostAllocation WHERE billing_date=%s GROUP BY team
""", (total_tagged, billing_date))
for row in untagged_rows:
for ts in team_shares:
db.execute("""
INSERT INTO CostAllocation
(billing_date, team, service, allocated_cost, allocation_method, source_line_items)
VALUES (%s, %s, %s, %s, 'proportion', %s)
""", (billing_date, ts['team'], row['service'],
float(row['unblended_cost']) * float(ts['share']),
[row['line_item_id']]))
Anomaly Detection
def detect_anomalies(billing_date: datetime.date, lookback_days: int = 28, z_threshold: float = 3.0):
"""
For each (team, service) pair, compute z-score vs. trailing 28-day window.
z > 3.0 → anomaly. Fires an alert to Slack.
"""
import numpy as np
combos = db.fetchall("""
SELECT DISTINCT team, service FROM CostAllocation
WHERE billing_date >= %s AND billing_date = %s AND billing_date < %s
GROUP BY billing_date ORDER BY billing_date
""", (team, service,
billing_date - datetime.timedelta(lookback_days), billing_date))
if len(history) < 7:
continue # not enough data
costs = [float(r['daily_cost']) for r in history]
mean, std = np.mean(costs), np.std(costs)
if std = z_threshold:
db.execute("""
INSERT INTO CostAnomaly
(billing_date, team, service, expected_cost, actual_cost, z_score)
VALUES (%s,%s,%s,%s,%s,%s)
""", (billing_date, team, service, round(mean, 4), today_cost, round(z, 2)))
_send_anomaly_alert(team, service, mean, float(today_cost), z)
def _send_anomaly_alert(team, service, expected, actual, z_score):
direction = "spike" if z_score > 0 else "drop"
pct_change = abs(actual - expected) / expected * 100
message = (
f":warning: Cost anomaly detected — {team} / {service}n"
f"Expected: ${expected:.2f} | Actual: ${actual:.2f} | {pct_change:.0f}% {direction} (z={z_score:.1f})"
)
slack_client.post_message(channel=f"#cost-alerts-{team}", text=message)
Budget Alert Check
def check_budget_alerts():
"""Run daily after allocation. Alert when a team's month-to-date spend crosses alert_pct."""
today = datetime.date.today()
month_start = today.replace(day=1)
budgets = db.fetchall("""
SELECT b.*, COALESCE(SUM(a.allocated_cost),0) AS mtd_spend
FROM CostBudget b
LEFT JOIN CostAllocation a
ON a.team=b.team AND a.billing_date >= b.period_start AND a.billing_date = budget['alert_pct']:
slack_client.post_message(
channel=f"#cost-alerts-{budget['team']}",
text=(
f":money_with_wings: Budget alert — {budget['team']}n"
f"${budget['mtd_spend']:.0f} of ${budget['budget_usd']:.0f} "
f"({pct_used:.0f}%) used with {(today.replace(month=today.month%12+1,day=1)-datetime.timedelta(1)-today).days+1} days left"
)
)
db.execute(
"UPDATE CostBudget SET alerted_at=NOW() WHERE budget_id=%s",
(budget['budget_id'],)
)
Key Design Decisions
- Tag-first allocation: teams that tag their resources properly get exact attribution. The proportion pass for untagged resources is deliberately visible — engineers can see “15% of your allocation comes from untagged shared resources” and are incentivized to add tags. Report untagged fraction in dashboards to drive tagging compliance.
- Partition by billing_date: cost data grows ~30 rows per resource per day. Monthly partitions let old data be archived to cold storage (S3 via partition DETACH) without touching the live table. Anomaly detection uses the last 28 days — always within the current + prior partition.
- Z-score over fixed thresholds: a fixed threshold ($50/day anomaly) misses a service that normally costs $5,000/day spiking to $7,500 (50% jump, z=1.5 — not anomalous) while alerting on a $0 → $50 jump in a new service. Z-score correctly normalizes by historical variance. Require at least 7 days of history to avoid false positives for new services.
- 24-hour billing lag: AWS Cost Explorer finalizes daily costs ~24 hours after the billing day ends. Ingest billing_date T at the end of day T+1. Never ingest the current day’s partial costs — they skew anomaly baselines.
Cost allocation and cloud billing system design is discussed in Amazon system design interview questions.
Cost allocation and compute resource attribution design is covered in Databricks system design interview preparation.
Cost allocation and cloud spend attribution design is discussed in Netflix system design interview guide.