Concurrency and Multithreading Interview Questions (2026)
Concurrency is one of the hardest topics in software engineering interviews — and one of the most frequently tested at senior levels. Google, Apple, Meta, Uber, and financial tech companies (Jane Street, Citadel, Two Sigma) all test concurrency depth extensively. This guide covers the most important concepts with working Python and conceptual examples.
Core Concepts You Must Know
1. The Dining Philosophers Problem
Five philosophers sit at a table with five forks (one between each pair). Each needs two forks to eat. If all pick up the left fork simultaneously, deadlock occurs — no one can get their right fork.
import threading
import time
import random
class DiningPhilosophers:
"""
Classic deadlock-prevention problem.
Deadlock conditions (ALL must hold for deadlock):
1. Mutual exclusion: forks can only be held by one philosopher
2. Hold and wait: holding left fork while waiting for right
3. No preemption: can't forcibly take a fork from a philosopher
4. Circular wait: circular dependency of waiting
Solution 1: Resource Ordering — always acquire lower-numbered fork first.
This breaks circular wait: philosopher 5 (who would pick 5 then 1)
now picks 1 then 5, same order as philosopher 1.
Solution 2: Arbitrator — a central waiter grants permission.
Solution 3: Chandy-Misra — message-passing, fully distributed.
"""
def __init__(self, n=5):
self.n = n
self.forks = [threading.Lock() for _ in range(n)]
def philosopher(self, phil_id: int, eat_times: int):
"""Philosopher with resource-ordering deadlock prevention."""
# Always acquire lower-numbered fork first
left = phil_id
right = (phil_id + 1) % self.n
# Resource ordering: acquire in ascending order
first, second = (left, right) if left < right else (right, left)
for _ in range(eat_times):
# Think
time.sleep(random.uniform(0.01, 0.05))
# Acquire forks in ordered fashion
with self.forks[first]:
with self.forks[second]:
# Eat
print(f"Philosopher {phil_id} is eating")
time.sleep(random.uniform(0.01, 0.03))
def run(self, eat_times: int = 3):
"""Start all philosophers. No deadlock due to resource ordering."""
threads = [
threading.Thread(target=self.philosopher, args=(i, eat_times))
for i in range(self.n)
]
for t in threads:
t.start()
for t in threads:
t.join()
print("All philosophers finished eating.")
2. Producer-Consumer with Bounded Buffer
import threading
import queue
import time
class BoundedBuffer:
"""
Producer-Consumer with bounded buffer.
Classic synchronization problem — appears in OS, distributed systems,
and streaming pipeline interviews.
Key insight: producers block when buffer full;
consumers block when buffer empty.
Semaphores provide elegant solution.
"""
def __init__(self, capacity: int):
self.buffer = queue.Queue(maxsize=capacity)
self.capacity = capacity
def producer(self, producer_id: int, items: list):
"""Producer: adds items to buffer, blocks when full."""
for item in items:
self.buffer.put(item) # blocks if full (Queue handles semaphore internally)
print(f"Producer {producer_id} produced: {item} (size={self.buffer.qsize()})")
time.sleep(0.01)
def consumer(self, consumer_id: int, n_items: int):
"""Consumer: takes items from buffer, blocks when empty."""
for _ in range(n_items):
item = self.buffer.get() # blocks if empty
print(f"Consumer {consumer_id} consumed: {item}")
self.buffer.task_done()
time.sleep(0.02) # consumers slower than producers
class SemaphoreBoundedBuffer:
"""
Explicit semaphore implementation (interview whiteboard version).
Shows understanding of semaphore mechanics.
"""
def __init__(self, capacity: int):
self.capacity = capacity
self.buffer = []
self.mutex = threading.Lock() # mutual exclusion for buffer access
self.empty_slots = threading.Semaphore(capacity) # available space
self.filled_slots = threading.Semaphore(0) # filled items
def produce(self, item):
self.empty_slots.acquire() # wait for empty slot
with self.mutex:
self.buffer.append(item)
self.filled_slots.release() # signal that a filled slot is available
def consume(self):
self.filled_slots.acquire() # wait for filled slot
with self.mutex:
item = self.buffer.pop(0)
self.empty_slots.release() # signal that an empty slot is available
return item
3. Read-Write Lock
import threading
class ReadWriteLock:
"""
Readers-Writer lock: multiple concurrent readers, exclusive writer.
Use case: database with frequent reads, rare writes.
In production: Python's threading.RLock, Java's ReentrantReadWriteLock.
This implementation: readers have priority (writers can starve).
Writer-priority variant: track waiting writers, stop new readers.
"""
def __init__(self):
self._read_lock = threading.Lock()
self._write_lock = threading.Lock()
self._reader_count = 0
def acquire_read(self):
with self._read_lock:
self._reader_count += 1
if self._reader_count == 1:
self._write_lock.acquire() # first reader blocks writers
def release_read(self):
with self._read_lock:
self._reader_count -= 1
if self._reader_count == 0:
self._write_lock.release() # last reader unblocks writers
def acquire_write(self):
self._write_lock.acquire()
def release_write(self):
self._write_lock.release()
# Context manager interface
class ReadContext:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_read()
def __exit__(self, *args):
self.rwlock.release_read()
class WriteContext:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_write()
def __exit__(self, *args):
self.rwlock.release_write()
def read(self):
return self.ReadContext(self)
def write(self):
return self.WriteContext(self)
4. Thread-Safe Singleton
import threading
class ThreadSafeSingleton:
"""
Singleton with double-checked locking.
Classic interview question: "How do you make a Singleton thread-safe?"
Naive singleton is not thread-safe:
- Thread A checks: instance is None
- Context switch to Thread B
- Thread B also checks: instance is None
- Thread B creates instance
- Thread A creates ANOTHER instance (race condition)
Solution: lock + double-check pattern.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None: # First check (no lock, fast)
with cls._lock:
if cls._instance is None: # Second check (inside lock, safe)
cls._instance = super().__new__(cls)
return cls._instance
# Python-idiomatic alternative: module-level singleton
# Python imports are thread-safe by default (GIL + import lock)
# Just put your singleton in a module and import it everywhere.
5. Lock-Free Data Structure: Atomic Counter
import threading
import time
from typing import Optional
class AtomicCounter:
"""
Thread-safe counter without explicit locks.
Uses Python's GIL for atomicity (CPython only).
For production: use threading.atomic (Python 3.12+) or
ctypes atomic operations, or just use a Lock.
Lock-free algorithms are critical in high-throughput systems:
- Eliminates lock contention (bottleneck at high thread counts)
- No deadlock risk
- Better cache behavior (no false sharing via lock state)
"""
def __init__(self):
self._value = 0
self._lock = threading.Lock() # explicit lock for true atomicity
def increment(self) -> int:
with self._lock:
self._value += 1
return self._value
def decrement(self) -> int:
with self._lock:
self._value -= 1
return self._value
def get(self) -> int:
return self._value # reads are atomic in CPython
def compare_and_swap(self, expected: int, new_value: int) -> bool:
"""
CAS operation: atomically set to new_value if current == expected.
Foundation of all lock-free algorithms.
Used in: reference counting, lock-free queues, concurrent hash maps.
"""
with self._lock:
if self._value == expected:
self._value = new_value
return True
return False
Python Concurrency: GIL, Threading, Multiprocessing
"""
Python Concurrency Model — critical to understand for interviews:
1. GIL (Global Interpreter Lock):
- Only ONE Python thread runs at a time (for CPU-bound work)
- Released during I/O operations (network, disk)
- Implication: threading doesn't help for CPU-bound tasks
2. When to use threading:
- I/O-bound work (web requests, file reads, DB queries)
- Large number of lightweight concurrent operations
- Shared state is easy to manage with locks
3. When to use multiprocessing:
- CPU-bound work (data processing, ML inference, image resizing)
- True parallelism across CPU cores
- Processes have separate memory spaces (no shared state bugs)
- Drawback: more memory, IPC overhead
4. When to use asyncio:
- High-concurrency I/O (thousands of concurrent connections)
- Event loop: one thread, cooperative multitasking
- async/await syntax; much less overhead than threads
- Drawback: all libraries must be async-compatible
# Quick reference:
import threading # I/O-bound, shared state
import multiprocessing # CPU-bound, isolated state
import asyncio # async I/O, high concurrency
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ThreadPoolExecutor: clean API for thread pool
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(fetch_url, url) for url in urls]
results = [f.result() for f in futures]
# ProcessPoolExecutor: same API but processes
with ProcessPoolExecutor(max_workers=8) as executor:
results = list(executor.map(process_image, image_paths))
"""
Common Interview Patterns
Thread-Safe LRU Cache
from collections import OrderedDict
import threading
class ThreadSafeLRUCache:
"""
LRU Cache with thread safety.
Combines LeetCode 146 with concurrent access requirements.
Seen at: Meta, Google, Amazon, Uber.
"""
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
self.lock = threading.RLock() # RLock allows re-entry (e.g., get → put)
def get(self, key: int) -> int:
with self.lock:
if key not in self.cache:
return -1
self.cache.move_to_end(key) # Mark as recently used
return self.cache[key]
def put(self, key: int, value: int) -> None:
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False) # Remove least recently used
Interview Questions by Difficulty
Junior (L3/E4)
- What is a race condition? Give an example.
- What is the difference between a mutex and a semaphore?
- When would you use multiprocessing over threading in Python?
- What is deadlock? Name the four conditions for deadlock.
Senior (L5/E5)
- Implement a thread-safe singleton with double-checked locking.
- Design a rate limiter that works across multiple server instances.
- What is the difference between optimistic and pessimistic locking?
- Explain the readers-writers problem and its variants.
Staff (L6/E6)
- Design a lock-free queue using compare-and-swap.
- Explain MVCC (multi-version concurrency control) in PostgreSQL.
- How does the Linux kernel scheduler handle thread priorities?
- Design a distributed lock service (like Google Chubby or Redis Redlock).
Key Terms Cheat Sheet
| Term | Definition |
|---|---|
| Mutex | Binary lock; only one thread can hold it at a time |
| Semaphore | Counter-based lock; N threads can hold concurrently |
| Monitor | Mutex + condition variables; high-level synchronization construct |
| Spinlock | Busy-waits (loops) instead of sleeping; good for very short waits |
| Deadlock | Circular dependency; no thread can proceed |
| Livelock | Threads respond to each other but make no progress |
| Starvation | Thread is perpetually denied access (e.g., low-priority thread) |
| CAS | Compare-and-swap; atomic read-modify-write; foundation of lock-free algorithms |
| MVCC | Multi-Version Concurrency Control; readers don’t block writers (Postgres) |
| Memory barrier | Prevents CPU/compiler reordering; ensures visibility across threads |