Low-Level Design: Logging Framework
A logging framework is a fundamental software engineering tool — and a frequently-asked OOP design question at companies like Uber, Google, and Atlassian. It tests: the Chain of Responsibility pattern, Strategy pattern for formatters, log level filtering, and thread safety. Designing a clean, extensible logger demonstrates solid OOP principles.
Core Classes
Log Level
from enum import IntEnum
class LogLevel(IntEnum):
DEBUG = 10
INFO = 20
WARNING = 30
ERROR = 40
CRITICAL = 50
Using IntEnum allows comparison: LogLevel.ERROR > LogLevel.INFO is True. A handler set to WARNING level will drop DEBUG and INFO messages.
Log Record
from dataclasses import dataclass, field
from datetime import datetime
import traceback
@dataclass
class LogRecord:
level: LogLevel
message: str
logger_name: str
timestamp: datetime = field(default_factory=datetime.now)
exc_info: str = None # formatted exception traceback
@classmethod
def from_exception(cls, level: LogLevel, message: str, logger_name: str) -> 'LogRecord':
return cls(
level=level,
message=message,
logger_name=logger_name,
exc_info=traceback.format_exc()
)
Formatter (Strategy Pattern)
from abc import ABC, abstractmethod
class Formatter(ABC):
@abstractmethod
def format(self, record: LogRecord) -> str:
pass
class SimpleFormatter(Formatter):
def format(self, record: LogRecord) -> str:
ts = record.timestamp.strftime('%Y-%m-%d %H:%M:%S')
return f"[{ts}] [{record.level.name}] [{record.logger_name}] {record.message}"
class JSONFormatter(Formatter):
def format(self, record: LogRecord) -> str:
import json
data = {
"timestamp": record.timestamp.isoformat(),
"level": record.level.name,
"logger": record.logger_name,
"message": record.message,
}
if record.exc_info:
data["exception"] = record.exc_info
return json.dumps(data)
class ColorFormatter(Formatter):
COLORS = {
LogLevel.DEBUG: "[36m", # Cyan
LogLevel.INFO: "[32m", # Green
LogLevel.WARNING: "[33m", # Yellow
LogLevel.ERROR: "[31m", # Red
LogLevel.CRITICAL: "[35m", # Magenta
}
RESET = "[0m"
def format(self, record: LogRecord) -> str:
color = self.COLORS.get(record.level, "")
ts = record.timestamp.strftime('%H:%M:%S')
return f"{color}[{ts}] {record.level.name}: {record.message}{self.RESET}"
Handler (Chain of Responsibility)
import sys
class Handler(ABC):
def __init__(
self,
level: LogLevel = LogLevel.DEBUG,
formatter: Formatter = None,
next_handler: 'Handler' = None
):
self.level = level
self.formatter = formatter or SimpleFormatter()
self.next_handler = next_handler # chain of responsibility
def handle(self, record: LogRecord) -> None:
"""Filter by level, format, emit. Then pass to next handler."""
if record.level >= self.level:
self.emit(self.formatter.format(record))
if self.next_handler:
self.next_handler.handle(record)
@abstractmethod
def emit(self, message: str) -> None:
pass
class ConsoleHandler(Handler):
def emit(self, message: str) -> None:
print(message, file=sys.stdout, flush=True)
class FileHandler(Handler):
def __init__(self, filepath: str, **kwargs):
super().__init__(**kwargs)
self._file = open(filepath, 'a', encoding='utf-8')
def emit(self, message: str) -> None:
self._file.write(message + '
')
self._file.flush()
def close(self) -> None:
self._file.close()
class RateLimitedHandler(Handler):
"""Drops repeated identical messages within a time window."""
def __init__(self, max_per_minute: int = 100, **kwargs):
super().__init__(**kwargs)
from collections import deque
self._timestamps = deque()
self._max = max_per_minute
def emit(self, message: str) -> None:
from datetime import datetime
now = datetime.now()
# Remove timestamps older than 1 minute
while self._timestamps and (now - self._timestamps[0]).total_seconds() > 60:
self._timestamps.popleft()
if len(self._timestamps) < self._max:
self._timestamps.append(now)
print(message)
# else: drop the message
Logger
import threading
class Logger:
def __init__(self, name: str, level: LogLevel = LogLevel.DEBUG):
self.name = name
self.level = level
self._handlers: list[Handler] = []
self._lock = threading.Lock()
def add_handler(self, handler: Handler) -> None:
self._handlers.append(handler)
def _log(self, level: LogLevel, message: str, exc: bool = False) -> None:
if level None: self._log(LogLevel.DEBUG, msg)
def info(self, msg: str) -> None: self._log(LogLevel.INFO, msg)
def warning(self, msg: str) -> None: self._log(LogLevel.WARNING, msg)
def error(self, msg: str) -> None: self._log(LogLevel.ERROR, msg)
def critical(self, msg: str) -> None: self._log(LogLevel.CRITICAL, msg)
def exception(self, msg: str) -> None: self._log(LogLevel.ERROR, msg, exc=True)
class LoggerFactory:
"""Singleton registry of named loggers."""
_loggers: dict[str, Logger] = {}
_lock = threading.Lock()
@classmethod
def get_logger(cls, name: str, level: LogLevel = LogLevel.DEBUG) -> Logger:
with cls._lock:
if name not in cls._loggers:
cls._loggers[name] = Logger(name, level)
return cls._loggers[name]
Usage Example
# Setup
logger = LoggerFactory.get_logger("payment_service", LogLevel.INFO)
console_handler = ConsoleHandler(level=LogLevel.DEBUG, formatter=ColorFormatter())
file_handler = FileHandler(
"/var/log/app.log",
level=LogLevel.WARNING,
formatter=JSONFormatter()
)
logger.add_handler(console_handler)
logger.add_handler(file_handler)
# Usage
logger.info("Payment service started")
logger.warning("High latency detected: 450ms")
try:
raise ValueError("Invalid card number")
except Exception:
logger.exception("Payment processing failed")
Design Patterns Used
- Strategy: Formatters are interchangeable — SimpleFormatter, JSONFormatter, ColorFormatter are swapped without modifying Handler.
- Chain of Responsibility: Handlers form a chain via next_handler. A log record flows down the chain, each handler decides independently whether to emit.
- Factory / Registry: LoggerFactory maintains a registry of named loggers — same name always returns the same instance (similar to Python’s logging.getLogger()).
- Template Method: Handler.handle() defines the algorithm (filter → format → emit → chain); subclasses override only emit().
Interview Follow-ups
- Async logging: Replace synchronous emit() calls with queue-based dispatch. Logger.log() enqueues LogRecord; a background thread dequeues and calls handlers. Prevents logging from blocking the main thread.
- Log rotation: RotatingFileHandler tracks file size; when it exceeds maxBytes, rename current log to app.log.1 and open a new app.log. Keep backupCount rotated files.
- Structured logging: Log key-value pairs instead of strings — easier to query in log aggregation systems (ELK, Splunk). JSONFormatter already supports this pattern.
{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”What design patterns are used in a logging framework?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A logging framework typically uses three main patterns: (1) Strategy pattern for Formatters — SimpleFormatter, JSONFormatter, and ColorFormatter all implement the same Formatter interface with a format(record) method. Handlers are configured with a formatter at construction time; swapping formatters changes output format without modifying Handler code. (2) Chain of Responsibility for Handlers — each Handler has a next_handler reference. handle() filters by level, emits if above threshold, then passes to next_handler. This allows a single log record to flow through multiple handlers (console + file + alerting) without the Logger knowing how many handlers exist. (3) Factory/Registry pattern for Logger instances — LoggerFactory maintains a dict of name → Logger. Multiple calls to get_logger(“payment_service”) return the same instance, ensuring consistent configuration across modules.”}},{“@type”:”Question”,”name”:”How do you implement thread-safe logging?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Use a threading.Lock in the Logger._log() method — acquire before iterating handlers, release after. This prevents two threads from interleaving their log records, which would corrupt multi-line log entries (e.g., a stack trace interleaved with another log line). Use threading.Lock (not RLock) because Logger._log() doesn’t re-enter the lock recursively. For high-throughput systems: synchronous locking can become a bottleneck. Solution: async logging queue. Logger._log() appends the LogRecord to a thread-safe queue.Queue instead of calling handlers directly. A single background thread dequeues records and calls handlers — no lock contention in the critical path. The background thread is the sole consumer, so handlers don’t need to be thread-safe themselves. Trade-off: async logging may lose the last few log records if the process crashes before the queue is drained — acceptable for most applications.”}},{“@type”:”Question”,”name”:”How do log levels work in a logging framework?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Log levels are a hierarchy (DEBUG < INFO < WARNING < ERROR = handler.level returns True only if the record’s level is at or above the handler’s threshold. Python’s logging module works exactly this way: getLogger().setLevel() is the logger filter, and handler.setLevel() is the handler filter. Always set the logger level to the minimum of all handler levels to avoid unnecessary record creation.”}}]}
🏢 Asked at: Uber Interview Guide 2026: Dispatch Systems, Geospatial Algorithms, and Marketplace Engineering
🏢 Asked at: Atlassian Interview Guide
🏢 Asked at: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture
🏢 Asked at: Shopify Interview Guide
🏢 Asked at: LinkedIn Interview Guide 2026: Social Graph Engineering, Feed Ranking, and Professional Network Scale