Event Replay Low-Level Design: Replaying Historical Events for State Rebuild and Backfill

Event replay re-processes historical events through a consumer to rebuild state, backfill a new downstream system, fix a processing bug, or audit system behavior. It is a critical operational capability for event-sourced systems, CDC pipelines, and Kafka-based architectures. The design challenges are: accessing historical events (retention policy, storage), ensuring replay is idempotent (processing the same event twice produces the same result), and isolating replay traffic from live processing so that a replay job does not cause duplicate side effects in production systems.

Core Data Model (Event Store)

CREATE TABLE Event (
    event_id        UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_id    VARCHAR(100) NOT NULL,   -- the entity this event belongs to
    aggregate_type  VARCHAR(50) NOT NULL,    -- 'Order', 'User', 'Payment'
    event_type      VARCHAR(100) NOT NULL,   -- 'OrderPlaced', 'PaymentFailed'
    payload         JSONB NOT NULL,
    metadata        JSONB NOT NULL DEFAULT '{}',  -- correlation_id, causation_id
    sequence_num    BIGINT NOT NULL,              -- monotonic per aggregate
    occurred_at     TIMESTAMPTZ NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (aggregate_id, sequence_num)
);

CREATE INDEX idx_event_aggregate ON Event(aggregate_id, sequence_num);
CREATE INDEX idx_event_type_time ON Event(event_type, occurred_at);
CREATE INDEX idx_event_occurred ON Event(occurred_at);  -- for time-range replay

-- Replay job tracking
CREATE TABLE ReplayJob (
    job_id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    consumer_name   VARCHAR(100) NOT NULL,
    replay_type     VARCHAR(50) NOT NULL,   -- 'full', 'aggregate', 'time_range', 'event_type'
    filter_params   JSONB,                  -- {aggregate_id, from_time, to_time, event_types}
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    total_events    BIGINT,
    processed_count BIGINT NOT NULL DEFAULT 0,
    last_event_id   UUID,                   -- checkpoint for resume
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,
    created_at      TIMESTAMPTZ DEFAULT NOW()
);

Idempotent Event Handler

class IdempotentEventHandler:
    """Wrap any event handler with idempotency tracking."""

    def __init__(self, handler_name: str):
        self.handler_name = handler_name

    def handle(self, event: dict, is_replay: bool = False) -> bool:
        event_id = event['event_id']
        idempotency_key = f"{self.handler_name}:{event_id}"

        # Check if already processed
        if redis.exists(f"processed:{idempotency_key}"):
            return False  # skip — already handled

        # For DB-backed idempotency (survives Redis flush):
        existing = db.fetchone("""
            SELECT 1 FROM ProcessedEvent
            WHERE handler_name=%s AND event_id=%s
        """, [self.handler_name, event_id])
        if existing:
            redis.setex(f"processed:{idempotency_key}", 86400, 1)
            return False

        # Process the event
        with db.transaction():
            self._process(event, is_replay=is_replay)
            db.execute("""
                INSERT INTO ProcessedEvent (handler_name, event_id, processed_at)
                VALUES (%s, %s, NOW())
                ON CONFLICT DO NOTHING
            """, [self.handler_name, event_id])

        redis.setex(f"processed:{idempotency_key}", 86400, 1)
        return True

    def _process(self, event: dict, is_replay: bool = False):
        # Subclasses implement actual business logic here
        # is_replay flag can suppress side effects (emails, webhooks) during replay
        raise NotImplementedError

Replay Execution with Checkpointing

def run_replay_job(job_id: str, handler: IdempotentEventHandler,
                   batch_size: int = 1000):
    job = db.fetchone("SELECT * FROM ReplayJob WHERE job_id=%s", [job_id])
    db.execute("UPDATE ReplayJob SET status='running', started_at=NOW() WHERE job_id=%s",
               [job_id])

    # Build query based on replay type
    filters = job['filter_params'] or {}
    where_clauses = ["1=1"]
    params = []

    if filters.get('aggregate_id'):
        where_clauses.append("aggregate_id = %s")
        params.append(filters['aggregate_id'])
    if filters.get('from_time'):
        where_clauses.append("occurred_at >= %s")
        params.append(filters['from_time'])
    if filters.get('to_time'):
        where_clauses.append("occurred_at  (
                SELECT occurred_at FROM Event WHERE event_id=%s
            )
        """)
        params.append(job['last_event_id'])

    query = f"""
        SELECT * FROM Event
        WHERE {' AND '.join(where_clauses)}
        ORDER BY occurred_at ASC, event_id ASC
    """

    processed = 0
    last_id = job['last_event_id']

    for event in db.fetchall_cursor(query, params, batch_size=batch_size):
        handler.handle(event, is_replay=True)
        last_id = event['event_id']
        processed += 1

        # Checkpoint every 1000 events
        if processed % 1000 == 0:
            db.execute("""
                UPDATE ReplayJob
                SET processed_count=%s, last_event_id=%s
                WHERE job_id=%s
            """, [processed, last_id, job_id])

    db.execute("""
        UPDATE ReplayJob
        SET status='completed', processed_count=%s, completed_at=NOW()
        WHERE job_id=%s
    """, [processed, job_id])

Kafka-Based Replay

For Kafka-backed systems, replay is built in: Kafka retains messages for a configurable retention period (days to forever with tiered storage). To replay, create a new consumer group and set its offset to the desired start position:

from kafka import KafkaConsumer, TopicPartition

def replay_from_timestamp(topic: str, start_ms: int, consumer_group: str):
    consumer = KafkaConsumer(bootstrap_servers=['kafka:9092'],
                             group_id=consumer_group,
                             auto_offset_reset='earliest',
                             enable_auto_commit=False)
    partitions = consumer.partitions_for_topic(topic)
    tps = [TopicPartition(topic, p) for p in partitions]
    consumer.assign(tps)

    # Seek all partitions to the timestamp
    offsets = consumer.offsets_for_times({tp: start_ms for tp in tps})
    for tp, offset_and_ts in offsets.items():
        if offset_and_ts:
            consumer.seek(tp, offset_and_ts.offset)

    for message in consumer:
        process_event(message.value, is_replay=True)
        consumer.commit()

Key Interview Points

  • Idempotency is the prerequisite for replay — if handlers are not idempotent, replaying events causes double-charges, duplicate emails, and corrupted state. Design for idempotency first.
  • The is_replay=True flag lets handlers suppress external side effects (sending emails, calling payment APIs) during replay — only update internal state.
  • Checkpointing makes replay resumable after failure — save the last processed event_id periodically so a crashed replay job restarts near where it left off, not from the beginning.
  • Replay in a separate consumer group (Kafka) or a separate handler instance to avoid interfering with live processing — different offsets, different DB connections, separate rate limits.
  • Kafka tiered storage enables infinite event retention at S3 cost (~$0.023/GB/month) vs local disk — essential for systems that need replay of events months or years old.
  • Sequence number per aggregate enables “rebuild this entity’s state from scratch” by replaying only its events in order — the foundation of event sourcing.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”Why is idempotency a prerequisite for event replay and how do you implement it?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Event replay re-processes events that were already processed — often months later. If handlers are not idempotent, replay causes double-charges, duplicate emails, and corrupted aggregate state. Idempotency means processing the same event_id twice has the same effect as processing it once. Implementation: maintain a ProcessedEvent table (handler_name, event_id, processed_at). Before processing: check if this (handler_name, event_id) pair exists. If yes, skip. If no, process and insert the pair — both in the same transaction. This DB-backed approach survives Redis flushes. Use Redis as a fast cache (SET processed:{handler}:{event_id} EX 86400) to avoid a DB lookup on every event during normal (non-replay) processing.”}},{“@type”:”Question”,”name”:”How does the is_replay flag prevent side effects during replay?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Many event handlers have side effects beyond updating the database: sending emails, calling payment APIs, pushing notifications, posting to Slack. During replay, you want to rebuild internal state without triggering these external effects. Pass is_replay=True to handlers during replay and gate side effects on it: if not is_replay: send_confirmation_email(user_id). The internal state update (INSERT or UPDATE in the database) always runs. The external call only runs during live processing. This separation is why event sourcing practitioners say "events should only rebuild state — side effects are commanded by projections, not replayed." For partial replays (fix a bug in one handler), only that handler needs replay; all others retain their current state.”}},{“@type”:”Question”,”name”:”How do you replay only the events affecting a single entity?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Filter by aggregate_id: SELECT * FROM Event WHERE aggregate_id=%s ORDER BY sequence_num ASC. The sequence_num is a monotonically increasing per-aggregate counter that guarantees you process events in the order they were applied to that entity. This is the foundation of event sourcing: to get the current state of an order, replay all OrderPlaced, ItemAdded, PaymentProcessed, OrderShipped events for order_id=abc in sequence. For efficiency, combine with a snapshot: store a full state snapshot every 50 events (sequence_num % 50 = 0), then load the most recent snapshot and replay only the events after it. This bounds replay time to at most 50 events regardless of total history length.”}},{“@type”:”Question”,”name”:”How do you replay Kafka events from a specific point in time?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Kafka stores messages on disk with a configurable retention period (default 7 days; unlimited with tiered storage). To replay from a timestamp: (1) Create a new consumer group (or reset an existing one’s offsets). (2) Call consumer.offsets_for_times({TopicPartition: timestamp_ms}) — this returns the earliest offset with a timestamp >= the requested time for each partition. (3) Seek each partition to that offset. (4) Consume forward. This is built into the Kafka consumer API and requires no changes to the producer or topic configuration. The consumer group offset tracks progress; a crash resumes from the last committed offset. Replay in a separate consumer group to avoid interfering with the live consumer group’s offsets.”}},{“@type”:”Question”,”name”:”How do you checkpoint a replay job so it can resume after failure?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Persist the last successfully processed event_id (or Kafka offset) to a ReplayJob table periodically (e.g., every 1,000 events): UPDATE ReplayJob SET last_event_id=%s, processed_count=%s WHERE job_id=%s. On resume, query the last checkpoint and continue from there using a keyset condition: WHERE (occurred_at, event_id) > (SELECT occurred_at FROM Event WHERE event_id=%s). This avoids re-processing the entire history on restart — for a 100M-event replay, checkpointing every 1,000 events means at most 1,000 events are re-processed after a failure. The idempotency guard handles the few duplicates at the checkpoint boundary. Store checkpoints in the database (not just in-memory) so they survive process restarts.”}}]}

Event replay and data pipeline recovery design is discussed in Databricks system design interview questions.

Event replay and event sourcing system design is covered in LinkedIn system design interview preparation.

Event replay and distributed system recovery design is discussed in Uber system design interview guide.

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

Scroll to Top