Operating systems fundamentals appear in interviews at companies building systems software, operating in kernel-adjacent spaces (Cloudflare, HashiCorp, Apple), and anywhere performance matters at a deep level. This guide covers the most common OS interview questions.
Processes vs Threads
"""
Process:
- Isolated address space (virtual memory, heap, stack, code segment)
- Own file descriptors, PID, signal handlers
- Communication: IPC (pipes, sockets, shared memory, message queues)
- Context switch cost: ~1-10 microseconds (flush TLB, save/restore full state)
- Crash isolation: one process crash does not kill others
Thread:
- Shares address space with parent process (code, heap, globals)
- Own: stack, registers, thread-local storage, signal mask
- Communication: shared memory (needs synchronization)
- Context switch cost: ~100-1000 nanoseconds (less state to save)
- One thread crash can kill the entire process
When to use processes vs threads:
Processes: CPU-bound work (bypass GIL in Python), strict isolation, fault tolerance
Threads: I/O-bound work, frequent communication, lower overhead
"""
import os
import threading
import multiprocessing
import ctypes
def process_example():
pid = os.getpid()
ppid = os.getppid()
print(f"Process PID={pid}, Parent PID={ppid}")
child_pid = os.fork() # Unix only
if child_pid == 0:
# Child process
print(f"Child: PID={os.getpid()}, copy-on-write fork")
os._exit(0)
else:
os.waitpid(child_pid, 0) # Reap child to avoid zombie
def thread_example():
shared_counter = [0]
lock = threading.Lock()
def increment(n):
for _ in range(n):
with lock:
shared_counter[0] += 1
threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Counter: {shared_counter[0]}") # 400000 (lock ensures correctness)
Virtual Memory and Paging
"""
Virtual Memory allows each process to see a large, contiguous address space
even though physical memory is fragmented and smaller.
Address Translation:
Virtual Address → Page Table → Physical Address (via MMU/TLB)
Page: fixed-size block (typically 4KB on x86-64)
Page Table Entry (PTE): maps virtual page → physical frame + protection bits
TLB (Translation Lookaside Buffer): cache for recently used page table entries
Virtual Address (48-bit on x86-64):
[ 9 bits: PML4 | 9 bits: PDP | 9 bits: PD | 9 bits: PT | 12 bits: offset ]
Key concepts:
Page Fault: access to unmapped or not-present page → kernel handles it
- Minor fault: page in memory but not mapped (e.g., CoW) → fix mapping
- Major fault: page on disk → load from swap
Copy-on-Write (CoW): fork() shares pages; copy only when written → fast fork
Memory-mapped files: map file into address space → no read/write syscalls
Huge Pages: 2MB/1GB pages → reduce TLB pressure for large allocations
"""
import mmap
import os
def memory_mapped_io(filename: str):
"""Zero-copy file access via mmap."""
with open(filename, "r+b") as f:
with mmap.mmap(f.fileno(), 0) as mm:
data = mm[:1024] # Reads directly from page cache
mm.seek(0)
mm.write(b"Hello") # Writes directly; OS flushes to disk
Page Replacement Algorithms
from collections import OrderedDict
from typing import List
class LRUCache:
"""LRU page replacement — O(1) get/put via OrderedDict."""
def __init__(self, capacity: int):
self.cap = capacity
self.cache = OrderedDict()
def get(self, key: int) -> int:
if key not in self.cache:
return -1
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: int, value: int):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.cap:
self.cache.popitem(last=False) # Remove LRU (front)
def clock_algorithm(frames: int, pages: List[int]) -> int:
"""
Clock (second chance) page replacement — approximates LRU.
Each page has a reference bit; on eviction, give one more chance.
"""
memory = {} # page → position in frame
ref_bits = {} # page → reference bit (0 or 1)
frame_list = [None] * frames
hand = 0
faults = 0
for page in pages:
if page in memory:
ref_bits[page] = 1 # Page accessed — set reference bit
continue
faults += 1
while True:
victim = frame_list[hand]
if victim is None:
frame_list[hand] = page
memory[page] = hand
ref_bits[page] = 0
hand = (hand + 1) % frames
break
if ref_bits.get(victim, 0) == 0:
del memory[victim]
del ref_bits[victim]
frame_list[hand] = page
memory[page] = hand
ref_bits[page] = 0
hand = (hand + 1) % frames
break
else:
ref_bits[victim] = 0 # Second chance: clear bit, move on
hand = (hand + 1) % frames
return faults
print("Page faults (LRU/Clock comparison):", clock_algorithm(3, [1,2,3,4,1,2,5,1,2,3,4,5]))
CPU Scheduling Algorithms
import heapq
from dataclasses import dataclass, field
from typing import List
@dataclass
class Process:
pid: int
arrival: int
burst: int
priority: int = 0
@dataclass
class ScheduleResult:
pid: int
start: int
end: int
turnaround: int
waiting: int
def round_robin(processes: List[Process], quantum: int) -> List[ScheduleResult]:
"""Round Robin: each process gets at most `quantum` time units."""
from collections import deque
queue = deque()
time = 0
results = []
remaining = {p.pid: p.burst for p in processes}
sorted_ps = sorted(processes, key=lambda p: p.arrival)
ps_iter = iter(sorted_ps)
next_p = next(ps_iter, None)
while queue or next_p:
while next_p and next_p.arrival <= time:
queue.append(next_p)
next_p = next(ps_iter, None)
if not queue:
time = next_p.arrival
continue
p = queue.popleft()
run = min(quantum, remaining[p.pid])
start = time
time += run
remaining[p.pid] -= run
# Check for new arrivals during this slice
while next_p and next_p.arrival 0:
queue.append(p) # Not finished — re-enqueue
else:
turnaround = time - p.arrival
results.append(ScheduleResult(
pid=p.pid, start=start, end=time,
turnaround=turnaround, waiting=turnaround - p.burst
))
return results
def sjf_preemptive(processes: List[Process]) -> List[ScheduleResult]:
"""Shortest Job First (preemptive / SRTF) — optimal average waiting time."""
heap = [] # (remaining_burst, arrival, pid)
time = 0
remaining = {p.pid: p.burst for p in processes}
results = {}
start_times = {}
sorted_ps = sorted(processes, key=lambda p: p.arrival)
i = 0
while i < len(sorted_ps) or heap:
while i < len(sorted_ps) and sorted_ps[i].arrival <= time:
p = sorted_ps[i]
heapq.heappush(heap, (remaining[p.pid], p.arrival, p.pid))
i += 1
if not heap:
time = sorted_ps[i].arrival
continue
burst, arrival, pid = heapq.heappop(heap)
if pid not in start_times:
start_times[pid] = time
time += 1
remaining[pid] -= 1
if remaining[pid] == 0:
p_orig = next(p for p in sorted_ps if p.pid == pid)
turnaround = time - p_orig.arrival
results[pid] = ScheduleResult(
pid=pid, start=start_times[pid], end=time,
turnaround=turnaround, waiting=turnaround - p_orig.burst
)
else:
heapq.heappush(heap, (remaining[pid], arrival, pid))
return list(results.values())
Deadlocks — Detection and Prevention
"""
Deadlock conditions (all four must hold):
1. Mutual exclusion: resource can be held by only one process
2. Hold and wait: process holding a resource waits for another
3. No preemption: resources cannot be forcibly taken
4. Circular wait: process A waits for B, B waits for A
Banker's Algorithm — deadlock avoidance:
Determines if granting a request leads to a "safe state"
(an ordering of processes that can complete without deadlock)
"""
def is_safe_state(available: list, allocation: list, max_need: list) -> bool:
"""
Banker's algorithm safety check.
available: current available resources [R1, R2, ...]
allocation: current allocation per process
max_need: maximum need per process
"""
n_procs = len(allocation)
n_res = len(available)
work = list(available)
need = [[max_need[i][j] - allocation[i][j]
for j in range(n_res)] for i in range(n_procs)]
finish = [False] * n_procs
safe_seq = []
for _ in range(n_procs):
for i in range(n_procs):
if not finish[i] and all(need[i][j] = amount:
account_from["balance"] -= amount
account_to["balance"] += amount
return True
return False
Inter-Process Communication (IPC)
| Mechanism | Speed | Use Case |
|---|---|---|
| Shared memory | Fastest | High-bandwidth data between related processes |
| Anonymous pipe | Fast | Parent-child byte stream (unidirectional) |
| Named pipe (FIFO) | Fast | Unrelated processes, file-system based |
| Unix domain socket | Fast | Bidirectional, same host (nginx ↔ PHP-FPM) |
| TCP socket | Moderate | Cross-host, network-transparent |
| Message queue | Moderate | Async, decoupled producers/consumers |
| Signal | Fast | Notifications only (SIGTERM, SIGCHLD) |