Event replay re-processes historical events through a consumer to rebuild state, backfill a new downstream system, fix a processing bug, or audit system behavior. It is a critical operational capability for event-sourced systems, CDC pipelines, and Kafka-based architectures. The design challenges are: accessing historical events (retention policy, storage), ensuring replay is idempotent (processing the same event twice produces the same result), and isolating replay traffic from live processing so that a replay job does not cause duplicate side effects in production systems.
Core Data Model (Event Store)
CREATE TABLE Event (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(100) NOT NULL, -- the entity this event belongs to
aggregate_type VARCHAR(50) NOT NULL, -- 'Order', 'User', 'Payment'
event_type VARCHAR(100) NOT NULL, -- 'OrderPlaced', 'PaymentFailed'
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}', -- correlation_id, causation_id
sequence_num BIGINT NOT NULL, -- monotonic per aggregate
occurred_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (aggregate_id, sequence_num)
);
CREATE INDEX idx_event_aggregate ON Event(aggregate_id, sequence_num);
CREATE INDEX idx_event_type_time ON Event(event_type, occurred_at);
CREATE INDEX idx_event_occurred ON Event(occurred_at); -- for time-range replay
-- Replay job tracking
CREATE TABLE ReplayJob (
job_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
consumer_name VARCHAR(100) NOT NULL,
replay_type VARCHAR(50) NOT NULL, -- 'full', 'aggregate', 'time_range', 'event_type'
filter_params JSONB, -- {aggregate_id, from_time, to_time, event_types}
status VARCHAR(20) NOT NULL DEFAULT 'pending',
total_events BIGINT,
processed_count BIGINT NOT NULL DEFAULT 0,
last_event_id UUID, -- checkpoint for resume
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Idempotent Event Handler
class IdempotentEventHandler:
"""Wrap any event handler with idempotency tracking."""
def __init__(self, handler_name: str):
self.handler_name = handler_name
def handle(self, event: dict, is_replay: bool = False) -> bool:
event_id = event['event_id']
idempotency_key = f"{self.handler_name}:{event_id}"
# Check if already processed
if redis.exists(f"processed:{idempotency_key}"):
return False # skip — already handled
# For DB-backed idempotency (survives Redis flush):
existing = db.fetchone("""
SELECT 1 FROM ProcessedEvent
WHERE handler_name=%s AND event_id=%s
""", [self.handler_name, event_id])
if existing:
redis.setex(f"processed:{idempotency_key}", 86400, 1)
return False
# Process the event
with db.transaction():
self._process(event, is_replay=is_replay)
db.execute("""
INSERT INTO ProcessedEvent (handler_name, event_id, processed_at)
VALUES (%s, %s, NOW())
ON CONFLICT DO NOTHING
""", [self.handler_name, event_id])
redis.setex(f"processed:{idempotency_key}", 86400, 1)
return True
def _process(self, event: dict, is_replay: bool = False):
# Subclasses implement actual business logic here
# is_replay flag can suppress side effects (emails, webhooks) during replay
raise NotImplementedError
Replay Execution with Checkpointing
def run_replay_job(job_id: str, handler: IdempotentEventHandler,
batch_size: int = 1000):
job = db.fetchone("SELECT * FROM ReplayJob WHERE job_id=%s", [job_id])
db.execute("UPDATE ReplayJob SET status='running', started_at=NOW() WHERE job_id=%s",
[job_id])
# Build query based on replay type
filters = job['filter_params'] or {}
where_clauses = ["1=1"]
params = []
if filters.get('aggregate_id'):
where_clauses.append("aggregate_id = %s")
params.append(filters['aggregate_id'])
if filters.get('from_time'):
where_clauses.append("occurred_at >= %s")
params.append(filters['from_time'])
if filters.get('to_time'):
where_clauses.append("occurred_at (
SELECT occurred_at FROM Event WHERE event_id=%s
)
""")
params.append(job['last_event_id'])
query = f"""
SELECT * FROM Event
WHERE {' AND '.join(where_clauses)}
ORDER BY occurred_at ASC, event_id ASC
"""
processed = 0
last_id = job['last_event_id']
for event in db.fetchall_cursor(query, params, batch_size=batch_size):
handler.handle(event, is_replay=True)
last_id = event['event_id']
processed += 1
# Checkpoint every 1000 events
if processed % 1000 == 0:
db.execute("""
UPDATE ReplayJob
SET processed_count=%s, last_event_id=%s
WHERE job_id=%s
""", [processed, last_id, job_id])
db.execute("""
UPDATE ReplayJob
SET status='completed', processed_count=%s, completed_at=NOW()
WHERE job_id=%s
""", [processed, job_id])
Kafka-Based Replay
For Kafka-backed systems, replay is built in: Kafka retains messages for a configurable retention period (days to forever with tiered storage). To replay, create a new consumer group and set its offset to the desired start position:
from kafka import KafkaConsumer, TopicPartition
def replay_from_timestamp(topic: str, start_ms: int, consumer_group: str):
consumer = KafkaConsumer(bootstrap_servers=['kafka:9092'],
group_id=consumer_group,
auto_offset_reset='earliest',
enable_auto_commit=False)
partitions = consumer.partitions_for_topic(topic)
tps = [TopicPartition(topic, p) for p in partitions]
consumer.assign(tps)
# Seek all partitions to the timestamp
offsets = consumer.offsets_for_times({tp: start_ms for tp in tps})
for tp, offset_and_ts in offsets.items():
if offset_and_ts:
consumer.seek(tp, offset_and_ts.offset)
for message in consumer:
process_event(message.value, is_replay=True)
consumer.commit()
Key Interview Points
- Idempotency is the prerequisite for replay — if handlers are not idempotent, replaying events causes double-charges, duplicate emails, and corrupted state. Design for idempotency first.
- The
is_replay=Trueflag lets handlers suppress external side effects (sending emails, calling payment APIs) during replay — only update internal state. - Checkpointing makes replay resumable after failure — save the last processed event_id periodically so a crashed replay job restarts near where it left off, not from the beginning.
- Replay in a separate consumer group (Kafka) or a separate handler instance to avoid interfering with live processing — different offsets, different DB connections, separate rate limits.
- Kafka tiered storage enables infinite event retention at S3 cost (~$0.023/GB/month) vs local disk — essential for systems that need replay of events months or years old.
- Sequence number per aggregate enables “rebuild this entity’s state from scratch” by replaying only its events in order — the foundation of event sourcing.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”Why is idempotency a prerequisite for event replay and how do you implement it?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Event replay re-processes events that were already processed — often months later. If handlers are not idempotent, replay causes double-charges, duplicate emails, and corrupted aggregate state. Idempotency means processing the same event_id twice has the same effect as processing it once. Implementation: maintain a ProcessedEvent table (handler_name, event_id, processed_at). Before processing: check if this (handler_name, event_id) pair exists. If yes, skip. If no, process and insert the pair — both in the same transaction. This DB-backed approach survives Redis flushes. Use Redis as a fast cache (SET processed:{handler}:{event_id} EX 86400) to avoid a DB lookup on every event during normal (non-replay) processing.”}},{“@type”:”Question”,”name”:”How does the is_replay flag prevent side effects during replay?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Many event handlers have side effects beyond updating the database: sending emails, calling payment APIs, pushing notifications, posting to Slack. During replay, you want to rebuild internal state without triggering these external effects. Pass is_replay=True to handlers during replay and gate side effects on it: if not is_replay: send_confirmation_email(user_id). The internal state update (INSERT or UPDATE in the database) always runs. The external call only runs during live processing. This separation is why event sourcing practitioners say "events should only rebuild state — side effects are commanded by projections, not replayed." For partial replays (fix a bug in one handler), only that handler needs replay; all others retain their current state.”}},{“@type”:”Question”,”name”:”How do you replay only the events affecting a single entity?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Filter by aggregate_id: SELECT * FROM Event WHERE aggregate_id=%s ORDER BY sequence_num ASC. The sequence_num is a monotonically increasing per-aggregate counter that guarantees you process events in the order they were applied to that entity. This is the foundation of event sourcing: to get the current state of an order, replay all OrderPlaced, ItemAdded, PaymentProcessed, OrderShipped events for order_id=abc in sequence. For efficiency, combine with a snapshot: store a full state snapshot every 50 events (sequence_num % 50 = 0), then load the most recent snapshot and replay only the events after it. This bounds replay time to at most 50 events regardless of total history length.”}},{“@type”:”Question”,”name”:”How do you replay Kafka events from a specific point in time?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Kafka stores messages on disk with a configurable retention period (default 7 days; unlimited with tiered storage). To replay from a timestamp: (1) Create a new consumer group (or reset an existing one’s offsets). (2) Call consumer.offsets_for_times({TopicPartition: timestamp_ms}) — this returns the earliest offset with a timestamp >= the requested time for each partition. (3) Seek each partition to that offset. (4) Consume forward. This is built into the Kafka consumer API and requires no changes to the producer or topic configuration. The consumer group offset tracks progress; a crash resumes from the last committed offset. Replay in a separate consumer group to avoid interfering with the live consumer group’s offsets.”}},{“@type”:”Question”,”name”:”How do you checkpoint a replay job so it can resume after failure?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Persist the last successfully processed event_id (or Kafka offset) to a ReplayJob table periodically (e.g., every 1,000 events): UPDATE ReplayJob SET last_event_id=%s, processed_count=%s WHERE job_id=%s. On resume, query the last checkpoint and continue from there using a keyset condition: WHERE (occurred_at, event_id) > (SELECT occurred_at FROM Event WHERE event_id=%s). This avoids re-processing the entire history on restart — for a 100M-event replay, checkpointing every 1,000 events means at most 1,000 events are re-processed after a failure. The idempotency guard handles the few duplicates at the checkpoint boundary. Store checkpoints in the database (not just in-memory) so they survive process restarts.”}}]}
Event replay and data pipeline recovery design is discussed in Databricks system design interview questions.
Event replay and event sourcing system design is covered in LinkedIn system design interview preparation.
Event replay and distributed system recovery design is discussed in Uber system design interview guide.
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering
See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety