Designing a Slack-like team collaboration application tests your knowledge of real-time messaging, WebSocket management, distributed storage, and search. This question appears at Meta, Microsoft, Atlassian, and any company building communication infrastructure.
Requirements Clarification
- Scale: 10M DAU, 1B messages/day, 500k concurrent WebSocket connections
- Features: Real-time messaging, channels, DMs, threads, search, file sharing, presence
- Latency: Message delivery <100ms for online recipients; push notification <5s for offline
- Storage: Messages retained indefinitely (or per workspace plan); searchable
- Workspace isolation: Strict data isolation between organizations
WebSocket Gateway
import asyncio
import json
import logging
from typing import Dict, Set, Optional
from dataclasses import dataclass, field
import websockets
@dataclass
class Connection:
user_id: str
workspace_id: str
websocket: object # websockets.WebSocketServerProtocol
subscribed_channels: Set[str] = field(default_factory=set)
device_type: str = "desktop" # desktop | mobile | browser
class WebSocketGateway:
"""
Stateful WebSocket gateway. Horizontally scalable via Redis pub/sub fanout.
Each gateway instance handles ~50k connections.
"""
def __init__(self, redis_client, message_router):
self.connections: Dict[str, Connection] = {} # conn_id → Connection
self.user_connections: Dict[str, Set[str]] = {} # user_id → {conn_ids}
self.redis = redis_client
self.router = message_router
async def handle_connection(self, websocket, path: str):
conn_id = await self._authenticate(websocket)
if not conn_id:
return
conn = self.connections[conn_id]
try:
await self._subscribe_to_channels(conn)
await self._send_message(websocket, {
"type": "hello",
"conn_id": conn_id,
"heartbeat_interval": 30,
})
async for raw_msg in websocket:
await self._handle_message(conn, json.loads(raw_msg))
except websockets.exceptions.ConnectionClosed:
pass
finally:
await self._cleanup_connection(conn_id)
async def _handle_message(self, conn: Connection, msg: dict):
msg_type = msg.get("type")
handlers = {
"ping": self._handle_ping,
"message:send": self._handle_send,
"channel:join": self._handle_join,
"typing:start": self._handle_typing,
}
handler = handlers.get(msg_type)
if handler:
await handler(conn, msg)
async def _handle_send(self, conn: Connection, msg: dict):
channel_id = msg["channel_id"]
content = msg["text"]
# Persist to Cassandra via message service
saved_msg = await self.router.persist_message({
"workspace_id": conn.workspace_id,
"channel_id": channel_id,
"user_id": conn.user_id,
"text": content,
"client_msg_id": msg.get("client_msg_id"), # Dedup key
})
# Fanout via Redis pub/sub to all gateway instances
await self.redis.publish(
f"channel:{channel_id}",
json.dumps({"type": "message:new", "message": saved_msg})
)
async def _handle_ping(self, conn: Connection, msg: dict):
await self._send_message(conn.websocket, {"type": "pong"})
async def fanout_to_local(self, channel_id: str, payload: dict):
"""Called by Redis subscriber — deliver to local connections."""
for conn_id, conn in list(self.connections.items()):
if channel_id in conn.subscribed_channels:
try:
await self._send_message(conn.websocket, payload)
except Exception:
pass
async def _send_message(self, ws, payload: dict):
await ws.send(json.dumps(payload))
async def _cleanup_connection(self, conn_id: str):
conn = self.connections.pop(conn_id, None)
if conn:
self.user_connections.get(conn.user_id, set()).discard(conn_id)
await self.redis.publish(
f"presence:{conn.workspace_id}",
json.dumps({"user_id": conn.user_id, "online": False})
)
Message Storage — Cassandra Schema
"""
Cassandra is ideal for Slack messages:
- Time-series access pattern: fetch recent messages in a channel
- Write-heavy: 1B messages/day → ~11k writes/second
- Linear scale: add nodes, partition key distributes load
- TTL support: free tier workspaces auto-expire old messages
Schema design:
"""
MESSAGES_TABLE = """
CREATE TABLE messages (
workspace_id UUID,
channel_id UUID,
message_id TIMEUUID, -- Sort key: time-ordered UUID
user_id UUID,
text TEXT,
thread_ts TIMEUUID, -- NULL = top-level; set = thread reply
edited_at TIMESTAMP,
deleted BOOLEAN,
reactions MAP<TEXT, FROZEN<SET>>, -- emoji → {user_ids}
attachments LIST<FROZEN<MAP>>,
PRIMARY KEY ((workspace_id, channel_id), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND default_time_to_live = 0 -- Set per-workspace for tiered plans
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
"""
# Query: Load last 50 messages in a channel
LOAD_CHANNEL_HISTORY = """
SELECT message_id, user_id, text, reactions, thread_ts
FROM messages
WHERE workspace_id = ? AND channel_id = ?
ORDER BY message_id DESC
LIMIT 50;
"""
# Query: Load thread replies
LOAD_THREAD_REPLIES = """
SELECT message_id, user_id, text
FROM messages
WHERE workspace_id = ? AND channel_id = ?
AND thread_ts = ?
ORDER BY message_id ASC;
"""
Search — Elasticsearch Integration
MESSAGES_INDEX_MAPPING = {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"message_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "stop", "snowball"]
}
}
}
},
"mappings": {
"properties": {
"workspace_id": {"type": "keyword"}, # Strict isolation
"channel_id": {"type": "keyword"},
"user_id": {"type": "keyword"},
"text": {"type": "text", "analyzer": "message_analyzer"},
"timestamp": {"type": "date"},
"has_attachment": {"type": "boolean"},
"reactions_count": {"type": "integer"},
}
}
}
def search_messages(es, workspace_id: str, query: str,
channel_ids=None, from_user=None, limit=20) -> dict:
must = [{"match": {"text": {"query": query, "fuzziness": "AUTO"}}}]
filters = [{"term": {"workspace_id": workspace_id}}] # Always filter by workspace!
if channel_ids:
filters.append({"terms": {"channel_id": channel_ids}})
if from_user:
filters.append({"term": {"user_id": from_user}})
return es.search(index="messages", body={
"query": {"bool": {"must": must, "filter": filters}},
"sort": [{"_score": "desc"}, {"timestamp": "desc"}],
"size": limit,
"highlight": {
"fields": {"text": {"number_of_fragments": 1, "fragment_size": 150}}
}
})
Presence Service
import redis
class PresenceService:
"""
Track online/away/offline status.
Redis SETEX: key expires automatically when heartbeat stops.
"""
HEARTBEAT_TTL = 35 # seconds — slightly longer than 30s ping interval
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def set_online(self, workspace_id: str, user_id: str, device: str = "desktop"):
key = f"presence:{workspace_id}:{user_id}"
pipe = self.redis.pipeline()
pipe.setex(key, self.HEARTBEAT_TTL, device)
# Notify channel subscribers
pipe.publish(f"presence_updates:{workspace_id}", f"{user_id}:online")
pipe.execute()
def heartbeat(self, workspace_id: str, user_id: str):
"""Called every 30s from client ping. Reset TTL."""
key = f"presence:{workspace_id}:{user_id}"
# Only refresh if key exists (user still connected)
if self.redis.exists(key):
self.redis.expire(key, self.HEARTBEAT_TTL)
def get_presence(self, workspace_id: str, user_ids: list) -> dict:
"""Batch presence lookup — O(n) pipeline."""
pipe = self.redis.pipeline()
for uid in user_ids:
pipe.get(f"presence:{workspace_id}:{uid}")
results = pipe.execute()
return {
uid: ("online" if val else "offline")
for uid, val in zip(user_ids, results)
}
def get_workspace_online_count(self, workspace_id: str) -> int:
"""Count online users — scan pattern (use sparingly)."""
pattern = f"presence:{workspace_id}:*"
count = 0
for _ in self.redis.scan_iter(pattern, count=100):
count += 1
return count
Architecture Diagram
┌──────────────────┐
Clients ──────────► │ Load Balancer │
(WebSocket) │ (L4, sticky) │
└────────┬─────────┘
│
┌────────────┴────────────┐
│ │
┌───────▼───────┐ ┌───────▼───────┐
│ WS Gateway 1 │ │ WS Gateway 2 │
│ 50k conns │ │ 50k conns │
└───────┬───────┘ └───────┬───────┘
└──────────┬──────────────┘
│ Redis Pub/Sub (fanout)
┌──────────▼──────────┐
│ Message Service │
└──────────┬──────────┘
┌────────────────┼────────────────┐
│ │ │
┌───────▼──────┐ ┌───────▼──────┐ ┌──────▼────────┐
│ Cassandra │ │Elasticsearch │ │ Notification │
│ (messages) │ │ (search) │ │ Service │
└──────────────┘ └──────────────┘ └───────────────┘
Key Design Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Real-time transport | WebSocket over SSE | Bidirectional needed (typing, reactions, presence) |
| Fanout mechanism | Redis pub/sub | Sub-millisecond; gateway instances subscribe to channels |
| Message storage | Cassandra | Time-series pattern, linear scale, built-in TTL |
| Search | Elasticsearch | Full-text with per-workspace filtering |
| Presence | Redis SETEX | Auto-expire on disconnect; O(1) lookup |
| Offline delivery | APNs/FCM push | WebSocket closed — need push notification |
| File storage | S3 + CDN | Large blobs outside message store; pre-signed URLs |
Scale Bottlenecks and Solutions
- Hot channels: General channel in large workspaces — use server-side fan-out batching, not per-message fanout
- WebSocket affinity: Need consistent hashing to route user’s reconnect to same (or nearby) gateway — use L4 load balancer with session persistence by user_id
- Search lag: Elasticsearch indexing is async — messages appear in search within 1s of send (near-real-time, not real-time)
- Workspace isolation: Always include workspace_id in every query — enforce at service layer, not just application code
Companies That Ask This System Design Question
This problem type commonly appears in interviews at:
See our company interview guides for full interview process, compensation, and preparation tips.