Concurrency and Threading Interview Patterns: Locks, Semaphores, and Deadlock Prevention (2025)

Thread Safety Fundamentals

A race condition occurs when two threads read and write shared state concurrently without synchronization, producing non-deterministic results. The fix is to identify the critical section – the code that accesses shared state – and make it atomic: only one thread executes it at a time.

Python’s threading.Lock provides mutual exclusion. threading.RLock (reentrant lock) allows the same thread to acquire the lock multiple times without deadlocking – useful when a locked method calls another locked method on the same object. Java’s synchronized keyword is equivalent to wrapping a block with a reentrant lock.

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    with lock:          # critical section
        counter += 1    # read-modify-write is now atomic

threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # always 1000

# RLock example - same thread can re-acquire
rlock = threading.RLock()

def outer():
    with rlock:
        inner()   # safe - same thread re-acquires

def inner():
    with rlock:   # would deadlock with regular Lock
        pass

Producer-Consumer with queue.Queue

queue.Queue is thread-safe by design – it handles its own internal locking. The producer puts items; the consumer gets items. Use a bounded buffer (Queue(maxsize=N)) to apply backpressure: producers block when the queue is full, preventing unbounded memory growth.

import threading
import queue
import time

def producer(q, n):
    for i in range(n):
        q.put(i)          # blocks if queue full (bounded)
        print(f'Produced {i}')
    q.put(None)           # sentinel to signal done

def consumer(q):
    while True:
        item = q.get()    # blocks until item available
        if item is None:
            break
        print(f'Consumed {item}')
        q.task_done()

# Bounded buffer: at most 5 items in flight
buf = queue.Queue(maxsize=5)

p = threading.Thread(target=producer, args=(buf, 20))
c = threading.Thread(target=consumer, args=(buf,))
p.start(); c.start()
p.join(); c.join()

For multiple consumers, put N sentinels (one per consumer). q.join() blocks until all items have been processed (requires calling task_done() for each item).

Readers-Writers Problem

Multiple readers can read simultaneously (no mutation), but a writer needs exclusive access. Naive implementation uses a read_lock (mutex protecting reader_count) and a write_lock (held by any reader while readers are active, and held exclusively by writers).

import threading

class ReadWriteLock:
    def __init__(self):
        self.read_lock = threading.Lock()
        self.write_lock = threading.Lock()
        self.readers = 0

    def acquire_read(self):
        with self.read_lock:
            self.readers += 1
            if self.readers == 1:
                self.write_lock.acquire()  # first reader blocks writers

    def release_read(self):
        with self.read_lock:
            self.readers -= 1
            if self.readers == 0:
                self.write_lock.release()  # last reader unblocks writers

    def acquire_write(self):
        self.write_lock.acquire()

    def release_write(self):
        self.write_lock.release()

rw = ReadWriteLock()

def reader(name):
    rw.acquire_read()
    print(f'{name} reading')
    rw.release_read()

def writer(name):
    rw.acquire_write()
    print(f'{name} writing')
    rw.release_write()

Starvation risk: if readers arrive continuously, writers starve. Fix with a write-priority variant or use Python’s threading.Condition. In Python 3.3+, threading.RWLock does not exist in stdlib – use rwlock package or the above pattern.

Semaphore for Connection Pools

A counting semaphore allows up to N threads to hold it simultaneously. Classic use: limit concurrent database connections. When a thread calls acquire() and the count is zero, it blocks until another thread calls release().

import threading
import time

MAX_CONNECTIONS = 3
db_semaphore = threading.Semaphore(MAX_CONNECTIONS)

def query_db(thread_id):
    with db_semaphore:       # acquire - blocks if 3 connections in use
        print(f'Thread {thread_id} acquired DB connection')
        time.sleep(0.5)      # simulate query
        print(f'Thread {thread_id} released DB connection')
    # release happens automatically

threads = [threading.Thread(target=query_db, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
# At most 3 threads in the critical section simultaneously

threading.BoundedSemaphore raises ValueError if you release more than you acquired – catches bugs. Use it instead of plain Semaphore when the count should never exceed the initial value.

Deadlock: Conditions and Prevention

Deadlock requires all four Coffman conditions simultaneously: mutual exclusion (resources are non-shareable), hold-and-wait (thread holds a resource while waiting for another), no preemption (resources cannot be forcibly taken), circular wait (thread A waits for B, B waits for A).

Break any one condition to prevent deadlock:

  • Lock ordering: always acquire locks in the same global order (e.g., by lock ID). Eliminates circular wait. Simple and effective.
  • Lock timeout: use lock.acquire(timeout=1.0) – if it times out, release held locks and retry. Turns deadlock into livelock risk, but recoverable.
  • Avoid nested locks: restructure code to never hold one lock while acquiring another. Use a single coarse-grained lock.
  • Try-lock with backoff: if you can’t acquire all needed locks immediately, release what you hold and wait with exponential backoff.
import threading

lock_a = threading.Lock()
lock_b = threading.Lock()

# DEADLOCK: thread 1 holds A waits B, thread 2 holds B waits A
def bad_thread1():
    with lock_a:
        with lock_b:   # deadlock if thread2 holds lock_b
            pass

def bad_thread2():
    with lock_b:
        with lock_a:   # circular wait
            pass

# FIX: consistent lock ordering by id
def safe_thread(first, second):
    # always acquire lower-id lock first
    l1, l2 = sorted([lock_a, lock_b], key=id)
    with l1:
        with l2:
            pass

# FIX: timeout approach
def timeout_thread():
    while True:
        if lock_a.acquire(timeout=1.0):
            if lock_b.acquire(timeout=1.0):
                try:
                    pass  # do work
                finally:
                    lock_b.release()
                    lock_a.release()
                break
            else:
                lock_a.release()  # release and retry

threading.Event and threading.Barrier

threading.Event is a simple flag for thread signaling. One thread calls set(); others call wait() and block until the event is set. clear() resets it. Use case: signal worker threads to stop, or notify that initialization is complete.

threading.Barrier(N) is a synchronization point: all N threads must call barrier.wait() before any of them can proceed. Use case: parallel phases where all workers must finish phase 1 before any starts phase 2.

import threading
import time

# Event: start signal
ready = threading.Event()

def worker(name):
    print(f'{name} waiting for start signal')
    ready.wait()           # block until set()
    print(f'{name} started')

threads = [threading.Thread(target=worker, args=(f'W{i}',)) for i in range(4)]
for t in threads: t.start()
time.sleep(1)
ready.set()                # unblock all workers at once
for t in threads: t.join()

# Barrier: phase synchronization
barrier = threading.Barrier(3)

def phase_worker(name):
    print(f'{name} completing phase 1')
    barrier.wait()         # wait for all 3 threads
    print(f'{name} starting phase 2')  # all start together

threads = [threading.Thread(target=phase_worker, args=(f'T{i}',)) for i in range(3)]
for t in threads: t.start()
for t in threads: t.join()

Interview checklist: when asked about concurrency bugs, name race conditions and deadlocks. For thread safety, reach for queue.Queue first (producer-consumer), then locks. Know the four deadlock conditions by heart and state lock ordering as the primary prevention. Mention that Python’s GIL limits true parallelism for CPU-bound work – use multiprocessing instead.

Scroll to Top