Low Level Design: Schema Registry

Overview

A schema registry is a centralized store for event and message schemas used in event-driven architectures. Producers register schemas before publishing; consumers fetch schemas to deserialize messages. This design covers Avro, JSON Schema, and Protobuf with compatibility enforcement and Kafka wire-format integration — as implemented in Confluent Schema Registry.

Data Model

Schema Table

CREATE TABLE schemas (
    id                SERIAL PRIMARY KEY,
    subject           VARCHAR(255) NOT NULL,       -- e.g. "payments-value"
    version           INT NOT NULL,
    schema_definition TEXT NOT NULL,               -- raw Avro/JSON Schema/Protobuf
    schema_type       ENUM('AVRO','JSON','PROTOBUF') NOT NULL DEFAULT 'AVRO',
    fingerprint       CHAR(64) NOT NULL,            -- SHA-256 of normalized schema
    created_at        TIMESTAMP DEFAULT NOW(),
    is_deleted        BOOLEAN DEFAULT FALSE,        -- soft delete (tombstone)
    UNIQUE (subject, version),
    UNIQUE (fingerprint)                            -- dedup: same schema = same ID
);

Subject Config Table

CREATE TABLE subject_config (
    subject           VARCHAR(255) PRIMARY KEY,
    compatibility     ENUM('BACKWARD','FORWARD','FULL','NONE') NOT NULL DEFAULT 'BACKWARD',
    updated_at        TIMESTAMP DEFAULT NOW()
);

Subject Naming Convention

-- Topic "payments", key schema
subject = "payments-key"

-- Topic "payments", value schema
subject = "payments-value"

-- Record name strategy (alternative)
subject = "com.example.Payment"

The TopicNameStrategy (topic + “-key”/”-value”) is the default. RecordNameStrategy allows schema reuse across topics. The strategy is set per producer and must match the consumer.

Compatibility Modes

BACKWARD  — new schema can read data written by old schema (default; safe for consumers first)
FORWARD   — old schema can read data written by new schema (safe for producers first)
FULL      — both BACKWARD and FORWARD (most restrictive)
NONE      — no compatibility check (use only in dev)

Avro Compatibility Check Algorithm

def is_backward_compatible(new_schema: dict, old_schema: dict) -> bool:
    """
    New schema is backward compatible with old schema if:
    - All fields in old_schema exist in new_schema, OR have a default in new_schema
    - New fields added to new_schema must have defaults (so old data missing them can be read)
    - No field type changes that break deserialization (int -> long ok, int -> string not ok)
    """
    old_fields = {f["name"]: f for f in old_schema.get("fields", [])}
    new_fields = {f["name"]: f for f in new_schema.get("fields", [])}

    # Removed fields must have defaults in the new schema
    for name, field in old_fields.items():
        if name not in new_fields:
            # Field removed — OK only if new schema won't need it (BACKWARD: reading old data)
            # Old data has value; new reader ignores unknown fields — this is fine
            continue

    # Added fields must have defaults
    for name, field in new_fields.items():
        if name not in old_fields and "default" not in field:
            return False  # New field without default breaks reading old data

    return True

REST API

## Register a new schema version
POST /subjects/{subject}/versions
Content-Type: application/vnd.schemaregistry.v1+json

{"schema": "{"type":"record","name":"Payment",...}"}

Response 200: {"id": 42}
Response 409: {"error_code": 409, "message": "Schema being registered is incompatible"}
Response 422: {"error_code": 422, "message": "Invalid schema"}

## Get latest schema for a subject
GET /subjects/{subject}/versions/latest
Response: {"subject":"payments-value","version":7,"id":42,"schema":"..."}

## Get schema by global ID (used by consumers)
GET /schemas/ids/{id}
Response: {"schema": "{...}"}

## List all versions of a subject
GET /subjects/{subject}/versions
Response: [1, 2, 3, 4, 5, 6, 7]

## Check compatibility before registering
POST /compatibility/subjects/{subject}/versions/latest
Response: {"is_compatible": true}

## Soft delete a subject (tombstone)
DELETE /subjects/{subject}
Response: [1, 2, 3, 4, 5, 6, 7]   -- deleted version numbers

Schema Fingerprint Deduplication

import hashlib, json

def normalize_and_fingerprint(schema_str: str) -> str:
    """Canonical form: sorted keys, no whitespace."""
    parsed = json.loads(schema_str)
    canonical = json.dumps(parsed, sort_keys=True, separators=(',', ':'))
    return hashlib.sha256(canonical.encode()).hexdigest()

# On POST /subjects/:subject/versions:
fp = normalize_and_fingerprint(incoming_schema)
existing = db.query("SELECT id FROM schemas WHERE fingerprint = %s", fp)
if existing:
    return {"id": existing.id}   # same schema already registered, return existing ID

Kafka Wire Format

Byte layout of a Kafka message value (Confluent wire format):

 0        1        2        3        4        5       ...
+--------+--------+--------+--------+--------+--------+---
| magic  |        schema_id (4 bytes, big-endian)     | avro payload ...
| 0x00   |                                            |
+--------+--------------------------------------------+---

Magic byte = 0x00 (protocol version marker)
Schema ID  = 4-byte big-endian int32 (looked up in schema registry)
## Python producer (confluent-kafka + fastavro)
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(sr_client, schema_str, conf={"auto.register.schemas": False})

producer = Producer({"bootstrap.servers": "kafka:9092"})
producer.produce(
    topic="payments",
    value=avro_serializer(payment_dict, SerializationContext("payments", MessageField.VALUE))
)

Schema Caching in Producers and Consumers

class SchemaCache:
    def __init__(self, registry_url: str, capacity: int = 1000):
        self.client = SchemaRegistryClient(registry_url)
        self.cache: OrderedDict[int, str] = OrderedDict()  # LRU by schema_id
        self.capacity = capacity

    def get_schema(self, schema_id: int) -> str:
        if schema_id in self.cache:
            self.cache.move_to_end(schema_id)
            return self.cache[schema_id]
        schema = self.client.get_schema(schema_id).schema_str
        self.cache[schema_id] = schema
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)  # evict LRU
        return schema

Cache hit rate is typically 99%+ because schema IDs are stable once registered. Producers cache schema ID by subject+version; consumers cache schema string by ID.

Schema Evolution Examples

-- Safe (BACKWARD compatible): add optional field with default
{"name": "currency", "type": "string", "default": "USD"}

-- Safe: remove a field (old data still has bytes; new reader skips them)

-- UNSAFE: rename a field (old data has old name, new reader won't find it)
-- UNSAFE: change type int -> string
-- UNSAFE: remove a field's default (breaks reading old data lacking the field)

Operational Notes

  • Deploy schema registry with 3+ replicas behind a load balancer; use a shared Postgres or Kafka-backed store (not in-memory) so all nodes agree.
  • Set BACKWARD compatibility by default; override to NONE only in dev namespaces.
  • Pre-register schemas in CI before deployment using /compatibility endpoint — fail the pipeline on incompatible schemas.
  • Monitor: schema count per subject, registration rate, compatibility check failure rate.

Interview Tips

  • Interviewers expect you to know the Confluent wire format (magic byte + 4-byte schema ID) — it comes up whenever Kafka serialization is discussed.
  • Explain why fingerprint deduplication matters: without it, registering the same schema twice creates two IDs, bloating the registry and confusing consumers.
  • BACKWARD vs FORWARD confusion is a common trap — mnemonic: BACKWARD = “new code, old data” (consumers upgrade first); FORWARD = “old code, new data” (producers upgrade first).

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is a schema registry and why is it needed in Kafka systems?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A schema registry is a centralized store for Avro, JSON Schema, or Protobuf schemas used by Kafka producers and consumers. Without it, producers and consumers must agree on schema out-of-band, making schema evolution error-prone. The registry enforces compatibility rules on every schema registration, provides a global schema ID embedded in the Kafka message wire format (magic byte + 4-byte schema ID + payload), and allows consumers to fetch the exact schema used to serialize any message.”
}
},
{
“@type”: “Question”,
“name”: “What is the difference between BACKWARD and FORWARD schema compatibility?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “BACKWARD compatibility means the new schema can read data written by the old schema — consumers can upgrade first. FORWARD compatibility means the old schema can read data written by the new schema — producers can upgrade first. FULL compatibility requires both. A useful mnemonic: BACKWARD = 'new code reads old data'; FORWARD = 'old code reads new data'. BACKWARD is the default and safest choice for most event-driven systems because consumers are upgraded before producers.”
}
},
{
“@type”: “Question”,
“name”: “How does schema fingerprint deduplication work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When a schema is registered, the registry normalizes it to canonical form (sorted keys, no whitespace) and computes a SHA-256 fingerprint. Before assigning a new ID, it checks if the fingerprint already exists in the database. If it does, it returns the existing schema ID instead of creating a duplicate. This prevents the registry from accumulating multiple IDs for structurally identical schemas, which would confuse consumers and waste storage.”
}
},
{
“@type”: “Question”,
“name”: “How do producers and consumers use schema IDs efficiently at high throughput?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Both producers and consumers maintain an in-process LRU cache keyed by schema ID (consumers) or by subject+version (producers). On a cache hit, no network call to the registry is made. Cache hit rates are typically above 99% because schema IDs are stable once registered and a given producer uses a small set of schemas. The cache capacity is tuned to the number of distinct schemas in use (commonly 100-1000 entries), making the cache memory cost negligible.”
}
}
]
}

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top