Concurrency and Multithreading Interview Questions (2026)

Concurrency and Multithreading Interview Questions (2026)

Concurrency is one of the hardest topics in software engineering interviews — and one of the most frequently tested at senior levels. Google, Apple, Meta, Uber, and financial tech companies (Jane Street, Citadel, Two Sigma) all test concurrency depth extensively. This guide covers the most important concepts with working Python and conceptual examples.

Core Concepts You Must Know

1. The Dining Philosophers Problem

Five philosophers sit at a table with five forks (one between each pair). Each needs two forks to eat. If all pick up the left fork simultaneously, deadlock occurs — no one can get their right fork.

import threading
import time
import random

class DiningPhilosophers:
    """
    Classic deadlock-prevention problem.

    Deadlock conditions (ALL must hold for deadlock):
    1. Mutual exclusion: forks can only be held by one philosopher
    2. Hold and wait: holding left fork while waiting for right
    3. No preemption: can't forcibly take a fork from a philosopher
    4. Circular wait: circular dependency of waiting

    Solution 1: Resource Ordering — always acquire lower-numbered fork first.
    This breaks circular wait: philosopher 5 (who would pick 5 then 1)
    now picks 1 then 5, same order as philosopher 1.

    Solution 2: Arbitrator — a central waiter grants permission.
    Solution 3: Chandy-Misra — message-passing, fully distributed.
    """

    def __init__(self, n=5):
        self.n = n
        self.forks = [threading.Lock() for _ in range(n)]

    def philosopher(self, phil_id: int, eat_times: int):
        """Philosopher with resource-ordering deadlock prevention."""
        # Always acquire lower-numbered fork first
        left = phil_id
        right = (phil_id + 1) % self.n

        # Resource ordering: acquire in ascending order
        first, second = (left, right) if left < right else (right, left)

        for _ in range(eat_times):
            # Think
            time.sleep(random.uniform(0.01, 0.05))

            # Acquire forks in ordered fashion
            with self.forks[first]:
                with self.forks[second]:
                    # Eat
                    print(f"Philosopher {phil_id} is eating")
                    time.sleep(random.uniform(0.01, 0.03))

    def run(self, eat_times: int = 3):
        """Start all philosophers. No deadlock due to resource ordering."""
        threads = [
            threading.Thread(target=self.philosopher, args=(i, eat_times))
            for i in range(self.n)
        ]
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("All philosophers finished eating.")

2. Producer-Consumer with Bounded Buffer

import threading
import queue
import time

class BoundedBuffer:
    """
    Producer-Consumer with bounded buffer.
    Classic synchronization problem — appears in OS, distributed systems,
    and streaming pipeline interviews.

    Key insight: producers block when buffer full;
    consumers block when buffer empty.
    Semaphores provide elegant solution.
    """

    def __init__(self, capacity: int):
        self.buffer = queue.Queue(maxsize=capacity)
        self.capacity = capacity

    def producer(self, producer_id: int, items: list):
        """Producer: adds items to buffer, blocks when full."""
        for item in items:
            self.buffer.put(item)  # blocks if full (Queue handles semaphore internally)
            print(f"Producer {producer_id} produced: {item} (size={self.buffer.qsize()})")
            time.sleep(0.01)

    def consumer(self, consumer_id: int, n_items: int):
        """Consumer: takes items from buffer, blocks when empty."""
        for _ in range(n_items):
            item = self.buffer.get()  # blocks if empty
            print(f"Consumer {consumer_id} consumed: {item}")
            self.buffer.task_done()
            time.sleep(0.02)  # consumers slower than producers


class SemaphoreBoundedBuffer:
    """
    Explicit semaphore implementation (interview whiteboard version).
    Shows understanding of semaphore mechanics.
    """

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.buffer = []
        self.mutex = threading.Lock()            # mutual exclusion for buffer access
        self.empty_slots = threading.Semaphore(capacity)  # available space
        self.filled_slots = threading.Semaphore(0)        # filled items

    def produce(self, item):
        self.empty_slots.acquire()    # wait for empty slot
        with self.mutex:
            self.buffer.append(item)
        self.filled_slots.release()   # signal that a filled slot is available

    def consume(self):
        self.filled_slots.acquire()   # wait for filled slot
        with self.mutex:
            item = self.buffer.pop(0)
        self.empty_slots.release()    # signal that an empty slot is available
        return item

3. Read-Write Lock

import threading

class ReadWriteLock:
    """
    Readers-Writer lock: multiple concurrent readers, exclusive writer.

    Use case: database with frequent reads, rare writes.
    In production: Python's threading.RLock, Java's ReentrantReadWriteLock.

    This implementation: readers have priority (writers can starve).
    Writer-priority variant: track waiting writers, stop new readers.
    """

    def __init__(self):
        self._read_lock = threading.Lock()
        self._write_lock = threading.Lock()
        self._reader_count = 0

    def acquire_read(self):
        with self._read_lock:
            self._reader_count += 1
            if self._reader_count == 1:
                self._write_lock.acquire()  # first reader blocks writers

    def release_read(self):
        with self._read_lock:
            self._reader_count -= 1
            if self._reader_count == 0:
                self._write_lock.release()  # last reader unblocks writers

    def acquire_write(self):
        self._write_lock.acquire()

    def release_write(self):
        self._write_lock.release()

    # Context manager interface
    class ReadContext:
        def __init__(self, rwlock):
            self.rwlock = rwlock
        def __enter__(self):
            self.rwlock.acquire_read()
        def __exit__(self, *args):
            self.rwlock.release_read()

    class WriteContext:
        def __init__(self, rwlock):
            self.rwlock = rwlock
        def __enter__(self):
            self.rwlock.acquire_write()
        def __exit__(self, *args):
            self.rwlock.release_write()

    def read(self):
        return self.ReadContext(self)

    def write(self):
        return self.WriteContext(self)

4. Thread-Safe Singleton

import threading

class ThreadSafeSingleton:
    """
    Singleton with double-checked locking.
    Classic interview question: "How do you make a Singleton thread-safe?"

    Naive singleton is not thread-safe:
    - Thread A checks: instance is None
    - Context switch to Thread B
    - Thread B also checks: instance is None
    - Thread B creates instance
    - Thread A creates ANOTHER instance (race condition)

    Solution: lock + double-check pattern.
    """
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if cls._instance is None:             # First check (no lock, fast)
            with cls._lock:
                if cls._instance is None:     # Second check (inside lock, safe)
                    cls._instance = super().__new__(cls)
        return cls._instance


# Python-idiomatic alternative: module-level singleton
# Python imports are thread-safe by default (GIL + import lock)
# Just put your singleton in a module and import it everywhere.

5. Lock-Free Data Structure: Atomic Counter

import threading
import time
from typing import Optional

class AtomicCounter:
    """
    Thread-safe counter without explicit locks.
    Uses Python's GIL for atomicity (CPython only).

    For production: use threading.atomic (Python 3.12+) or
    ctypes atomic operations, or just use a Lock.

    Lock-free algorithms are critical in high-throughput systems:
    - Eliminates lock contention (bottleneck at high thread counts)
    - No deadlock risk
    - Better cache behavior (no false sharing via lock state)
    """

    def __init__(self):
        self._value = 0
        self._lock = threading.Lock()  # explicit lock for true atomicity

    def increment(self) -> int:
        with self._lock:
            self._value += 1
            return self._value

    def decrement(self) -> int:
        with self._lock:
            self._value -= 1
            return self._value

    def get(self) -> int:
        return self._value  # reads are atomic in CPython

    def compare_and_swap(self, expected: int, new_value: int) -> bool:
        """
        CAS operation: atomically set to new_value if current == expected.
        Foundation of all lock-free algorithms.

        Used in: reference counting, lock-free queues, concurrent hash maps.
        """
        with self._lock:
            if self._value == expected:
                self._value = new_value
                return True
            return False

Python Concurrency: GIL, Threading, Multiprocessing

"""
Python Concurrency Model — critical to understand for interviews:

1. GIL (Global Interpreter Lock):
   - Only ONE Python thread runs at a time (for CPU-bound work)
   - Released during I/O operations (network, disk)
   - Implication: threading doesn't help for CPU-bound tasks

2. When to use threading:
   - I/O-bound work (web requests, file reads, DB queries)
   - Large number of lightweight concurrent operations
   - Shared state is easy to manage with locks

3. When to use multiprocessing:
   - CPU-bound work (data processing, ML inference, image resizing)
   - True parallelism across CPU cores
   - Processes have separate memory spaces (no shared state bugs)
   - Drawback: more memory, IPC overhead

4. When to use asyncio:
   - High-concurrency I/O (thousands of concurrent connections)
   - Event loop: one thread, cooperative multitasking
   - async/await syntax; much less overhead than threads
   - Drawback: all libraries must be async-compatible

# Quick reference:
import threading  # I/O-bound, shared state
import multiprocessing  # CPU-bound, isolated state
import asyncio  # async I/O, high concurrency
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# ThreadPoolExecutor: clean API for thread pool
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(fetch_url, url) for url in urls]
    results = [f.result() for f in futures]

# ProcessPoolExecutor: same API but processes
with ProcessPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(process_image, image_paths))
"""

Common Interview Patterns

Thread-Safe LRU Cache

from collections import OrderedDict
import threading

class ThreadSafeLRUCache:
    """
    LRU Cache with thread safety.
    Combines LeetCode 146 with concurrent access requirements.

    Seen at: Meta, Google, Amazon, Uber.
    """

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.cache = OrderedDict()
        self.lock = threading.RLock()  # RLock allows re-entry (e.g., get → put)

    def get(self, key: int) -> int:
        with self.lock:
            if key not in self.cache:
                return -1
            self.cache.move_to_end(key)  # Mark as recently used
            return self.cache[key]

    def put(self, key: int, value: int) -> None:
        with self.lock:
            if key in self.cache:
                self.cache.move_to_end(key)
            self.cache[key] = value
            if len(self.cache) > self.capacity:
                self.cache.popitem(last=False)  # Remove least recently used

Interview Questions by Difficulty

Junior (L3/E4)

  • What is a race condition? Give an example.
  • What is the difference between a mutex and a semaphore?
  • When would you use multiprocessing over threading in Python?
  • What is deadlock? Name the four conditions for deadlock.

Senior (L5/E5)

  • Implement a thread-safe singleton with double-checked locking.
  • Design a rate limiter that works across multiple server instances.
  • What is the difference between optimistic and pessimistic locking?
  • Explain the readers-writers problem and its variants.

Staff (L6/E6)

  • Design a lock-free queue using compare-and-swap.
  • Explain MVCC (multi-version concurrency control) in PostgreSQL.
  • How does the Linux kernel scheduler handle thread priorities?
  • Design a distributed lock service (like Google Chubby or Redis Redlock).

Key Terms Cheat Sheet

Term Definition
Mutex Binary lock; only one thread can hold it at a time
Semaphore Counter-based lock; N threads can hold concurrently
Monitor Mutex + condition variables; high-level synchronization construct
Spinlock Busy-waits (loops) instead of sleeping; good for very short waits
Deadlock Circular dependency; no thread can proceed
Livelock Threads respond to each other but make no progress
Starvation Thread is perpetually denied access (e.g., low-priority thread)
CAS Compare-and-swap; atomic read-modify-write; foundation of lock-free algorithms
MVCC Multi-Version Concurrency Control; readers don’t block writers (Postgres)
Memory barrier Prevents CPU/compiler reordering; ensures visibility across threads
Scroll to Top