Low-Level Design: Chat Application (OOP Interview)
A chat application combines real-time messaging, user management, conversation threading, and notification delivery. It’s a popular LLD question that tests your ability to model a complex domain with multiple interacting entities. This guide covers a complete OOP implementation for one-on-one and group chats.
Requirements
- Users can create one-on-one or group conversations
- Send and receive messages; each message has a status (SENT, DELIVERED, READ)
- Group conversations: add/remove members, admin roles
- Search messages by content within a conversation
- Paginated message history with cursor-based pagination
Core Data Models
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
from typing import Optional
import uuid
class MessageStatus(Enum):
SENT = "sent"
DELIVERED = "delivered"
READ = "read"
class ConversationType(Enum):
ONE_ON_ONE = "one_on_one"
GROUP = "group"
class MemberRole(Enum):
ADMIN = "admin"
MEMBER = "member"
@dataclass
class User:
user_id: str
username: str
email: str
is_online: bool = False
last_seen: Optional[datetime] = None
@dataclass
class Message:
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
conversation_id: str = ""
sender_id: str = ""
content: str = ""
sent_at: datetime = field(default_factory=datetime.now)
status: MessageStatus = MessageStatus.SENT
reply_to: Optional[str] = None # message_id of quoted message
@dataclass
class Member:
user_id: str
role: MemberRole = MemberRole.MEMBER
joined_at: datetime = field(default_factory=datetime.now)
last_read_msg_id: Optional[str] = None
@dataclass
class Conversation:
conversation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
type: ConversationType = ConversationType.ONE_ON_ONE
name: Optional[str] = None
members: dict[str, Member] = field(default_factory=dict)
messages: list[Message] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
def is_member(self, user_id: str) -> bool:
return user_id in self.members
def is_admin(self, user_id: str) -> bool:
return (user_id in self.members and
self.members[user_id].role == MemberRole.ADMIN)
def get_last_message(self) -> Optional[Message]:
return self.messages[-1] if self.messages else None
def unread_count(self, user_id: str) -> int:
member = self.members.get(user_id)
if not member or not member.last_read_msg_id:
return len(self.messages)
for i, msg in enumerate(reversed(self.messages)):
if msg.message_id == member.last_read_msg_id:
return i
return 0
def search_messages(self, query: str) -> list[Message]:
q = query.lower()
return [m for m in self.messages if q in m.content.lower()]
Chat Service
class PermissionError(Exception): pass
class NotFoundError(Exception): pass
class NotificationService:
def notify(self, user: User, message: Message, conv: Conversation) -> None:
if not user.is_online:
print(f"[PUSH] {user.username}: new message in {conv.name or 'DM'}")
class ChatService:
def __init__(self, notification_service: NotificationService):
self._users: dict[str, User] = {}
self._conversations: dict[str, Conversation] = {}
self._notifier = notification_service
def register_user(self, username: str, email: str) -> User:
user = User(user_id=str(uuid.uuid4()), username=username, email=email)
self._users[user.user_id] = user
return user
def set_online(self, user_id: str, online: bool) -> None:
user = self._get_user(user_id)
user.is_online = online
if not online:
user.last_seen = datetime.now()
def create_dm(self, user1_id: str, user2_id: str) -> Conversation:
# Reuse existing DM if one exists
for conv in self._conversations.values():
if (conv.type == ConversationType.ONE_ON_ONE and
user1_id in conv.members and user2_id in conv.members):
return conv
conv = Conversation(type=ConversationType.ONE_ON_ONE)
conv.members[user1_id] = Member(user_id=user1_id, role=MemberRole.ADMIN)
conv.members[user2_id] = Member(user_id=user2_id, role=MemberRole.ADMIN)
self._conversations[conv.conversation_id] = conv
return conv
def create_group(self, creator_id: str, name: str, member_ids: list[str]) -> Conversation:
conv = Conversation(type=ConversationType.GROUP, name=name)
conv.members[creator_id] = Member(user_id=creator_id, role=MemberRole.ADMIN)
for uid in member_ids:
if uid != creator_id:
conv.members[uid] = Member(user_id=uid)
self._conversations[conv.conversation_id] = conv
return conv
def add_member(self, requester_id: str, conv_id: str, new_uid: str) -> None:
conv = self._get_conversation(conv_id)
if not conv.is_admin(requester_id):
raise PermissionError("Only admins can add members")
conv.members[new_uid] = Member(user_id=new_uid)
def send_message(self, sender_id: str, conv_id: str, content: str,
reply_to: Optional[str] = None) -> Message:
conv = self._get_conversation(conv_id)
sender = self._get_user(sender_id)
if not conv.is_member(sender_id):
raise PermissionError("Not a member of this conversation")
if not content.strip():
raise ValueError("Message content cannot be empty")
msg = Message(conversation_id=conv_id, sender_id=sender_id,
content=content.strip(), reply_to=reply_to)
conv.messages.append(msg)
conv.members[sender_id].last_read_msg_id = msg.message_id
for uid, _ in conv.members.items():
if uid == sender_id:
continue
user = self._users.get(uid)
if user:
msg.status = MessageStatus.DELIVERED if user.is_online else MessageStatus.SENT
self._notifier.notify(user, msg, conv)
return msg
def mark_read(self, user_id: str, conv_id: str) -> None:
conv = self._get_conversation(conv_id)
if not conv.is_member(user_id):
raise PermissionError("Not a member")
last = conv.get_last_message()
if last:
conv.members[user_id].last_read_msg_id = last.message_id
last.status = MessageStatus.READ
def get_messages(self, user_id: str, conv_id: str,
limit: int = 50, before_id: Optional[str] = None) -> list[Message]:
conv = self._get_conversation(conv_id)
if not conv.is_member(user_id):
raise PermissionError("Not a member")
messages = conv.messages
if before_id:
for i, msg in enumerate(messages):
if msg.message_id == before_id:
messages = messages[:i]
break
return messages[-limit:]
def get_user_conversations(self, user_id: str) -> list[Conversation]:
convs = [c for c in self._conversations.values() if c.is_member(user_id)]
return sorted(convs,
key=lambda c: c.get_last_message().sent_at
if c.get_last_message() else c.created_at,
reverse=True)
def _get_user(self, user_id: str) -> User:
user = self._users.get(user_id)
if not user:
raise NotFoundError(f"User {user_id} not found")
return user
def _get_conversation(self, conv_id: str) -> Conversation:
conv = self._conversations.get(conv_id)
if not conv:
raise NotFoundError(f"Conversation {conv_id} not found")
return conv
Usage Example
chat = ChatService(NotificationService())
alice = chat.register_user("alice", "alice@example.com")
bob = chat.register_user("bob", "bob@example.com")
carol = chat.register_user("carol", "carol@example.com")
# Direct message
dm = chat.create_dm(alice.user_id, bob.user_id)
m1 = chat.send_message(alice.user_id, dm.conversation_id, "Hey Bob!")
m2 = chat.send_message(bob.user_id, dm.conversation_id, "Hi Alice!")
chat.mark_read(alice.user_id, dm.conversation_id)
# Group chat
group = chat.create_group(alice.user_id, "Project Team", [bob.user_id, carol.user_id])
chat.send_message(alice.user_id, group.conversation_id, "Welcome to the team!")
chat.send_message(bob.user_id, group.conversation_id, "Thanks!", reply_to=m1.message_id)
print(f"DM unread for Alice: {dm.unread_count(alice.user_id)}") # 0
print(f"DM unread for Bob: {dm.unread_count(bob.user_id)}") # 1
results = group.search_messages("team")
print([m.content for m in results]) # ["Welcome to the team!"]
history = chat.get_messages(alice.user_id, group.conversation_id, limit=20)
Design Patterns Applied
| Pattern | Where | Benefit |
|---|---|---|
| Observer | NotificationService | Decouples message send from notification logic |
| Factory | create_dm(), create_group() | Centralizes conversation creation with validation |
| State Machine | MessageStatus | Explicit progression: SENT → DELIVERED → READ |
| Cursor Pagination | get_messages(before_id) | O(1) page fetch regardless of history depth |
Interview Discussion Points
- Real-time delivery: This handles the data layer. Real-time requires WebSocket connections (per the WhatsApp system design) or SSE — a separate concern from OOP design
- Unread count optimization: Current O(n) scan is fine for small histories. At scale: store unread_count directly on Member and decrement on mark_read
- Admin promotion on leave: When the last admin leaves a group, promote the longest-standing member — prevents groups becoming permanently admin-less
- Message ordering: In distributed systems, use server-assigned Snowflake IDs as ordering keys rather than client timestamps (which can drift)
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you design the data model for a chat application in an OOP interview?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Four core classes: User (user_id, username, online status, last_seen), Message (message_id, conversation_id, sender_id, content, status: SENT/DELIVERED/READ, optional reply_to), Member (user_id, role: ADMIN/MEMBER, last_read_msg_id for unread count tracking), and Conversation (conversation_id, type: ONE_ON_ONE/GROUP, members dict, messages list). The ChatService orchestrator handles: create_dm() (reuses existing DM between same pair), create_group(), send_message() (appends to conversation, notifies members), mark_read() (updates last_read_msg_id), and get_messages() with cursor-based pagination (pass before_id to get messages before a given message ID).”}},{“@type”:”Question”,”name”:”How do you calculate unread message count for a user in a chat application?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Each Member object stores last_read_msg_id — the message_id of the last message the user has read. unread_count() scans the conversation’s message list in reverse until it finds last_read_msg_id, returning the number of messages scanned (i.e., messages after the last read). Time: O(unread_count), which is acceptable since unread counts are typically small. Optimization for scale: store an explicit unread_count integer on the Member object, increment it when new messages arrive (skipping the sender), reset to 0 on mark_read(). This makes unread count O(1) at the cost of consistency risk if a message is deleted. Both approaches are valid interview answers — mention the tradeoff.”}},{“@type”:”Question”,”name”:”How do you handle admin roles when the only admin leaves a group chat?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”When a user leaves a group and they are the last admin: (1) Before removing them, scan other members for existing admins. (2) If no other admins exist, promote the next member — by convention use the longest-standing member (first in join order) or the most active member. (3) If no other members remain, dissolve the group (delete the conversation). This prevents groups from becoming permanently admin-less, which would block future admin operations (adding/removing members, changing group name). An alternative design: designate a “super admin” (group creator) who cannot be removed by other admins, only by themselves — but this is more complex and not always necessary for an OOP interview.”}}]}
🏢 Asked at: Meta Interview Guide 2026: Facebook, Instagram, WhatsApp Engineering
🏢 Asked at: Snap Interview Guide
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale
🏢 Asked at: Twitter/X Interview Guide 2026: Timeline Algorithms, Real-Time Search, and Content at Scale
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering