Designing a real-time multiplayer game backend is a fascinating system design problem that combines the constraints of ultra-low latency, authoritative game state, lag compensation, and matchmaking. This question appears at gaming companies and any company with real-time features.
Requirements Clarification
- Game type: 5v5 competitive game (like League of Legends / Valorant), 100-player battle royale, or casual mobile game — each has different latency requirements
- Scale: 50M MAU, 5M concurrent players, 100k concurrent game sessions
- Latency: <50ms client-to-server round trip for competitive games; <150ms for casual
- Consistency: Server-authoritative game state (client prediction with server reconciliation)
Game Server Architecture
"""
Dedicated game server (authoritative):
- Single source of truth for game state
- Runs game simulation at 20-128 ticks/second (tick = one simulation step)
- Clients send inputs; server validates and applies them
- Server sends game state snapshots to all clients
Client-side prediction:
- Client immediately applies its own inputs locally (responsive feel)
- Client also receives authoritative server state
- If server state differs from predicted state: reconcile (snap back)
- Entity interpolation: smoothly animate other players between server snapshots
Alternative: Peer-to-peer (not recommended for competitive)
- One peer is "host" (lockstep deterministic)
- Cheating: host advantage, easier to hack
- Used by fighting games (Street Fighter), retro RTS
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import time
@dataclass
class PlayerInput:
player_id: str
tick: int
sequence: int # Client sequence number for reconciliation
move_x: float
move_y: float
actions: List[str] # ["jump", "shoot", "use_ability"]
timestamp: float
@dataclass
class GameState:
tick: int
timestamp: float
players: Dict[str, "PlayerState"]
entities: Dict[str, "Entity"] # Projectiles, pickups, etc.
@dataclass
class PlayerState:
player_id: str
x: float
y: float
health: int
velocity_x: float
velocity_y: float
last_processed_input: int # Last input sequence the server applied
class GameServer:
TICK_RATE = 60 # 60 simulation steps per second
TICK_DELTA = 1.0 / TICK_RATE
MAX_PLAYERS = 10 # Per game session
def __init__(self, session_id: str):
self.session_id = session_id
self.tick = 0
self.state = GameState(tick=0, timestamp=time.time(), players={}, entities={})
self.input_buffer: Dict[str, List[PlayerInput]] = {} # player_id -> buffered inputs
self.running = True
def receive_input(self, player_input: PlayerInput):
"""Buffer inputs from client. Applied during tick processing."""
pid = player_input.player_id
if pid not in self.input_buffer:
self.input_buffer[pid] = []
# Keep at most 3 ticks of buffered inputs (prevent input stuffing attacks)
self.input_buffer[pid].append(player_input)
self.input_buffer[pid] = self.input_buffer[pid][-3*3:]
def process_tick(self):
"""Run one simulation step. Called TICK_RATE times per second."""
self.tick += 1
self.state.tick = self.tick
# Apply all buffered inputs for this tick
for player_id, inputs in self.input_buffer.items():
current_tick_inputs = [i for i in inputs if i.tick == self.tick]
if current_tick_inputs:
self._apply_input(player_id, current_tick_inputs[-1])
else:
# Input drop: extrapolate last known velocity
self._extrapolate_player(player_id)
# Physics simulation
self._simulate_physics()
# Broadcast snapshot to all clients
return self._build_snapshot()
def _apply_input(self, player_id: str, inp: PlayerInput):
player = self.state.players.get(player_id)
if not player:
return
# Validate input (anti-cheat: cap max speed, check cooldowns)
speed = 5.0 # Units per tick
player.velocity_x = max(-speed, min(speed, inp.move_x * speed))
player.velocity_y = max(-speed, min(speed, inp.move_y * speed))
player.x += player.velocity_x * self.TICK_DELTA
player.y += player.velocity_y * self.TICK_DELTA
player.last_processed_input = inp.sequence
def _simulate_physics(self):
"""Apply physics: gravity, collision detection, projectile movement."""
pass # Game-specific implementation
def _build_snapshot(self) -> dict:
"""Compact snapshot for network transmission."""
return {
"tick": self.tick,
"players": {
pid: {
"x": round(p.x, 2), "y": round(p.y, 2),
"hp": p.health,
"last_seq": p.last_processed_input,
}
for pid, p in self.state.players.items()
}
}
Lag Compensation
"""
Problem: Player A shoots at Player B. Due to latency, by the time the
server receives the shot, Player B has moved. Should the shot hit?
Lag compensation: rewind server state to when the client fired.
Algorithm:
1. Client sends shot with timestamp T (when they pressed fire)
2. Server stores N ticks of history (N = max_latency / tick_interval)
3. Server rewinds to the state at time T
4. Validates hit detection in the rewound state
5. Restores current state and applies damage if hit confirmed
"""
from collections import deque
import copy
class LagCompensator:
def __init__(self, max_history_ticks: int = 128):
self.history: deque = deque(maxlen=max_history_ticks)
def record_tick(self, state: GameState):
self.history.append(copy.deepcopy(state))
def get_state_at_time(self, timestamp: float) -> Optional[GameState]:
"""Find closest recorded state to the given timestamp."""
best = None
best_diff = float("inf")
for state in self.history:
diff = abs(state.timestamp - timestamp)
if diff bool:
"""
Rewind to shot_timestamp, check if shot_direction intersects target hitbox.
"""
past_state = self.get_state_at_time(shot_timestamp)
if not past_state:
return False
target = past_state.players.get(target_id)
if not target or target.health <= 0:
return False
# Simple AABB hitbox check (real games use capsule/sphere)
hitbox_size = 1.0
ox, oy = shot_origin
dx, dy = shot_direction
tx, ty = target.x, target.y
# Parametric ray-AABB intersection
tmin_x = (tx - hitbox_size/2 - ox) / (dx or 0.001)
tmax_x = (tx + hitbox_size/2 - ox) / (dx or 0.001)
tmin_y = (ty - hitbox_size/2 - oy) / (dy or 0.001)
tmax_y = (ty + hitbox_size/2 - oy) / (dy or 0.001)
t_enter = max(min(tmin_x, tmax_x), min(tmin_y, tmax_y))
t_exit = min(max(tmin_x, tmax_x), max(tmin_y, tmax_y))
return 0 <= t_enter <= t_exit
Matchmaking System
import heapq
import time
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class Player:
player_id: str
mmr: float # Matchmaking rating (ELO-like)
region: str # "us-west", "eu-central", etc.
queued_at: float = field(default_factory=time.time)
ping_ms: int = 50 # Average latency to regional servers
@dataclass(order=True)
class MatchmakingTicket:
priority: float # Lower = higher priority (use negative queue_time)
player_id: str = field(compare=False)
player: Player = field(compare=False)
class Matchmaker:
"""
MMR-based matchmaking with expanding search window (flex window).
As players wait longer, acceptable MMR range grows.
"""
BASE_MMR_WINDOW = 100 # Initial MMR tolerance
MAX_MMR_WINDOW = 500 # Max MMR spread in a match
WINDOW_EXPAND_RATE = 50 # Expand by this per 10 seconds waiting
TEAM_SIZE = 5
GAME_SIZE = TEAM_SIZE * 2
def __init__(self):
self.queue: List[MatchmakingTicket] = [] # Min-heap by priority
def enqueue(self, player: Player):
ticket = MatchmakingTicket(
priority=-time.time(), # Negative: longer wait = lower number = higher priority
player_id=player.player_id,
player=player,
)
heapq.heappush(self.queue, ticket)
def get_mmr_window(self, player: Player) -> float:
wait_s = time.time() - player.queued_at
expand = (wait_s / 10) * self.WINDOW_EXPAND_RATE
return min(self.BASE_MMR_WINDOW + expand, self.MAX_MMR_WINDOW)
def try_form_match(self) -> Optional[List[Player]]:
"""Try to form a balanced match from the queue."""
if len(self.queue) < self.GAME_SIZE:
return None
# Sort by wait time (priority) — process longest-waiting first
candidates = sorted(self.queue, key=lambda t: t.priority)
for anchor in candidates[:10]: # Try top 10 longest-waiting as anchor
window = self.get_mmr_window(anchor.player)
group = [
t.player for t in candidates
if abs(t.player.mmr - anchor.player.mmr) = self.GAME_SIZE:
# Found enough players — form the match
match_players = group[:self.GAME_SIZE]
# Remove matched players from queue
matched_ids = {p.player_id for p in match_players}
self.queue = [t for t in self.queue if t.player_id not in matched_ids]
heapq.heapify(self.queue)
return match_players
return None
def balance_teams(self, players: List[Player]) -> tuple:
"""Split players into two balanced teams by MMR."""
sorted_players = sorted(players, key=lambda p: p.mmr, reverse=True)
team_a, team_b = [], []
mmr_a = mmr_b = 0
for player in sorted_players:
if mmr_a <= mmr_b:
team_a.append(player)
mmr_a += player.mmr
else:
team_b.append(player)
mmr_b += player.mmr
return team_a, team_b
Key Architecture Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Server authority | Dedicated authoritative server | Prevents cheating; single source of truth |
| Protocol | UDP (not TCP) | Lower latency; TCP retransmission causes stalls worse than packet loss |
| Tick rate | 60 ticks/second (competitive) | 30ms window per tick; higher = more responsive, more bandwidth |
| Lag compensation | Server-side rewind | Fairness: compensate for network latency on hit detection |
| Snapshot delta encoding | Send only changed state | Reduce bandwidth; only diff since last acknowledged snapshot |
| Anti-cheat | Server validates all inputs | Client sends intent, not results; server runs physics |
| Global servers | Regional data centers | GeoDNS routes players to nearest server; <50ms within region |