Thread Safety Fundamentals
A race condition occurs when two threads read and write shared state concurrently without synchronization, producing non-deterministic results. The fix is to identify the critical section – the code that accesses shared state – and make it atomic: only one thread executes it at a time.
Python’s threading.Lock provides mutual exclusion. threading.RLock (reentrant lock) allows the same thread to acquire the lock multiple times without deadlocking – useful when a locked method calls another locked method on the same object. Java’s synchronized keyword is equivalent to wrapping a block with a reentrant lock.
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # critical section
counter += 1 # read-modify-write is now atomic
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # always 1000
# RLock example - same thread can re-acquire
rlock = threading.RLock()
def outer():
with rlock:
inner() # safe - same thread re-acquires
def inner():
with rlock: # would deadlock with regular Lock
pass
Producer-Consumer with queue.Queue
queue.Queue is thread-safe by design – it handles its own internal locking. The producer puts items; the consumer gets items. Use a bounded buffer (Queue(maxsize=N)) to apply backpressure: producers block when the queue is full, preventing unbounded memory growth.
import threading
import queue
import time
def producer(q, n):
for i in range(n):
q.put(i) # blocks if queue full (bounded)
print(f'Produced {i}')
q.put(None) # sentinel to signal done
def consumer(q):
while True:
item = q.get() # blocks until item available
if item is None:
break
print(f'Consumed {item}')
q.task_done()
# Bounded buffer: at most 5 items in flight
buf = queue.Queue(maxsize=5)
p = threading.Thread(target=producer, args=(buf, 20))
c = threading.Thread(target=consumer, args=(buf,))
p.start(); c.start()
p.join(); c.join()
For multiple consumers, put N sentinels (one per consumer). q.join() blocks until all items have been processed (requires calling task_done() for each item).
Readers-Writers Problem
Multiple readers can read simultaneously (no mutation), but a writer needs exclusive access. Naive implementation uses a read_lock (mutex protecting reader_count) and a write_lock (held by any reader while readers are active, and held exclusively by writers).
import threading
class ReadWriteLock:
def __init__(self):
self.read_lock = threading.Lock()
self.write_lock = threading.Lock()
self.readers = 0
def acquire_read(self):
with self.read_lock:
self.readers += 1
if self.readers == 1:
self.write_lock.acquire() # first reader blocks writers
def release_read(self):
with self.read_lock:
self.readers -= 1
if self.readers == 0:
self.write_lock.release() # last reader unblocks writers
def acquire_write(self):
self.write_lock.acquire()
def release_write(self):
self.write_lock.release()
rw = ReadWriteLock()
def reader(name):
rw.acquire_read()
print(f'{name} reading')
rw.release_read()
def writer(name):
rw.acquire_write()
print(f'{name} writing')
rw.release_write()
Starvation risk: if readers arrive continuously, writers starve. Fix with a write-priority variant or use Python’s threading.Condition. In Python 3.3+, threading.RWLock does not exist in stdlib – use rwlock package or the above pattern.
Semaphore for Connection Pools
A counting semaphore allows up to N threads to hold it simultaneously. Classic use: limit concurrent database connections. When a thread calls acquire() and the count is zero, it blocks until another thread calls release().
import threading
import time
MAX_CONNECTIONS = 3
db_semaphore = threading.Semaphore(MAX_CONNECTIONS)
def query_db(thread_id):
with db_semaphore: # acquire - blocks if 3 connections in use
print(f'Thread {thread_id} acquired DB connection')
time.sleep(0.5) # simulate query
print(f'Thread {thread_id} released DB connection')
# release happens automatically
threads = [threading.Thread(target=query_db, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
# At most 3 threads in the critical section simultaneously
threading.BoundedSemaphore raises ValueError if you release more than you acquired – catches bugs. Use it instead of plain Semaphore when the count should never exceed the initial value.
Deadlock: Conditions and Prevention
Deadlock requires all four Coffman conditions simultaneously: mutual exclusion (resources are non-shareable), hold-and-wait (thread holds a resource while waiting for another), no preemption (resources cannot be forcibly taken), circular wait (thread A waits for B, B waits for A).
Break any one condition to prevent deadlock:
- Lock ordering: always acquire locks in the same global order (e.g., by lock ID). Eliminates circular wait. Simple and effective.
- Lock timeout: use
lock.acquire(timeout=1.0)– if it times out, release held locks and retry. Turns deadlock into livelock risk, but recoverable. - Avoid nested locks: restructure code to never hold one lock while acquiring another. Use a single coarse-grained lock.
- Try-lock with backoff: if you can’t acquire all needed locks immediately, release what you hold and wait with exponential backoff.
import threading
lock_a = threading.Lock()
lock_b = threading.Lock()
# DEADLOCK: thread 1 holds A waits B, thread 2 holds B waits A
def bad_thread1():
with lock_a:
with lock_b: # deadlock if thread2 holds lock_b
pass
def bad_thread2():
with lock_b:
with lock_a: # circular wait
pass
# FIX: consistent lock ordering by id
def safe_thread(first, second):
# always acquire lower-id lock first
l1, l2 = sorted([lock_a, lock_b], key=id)
with l1:
with l2:
pass
# FIX: timeout approach
def timeout_thread():
while True:
if lock_a.acquire(timeout=1.0):
if lock_b.acquire(timeout=1.0):
try:
pass # do work
finally:
lock_b.release()
lock_a.release()
break
else:
lock_a.release() # release and retry
threading.Event and threading.Barrier
threading.Event is a simple flag for thread signaling. One thread calls set(); others call wait() and block until the event is set. clear() resets it. Use case: signal worker threads to stop, or notify that initialization is complete.
threading.Barrier(N) is a synchronization point: all N threads must call barrier.wait() before any of them can proceed. Use case: parallel phases where all workers must finish phase 1 before any starts phase 2.
import threading
import time
# Event: start signal
ready = threading.Event()
def worker(name):
print(f'{name} waiting for start signal')
ready.wait() # block until set()
print(f'{name} started')
threads = [threading.Thread(target=worker, args=(f'W{i}',)) for i in range(4)]
for t in threads: t.start()
time.sleep(1)
ready.set() # unblock all workers at once
for t in threads: t.join()
# Barrier: phase synchronization
barrier = threading.Barrier(3)
def phase_worker(name):
print(f'{name} completing phase 1')
barrier.wait() # wait for all 3 threads
print(f'{name} starting phase 2') # all start together
threads = [threading.Thread(target=phase_worker, args=(f'T{i}',)) for i in range(3)]
for t in threads: t.start()
for t in threads: t.join()
Interview checklist: when asked about concurrency bugs, name race conditions and deadlocks. For thread safety, reach for queue.Queue first (producer-consumer), then locks. Know the four deadlock conditions by heart and state lock ordering as the primary prevention. Mention that Python’s GIL limits true parallelism for CPU-bound work – use multiprocessing instead.