Low-Level Design: Task Scheduler / Job Queue (OOP Interview)

Low-Level Design: Task Scheduler / Job Queue

A task scheduler executes jobs at specified times or with specified delays, tracks their status, and handles failures with retry logic. This design appears at companies like Airbnb (Chronon), Uber (Cadence), and Databricks. It tests: priority queuing, state machines, concurrency, and the Worker pattern.

Core Classes

Enums

from enum import Enum

class TaskStatus(Enum):
    PENDING   = "PENDING"    # waiting to be picked up
    RUNNING   = "RUNNING"    # currently executing
    SUCCEEDED = "SUCCEEDED"  # completed successfully
    FAILED    = "FAILED"     # final failure (retries exhausted)
    RETRYING  = "RETRYING"   # scheduled for retry
    CANCELLED = "CANCELLED"

class TaskPriority(Enum):
    LOW    = 1
    NORMAL = 5
    HIGH   = 10
    URGENT = 20

Task

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Any
import uuid

@dataclass
class Task:
    task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    func: Callable = None              # the function to execute
    args: tuple = field(default_factory=tuple)
    kwargs: dict = field(default_factory=dict)
    priority: TaskPriority = TaskPriority.NORMAL
    status: TaskStatus = TaskStatus.PENDING
    max_retries: int = 3
    retry_count: int = 0
    retry_delay_seconds: int = 60      # base delay, doubles each retry
    scheduled_at: datetime = field(default_factory=datetime.now)
    started_at: datetime = None
    completed_at: datetime = None
    result: Any = None
    error: str = None

    def __lt__(self, other: 'Task') -> bool:
        """For priority queue: higher priority = earlier execution."""
        if self.priority.value != other.priority.value:
            return self.priority.value > other.priority.value  # higher priority first
        return self.scheduled_at  bool:
        return self.retry_count  datetime:
        """Exponential backoff: 60s, 120s, 240s, ..."""
        delay = self.retry_delay_seconds * (2 ** self.retry_count)
        return datetime.now() + timedelta(seconds=delay)

TaskScheduler

import heapq
import threading
from typing import Optional

class TaskScheduler:
    def __init__(self, num_workers: int = 4):
        self._queue: list[Task] = []       # min-heap (uses Task.__lt__)
        self._lock = threading.Lock()
        self._not_empty = threading.Condition(self._lock)
        self._tasks: dict[str, Task] = {}  # task_id -> Task (registry)
        self._running = False
        self._workers: list[threading.Thread] = []
        self._num_workers = num_workers

    def submit(self, task: Task) -> str:
        with self._not_empty:
            self._tasks[task.task_id] = task
            heapq.heappush(self._queue, task)
            self._not_empty.notify()       # wake a waiting worker
        print(f"Submitted task {task.task_id[:8]} [{task.name}] priority={task.priority.name}")
        return task.task_id

    def schedule_at(self, task: Task, run_at: datetime) -> str:
        task.scheduled_at = run_at
        return self.submit(task)

    def cancel(self, task_id: str) -> bool:
        with self._lock:
            task = self._tasks.get(task_id)
            if not task or task.status != TaskStatus.PENDING:
                return False
            task.status = TaskStatus.CANCELLED
            return True

    def get_status(self, task_id: str) -> Optional[TaskStatus]:
        task = self._tasks.get(task_id)
        return task.status if task else None

    def _get_next_task(self) -> Optional[Task]:
        """Pop the highest-priority task that is due."""
        now = datetime.now()
        with self._not_empty:
            while self._running:
                # Find the first due task (may need to wait for scheduled_at)
                if self._queue and self._queue[0].scheduled_at  None:
        while self._running:
            task = self._get_next_task()
            if task is None:
                break
            self._execute(task)

    def _execute(self, task: Task) -> None:
        task.status = TaskStatus.RUNNING
        task.started_at = datetime.now()
        print(f"Executing: {task.task_id[:8]} [{task.name}]")
        try:
            task.result = task.func(*task.args, **task.kwargs)
            task.status = TaskStatus.SUCCEEDED
            task.completed_at = datetime.now()
            print(f"Succeeded: {task.task_id[:8]} [{task.name}]")
        except Exception as e:
            task.error = str(e)
            print(f"Failed: {task.task_id[:8]} [{task.name}] error={e}")
            if task.can_retry():
                task.retry_count += 1
                task.status = TaskStatus.RETRYING
                task.scheduled_at = task.next_retry_time()
                # Re-enqueue for retry
                with self._not_empty:
                    heapq.heappush(self._queue, task)
                    self._not_empty.notify()
                print(f"Retry {task.retry_count}/{task.max_retries} scheduled for {task.name}")
            else:
                task.status = TaskStatus.FAILED
                task.completed_at = datetime.now()

    def start(self) -> None:
        self._running = True
        for i in range(self._num_workers):
            t = threading.Thread(target=self._worker_loop, daemon=True)
            t.start()
            self._workers.append(t)
        print(f"TaskScheduler started with {self._num_workers} workers")

    def stop(self, wait: bool = True) -> None:
        self._running = False
        with self._not_empty:
            self._not_empty.notify_all()   # wake all waiting workers
        if wait:
            for t in self._workers:
                t.join(timeout=5)
        print("TaskScheduler stopped")

Usage Example

import time
from datetime import datetime, timedelta

def send_email(to: str, subject: str) -> str:
    time.sleep(0.1)   # simulate work
    return f"Email sent to {to}"

def generate_report(report_type: str) -> str:
    time.sleep(0.5)
    return f"Report {report_type} generated"

scheduler = TaskScheduler(num_workers=2)
scheduler.start()

# High-priority email
email_task = Task(
    name="send_welcome_email",
    func=send_email,
    args=("user@example.com", "Welcome!"),
    priority=TaskPriority.HIGH
)
tid1 = scheduler.submit(email_task)

# Scheduled report (run in 5 seconds)
report_task = Task(
    name="daily_report",
    func=generate_report,
    args=("revenue",),
    priority=TaskPriority.NORMAL,
    max_retries=2
)
tid2 = scheduler.schedule_at(report_task, datetime.now() + timedelta(seconds=5))

time.sleep(2)
print(f"Email status: {scheduler.get_status(tid1)}")
time.sleep(10)
print(f"Report status: {scheduler.get_status(tid2)}")
scheduler.stop()

Interview Follow-ups

  • Distributed scheduler: Replace the in-memory priority queue with a database table. Workers poll for PENDING tasks older than scheduled_at using SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) — prevents two workers from claiming the same task. This scales across multiple servers.
  • Dead letter queue: Tasks that exhaust retries are moved to a dead_letter_tasks table for manual inspection and reprocessing.
  • Cron-style scheduling: Store a cron expression with each recurring task. A separate scheduler thread wakes up every minute, generates next-run instances for all cron tasks due in the next interval, and enqueues them.
  • Heartbeat / task lease: Workers acquire a lease (DB row lock with TTL). If a worker crashes, the lease expires and another worker reclaims the task. Prevents orphaned RUNNING tasks.

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you design a distributed task scheduler that prevents duplicate execution?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Use a database as the task queue with pessimistic locking. Workers poll the database for PENDING tasks where scheduled_at 2 minutes), resets them to PENDING for retry. This lease-based approach ensures exactly-one-worker-at-a-time execution without a coordinator service. At-most-once delivery: mark as RUNNING before executing. At-least-once: re-enqueue on failure. Make tasks idempotent for safety.”}},{“@type”:”Question”,”name”:”How do you implement exponential backoff retry in a job queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Exponential backoff delays each retry attempt: delay = base_delay * (2 ^ retry_count). For base_delay=60s: retry 1 at 60s, retry 2 at 120s, retry 3 at 240s. Add random jitter (multiply by a random factor between 0.5 and 1.5) to prevent retry storms — if 1000 jobs fail simultaneously and all retry at the same time, you recreate the original problem. In code: next_retry_at = now + base_delay * (2 ** retry_count) * random.uniform(0.5, 1.5). Cap the maximum delay (e.g., 1 hour) so retries don’t wait indefinitely. Track retry_count on the task. On failure: if retry_count < max_retries: increment retry_count, set status=RETRYING, update scheduled_at=next_retry_at, re-enqueue. Else: status=FAILED, move to dead letter queue. The dead letter queue preserves failed tasks for manual investigation and reprocessing without losing data."}},{"@type":"Question","name":"How does a priority queue work in a task scheduler?","acceptedAnswer":{"@type":"Answer","text":"A priority queue (min-heap in Python via heapq) ensures higher-priority tasks are always executed before lower-priority ones. Each task has a priority value (e.g., URGENT=20, HIGH=10, NORMAL=5, LOW=1). The heap orders tasks so the highest-priority task is always at the top. When priorities are equal, tasks are ordered by scheduled_at (earlier scheduled tasks run first). In Python, heapq is a min-heap — to get max-priority behavior, negate the priority value or implement __lt__ on the Task class to reverse the ordering. Thread safety: use a threading.Condition (wraps a Lock) to protect the heap. Workers call wait() on the Condition when the queue is empty or all tasks are in the future; publishers call notify() after adding a task to wake a waiting worker. The Condition's wait(timeout) allows workers to sleep until either a new task arrives or a scheduled task becomes due."}}]}

🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering

🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering

🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

🏢 Asked at: Atlassian Interview Guide

🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale

Scroll to Top