Live Chat System Low-Level Design: WebSocket Connections, Message Persistence, and Presence

Live Chat System: Low-Level Design

A live chat system must maintain persistent WebSocket connections, fan out messages in milliseconds to all room members, track who is online, handle typing indicators, and paginate message history efficiently. This article designs each layer from the connection registry through Redis pub/sub fan-out down to the database schema and Python worker code.

Architecture Overview

Each chat server node holds a set of open WebSocket connections. When a message arrives on node A for a room whose members are spread across nodes A, B, and C, node A publishes to a Redis pub/sub channel; all nodes subscribed to that channel push the message to their local connections. This decouples horizontal scaling of WebSocket nodes from message routing.

SQL Schema


CREATE TABLE ChatRoom (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    name          VARCHAR(255)      NOT NULL,
    room_type     ENUM('direct','group','channel') NOT NULL DEFAULT 'group',
    created_at    DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    INDEX idx_name (name)
) ENGINE=InnoDB;

CREATE TABLE ChatMember (
    room_id       BIGINT UNSIGNED   NOT NULL,
    user_id       BIGINT UNSIGNED   NOT NULL,
    role          ENUM('owner','admin','member') NOT NULL DEFAULT 'member',
    last_read_at  DATETIME          NULL,
    joined_at     DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (room_id, user_id),
    INDEX idx_user_rooms (user_id)
) ENGINE=InnoDB;

CREATE TABLE ChatMessage (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    room_id       BIGINT UNSIGNED   NOT NULL,
    sender_id     BIGINT UNSIGNED   NOT NULL,
    body          TEXT              NOT NULL,
    msg_type      ENUM('text','image','file','system') NOT NULL DEFAULT 'text',
    reply_to_id   BIGINT UNSIGNED   NULL,
    deleted_at    DATETIME          NULL,
    created_at    DATETIME          NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id),
    INDEX idx_room_cursor (room_id, id DESC),
    INDEX idx_sender      (sender_id)
) ENGINE=InnoDB;

CREATE TABLE PresenceEvent (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    user_id       BIGINT UNSIGNED   NOT NULL,
    event_type    ENUM('connect','disconnect','heartbeat') NOT NULL,
    node_id       VARCHAR(64)       NOT NULL,
    recorded_at   DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id),
    INDEX idx_user_presence (user_id, recorded_at DESC)
) ENGINE=InnoDB;

Redis Data Structures

  • Connection registry: HSET conn:{user_id} connection_id node_id — maps a user to their active connection and the node holding it.
  • Room pub/sub: channel name room:{room_id} — all nodes subscribe; messages are published as JSON payloads.
  • Typing indicator: SET typing:{room_id}:{user_id} 1 EX 5 — a 5-second TTL key; no explicit “stopped typing” event needed.
  • Offline queue: LPUSH offline:{user_id} <msg_json> — drained on reconnect, capped with LTRIM.
  • Presence set: ZADD online_users {epoch_ts} {user_id} — sweep members older than 30 s as offline.

Python Implementation


import asyncio
import json
import time
import uuid
import redis.asyncio as aioredis
import websockets
import db

NODE_ID = str(uuid.uuid4())[:8]
TYPING_TTL = 5          # seconds
OFFLINE_QUEUE_CAP = 200 # max queued messages per user

redis_client: aioredis.Redis = None
connections: dict[int, websockets.WebSocketServerProtocol] = {}  # user_id -> ws


async def handle_ws_message(user_id: int, ws, raw: str) -> None:
    """Dispatch an incoming WebSocket frame."""
    try:
        msg = json.loads(raw)
    except json.JSONDecodeError:
        await ws.send(json.dumps({"error": "invalid_json"}))
        return

    kind = msg.get("type")

    if kind == "message":
        await _handle_chat_message(user_id, msg)
    elif kind == "typing":
        await _handle_typing(user_id, msg)
    elif kind == "read":
        await _handle_read_receipt(user_id, msg)
    elif kind == "ping":
        await ws.send(json.dumps({"type": "pong"}))
        await redis_client.zadd("online_users", {str(user_id): time.time()})
    else:
        await ws.send(json.dumps({"error": "unknown_type"}))


async def _handle_chat_message(sender_id: int, msg: dict) -> None:
    room_id = int(msg["room_id"])
    body = msg.get("body", "").strip()
    if not body:
        return

    # persist to DB
    message_id = await db.execute(
        """INSERT INTO ChatMessage (room_id, sender_id, body, reply_to_id)
           VALUES (%s, %s, %s, %s)""",
        (room_id, sender_id, body, msg.get("reply_to_id"))
    )

    payload = json.dumps({
        "type": "message",
        "id": message_id,
        "room_id": room_id,
        "sender_id": sender_id,
        "body": body,
        "ts": int(time.time() * 1000),
    })

    await broadcast_to_room(room_id, payload, exclude_user=None)


async def broadcast_to_room(room_id: int, payload: str, exclude_user: int | None) -> None:
    """Publish payload to Redis channel; each node fans out to local connections."""
    channel = f"room:{room_id}"
    envelope = json.dumps({"channel": channel, "payload": payload, "exclude": exclude_user})
    await redis_client.publish(channel, envelope)


async def redis_subscriber(room_ids: list[int]) -> None:
    """Subscribe to room channels and push messages to local WebSocket connections."""
    pubsub = redis_client.pubsub()
    channels = [f"room:{rid}" for rid in room_ids]
    await pubsub.subscribe(*channels)

    async for message in pubsub.listen():
        if message["type"] != "message":
            continue
        envelope = json.loads(message["data"])
        payload = envelope["payload"]
        exclude = envelope.get("exclude")

        # determine which local users belong to this room
        msg_data = json.loads(payload)
        target_room = msg_data.get("room_id")

        members = await db.fetchall(
            "SELECT user_id FROM ChatMember WHERE room_id = %s", (target_room,)
        )
        for m in members:
            uid = m["user_id"]
            if uid == exclude:
                continue
            if uid in connections:
                try:
                    await connections[uid].send(payload)
                except websockets.ConnectionClosed:
                    pass
            else:
                # user offline — queue message
                key = f"offline:{uid}"
                await redis_client.lpush(key, payload)
                await redis_client.ltrim(key, 0, OFFLINE_QUEUE_CAP - 1)


async def _handle_typing(user_id: int, msg: dict) -> None:
    room_id = msg["room_id"]
    key = f"typing:{room_id}:{user_id}"
    await redis_client.set(key, 1, ex=TYPING_TTL)

    # broadcast typing event (not persisted)
    payload = json.dumps({"type": "typing", "user_id": user_id, "room_id": room_id})
    await broadcast_to_room(room_id, payload, exclude_user=user_id)


async def _handle_read_receipt(user_id: int, msg: dict) -> None:
    room_id = int(msg["room_id"])
    await db.execute(
        "UPDATE ChatMember SET last_read_at = NOW() WHERE room_id = %s AND user_id = %s",
        (room_id, user_id)
    )


async def get_history(room_id: int, before_id: int | None, limit: int = 50) -> list[dict]:
    """Cursor-based pagination: fetch messages older than before_id."""
    if before_id:
        rows = await db.fetchall(
            """SELECT id, sender_id, body, msg_type, reply_to_id, created_at
               FROM ChatMessage
               WHERE room_id = %s AND id < %s AND deleted_at IS NULL
               ORDER BY id DESC LIMIT %s""",
            (room_id, before_id, limit)
        )
    else:
        rows = await db.fetchall(
            """SELECT id, sender_id, body, msg_type, reply_to_id, created_at
               FROM ChatMessage
               WHERE room_id = %s AND deleted_at IS NULL
               ORDER BY id DESC LIMIT %s""",
            (room_id, limit)
        )
    return list(reversed(rows))

Presence and Typing Indicators

Presence uses a Redis sorted set online_users with the epoch timestamp as score. A heartbeat from the client updates the score every 15 seconds. A background sweep removes entries older than 30 seconds. Typing indicators use short-lived keys with TTL; when the key expires Redis effectively broadcasts “stopped typing” without any extra event from the client.

Read Receipts

The ChatMember.last_read_at column is updated optimistically when the client sends a read event. Unread counts for a room are computed as COUNT(*) FROM ChatMessage WHERE room_id = X AND id > (SELECT last_read_msg_id ...). On large rooms this query is replaced with an approximate counter in Redis decremented on read.

Message History Pagination

Cursor-based pagination on id (rather than OFFSET) is stable under concurrent inserts. The composite index (room_id, id DESC) makes each page fetch an index-range scan of exactly limit rows.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “How does Redis pub/sub enable WebSocket message fan-out across multiple servers?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each chat server subscribes to Redis pub/sub channels named room:{room_id}. When a message arrives on any node, that node publishes a JSON payload to the channel. Every other node receives the publish event from Redis and pushes the message to whichever members of that room have local WebSocket connections, achieving cross-node fan-out without direct node-to-node communication.”
}
},
{
“@type”: “Question”,
“name”: “How are typing indicators implemented without a ‘stopped typing’ event?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Each keystroke sets a Redis key typing:{room_id}:{user_id} with a short TTL (e.g., 5 seconds). If the user keeps typing, each keystroke refreshes the TTL. When they stop, the key expires naturally. The client polls or the server sweeps expiring keys to broadcast ‘stopped typing’ status. This avoids the complexity of explicit stop events and handles sudden disconnects gracefully.”
}
},
{
“@type”: “Question”,
“name”: “How do you handle messages sent to offline users in a chat system?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “When a fan-out attempt finds that a user has no active WebSocket connection, the message payload is pushed to a Redis list offline:{user_id}, capped at a configurable maximum (e.g., 200 messages) using LTRIM. On reconnect, the client or server drains this queue and delivers queued messages in order before resuming real-time delivery.”
}
},
{
“@type”: “Question”,
“name”: “Why use cursor-based pagination instead of OFFSET for chat history?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “OFFSET pagination becomes unstable under concurrent inserts: a message inserted before the current page shifts all subsequent rows, causing duplicates or gaps as the user pages backward. Cursor-based pagination on an auto-increment message ID is stable because the cursor value does not shift with new inserts, and the composite index (room_id, id DESC) makes each page fetch a bounded index-range scan.”
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How is WebSocket state managed across multiple server instances?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Connection IDs are registered in Redis hash keyed by user_id; messages are published to a Redis pub/sub channel that all server instances subscribe to, delivering to the right connection.”}},{“@type”:”Question”,”name”:”How are typing indicators implemented efficiently?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A client sends a TYPING event; the server sets a Redis key with 3-second TTL; other room members receive the indicator until the key expires or a STOPPED_TYPING event arrives.”}},{“@type”:”Question”,”name”:”How is message history paginated for infinite scroll?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Messages are fetched using cursor-based pagination on (sent_at DESC, id DESC); the client passes the oldest visible message as the next cursor to load earlier history.”}},{“@type”:”Question”,”name”:”How are read receipts tracked?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Each room member has a last_read_message_id in ChatMember; unread count is computed as messages with id > last_read_message_id for that member.”}}]}

See also: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering

See also: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

See also: Snap Interview Guide

See also: Atlassian Interview Guide

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top