Overview
A schema registry is a centralized store for event and message schemas used in event-driven architectures. Producers register schemas before publishing; consumers fetch schemas to deserialize messages. This design covers Avro, JSON Schema, and Protobuf with compatibility enforcement and Kafka wire-format integration — as implemented in Confluent Schema Registry.
Data Model
Schema Table
CREATE TABLE schemas (
id SERIAL PRIMARY KEY,
subject VARCHAR(255) NOT NULL, -- e.g. "payments-value"
version INT NOT NULL,
schema_definition TEXT NOT NULL, -- raw Avro/JSON Schema/Protobuf
schema_type ENUM('AVRO','JSON','PROTOBUF') NOT NULL DEFAULT 'AVRO',
fingerprint CHAR(64) NOT NULL, -- SHA-256 of normalized schema
created_at TIMESTAMP DEFAULT NOW(),
is_deleted BOOLEAN DEFAULT FALSE, -- soft delete (tombstone)
UNIQUE (subject, version),
UNIQUE (fingerprint) -- dedup: same schema = same ID
);
Subject Config Table
CREATE TABLE subject_config (
subject VARCHAR(255) PRIMARY KEY,
compatibility ENUM('BACKWARD','FORWARD','FULL','NONE') NOT NULL DEFAULT 'BACKWARD',
updated_at TIMESTAMP DEFAULT NOW()
);
Subject Naming Convention
-- Topic "payments", key schema
subject = "payments-key"
-- Topic "payments", value schema
subject = "payments-value"
-- Record name strategy (alternative)
subject = "com.example.Payment"
The TopicNameStrategy (topic + “-key”/”-value”) is the default. RecordNameStrategy allows schema reuse across topics. The strategy is set per producer and must match the consumer.
Compatibility Modes
BACKWARD — new schema can read data written by old schema (default; safe for consumers first)
FORWARD — old schema can read data written by new schema (safe for producers first)
FULL — both BACKWARD and FORWARD (most restrictive)
NONE — no compatibility check (use only in dev)
Avro Compatibility Check Algorithm
def is_backward_compatible(new_schema: dict, old_schema: dict) -> bool:
"""
New schema is backward compatible with old schema if:
- All fields in old_schema exist in new_schema, OR have a default in new_schema
- New fields added to new_schema must have defaults (so old data missing them can be read)
- No field type changes that break deserialization (int -> long ok, int -> string not ok)
"""
old_fields = {f["name"]: f for f in old_schema.get("fields", [])}
new_fields = {f["name"]: f for f in new_schema.get("fields", [])}
# Removed fields must have defaults in the new schema
for name, field in old_fields.items():
if name not in new_fields:
# Field removed — OK only if new schema won't need it (BACKWARD: reading old data)
# Old data has value; new reader ignores unknown fields — this is fine
continue
# Added fields must have defaults
for name, field in new_fields.items():
if name not in old_fields and "default" not in field:
return False # New field without default breaks reading old data
return True
REST API
## Register a new schema version
POST /subjects/{subject}/versions
Content-Type: application/vnd.schemaregistry.v1+json
{"schema": "{"type":"record","name":"Payment",...}"}
Response 200: {"id": 42}
Response 409: {"error_code": 409, "message": "Schema being registered is incompatible"}
Response 422: {"error_code": 422, "message": "Invalid schema"}
## Get latest schema for a subject
GET /subjects/{subject}/versions/latest
Response: {"subject":"payments-value","version":7,"id":42,"schema":"..."}
## Get schema by global ID (used by consumers)
GET /schemas/ids/{id}
Response: {"schema": "{...}"}
## List all versions of a subject
GET /subjects/{subject}/versions
Response: [1, 2, 3, 4, 5, 6, 7]
## Check compatibility before registering
POST /compatibility/subjects/{subject}/versions/latest
Response: {"is_compatible": true}
## Soft delete a subject (tombstone)
DELETE /subjects/{subject}
Response: [1, 2, 3, 4, 5, 6, 7] -- deleted version numbers
Schema Fingerprint Deduplication
import hashlib, json
def normalize_and_fingerprint(schema_str: str) -> str:
"""Canonical form: sorted keys, no whitespace."""
parsed = json.loads(schema_str)
canonical = json.dumps(parsed, sort_keys=True, separators=(',', ':'))
return hashlib.sha256(canonical.encode()).hexdigest()
# On POST /subjects/:subject/versions:
fp = normalize_and_fingerprint(incoming_schema)
existing = db.query("SELECT id FROM schemas WHERE fingerprint = %s", fp)
if existing:
return {"id": existing.id} # same schema already registered, return existing ID
Kafka Wire Format
Byte layout of a Kafka message value (Confluent wire format):
0 1 2 3 4 5 ...
+--------+--------+--------+--------+--------+--------+---
| magic | schema_id (4 bytes, big-endian) | avro payload ...
| 0x00 | |
+--------+--------------------------------------------+---
Magic byte = 0x00 (protocol version marker)
Schema ID = 4-byte big-endian int32 (looked up in schema registry)
## Python producer (confluent-kafka + fastavro)
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(sr_client, schema_str, conf={"auto.register.schemas": False})
producer = Producer({"bootstrap.servers": "kafka:9092"})
producer.produce(
topic="payments",
value=avro_serializer(payment_dict, SerializationContext("payments", MessageField.VALUE))
)
Schema Caching in Producers and Consumers
class SchemaCache:
def __init__(self, registry_url: str, capacity: int = 1000):
self.client = SchemaRegistryClient(registry_url)
self.cache: OrderedDict[int, str] = OrderedDict() # LRU by schema_id
self.capacity = capacity
def get_schema(self, schema_id: int) -> str:
if schema_id in self.cache:
self.cache.move_to_end(schema_id)
return self.cache[schema_id]
schema = self.client.get_schema(schema_id).schema_str
self.cache[schema_id] = schema
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # evict LRU
return schema
Cache hit rate is typically 99%+ because schema IDs are stable once registered. Producers cache schema ID by subject+version; consumers cache schema string by ID.
Schema Evolution Examples
-- Safe (BACKWARD compatible): add optional field with default
{"name": "currency", "type": "string", "default": "USD"}
-- Safe: remove a field (old data still has bytes; new reader skips them)
-- UNSAFE: rename a field (old data has old name, new reader won't find it)
-- UNSAFE: change type int -> string
-- UNSAFE: remove a field's default (breaks reading old data lacking the field)
Operational Notes
- Deploy schema registry with 3+ replicas behind a load balancer; use a shared Postgres or Kafka-backed store (not in-memory) so all nodes agree.
- Set
BACKWARDcompatibility by default; override toNONEonly in dev namespaces. - Pre-register schemas in CI before deployment using
/compatibilityendpoint — fail the pipeline on incompatible schemas. - Monitor: schema count per subject, registration rate, compatibility check failure rate.
Interview Tips
- Interviewers expect you to know the Confluent wire format (magic byte + 4-byte schema ID) — it comes up whenever Kafka serialization is discussed.
- Explain why fingerprint deduplication matters: without it, registering the same schema twice creates two IDs, bloating the registry and confusing consumers.
- BACKWARD vs FORWARD confusion is a common trap — mnemonic: BACKWARD = “new code, old data” (consumers upgrade first); FORWARD = “old code, new data” (producers upgrade first).
{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “What is a schema registry and why is it needed in Kafka systems?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A schema registry is a centralized store for Avro, JSON Schema, or Protobuf schemas used by Kafka producers and consumers. Without it, producers and consumers must agree on schema out-of-band, making schema evolution error-prone. The registry enforces compatibility rules on every schema registration, provides a global schema ID embedded in the Kafka message wire format (magic byte + 4-byte schema ID + payload), and allows consumers to fetch the exact schema used to serialize any message.”
}
},
{
“@type”: “Question”,
“name”: “What is the difference between BACKWARD and FORWARD schema compatibility?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “BACKWARD compatibility means the new schema can read data written by the old schema — consumers can upgrade first. FORWARD compatibility means the old schema can read data written by the new schema — producers can upgrade first. FULL compatibility requires both. A useful mnemonic: BACKWARD = 'new code reads old data'; FORWARD = 'old code reads new data'. BACKWARD is the default and safest choice for most event-driven systems because consumers are upgraded before producers.”
}
},
{
“@type”: “Question”,
“name”: “How does schema fingerprint deduplication work?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When a schema is registered, the registry normalizes it to canonical form (sorted keys, no whitespace) and computes a SHA-256 fingerprint. Before assigning a new ID, it checks if the fingerprint already exists in the database. If it does, it returns the existing schema ID instead of creating a duplicate. This prevents the registry from accumulating multiple IDs for structurally identical schemas, which would confuse consumers and waste storage.”
}
},
{
“@type”: “Question”,
“name”: “How do producers and consumers use schema IDs efficiently at high throughput?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Both producers and consumers maintain an in-process LRU cache keyed by schema ID (consumers) or by subject+version (producers). On a cache hit, no network call to the registry is made. Cache hit rates are typically above 99% because schema IDs are stable once registered and a given producer uses a small set of schemas. The cache capacity is tuned to the number of distinct schemas in use (commonly 100-1000 entries), making the cache memory cost negligible.”
}
}
]
}
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture