The Problem: Multi-Service Data Assembly
In a microservices architecture, a single client-facing response often requires data from several independent services. A product detail page might need: product data from the catalog service, pricing from the pricing service, inventory from the warehouse service, and reviews from the review service. If the client calls each service sequentially, total latency is the sum of all calls. If any service is slow, the entire page is slow.
The API composition pattern solves this by introducing an aggregator service that fans out to all downstream services in parallel, joins the results, and returns a single unified response to the client. The aggregator hides the service topology from the client and handles failure gracefully.
Aggregator Service Architecture
The aggregator sits between the client (browser, mobile app, BFF) and the downstream microservices. Its responsibilities:
- Receive the incoming request and identify the required downstream calls (the aggregation plan).
- Fan out to all downstream services in parallel.
- Collect results and merge them into a unified schema.
- Handle partial failures — return what is available with fallbacks for failed services.
- Cache the assembled response to avoid redundant fan-out for identical requests.
Parallel Fan-Out
Parallel fan-out is the core performance mechanism. Instead of sequential calls where total latency = sum of all latencies, parallel calls give total latency = max(all latencies).
In Python, use asyncio.gather() for async I/O-bound calls, or ThreadPoolExecutor for wrapping synchronous HTTP clients:
import asyncio
import httpx
async def fetch_parallel(requests: list[dict]) -> list:
async with httpx.AsyncClient(timeout=0.2) as client:
tasks = [client.get(r['url'], params=r.get('params', {})) for r in requests]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
The return_exceptions=True flag is essential: it prevents a single failed downstream from cancelling all other in-flight requests. Each result is either a response object or an exception, which the assembly step handles independently.
Partial Failure Handling
Distributed systems fail partially. The aggregator must not return an error to the client just because one of five downstream services is down. Instead:
- For non-critical fields (e.g., recommendations, reviews), return a fallback default or cached last-known value and mark the field with an error indicator.
- For critical fields (e.g., price, availability), the aggregator may choose to fail the entire request or return a degraded response with a clear indicator.
- Log every partial failure with the downstream service name and error type for observability.
The response schema should accommodate partial data: use nullable fields or an explicit errors array at the top level to communicate which services failed.
Per-Downstream Timeout
Each downstream service gets its own timeout (e.g., 200ms). If a service does not respond within its timeout:
- Return the cached last-known value for that field if available.
- Otherwise, omit the field and include it in the
errorslist. - Do not let the slow service block the response for all other successfully fetched data.
Set per-service timeouts based on the service's P99 latency plus a small buffer — not a single global timeout. A reporting service might tolerate 500ms while a pricing service should respond within 100ms.
Response Caching at the Aggregator
Assembled responses can be cached at the aggregator level, avoiding redundant fan-out for identical requests. The cache key is typically a hash of the request parameters. The TTL is min(individual service TTLs) — the assembled response cannot be fresher than its least-fresh component.
Caching is especially valuable for high-read, low-write data (product pages, public profiles) where the same composition is served to many users. Cache invalidation follows the same cache-aside pattern: on any downstream data change, invalidate the affected aggregated responses.
Schema Stitching (GraphQL)
In GraphQL-based architectures, schema stitching (or federation) is a form of API composition at the type system level. Each downstream service exposes its own GraphQL schema. The aggregator gateway stitches them into a unified schema and resolves each field from the appropriate service. Apollo Federation uses an @key directive to declare entity ownership; the gateway routes field resolution to the owning service automatically.
Circuit Breaker Per Downstream
A circuit breaker prevents the aggregator from repeatedly calling a failing downstream service. After a configurable number of consecutive failures, the circuit opens and the aggregator returns a cached fallback immediately without attempting the call. After a cooldown period, the circuit moves to half-open and allows a single probe request. On success, the circuit closes and normal routing resumes.
Implement per-service circuit breakers, not a global one. One failing recommendation service should not open the circuit for the catalog service.
SQL: Schema Design
-- Downstream service registry
CREATE TABLE DownstreamService (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL UNIQUE,
base_url TEXT NOT NULL,
timeout_ms INT NOT NULL DEFAULT 200,
cache_ttl_seconds INT NOT NULL DEFAULT 60,
circuit_breaker_threshold INT NOT NULL DEFAULT 5, -- failures before open
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Aggregation plan: which services to call and how to join
CREATE TABLE AggregationPlan (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL UNIQUE,
steps JSONB NOT NULL, -- [{service_id, endpoint, field_mapping}]
cache_ttl_seconds INT NOT NULL DEFAULT 30,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Metrics per plan execution
CREATE TABLE CompositionMetric (
id BIGSERIAL PRIMARY KEY,
plan_id INT NOT NULL REFERENCES AggregationPlan(id),
success_count BIGINT NOT NULL DEFAULT 0,
partial_failure_count BIGINT NOT NULL DEFAULT 0,
full_failure_count BIGINT NOT NULL DEFAULT 0,
avg_latency_ms FLOAT NOT NULL DEFAULT 0,
sampled_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_composition_metric_plan_time ON CompositionMetric(plan_id, sampled_at DESC);
Python: AggregatorService
import asyncio
import hashlib
import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional
import httpx
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
@dataclass
class ServiceRequest:
service_name: str
url: str
params: dict = field(default_factory=dict)
timeout_ms: int = 200
field_key: str = '' # key in assembled response
@dataclass
class ServiceResult:
service_name: str
field_key: str
data: Optional[Any] = None
error: Optional[str] = None
@property
def ok(self) -> bool:
return self.error is None
# Simple in-process circuit breaker
class CircuitBreaker:
def __init__(self, threshold: int = 5, cooldown_seconds: int = 30):
self.threshold = threshold
self.cooldown = cooldown_seconds
self.failures: dict[str, int] = {}
self.open_until: dict[str, float] = {}
def is_open(self, service: str) -> bool:
if service in self.open_until:
if time.monotonic() < self.open_until[service]:
return True
# cooldown expired — move to half-open
del self.open_until[service]
self.failures[service] = 0
return False
def record_failure(self, service: str):
self.failures[service] = self.failures.get(service, 0) + 1
if self.failures[service] >= self.threshold:
self.open_until[service] = time.monotonic() + self.cooldown
print(f'Circuit OPEN for {service}')
def record_success(self, service: str):
self.failures[service] = 0
cb = CircuitBreaker()
class AggregatorService:
def __init__(self, redis_client: redis.Redis):
self.r = redis_client
def _cache_key(self, plan_id: str, requests: list[ServiceRequest]) -> str:
raw = plan_id + json.dumps([r.params for r in requests], sort_keys=True)
return 'agg:' + hashlib.sha256(raw.encode()).hexdigest()[:16]
async def _fetch_one(self, client: httpx.AsyncClient, req: ServiceRequest) -> ServiceResult:
if cb.is_open(req.service_name):
return ServiceResult(req.service_name, req.field_key, error='circuit_open')
try:
resp = await client.get(
req.url,
params=req.params,
timeout=req.timeout_ms / 1000,
)
resp.raise_for_status()
cb.record_success(req.service_name)
return ServiceResult(req.service_name, req.field_key, data=resp.json())
except Exception as exc:
cb.record_failure(req.service_name)
return ServiceResult(req.service_name, req.field_key, error=str(exc))
async def fetch_parallel(self, requests: list[ServiceRequest]) -> list[ServiceResult]:
async with httpx.AsyncClient() as client:
tasks = [self._fetch_one(client, req) for req in requests]
return await asyncio.gather(*tasks)
def assemble_response(
self,
results: list[ServiceResult],
fallbacks: Optional[dict] = None,
) -> dict:
fallbacks = fallbacks or {}
response: dict[str, Any] = {}
errors = []
for result in results:
if result.ok:
response[result.field_key] = result.data
else:
errors.append({'service': result.service_name, 'error': result.error})
response[result.field_key] = fallbacks.get(result.field_key)
if errors:
response['_errors'] = errors
return response
def cache_response(self, plan_id: str, requests: list[ServiceRequest], response: dict, ttl: int):
key = self._cache_key(plan_id, requests)
self.r.setex(key, ttl, json.dumps(response))
def get_cached(self, plan_id: str, requests: list[ServiceRequest]) -> Optional[dict]:
key = self._cache_key(plan_id, requests)
raw = self.r.get(key)
return json.loads(raw) if raw else None
async def compose(
self,
plan_id: str,
requests: list[ServiceRequest],
ttl: int = 30,
fallbacks: Optional[dict] = None,
) -> dict:
cached = self.get_cached(plan_id, requests)
if cached:
print(f'Cache hit for plan {plan_id}')
return cached
results = await self.fetch_parallel(requests)
response = self.assemble_response(results, fallbacks)
self.cache_response(plan_id, requests, response, ttl)
return response
# Usage example
async def main():
aggregator = AggregatorService(r)
requests = [
ServiceRequest('catalog', 'http://catalog/products/42', field_key='product'),
ServiceRequest('pricing', 'http://pricing/price/42', field_key='price', timeout_ms=100),
ServiceRequest('inventory', 'http://inventory/stock/42', field_key='stock'),
ServiceRequest('reviews', 'http://reviews/product/42', field_key='reviews', timeout_ms=300),
]
response = await aggregator.compose(
plan_id='product-detail',
requests=requests,
ttl=30,
fallbacks={'reviews': [], 'stock': {'available': False}},
)
print(json.dumps(response, indent=2))
asyncio.run(main())
Design Trade-offs Summary
| Concern | Approach | Trade-off |
|---|---|---|
| Latency | Parallel fan-out (asyncio.gather) | Latency = max(services); adds aggregator hop |
| Partial failure | Fallbacks + error array | Degraded but useful response; requires fallback design |
| Slow downstream | Per-service timeout | Fast response; may omit field on transient slowness |
| Repeated fan-out | Aggregator-level cache | Low latency for hot requests; cache invalidation complexity |
| Cascading failures | Per-service circuit breaker | Fast fallback; brief window of stale data on open circuit |
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why is parallel fan-out faster than sequential calls in API composition?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “In sequential fan-out, total latency is the sum of all downstream call latencies. In parallel fan-out using asyncio.gather or ThreadPoolExecutor, all downstream calls are in flight simultaneously, so total latency equals the slowest single call. For five services each averaging 50ms, sequential gives 250ms while parallel gives 50ms.”
}
},
{
“@type”: “Question”,
“name”: “How should an aggregator handle a partial downstream failure?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “The aggregator should return a partial response rather than an error. Failed fields are populated with fallback values (cached last-known data, empty arrays, or null) and the response includes an errors array listing which services failed and why. Critical fields like price or availability may warrant a full failure response, but non-critical fields like recommendations should always use fallbacks.”
}
},
{
“@type”: “Question”,
“name”: “How do you set per-service timeouts in an aggregator?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Set each service's timeout based on its P99 latency plus a small buffer (e.g., 20%). Do not use a single global timeout. A pricing service critical to the response might get 100ms while a reviews service might get 300ms. Monitor actual latency distributions per service and adjust thresholds to minimize false timeouts while keeping total response time acceptable.”
}
},
{
“@type”: “Question”,
“name”: “Why use a circuit breaker per downstream service rather than a global one?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A global circuit breaker would stop all downstream calls if any single service fails, causing complete aggregation failure even when most services are healthy. Per-service circuit breakers isolate failures: if the reviews service is down, its circuit opens and returns a cached fallback immediately, while all other services continue to be called normally.”
}
}
]
}
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does parallel fan-out reduce aggregator latency?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “asyncio.gather() issues all downstream requests concurrently; the aggregator waits for the slowest response rather than the sum of all response times; for 5 services each taking 100ms, parallel fan-out takes ~100ms vs 500ms sequentially.”
}
},
{
“@type”: “Question”,
“name”: “How does the aggregator handle partial downstream failures?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “asyncio.gather(return_exceptions=True) captures exceptions per downstream; the assembly step treats failed downstreams as null/default values and adds error indicators to the response; clients receive a partial response with degraded but available data.”
}
},
{
“@type”: “Question”,
“name”: “How does per-downstream timeout prevent cascading delays?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each downstream call is wrapped with asyncio.wait_for(timeout=downstream_timeout_ms); on timeout, the call is cancelled and the aggregator uses a cached or default value for that downstream, preventing one slow service from blocking the entire response.”
}
},
{
“@type”: “Question”,
“name”: “How is response caching implemented at the aggregator level?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A cache key is derived from the request parameters and downstream data fingerprints; the assembled response is stored with TTL = min(individual service TTLs) to ensure the cached aggregate does not outlive any of its components.”
}
}
]
}
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety