Multi-factor authentication (MFA) adds a second verification layer beyond passwords — protecting accounts even when passwords are compromised. The three main methods are TOTP (Time-based One-Time Password, used by Authy/Google Authenticator), SMS OTP, and backup codes. Core challenges: secure TOTP secret storage, handling clock skew, SMS delivery reliability, backup code single-use enforcement, and account recovery when the second factor is lost.
Core Data Model
CREATE TYPE mfa_method AS ENUM ('totp','sms','backup_code');
CREATE TABLE MfaCredential (
credential_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
method mfa_method NOT NULL,
-- TOTP fields
totp_secret BYTEA, -- encrypted AES-256; never store plaintext
-- SMS fields
phone_number TEXT, -- E.164 format, e.g. +14155552671
-- Backup codes (stored as individual rows, each single-use)
backup_code_hash CHAR(64), -- SHA-256 of the plaintext code
is_used BOOLEAN NOT NULL DEFAULT FALSE,
-- Common
is_verified BOOLEAN NOT NULL DEFAULT FALSE, -- enrollment confirmed
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (user_id, method) -- one TOTP, one SMS per user (multiple backup codes OK)
);
CREATE INDEX idx_mfa_user ON MfaCredential (user_id, method);
-- Track pending MFA challenges (SMS OTP in flight)
CREATE TABLE MfaChallenge (
challenge_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
method mfa_method NOT NULL,
code_hash CHAR(64) NOT NULL, -- SHA-256 of 6-digit OTP
attempts INT NOT NULL DEFAULT 0,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_mfa_challenge_user ON MfaChallenge (user_id, expires_at DESC);
TOTP Enrollment and Verification
import pyotp, os, hashlib, base64
from cryptography.fernet import Fernet
# Key management: load from environment / KMS in production
FERNET_KEY = os.environ["MFA_ENCRYPTION_KEY"].encode()
fernet = Fernet(FERNET_KEY)
TOTP_WINDOW = 1 # accept codes from ±1 time step (±30s)
def enroll_totp(conn, user_id: str) -> dict:
"""
Generate a TOTP secret, encrypt it, and return the provisioning URI
for QR code display. The user must verify a valid TOTP code to complete enrollment.
"""
# Generate 20-byte (160-bit) secret — RFC 4226 minimum
raw_secret = pyotp.random_base32()
# Encrypt before storing
encrypted_secret = fernet.encrypt(raw_secret.encode())
with conn.cursor() as cur:
cur.execute("""
INSERT INTO MfaCredential (user_id, method, totp_secret, is_verified)
VALUES (%s, 'totp', %s, FALSE)
ON CONFLICT (user_id, method) DO UPDATE
SET totp_secret = EXCLUDED.totp_secret, is_verified = FALSE
""", (user_id, encrypted_secret))
conn.commit()
# Generate provisioning URI for QR code
totp = pyotp.TOTP(raw_secret)
uri = totp.provisioning_uri(name=user_id, issuer_name="TechInterview App")
return {"provisioning_uri": uri, "secret": raw_secret} # raw_secret shown once
def verify_totp_enrollment(conn, user_id: str, code: str) -> bool:
"""
User scans QR code, enters first TOTP code to confirm enrollment.
Marks is_verified = TRUE on success.
"""
with conn.cursor() as cur:
cur.execute(
"SELECT totp_secret FROM MfaCredential WHERE user_id=%s AND method='totp'",
(user_id,)
)
row = cur.fetchone()
if not row:
return False
raw_secret = fernet.decrypt(row[0]).decode()
totp = pyotp.TOTP(raw_secret)
if totp.verify(code, valid_window=TOTP_WINDOW):
with conn.cursor() as cur:
cur.execute(
"UPDATE MfaCredential SET is_verified=TRUE WHERE user_id=%s AND method='totp'",
(user_id,)
)
conn.commit()
return True
return False
def verify_totp_login(conn, user_id: str, code: str) -> bool:
"""Verify TOTP during login — reject if enrollment not confirmed."""
with conn.cursor() as cur:
cur.execute(
"SELECT totp_secret, is_verified FROM MfaCredential WHERE user_id=%s AND method='totp'",
(user_id,)
)
row = cur.fetchone()
if not row or not row[1]: # not enrolled
return False
raw_secret = fernet.decrypt(row[0]).decode()
return pyotp.TOTP(raw_secret).verify(code, valid_window=TOTP_WINDOW)
SMS OTP Challenge
import secrets, hashlib
from datetime import datetime, timezone, timedelta
SMS_OTP_LENGTH = 6
SMS_OTP_TTL_MIN = 10
MAX_ATTEMPTS = 3
def send_sms_challenge(conn, user_id: str) -> str:
"""Generate a 6-digit OTP, send via SMS, store hash in DB."""
with conn.cursor() as cur:
cur.execute(
"SELECT phone_number, is_verified FROM MfaCredential WHERE user_id=%s AND method='sms'",
(user_id,)
)
row = cur.fetchone()
if not row or not row[1]:
raise ValueError("SMS MFA not enrolled")
phone_number = row[0]
otp = str(secrets.randbelow(10**SMS_OTP_LENGTH)).zfill(SMS_OTP_LENGTH)
code_hash = hashlib.sha256(otp.encode()).hexdigest()
expires_at = datetime.now(timezone.utc) + timedelta(minutes=SMS_OTP_TTL_MIN)
# Invalidate any existing pending challenges for this user
with conn.cursor() as cur:
cur.execute(
"DELETE FROM MfaChallenge WHERE user_id=%s AND method='sms'",
(user_id,)
)
cur.execute("""
INSERT INTO MfaChallenge (user_id, method, code_hash, expires_at)
VALUES (%s, 'sms', %s, %s)
""", (user_id, code_hash, expires_at))
conn.commit()
send_sms(phone_number, f"Your verification code is {otp}. Valid for {SMS_OTP_TTL_MIN} minutes.")
return str(user_id) # return user_id for logging; never return OTP
def verify_sms_otp(conn, user_id: str, otp: str) -> bool:
"""Verify the submitted OTP against the stored challenge."""
code_hash = hashlib.sha256(otp.encode()).hexdigest()
now = datetime.now(timezone.utc)
with conn.cursor() as cur:
cur.execute("""
SELECT challenge_id, attempts, expires_at
FROM MfaChallenge
WHERE user_id=%s AND method='sms' AND expires_at > %s
ORDER BY created_at DESC LIMIT 1
FOR UPDATE
""", (user_id, now))
row = cur.fetchone()
if not row:
return False # No pending challenge or expired
challenge_id, attempts, expires_at = row
if attempts >= MAX_ATTEMPTS:
return False # Too many failed attempts — force new OTP
with conn.cursor() as cur:
cur.execute(
"SELECT 1 FROM MfaChallenge WHERE challenge_id=%s AND code_hash=%s",
(challenge_id, code_hash)
)
match = cur.fetchone() is not None
if match:
cur.execute("DELETE FROM MfaChallenge WHERE challenge_id=%s", (challenge_id,))
else:
cur.execute(
"UPDATE MfaChallenge SET attempts=attempts+1 WHERE challenge_id=%s",
(challenge_id,)
)
conn.commit()
return match
Backup Codes
def generate_backup_codes(conn, user_id: str, count: int = 10) -> list[str]:
"""Generate one-time backup codes. Invalidate existing codes first."""
with conn.cursor() as cur:
cur.execute(
"DELETE FROM MfaCredential WHERE user_id=%s AND method='backup_code'",
(user_id,)
)
codes = []
with conn.cursor() as cur:
for _ in range(count):
# 8-character alphanumeric code: e.g. "A3K9-MX2P"
raw = secrets.token_hex(4).upper()
formatted = f"{raw[:4]}-{raw[4:]}"
code_hash = hashlib.sha256(formatted.encode()).hexdigest()
cur.execute("""
INSERT INTO MfaCredential (user_id, method, backup_code_hash, is_verified)
VALUES (%s, 'backup_code', %s, TRUE)
""", (user_id, code_hash))
codes.append(formatted)
conn.commit()
return codes # Shown to user ONCE — never stored in plaintext
def use_backup_code(conn, user_id: str, code: str) -> bool:
"""Consume a backup code — single use, atomically marked as used."""
code_hash = hashlib.sha256(code.encode()).hexdigest()
with conn.cursor() as cur:
cur.execute("""
UPDATE MfaCredential
SET is_used = TRUE
WHERE user_id=%s AND method='backup_code'
AND backup_code_hash=%s AND is_used=FALSE
RETURNING credential_id
""", (user_id, code_hash))
consumed = cur.fetchone() is not None
conn.commit()
return consumed
Key Interview Points
- TOTP clock skew tolerance: TOTP generates a new 6-digit code every 30 seconds. A ±1 window (valid_window=1 in pyotp) accepts codes from ±30 seconds, accommodating devices with slightly drifted clocks. Never accept a wider window (±5 steps = ±2.5 minutes) — that weakens the security significantly. If a user’s device clock is severely drifted, prompt them to sync time rather than widening the window.
- Encrypt TOTP secrets at rest: The TOTP secret is equivalent to the user’s MFA factor — if it leaks from the database, an attacker can generate valid codes forever. Use AES-256 encryption (Fernet) with the key stored in AWS KMS or a hardware security module (HSM), not in the database. This way, a database breach doesn’t compromise MFA secrets — the attacker also needs the KMS key.
- SMS OTP timing attack: Without constant-time comparison, an attacker timing code verification responses can infer partial code matches. Use hmac.compare_digest() or store the hash and compare hashes — hash comparison is already constant-time. Also rate-limit to MAX_ATTEMPTS=3 before invalidating the challenge — prevents brute force of the 6-digit space (1M possibilities / 3 attempts per OTP).
- TOTP replay prevention: A valid TOTP code is valid for ~90 seconds (with ±1 window). An attacker who intercepts the code can replay it within that window. Prevent by storing the last-used code per user and rejecting it if presented again: UPDATE MfaCredential SET last_used_totp=%s WHERE last_used_totp != %s (reject if already seen). This reduces the replay window from 90s to ~0s.
- MFA bypass and account recovery: Users who lose their TOTP device need a recovery path. Options: (1) backup codes (provided at enrollment, stored offline); (2) identity verification via support (manual review); (3) email-to-new-device flow with time delay (72-hour wait deters attackers). The recovery path is often the weakest link — attackers use social engineering to trigger “I lost my phone” flows. Log and alert on recovery attempts with rate limiting.
Multi-factor authentication and security system design is discussed in Google system design interview questions.
Multi-factor authentication and financial security design is covered in Coinbase system design interview preparation.
Multi-factor authentication and payment security design is discussed in Stripe system design interview guide.