Multi-Factor Authentication Low-Level Design: TOTP, SMS OTP, Backup Codes, and Replay Prevention

Multi-factor authentication (MFA) adds a second verification layer beyond passwords — protecting accounts even when passwords are compromised. The three main methods are TOTP (Time-based One-Time Password, used by Authy/Google Authenticator), SMS OTP, and backup codes. Core challenges: secure TOTP secret storage, handling clock skew, SMS delivery reliability, backup code single-use enforcement, and account recovery when the second factor is lost.

Core Data Model

CREATE TYPE mfa_method AS ENUM ('totp','sms','backup_code');

CREATE TABLE MfaCredential (
    credential_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id       UUID NOT NULL,
    method        mfa_method NOT NULL,
    -- TOTP fields
    totp_secret   BYTEA,           -- encrypted AES-256; never store plaintext
    -- SMS fields
    phone_number  TEXT,            -- E.164 format, e.g. +14155552671
    -- Backup codes (stored as individual rows, each single-use)
    backup_code_hash CHAR(64),     -- SHA-256 of the plaintext code
    is_used       BOOLEAN NOT NULL DEFAULT FALSE,
    -- Common
    is_verified   BOOLEAN NOT NULL DEFAULT FALSE,  -- enrollment confirmed
    created_at    TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE (user_id, method)  -- one TOTP, one SMS per user (multiple backup codes OK)
);
CREATE INDEX idx_mfa_user ON MfaCredential (user_id, method);

-- Track pending MFA challenges (SMS OTP in flight)
CREATE TABLE MfaChallenge (
    challenge_id   UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id        UUID NOT NULL,
    method         mfa_method NOT NULL,
    code_hash      CHAR(64) NOT NULL,    -- SHA-256 of 6-digit OTP
    attempts       INT NOT NULL DEFAULT 0,
    expires_at     TIMESTAMPTZ NOT NULL,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_mfa_challenge_user ON MfaChallenge (user_id, expires_at DESC);

TOTP Enrollment and Verification

import pyotp, os, hashlib, base64
from cryptography.fernet import Fernet

# Key management: load from environment / KMS in production
FERNET_KEY = os.environ["MFA_ENCRYPTION_KEY"].encode()
fernet = Fernet(FERNET_KEY)

TOTP_WINDOW = 1  # accept codes from ±1 time step (±30s)

def enroll_totp(conn, user_id: str) -> dict:
    """
    Generate a TOTP secret, encrypt it, and return the provisioning URI
    for QR code display. The user must verify a valid TOTP code to complete enrollment.
    """
    # Generate 20-byte (160-bit) secret — RFC 4226 minimum
    raw_secret = pyotp.random_base32()

    # Encrypt before storing
    encrypted_secret = fernet.encrypt(raw_secret.encode())

    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO MfaCredential (user_id, method, totp_secret, is_verified)
            VALUES (%s, 'totp', %s, FALSE)
            ON CONFLICT (user_id, method) DO UPDATE
            SET totp_secret = EXCLUDED.totp_secret, is_verified = FALSE
        """, (user_id, encrypted_secret))
    conn.commit()

    # Generate provisioning URI for QR code
    totp = pyotp.TOTP(raw_secret)
    uri = totp.provisioning_uri(name=user_id, issuer_name="TechInterview App")
    return {"provisioning_uri": uri, "secret": raw_secret}  # raw_secret shown once

def verify_totp_enrollment(conn, user_id: str, code: str) -> bool:
    """
    User scans QR code, enters first TOTP code to confirm enrollment.
    Marks is_verified = TRUE on success.
    """
    with conn.cursor() as cur:
        cur.execute(
            "SELECT totp_secret FROM MfaCredential WHERE user_id=%s AND method='totp'",
            (user_id,)
        )
        row = cur.fetchone()
    if not row:
        return False

    raw_secret = fernet.decrypt(row[0]).decode()
    totp = pyotp.TOTP(raw_secret)

    if totp.verify(code, valid_window=TOTP_WINDOW):
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE MfaCredential SET is_verified=TRUE WHERE user_id=%s AND method='totp'",
                (user_id,)
            )
        conn.commit()
        return True
    return False

def verify_totp_login(conn, user_id: str, code: str) -> bool:
    """Verify TOTP during login — reject if enrollment not confirmed."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT totp_secret, is_verified FROM MfaCredential WHERE user_id=%s AND method='totp'",
            (user_id,)
        )
        row = cur.fetchone()
    if not row or not row[1]:  # not enrolled
        return False

    raw_secret = fernet.decrypt(row[0]).decode()
    return pyotp.TOTP(raw_secret).verify(code, valid_window=TOTP_WINDOW)

SMS OTP Challenge

import secrets, hashlib
from datetime import datetime, timezone, timedelta

SMS_OTP_LENGTH = 6
SMS_OTP_TTL_MIN = 10
MAX_ATTEMPTS = 3

def send_sms_challenge(conn, user_id: str) -> str:
    """Generate a 6-digit OTP, send via SMS, store hash in DB."""
    with conn.cursor() as cur:
        cur.execute(
            "SELECT phone_number, is_verified FROM MfaCredential WHERE user_id=%s AND method='sms'",
            (user_id,)
        )
        row = cur.fetchone()
    if not row or not row[1]:
        raise ValueError("SMS MFA not enrolled")

    phone_number = row[0]
    otp = str(secrets.randbelow(10**SMS_OTP_LENGTH)).zfill(SMS_OTP_LENGTH)
    code_hash = hashlib.sha256(otp.encode()).hexdigest()
    expires_at = datetime.now(timezone.utc) + timedelta(minutes=SMS_OTP_TTL_MIN)

    # Invalidate any existing pending challenges for this user
    with conn.cursor() as cur:
        cur.execute(
            "DELETE FROM MfaChallenge WHERE user_id=%s AND method='sms'",
            (user_id,)
        )
        cur.execute("""
            INSERT INTO MfaChallenge (user_id, method, code_hash, expires_at)
            VALUES (%s, 'sms', %s, %s)
        """, (user_id, code_hash, expires_at))
    conn.commit()

    send_sms(phone_number, f"Your verification code is {otp}. Valid for {SMS_OTP_TTL_MIN} minutes.")
    return str(user_id)  # return user_id for logging; never return OTP

def verify_sms_otp(conn, user_id: str, otp: str) -> bool:
    """Verify the submitted OTP against the stored challenge."""
    code_hash = hashlib.sha256(otp.encode()).hexdigest()
    now = datetime.now(timezone.utc)

    with conn.cursor() as cur:
        cur.execute("""
            SELECT challenge_id, attempts, expires_at
            FROM MfaChallenge
            WHERE user_id=%s AND method='sms' AND expires_at > %s
            ORDER BY created_at DESC LIMIT 1
            FOR UPDATE
        """, (user_id, now))
        row = cur.fetchone()

    if not row:
        return False  # No pending challenge or expired

    challenge_id, attempts, expires_at = row
    if attempts >= MAX_ATTEMPTS:
        return False  # Too many failed attempts — force new OTP

    with conn.cursor() as cur:
        cur.execute(
            "SELECT 1 FROM MfaChallenge WHERE challenge_id=%s AND code_hash=%s",
            (challenge_id, code_hash)
        )
        match = cur.fetchone() is not None

        if match:
            cur.execute("DELETE FROM MfaChallenge WHERE challenge_id=%s", (challenge_id,))
        else:
            cur.execute(
                "UPDATE MfaChallenge SET attempts=attempts+1 WHERE challenge_id=%s",
                (challenge_id,)
            )
    conn.commit()
    return match

Backup Codes

def generate_backup_codes(conn, user_id: str, count: int = 10) -> list[str]:
    """Generate one-time backup codes. Invalidate existing codes first."""
    with conn.cursor() as cur:
        cur.execute(
            "DELETE FROM MfaCredential WHERE user_id=%s AND method='backup_code'",
            (user_id,)
        )

    codes = []
    with conn.cursor() as cur:
        for _ in range(count):
            # 8-character alphanumeric code: e.g. "A3K9-MX2P"
            raw = secrets.token_hex(4).upper()
            formatted = f"{raw[:4]}-{raw[4:]}"
            code_hash = hashlib.sha256(formatted.encode()).hexdigest()
            cur.execute("""
                INSERT INTO MfaCredential (user_id, method, backup_code_hash, is_verified)
                VALUES (%s, 'backup_code', %s, TRUE)
            """, (user_id, code_hash))
            codes.append(formatted)
    conn.commit()
    return codes  # Shown to user ONCE — never stored in plaintext

def use_backup_code(conn, user_id: str, code: str) -> bool:
    """Consume a backup code — single use, atomically marked as used."""
    code_hash = hashlib.sha256(code.encode()).hexdigest()
    with conn.cursor() as cur:
        cur.execute("""
            UPDATE MfaCredential
            SET is_used = TRUE
            WHERE user_id=%s AND method='backup_code'
              AND backup_code_hash=%s AND is_used=FALSE
            RETURNING credential_id
        """, (user_id, code_hash))
        consumed = cur.fetchone() is not None
    conn.commit()
    return consumed

Key Interview Points

  • TOTP clock skew tolerance: TOTP generates a new 6-digit code every 30 seconds. A ±1 window (valid_window=1 in pyotp) accepts codes from ±30 seconds, accommodating devices with slightly drifted clocks. Never accept a wider window (±5 steps = ±2.5 minutes) — that weakens the security significantly. If a user’s device clock is severely drifted, prompt them to sync time rather than widening the window.
  • Encrypt TOTP secrets at rest: The TOTP secret is equivalent to the user’s MFA factor — if it leaks from the database, an attacker can generate valid codes forever. Use AES-256 encryption (Fernet) with the key stored in AWS KMS or a hardware security module (HSM), not in the database. This way, a database breach doesn’t compromise MFA secrets — the attacker also needs the KMS key.
  • SMS OTP timing attack: Without constant-time comparison, an attacker timing code verification responses can infer partial code matches. Use hmac.compare_digest() or store the hash and compare hashes — hash comparison is already constant-time. Also rate-limit to MAX_ATTEMPTS=3 before invalidating the challenge — prevents brute force of the 6-digit space (1M possibilities / 3 attempts per OTP).
  • TOTP replay prevention: A valid TOTP code is valid for ~90 seconds (with ±1 window). An attacker who intercepts the code can replay it within that window. Prevent by storing the last-used code per user and rejecting it if presented again: UPDATE MfaCredential SET last_used_totp=%s WHERE last_used_totp != %s (reject if already seen). This reduces the replay window from 90s to ~0s.
  • MFA bypass and account recovery: Users who lose their TOTP device need a recovery path. Options: (1) backup codes (provided at enrollment, stored offline); (2) identity verification via support (manual review); (3) email-to-new-device flow with time delay (72-hour wait deters attackers). The recovery path is often the weakest link — attackers use social engineering to trigger “I lost my phone” flows. Log and alert on recovery attempts with rate limiting.

Multi-factor authentication and security system design is discussed in Google system design interview questions.

Multi-factor authentication and financial security design is covered in Coinbase system design interview preparation.

Multi-factor authentication and payment security design is discussed in Stripe system design interview guide.

Scroll to Top