Low-Level Design: Task Scheduler / Job Queue
A task scheduler executes jobs at specified times or with specified delays, tracks their status, and handles failures with retry logic. This design appears at companies like Airbnb (Chronon), Uber (Cadence), and Databricks. It tests: priority queuing, state machines, concurrency, and the Worker pattern.
Core Classes
Enums
from enum import Enum
class TaskStatus(Enum):
PENDING = "PENDING" # waiting to be picked up
RUNNING = "RUNNING" # currently executing
SUCCEEDED = "SUCCEEDED" # completed successfully
FAILED = "FAILED" # final failure (retries exhausted)
RETRYING = "RETRYING" # scheduled for retry
CANCELLED = "CANCELLED"
class TaskPriority(Enum):
LOW = 1
NORMAL = 5
HIGH = 10
URGENT = 20
Task
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, Any
import uuid
@dataclass
class Task:
task_id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
func: Callable = None # the function to execute
args: tuple = field(default_factory=tuple)
kwargs: dict = field(default_factory=dict)
priority: TaskPriority = TaskPriority.NORMAL
status: TaskStatus = TaskStatus.PENDING
max_retries: int = 3
retry_count: int = 0
retry_delay_seconds: int = 60 # base delay, doubles each retry
scheduled_at: datetime = field(default_factory=datetime.now)
started_at: datetime = None
completed_at: datetime = None
result: Any = None
error: str = None
def __lt__(self, other: 'Task') -> bool:
"""For priority queue: higher priority = earlier execution."""
if self.priority.value != other.priority.value:
return self.priority.value > other.priority.value # higher priority first
return self.scheduled_at bool:
return self.retry_count datetime:
"""Exponential backoff: 60s, 120s, 240s, ..."""
delay = self.retry_delay_seconds * (2 ** self.retry_count)
return datetime.now() + timedelta(seconds=delay)
TaskScheduler
import heapq
import threading
from typing import Optional
class TaskScheduler:
def __init__(self, num_workers: int = 4):
self._queue: list[Task] = [] # min-heap (uses Task.__lt__)
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
self._tasks: dict[str, Task] = {} # task_id -> Task (registry)
self._running = False
self._workers: list[threading.Thread] = []
self._num_workers = num_workers
def submit(self, task: Task) -> str:
with self._not_empty:
self._tasks[task.task_id] = task
heapq.heappush(self._queue, task)
self._not_empty.notify() # wake a waiting worker
print(f"Submitted task {task.task_id[:8]} [{task.name}] priority={task.priority.name}")
return task.task_id
def schedule_at(self, task: Task, run_at: datetime) -> str:
task.scheduled_at = run_at
return self.submit(task)
def cancel(self, task_id: str) -> bool:
with self._lock:
task = self._tasks.get(task_id)
if not task or task.status != TaskStatus.PENDING:
return False
task.status = TaskStatus.CANCELLED
return True
def get_status(self, task_id: str) -> Optional[TaskStatus]:
task = self._tasks.get(task_id)
return task.status if task else None
def _get_next_task(self) -> Optional[Task]:
"""Pop the highest-priority task that is due."""
now = datetime.now()
with self._not_empty:
while self._running:
# Find the first due task (may need to wait for scheduled_at)
if self._queue and self._queue[0].scheduled_at None:
while self._running:
task = self._get_next_task()
if task is None:
break
self._execute(task)
def _execute(self, task: Task) -> None:
task.status = TaskStatus.RUNNING
task.started_at = datetime.now()
print(f"Executing: {task.task_id[:8]} [{task.name}]")
try:
task.result = task.func(*task.args, **task.kwargs)
task.status = TaskStatus.SUCCEEDED
task.completed_at = datetime.now()
print(f"Succeeded: {task.task_id[:8]} [{task.name}]")
except Exception as e:
task.error = str(e)
print(f"Failed: {task.task_id[:8]} [{task.name}] error={e}")
if task.can_retry():
task.retry_count += 1
task.status = TaskStatus.RETRYING
task.scheduled_at = task.next_retry_time()
# Re-enqueue for retry
with self._not_empty:
heapq.heappush(self._queue, task)
self._not_empty.notify()
print(f"Retry {task.retry_count}/{task.max_retries} scheduled for {task.name}")
else:
task.status = TaskStatus.FAILED
task.completed_at = datetime.now()
def start(self) -> None:
self._running = True
for i in range(self._num_workers):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self._workers.append(t)
print(f"TaskScheduler started with {self._num_workers} workers")
def stop(self, wait: bool = True) -> None:
self._running = False
with self._not_empty:
self._not_empty.notify_all() # wake all waiting workers
if wait:
for t in self._workers:
t.join(timeout=5)
print("TaskScheduler stopped")
Usage Example
import time
from datetime import datetime, timedelta
def send_email(to: str, subject: str) -> str:
time.sleep(0.1) # simulate work
return f"Email sent to {to}"
def generate_report(report_type: str) -> str:
time.sleep(0.5)
return f"Report {report_type} generated"
scheduler = TaskScheduler(num_workers=2)
scheduler.start()
# High-priority email
email_task = Task(
name="send_welcome_email",
func=send_email,
args=("user@example.com", "Welcome!"),
priority=TaskPriority.HIGH
)
tid1 = scheduler.submit(email_task)
# Scheduled report (run in 5 seconds)
report_task = Task(
name="daily_report",
func=generate_report,
args=("revenue",),
priority=TaskPriority.NORMAL,
max_retries=2
)
tid2 = scheduler.schedule_at(report_task, datetime.now() + timedelta(seconds=5))
time.sleep(2)
print(f"Email status: {scheduler.get_status(tid1)}")
time.sleep(10)
print(f"Report status: {scheduler.get_status(tid2)}")
scheduler.stop()
Interview Follow-ups
- Distributed scheduler: Replace the in-memory priority queue with a database table. Workers poll for PENDING tasks older than scheduled_at using SELECT FOR UPDATE SKIP LOCKED (PostgreSQL) — prevents two workers from claiming the same task. This scales across multiple servers.
- Dead letter queue: Tasks that exhaust retries are moved to a dead_letter_tasks table for manual inspection and reprocessing.
- Cron-style scheduling: Store a cron expression with each recurring task. A separate scheduler thread wakes up every minute, generates next-run instances for all cron tasks due in the next interval, and enqueues them.
- Heartbeat / task lease: Workers acquire a lease (DB row lock with TTL). If a worker crashes, the lease expires and another worker reclaims the task. Prevents orphaned RUNNING tasks.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How do you design a distributed task scheduler that prevents duplicate execution?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Use a database as the task queue with pessimistic locking. Workers poll the database for PENDING tasks where scheduled_at 2 minutes), resets them to PENDING for retry. This lease-based approach ensures exactly-one-worker-at-a-time execution without a coordinator service. At-most-once delivery: mark as RUNNING before executing. At-least-once: re-enqueue on failure. Make tasks idempotent for safety.”}},{“@type”:”Question”,”name”:”How do you implement exponential backoff retry in a job queue?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Exponential backoff delays each retry attempt: delay = base_delay * (2 ^ retry_count). For base_delay=60s: retry 1 at 60s, retry 2 at 120s, retry 3 at 240s. Add random jitter (multiply by a random factor between 0.5 and 1.5) to prevent retry storms — if 1000 jobs fail simultaneously and all retry at the same time, you recreate the original problem. In code: next_retry_at = now + base_delay * (2 ** retry_count) * random.uniform(0.5, 1.5). Cap the maximum delay (e.g., 1 hour) so retries don’t wait indefinitely. Track retry_count on the task. On failure: if retry_count < max_retries: increment retry_count, set status=RETRYING, update scheduled_at=next_retry_at, re-enqueue. Else: status=FAILED, move to dead letter queue. The dead letter queue preserves failed tasks for manual investigation and reprocessing without losing data."}},{"@type":"Question","name":"How does a priority queue work in a task scheduler?","acceptedAnswer":{"@type":"Answer","text":"A priority queue (min-heap in Python via heapq) ensures higher-priority tasks are always executed before lower-priority ones. Each task has a priority value (e.g., URGENT=20, HIGH=10, NORMAL=5, LOW=1). The heap orders tasks so the highest-priority task is always at the top. When priorities are equal, tasks are ordered by scheduled_at (earlier scheduled tasks run first). In Python, heapq is a min-heap — to get max-priority behavior, negate the priority value or implement __lt__ on the Task class to reverse the ordering. Thread safety: use a threading.Condition (wraps a Lock) to protect the heap. Workers call wait() on the Condition when the queue is empty or all tasks are in the future; publishers call notify() after adding a task to wake a waiting worker. The Condition's wait(timeout) allows workers to sleep until either a new task arrives or a scheduled task becomes due."}}]}
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: Airbnb Interview Guide 2026: Search Systems, Trust and Safety, and Full-Stack Engineering
🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale