Live Chat System: Low-Level Design
A live chat system must maintain persistent WebSocket connections, fan out messages in milliseconds to all room members, track who is online, handle typing indicators, and paginate message history efficiently. This article designs each layer from the connection registry through Redis pub/sub fan-out down to the database schema and Python worker code.
Architecture Overview
Each chat server node holds a set of open WebSocket connections. When a message arrives on node A for a room whose members are spread across nodes A, B, and C, node A publishes to a Redis pub/sub channel; all nodes subscribed to that channel push the message to their local connections. This decouples horizontal scaling of WebSocket nodes from message routing.
SQL Schema
CREATE TABLE ChatRoom (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
room_type ENUM('direct','group','channel') NOT NULL DEFAULT 'group',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_name (name)
) ENGINE=InnoDB;
CREATE TABLE ChatMember (
room_id BIGINT UNSIGNED NOT NULL,
user_id BIGINT UNSIGNED NOT NULL,
role ENUM('owner','admin','member') NOT NULL DEFAULT 'member',
last_read_at DATETIME NULL,
joined_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (room_id, user_id),
INDEX idx_user_rooms (user_id)
) ENGINE=InnoDB;
CREATE TABLE ChatMessage (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
room_id BIGINT UNSIGNED NOT NULL,
sender_id BIGINT UNSIGNED NOT NULL,
body TEXT NOT NULL,
msg_type ENUM('text','image','file','system') NOT NULL DEFAULT 'text',
reply_to_id BIGINT UNSIGNED NULL,
deleted_at DATETIME NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_room_cursor (room_id, id DESC),
INDEX idx_sender (sender_id)
) ENGINE=InnoDB;
CREATE TABLE PresenceEvent (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
user_id BIGINT UNSIGNED NOT NULL,
event_type ENUM('connect','disconnect','heartbeat') NOT NULL,
node_id VARCHAR(64) NOT NULL,
recorded_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
PRIMARY KEY (id),
INDEX idx_user_presence (user_id, recorded_at DESC)
) ENGINE=InnoDB;
Redis Data Structures
- Connection registry:
HSET conn:{user_id} connection_id node_id— maps a user to their active connection and the node holding it. - Room pub/sub: channel name
room:{room_id}— all nodes subscribe; messages are published as JSON payloads. - Typing indicator:
SET typing:{room_id}:{user_id} 1 EX 5— a 5-second TTL key; no explicit “stopped typing” event needed. - Offline queue:
LPUSH offline:{user_id} <msg_json>— drained on reconnect, capped withLTRIM. - Presence set:
ZADD online_users {epoch_ts} {user_id}— sweep members older than 30 s as offline.
Python Implementation
import asyncio
import json
import time
import uuid
import redis.asyncio as aioredis
import websockets
import db
NODE_ID = str(uuid.uuid4())[:8]
TYPING_TTL = 5 # seconds
OFFLINE_QUEUE_CAP = 200 # max queued messages per user
redis_client: aioredis.Redis = None
connections: dict[int, websockets.WebSocketServerProtocol] = {} # user_id -> ws
async def handle_ws_message(user_id: int, ws, raw: str) -> None:
"""Dispatch an incoming WebSocket frame."""
try:
msg = json.loads(raw)
except json.JSONDecodeError:
await ws.send(json.dumps({"error": "invalid_json"}))
return
kind = msg.get("type")
if kind == "message":
await _handle_chat_message(user_id, msg)
elif kind == "typing":
await _handle_typing(user_id, msg)
elif kind == "read":
await _handle_read_receipt(user_id, msg)
elif kind == "ping":
await ws.send(json.dumps({"type": "pong"}))
await redis_client.zadd("online_users", {str(user_id): time.time()})
else:
await ws.send(json.dumps({"error": "unknown_type"}))
async def _handle_chat_message(sender_id: int, msg: dict) -> None:
room_id = int(msg["room_id"])
body = msg.get("body", "").strip()
if not body:
return
# persist to DB
message_id = await db.execute(
"""INSERT INTO ChatMessage (room_id, sender_id, body, reply_to_id)
VALUES (%s, %s, %s, %s)""",
(room_id, sender_id, body, msg.get("reply_to_id"))
)
payload = json.dumps({
"type": "message",
"id": message_id,
"room_id": room_id,
"sender_id": sender_id,
"body": body,
"ts": int(time.time() * 1000),
})
await broadcast_to_room(room_id, payload, exclude_user=None)
async def broadcast_to_room(room_id: int, payload: str, exclude_user: int | None) -> None:
"""Publish payload to Redis channel; each node fans out to local connections."""
channel = f"room:{room_id}"
envelope = json.dumps({"channel": channel, "payload": payload, "exclude": exclude_user})
await redis_client.publish(channel, envelope)
async def redis_subscriber(room_ids: list[int]) -> None:
"""Subscribe to room channels and push messages to local WebSocket connections."""
pubsub = redis_client.pubsub()
channels = [f"room:{rid}" for rid in room_ids]
await pubsub.subscribe(*channels)
async for message in pubsub.listen():
if message["type"] != "message":
continue
envelope = json.loads(message["data"])
payload = envelope["payload"]
exclude = envelope.get("exclude")
# determine which local users belong to this room
msg_data = json.loads(payload)
target_room = msg_data.get("room_id")
members = await db.fetchall(
"SELECT user_id FROM ChatMember WHERE room_id = %s", (target_room,)
)
for m in members:
uid = m["user_id"]
if uid == exclude:
continue
if uid in connections:
try:
await connections[uid].send(payload)
except websockets.ConnectionClosed:
pass
else:
# user offline — queue message
key = f"offline:{uid}"
await redis_client.lpush(key, payload)
await redis_client.ltrim(key, 0, OFFLINE_QUEUE_CAP - 1)
async def _handle_typing(user_id: int, msg: dict) -> None:
room_id = msg["room_id"]
key = f"typing:{room_id}:{user_id}"
await redis_client.set(key, 1, ex=TYPING_TTL)
# broadcast typing event (not persisted)
payload = json.dumps({"type": "typing", "user_id": user_id, "room_id": room_id})
await broadcast_to_room(room_id, payload, exclude_user=user_id)
async def _handle_read_receipt(user_id: int, msg: dict) -> None:
room_id = int(msg["room_id"])
await db.execute(
"UPDATE ChatMember SET last_read_at = NOW() WHERE room_id = %s AND user_id = %s",
(room_id, user_id)
)
async def get_history(room_id: int, before_id: int | None, limit: int = 50) -> list[dict]:
"""Cursor-based pagination: fetch messages older than before_id."""
if before_id:
rows = await db.fetchall(
"""SELECT id, sender_id, body, msg_type, reply_to_id, created_at
FROM ChatMessage
WHERE room_id = %s AND id < %s AND deleted_at IS NULL
ORDER BY id DESC LIMIT %s""",
(room_id, before_id, limit)
)
else:
rows = await db.fetchall(
"""SELECT id, sender_id, body, msg_type, reply_to_id, created_at
FROM ChatMessage
WHERE room_id = %s AND deleted_at IS NULL
ORDER BY id DESC LIMIT %s""",
(room_id, limit)
)
return list(reversed(rows))
Presence and Typing Indicators
Presence uses a Redis sorted set online_users with the epoch timestamp as score. A heartbeat from the client updates the score every 15 seconds. A background sweep removes entries older than 30 seconds. Typing indicators use short-lived keys with TTL; when the key expires Redis effectively broadcasts “stopped typing” without any extra event from the client.
Read Receipts
The ChatMember.last_read_at column is updated optimistically when the client sends a read event. Unread counts for a room are computed as COUNT(*) FROM ChatMessage WHERE room_id = X AND id > (SELECT last_read_msg_id ...). On large rooms this query is replaced with an approximate counter in Redis decremented on read.
Message History Pagination
Cursor-based pagination on id (rather than OFFSET) is stable under concurrent inserts. The composite index (room_id, id DESC) makes each page fetch an index-range scan of exactly limit rows.
See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
See also: Snap Interview Guide
See also: Atlassian Interview Guide
See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering