Log Aggregation Pipeline Low-Level Design: Ingestion, Parsing, Indexing, and Alerting

Log Aggregation Pipeline: Low-Level Design

A log aggregation pipeline collects structured logs from thousands of services, parses and enriches them, builds an inverted index for full-text search, enforces retention policies, and evaluates alert rules in near real-time. This article designs each stage from Kafka ingestion through index construction to threshold-based alerting with sliding window aggregation.

Ingestion via Kafka

Services emit structured JSON logs to a Kafka topic logs.raw partitioned by service_name. Partitioning by service keeps logs for the same service on the same partition, preserving intra-service ordering without global ordering overhead. Consumers in the parsing tier are a Kafka consumer group; each partition is owned by exactly one consumer, enabling parallel parsing without coordination locks.


# Log producer (service side)
import json, logging, time
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    value_serializer=lambda v: json.dumps(v).encode(),
    compression_type="lz4",
    acks="all",
    retries=5,
)

def emit_log(level: str, message: str, service: str, **extra):
    record = {
        "ts": int(time.time() * 1000),
        "level": level.upper(),
        "service": service,
        "message": message,
        **extra,
    }
    producer.send("logs.raw", value=record, key=service.encode())

SQL Schema


-- Partitioned by month for efficient retention drops
CREATE TABLE LogEntry (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    ts            DATETIME(3)       NOT NULL,
    level         ENUM('DEBUG','INFO','WARN','ERROR','FATAL') NOT NULL,
    service       VARCHAR(128)      NOT NULL,
    host          VARCHAR(255)      NOT NULL,
    trace_id      CHAR(32)          NULL,
    message       TEXT              NOT NULL,
    fields        JSON              NULL,      -- enriched fields (geo, env, version)
    ingested_at   DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id, ts),
    INDEX idx_service_ts  (service, ts DESC),
    INDEX idx_level_ts    (level, ts DESC),
    INDEX idx_trace       (trace_id)
) ENGINE=InnoDB
  PARTITION BY RANGE (UNIX_TIMESTAMP(ts)) (
    PARTITION p_2025_01 VALUES LESS THAN (UNIX_TIMESTAMP('2025-02-01')),
    PARTITION p_2025_02 VALUES LESS THAN (UNIX_TIMESTAMP('2025-03-01')),
    PARTITION p_future  VALUES LESS THAN MAXVALUE
);

CREATE TABLE LogIndex (
    term          VARCHAR(255)      NOT NULL,
    log_id        BIGINT UNSIGNED   NOT NULL,
    ts            DATETIME(3)       NOT NULL,   -- denormalized for range pruning
    PRIMARY KEY (term, log_id),
    INDEX idx_log_terms (log_id)
) ENGINE=InnoDB;

CREATE TABLE AlertRule (
    id            INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    name          VARCHAR(255)      NOT NULL,
    filter_expr   TEXT              NOT NULL,   -- DSL: 'level=ERROR AND service=payments'
    metric        ENUM('count','rate','p99') NOT NULL DEFAULT 'count',
    threshold     DECIMAL(12,4)     NOT NULL,
    window_secs   INT UNSIGNED      NOT NULL DEFAULT 300,
    severity      ENUM('P1','P2','P3','P4') NOT NULL DEFAULT 'P3',
    enabled       TINYINT(1)        NOT NULL DEFAULT 1,
    PRIMARY KEY (id)
) ENGINE=InnoDB;

CREATE TABLE AlertFiring (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    rule_id       INT UNSIGNED      NOT NULL,
    fired_at      DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    resolved_at   DATETIME(3)       NULL,
    value         DECIMAL(12,4)     NOT NULL,   -- metric value that breached threshold
    PRIMARY KEY (id),
    INDEX idx_rule_open (rule_id, resolved_at)
) ENGINE=InnoDB;

Parsing and Enrichment

Each Kafka consumer worker reads a batch of raw log records, applies grok-style regex patterns to extract structured fields, and enriches with geo-IP data and service metadata before writing to LogEntry.


import re, json, maxminddb
from dataclasses import dataclass, field
from typing import Any

GROK_PATTERNS = {
    "nginx_access": re.compile(
        r'(?Pd+.d+.d+.d+) .+ "(?P[A-Z]+) (?PS+)'
        r' HTTP/S+" (?Pd{3}) (?Pd+)'
    ),
    "java_exception": re.compile(
        r"(?P[A-Za-z.]+Exception): (?P.+)"
    ),
}

_geoip_reader = maxminddb.open_database("/var/lib/GeoLite2-City.mmdb")

@dataclass
class ParsedLog:
    ts: int
    level: str
    service: str
    host: str
    message: str
    trace_id: str | None = None
    fields: dict[str, Any] = field(default_factory=dict)


def parse_log_line(raw: dict) -> ParsedLog:
    """Parse and enrich a raw log dict from Kafka."""
    log = ParsedLog(
        ts=raw.get("ts", 0),
        level=raw.get("level", "INFO"),
        service=raw.get("service", "unknown"),
        host=raw.get("host", ""),
        message=raw.get("message", ""),
        trace_id=raw.get("trace_id"),
    )

    # field extraction via grok
    extra_fields: dict[str, Any] = {}
    for pattern_name, pattern in GROK_PATTERNS.items():
        m = pattern.search(log.message)
        if m:
            extra_fields["grok_pattern"] = pattern_name
            extra_fields.update(m.groupdict())
            break

    # geo-IP enrichment for client_ip if present
    client_ip = extra_fields.get("client_ip") or raw.get("client_ip")
    if client_ip:
        try:
            geo = _geoip_reader.get(client_ip)
            if geo:
                extra_fields["geo_country"] = geo.get("country", {}).get("iso_code")
                extra_fields["geo_city"] = (
                    geo.get("city", {}).get("names", {}).get("en")
                )
        except Exception:
            pass

    # service metadata from environment tag
    extra_fields["env"] = raw.get("env", "prod")
    log.fields = {**extra_fields, **{k: v for k, v in raw.items()
                                      if k not in ("ts","level","service","host","message","trace_id")}}
    return log


def index_log(log_id: int, ts: str, message: str, fields: dict, conn) -> None:
    """Build inverted index posting list entries for the log entry."""
    tokens = set(_tokenize(message))
    for fval in fields.values():
        if isinstance(fval, str):
            tokens.update(_tokenize(fval))
    if not tokens:
        return
    rows = [(tok, log_id, ts) for tok in tokens if len(tok) >= 3]
    cursor = conn.cursor()
    cursor.executemany(
        "INSERT IGNORE INTO LogIndex (term, log_id, ts) VALUES (%s, %s, %s)", rows
    )


def _tokenize(text: str) -> list[str]:
    return re.findall(r"[a-z0-9_-.]{3,}", text.lower())

Full-Text Search with Inverted Index

The LogIndex table is a posting list: each (term, log_id) pair lets you look up all log entries containing a term. A multi-term query performs an index intersection: fetch posting lists for each term and intersect on log_id. Time-range pruning uses the denormalized ts column: WHERE term = 'oom' AND ts BETWEEN '...' AND '...' hits the composite index and prunes most of the posting list before the intersection.

Alerting with Sliding Window Aggregation


from datetime import datetime, timedelta
import db

def evaluate_alert_rules() -> list[dict]:
    """Run periodically (e.g., every 60 s). Returns list of newly fired alerts."""
    rules = db.fetchall("SELECT * FROM AlertRule WHERE enabled = 1")
    fired = []

    for rule in rules:
        window_start = datetime.utcnow() - timedelta(seconds=rule["window_secs"])
        where_clause, params = _parse_filter_expr(rule["filter_expr"])

        count = db.fetchone(
            f"SELECT COUNT(*) AS cnt FROM LogEntry WHERE ts >= %s AND {where_clause}",
            [window_start] + params
        )["cnt"]

        if count >= rule["threshold"]:
            # deduplicate: only fire if no open alert for this rule
            open_alert = db.fetchone(
                "SELECT id FROM AlertFiring WHERE rule_id = %s AND resolved_at IS NULL",
                (rule["id"],)
            )
            if not open_alert:
                alert_id = db.execute(
                    "INSERT INTO AlertFiring (rule_id, value) VALUES (%s, %s)",
                    (rule["id"], count)
                )
                fired.append({"rule": rule["name"], "value": count, "alert_id": alert_id})
        else:
            # resolve any open alert
            db.execute(
                "UPDATE AlertFiring SET resolved_at = NOW(3) WHERE rule_id = %s AND resolved_at IS NULL",
                (rule["id"],)
            )

    return fired


def _parse_filter_expr(expr: str) -> tuple[str, list]:
    """Minimal DSL: 'level=ERROR AND service=payments' -> SQL fragment + params."""
    clauses, params = [], []
    for token in re.split(r"bANDb", expr, flags=re.IGNORECASE):
        token = token.strip()
        m = re.match(r"(w+)s*=s*(.+)", token)
        if m:
            col, val = m.group(1).strip(), m.group(2).strip().strip("'"")
            clauses.append(f"{col} = %s")
            params.append(val)
    return " AND ".join(clauses) if clauses else "1=1", params

Retention Policy

The LogEntry table is range-partitioned by month. Dropping a month’s data is an instant DDL operation: ALTER TABLE LogEntry DROP PARTITION p_2025_01 avoids expensive row-level deletes. The LogIndex posting lists for dropped partitions are purged by a background job that batches deletes on ts < retention_cutoff.

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

Scroll to Top