Operating Systems Interview Questions (Processes, Memory, Scheduling)

Operating systems fundamentals appear in interviews at companies building systems software, operating in kernel-adjacent spaces (Cloudflare, HashiCorp, Apple), and anywhere performance matters at a deep level. This guide covers the most common OS interview questions.

Processes vs Threads

"""
Process:
  - Isolated address space (virtual memory, heap, stack, code segment)
  - Own file descriptors, PID, signal handlers
  - Communication: IPC (pipes, sockets, shared memory, message queues)
  - Context switch cost: ~1-10 microseconds (flush TLB, save/restore full state)
  - Crash isolation: one process crash does not kill others

Thread:
  - Shares address space with parent process (code, heap, globals)
  - Own: stack, registers, thread-local storage, signal mask
  - Communication: shared memory (needs synchronization)
  - Context switch cost: ~100-1000 nanoseconds (less state to save)
  - One thread crash can kill the entire process

When to use processes vs threads:
  Processes: CPU-bound work (bypass GIL in Python), strict isolation, fault tolerance
  Threads:   I/O-bound work, frequent communication, lower overhead
"""

import os
import threading
import multiprocessing
import ctypes

def process_example():
    pid = os.getpid()
    ppid = os.getppid()
    print(f"Process PID={pid}, Parent PID={ppid}")

    child_pid = os.fork()  # Unix only
    if child_pid == 0:
        # Child process
        print(f"Child: PID={os.getpid()}, copy-on-write fork")
        os._exit(0)
    else:
        os.waitpid(child_pid, 0)  # Reap child to avoid zombie

def thread_example():
    shared_counter = [0]
    lock = threading.Lock()

    def increment(n):
        for _ in range(n):
            with lock:
                shared_counter[0] += 1

    threads = [threading.Thread(target=increment, args=(100_000,)) for _ in range(4)]
    for t in threads: t.start()
    for t in threads: t.join()
    print(f"Counter: {shared_counter[0]}")  # 400000 (lock ensures correctness)

Virtual Memory and Paging

"""
Virtual Memory allows each process to see a large, contiguous address space
even though physical memory is fragmented and smaller.

Address Translation:
  Virtual Address → Page Table → Physical Address (via MMU/TLB)

  Page: fixed-size block (typically 4KB on x86-64)
  Page Table Entry (PTE): maps virtual page → physical frame + protection bits
  TLB (Translation Lookaside Buffer): cache for recently used page table entries

  Virtual Address (48-bit on x86-64):
  [ 9 bits: PML4 | 9 bits: PDP | 9 bits: PD | 9 bits: PT | 12 bits: offset ]

Key concepts:
  Page Fault: access to unmapped or not-present page → kernel handles it
    - Minor fault: page in memory but not mapped (e.g., CoW) → fix mapping
    - Major fault: page on disk → load from swap

  Copy-on-Write (CoW): fork() shares pages; copy only when written → fast fork
  Memory-mapped files: map file into address space → no read/write syscalls
  Huge Pages: 2MB/1GB pages → reduce TLB pressure for large allocations
"""

import mmap
import os

def memory_mapped_io(filename: str):
    """Zero-copy file access via mmap."""
    with open(filename, "r+b") as f:
        with mmap.mmap(f.fileno(), 0) as mm:
            data = mm[:1024]  # Reads directly from page cache
            mm.seek(0)
            mm.write(b"Hello")  # Writes directly; OS flushes to disk

Page Replacement Algorithms

from collections import OrderedDict
from typing import List

class LRUCache:
    """LRU page replacement — O(1) get/put via OrderedDict."""
    def __init__(self, capacity: int):
        self.cap = capacity
        self.cache = OrderedDict()

    def get(self, key: int) -> int:
        if key not in self.cache:
            return -1
        self.cache.move_to_end(key)
        return self.cache[key]

    def put(self, key: int, value: int):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.cap:
            self.cache.popitem(last=False)  # Remove LRU (front)

def clock_algorithm(frames: int, pages: List[int]) -> int:
    """
    Clock (second chance) page replacement — approximates LRU.
    Each page has a reference bit; on eviction, give one more chance.
    """
    memory = {}    # page → position in frame
    ref_bits = {}  # page → reference bit (0 or 1)
    frame_list = [None] * frames
    hand = 0
    faults = 0

    for page in pages:
        if page in memory:
            ref_bits[page] = 1  # Page accessed — set reference bit
            continue

        faults += 1
        while True:
            victim = frame_list[hand]
            if victim is None:
                frame_list[hand] = page
                memory[page] = hand
                ref_bits[page] = 0
                hand = (hand + 1) % frames
                break
            if ref_bits.get(victim, 0) == 0:
                del memory[victim]
                del ref_bits[victim]
                frame_list[hand] = page
                memory[page] = hand
                ref_bits[page] = 0
                hand = (hand + 1) % frames
                break
            else:
                ref_bits[victim] = 0  # Second chance: clear bit, move on
                hand = (hand + 1) % frames

    return faults

print("Page faults (LRU/Clock comparison):", clock_algorithm(3, [1,2,3,4,1,2,5,1,2,3,4,5]))

CPU Scheduling Algorithms

import heapq
from dataclasses import dataclass, field
from typing import List

@dataclass
class Process:
    pid: int
    arrival: int
    burst:   int
    priority: int = 0

@dataclass
class ScheduleResult:
    pid:           int
    start:         int
    end:           int
    turnaround:    int
    waiting:       int

def round_robin(processes: List[Process], quantum: int) -> List[ScheduleResult]:
    """Round Robin: each process gets at most `quantum` time units."""
    from collections import deque
    queue = deque()
    time = 0
    results = []
    remaining = {p.pid: p.burst for p in processes}
    sorted_ps = sorted(processes, key=lambda p: p.arrival)
    ps_iter = iter(sorted_ps)
    next_p = next(ps_iter, None)

    while queue or next_p:
        while next_p and next_p.arrival <= time:
            queue.append(next_p)
            next_p = next(ps_iter, None)

        if not queue:
            time = next_p.arrival
            continue

        p = queue.popleft()
        run = min(quantum, remaining[p.pid])
        start = time
        time += run
        remaining[p.pid] -= run

        # Check for new arrivals during this slice
        while next_p and next_p.arrival  0:
            queue.append(p)  # Not finished — re-enqueue
        else:
            turnaround = time - p.arrival
            results.append(ScheduleResult(
                pid=p.pid, start=start, end=time,
                turnaround=turnaround, waiting=turnaround - p.burst
            ))
    return results

def sjf_preemptive(processes: List[Process]) -> List[ScheduleResult]:
    """Shortest Job First (preemptive / SRTF) — optimal average waiting time."""
    heap = []  # (remaining_burst, arrival, pid)
    time = 0
    remaining = {p.pid: p.burst for p in processes}
    results = {}
    start_times = {}
    sorted_ps = sorted(processes, key=lambda p: p.arrival)
    i = 0

    while i < len(sorted_ps) or heap:
        while i < len(sorted_ps) and sorted_ps[i].arrival <= time:
            p = sorted_ps[i]
            heapq.heappush(heap, (remaining[p.pid], p.arrival, p.pid))
            i += 1

        if not heap:
            time = sorted_ps[i].arrival
            continue

        burst, arrival, pid = heapq.heappop(heap)
        if pid not in start_times:
            start_times[pid] = time
        time += 1
        remaining[pid] -= 1

        if remaining[pid] == 0:
            p_orig = next(p for p in sorted_ps if p.pid == pid)
            turnaround = time - p_orig.arrival
            results[pid] = ScheduleResult(
                pid=pid, start=start_times[pid], end=time,
                turnaround=turnaround, waiting=turnaround - p_orig.burst
            )
        else:
            heapq.heappush(heap, (remaining[pid], arrival, pid))

    return list(results.values())

Deadlocks — Detection and Prevention

"""
Deadlock conditions (all four must hold):
  1. Mutual exclusion: resource can be held by only one process
  2. Hold and wait: process holding a resource waits for another
  3. No preemption: resources cannot be forcibly taken
  4. Circular wait: process A waits for B, B waits for A

Banker's Algorithm — deadlock avoidance:
  Determines if granting a request leads to a "safe state"
  (an ordering of processes that can complete without deadlock)
"""

def is_safe_state(available: list, allocation: list, max_need: list) -> bool:
    """
    Banker's algorithm safety check.
    available:  current available resources [R1, R2, ...]
    allocation: current allocation per process
    max_need:   maximum need per process
    """
    n_procs = len(allocation)
    n_res = len(available)
    work = list(available)
    need = [[max_need[i][j] - allocation[i][j]
             for j in range(n_res)] for i in range(n_procs)]
    finish = [False] * n_procs
    safe_seq = []

    for _ in range(n_procs):
        for i in range(n_procs):
            if not finish[i] and all(need[i][j] = amount:
                account_from["balance"] -= amount
                account_to["balance"]   += amount
                return True
    return False

Inter-Process Communication (IPC)

Mechanism Speed Use Case
Shared memory Fastest High-bandwidth data between related processes
Anonymous pipe Fast Parent-child byte stream (unidirectional)
Named pipe (FIFO) Fast Unrelated processes, file-system based
Unix domain socket Fast Bidirectional, same host (nginx ↔ PHP-FPM)
TCP socket Moderate Cross-host, network-transparent
Message queue Moderate Async, decoupled producers/consumers
Signal Fast Notifications only (SIGTERM, SIGCHLD)
Scroll to Top