Low-Level Design: Inventory Management System
An Inventory Management System (IMS) tracks product stock levels, warehouse locations, reorder triggers, and stock movements. It is asked at Shopify, Amazon, and DoorDash in the context of e-commerce or supply chain design.
Core Entities
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Optional
import uuid
class MovementType(Enum):
RECEIVE = "receive" # stock added (purchase order received)
SELL = "sell" # stock consumed (order fulfillment)
ADJUST = "adjust" # manual correction (damage, stocktake)
TRANSFER = "transfer" # move between warehouses
RESERVE = "reserve" # hold stock for pending order
RELEASE = "release" # undo reservation
class StockStatus(Enum):
NORMAL = "normal"
LOW_STOCK = "low_stock" # below reorder_point
OUT_OF_STOCK = "out_of_stock"
OVERSTOCKED = "overstocked" # above max_stock
@dataclass
class Product:
product_id: str
name: str
sku: str
unit_cost_cents: int
reorder_point: int # trigger reorder when quantity falls below
reorder_quantity: int # how much to order
max_stock: int
@dataclass
class Warehouse:
warehouse_id: str
name: str
location: str
@dataclass
class InventoryRecord:
inventory_id: str
product_id: str
warehouse_id: str
quantity_on_hand: int # physically present
quantity_reserved: int # held for pending orders
quantity_available: int # on_hand - reserved
@property
def status(self) -> StockStatus:
if self.quantity_available <= 0:
return StockStatus.OUT_OF_STOCK
if self.quantity_on_hand < 10: # simplified; use reorder_point
return StockStatus.LOW_STOCK
return StockStatus.NORMAL
@dataclass
class StockMovement:
movement_id: str
product_id: str
warehouse_id: str
movement_type: MovementType
quantity: int # positive for additions, negative for removals
reference_id: str # order_id, po_id, etc.
created_at: datetime = field(default_factory=datetime.utcnow)
notes: str = ""
Inventory Service
import threading
class InventoryService:
def __init__(self):
self._inventory: dict[tuple, InventoryRecord] = {} # (product_id, warehouse_id)
self._movements: list[StockMovement] = []
self._products: dict[str, Product] = {}
self._lock = threading.Lock() # per-record in production
def _key(self, product_id: str, warehouse_id: str) -> tuple:
return (product_id, warehouse_id)
def _get_record(self, product_id: str, warehouse_id: str) -> InventoryRecord:
key = self._key(product_id, warehouse_id)
if key not in self._inventory:
self._inventory[key] = InventoryRecord(
inventory_id=str(uuid.uuid4()),
product_id=product_id,
warehouse_id=warehouse_id,
quantity_on_hand=0,
quantity_reserved=0,
quantity_available=0,
)
return self._inventory[key]
def receive_stock(self, product_id: str, warehouse_id: str,
quantity: int, po_id: str) -> InventoryRecord:
if quantity bool:
with self._lock:
record = self._get_record(product_id, warehouse_id)
if record.quantity_available None:
"""Commit reserved stock: deduct from on_hand after shipment."""
with self._lock:
record = self._get_record(product_id, warehouse_id)
if record.quantity_reserved None:
"""Release reserved stock back to available (e.g., order cancelled)."""
with self._lock:
record = self._get_record(product_id, warehouse_id)
if record.quantity_reserved InventoryRecord:
"""Stocktake correction: set on_hand to actual counted quantity."""
with self._lock:
record = self._get_record(product_id, warehouse_id)
delta = new_on_hand - record.quantity_on_hand
record.quantity_on_hand = new_on_hand
record.quantity_available = new_on_hand - record.quantity_reserved
self._log_movement(product_id, warehouse_id, MovementType.ADJUST,
delta, reason)
return record
def _log_movement(self, product_id, warehouse_id, movement_type, quantity, ref):
self._movements.append(StockMovement(
movement_id=str(uuid.uuid4()),
product_id=product_id,
warehouse_id=warehouse_id,
movement_type=movement_type,
quantity=quantity,
reference_id=ref,
))
def _check_reorder(self, product_id: str, record: InventoryRecord) -> None:
product = self._products.get(product_id)
if not product:
return
if record.quantity_available < product.reorder_point:
# In production: emit a reorder event to purchasing system
print(f"REORDER ALERT: {product_id} at {record.quantity_available} units "
f"(reorder point: {product.reorder_point})")
Multi-Warehouse Allocation
def allocate_from_best_warehouse(self, product_id: str, quantity: int,
order_id: str, ship_to_lat: float,
ship_to_lng: float) -> Optional[str]:
"""Find the warehouse with sufficient stock closest to the ship-to location."""
candidates = []
for (pid, wid), record in self._inventory.items():
if pid != product_id:
continue
if record.quantity_available >= quantity:
distance = self._distance(wid, ship_to_lat, ship_to_lng)
candidates.append((distance, wid, record))
if not candidates:
return None
candidates.sort(key=lambda x: x[0])
_, best_wid, _ = candidates[0]
self.reserve_stock(product_id, best_wid, quantity, order_id)
return best_wid
def _distance(self, warehouse_id: str, lat: float, lng: float) -> float:
# In production: fetch warehouse coordinates and compute haversine
return 0.0 # placeholder
Design Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Stock tracking | on_hand + reserved + available | Prevents overselling during concurrent orders |
| Movement log | Append-only StockMovement records | Audit trail, enables replay, supports stocktake reconciliation |
| Concurrency | Lock per operation | For LLD; production uses row-level DB locks or optimistic locking |
| Reorder trigger | On receive/reserve check | Lazy evaluation; background job for bulk checks |
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is the difference between quantity_on_hand, quantity_reserved, and quantity_available?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “quantity_on_hand: physical units in the warehouse (what you can count). quantity_reserved: units committed to pending orders but not yet shipped u2014 physically present but logically allocated. quantity_available: on_hand minus reserved u2014 what new orders can be promised. This three-field model prevents overselling: when a new order arrives, check available (not on_hand). When shipped, deduct from on_hand and reserved. When order is cancelled, release reserved back to available.”
}
},
{
“@type”: “Question”,
“name”: “How do you prevent race conditions when reserving inventory?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Use an atomic compare-and-swap at the database level: UPDATE inventory SET reserved = reserved + qty, available = available – qty WHERE product_id = X AND warehouse_id = W AND available >= qty. If 0 rows are updated, the reservation fails (insufficient stock). This prevents two concurrent orders from both reading available=5 and both reserving 4 (overselling by 3). In a distributed system, use optimistic locking (version column) or a Redis Lua script for atomic check-and-decrement.”
}
},
{
“@type”: “Question”,
“name”: “Why keep an immutable stock movement log?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An append-only movement log (audit trail) enables: (1) Replay: reconstruct current inventory state from movements if the balance is corrupted. (2) Reconciliation: compare logged movements against physical count to find discrepancies (theft, damage, data entry errors). (3) Reporting: sales velocity, receiving frequency, adjustment patterns by warehouse. (4) Debugging: trace why a product went out of stock unexpectedly. Never delete movement records u2014 corrections are new ADJUST movements, not modifications.”
}
},
{
“@type”: “Question”,
“name”: “How does a reorder point trigger work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A reorder point is a minimum quantity threshold: when quantity_available drops below this threshold, trigger a purchase order. Example: a product with reorder_point=50 and reorder_quantity=200. When a sale brings available stock to 48, the system emits a reorder event: “create PO for 200 units from default supplier”. Check the trigger on every reservation and adjustment. In production, add lead_time_days to the reorder calculation: reorder when stock will cover demand until the next shipment arrives, not just when it hits the minimum.”
}
},
{
“@type”: “Question”,
“name”: “How would you scale inventory management to 1000 warehouses?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Shard inventory records by warehouse_id: each shard owns a set of warehouses. For global queries (total available across all warehouses), maintain a materialized aggregate table updated by an event stream (Kafka): each movement emits an event; a consumer updates per-product global totals. For single-warehouse writes (most operations), route to the appropriate shard. For multi-warehouse allocation, use a coordinator service that queries each shard and picks the best warehouse. Use Redis for frequently queried availability data to reduce database load.”
}
}
]
}
Asked at: Shopify Interview Guide
Asked at: DoorDash Interview Guide
Asked at: Stripe Interview Guide
Asked at: Airbnb Interview Guide