Log Aggregation Pipeline Low-Level Design: Ingestion, Parsing, Indexing, and Alerting

Log Aggregation Pipeline: Low-Level Design

A log aggregation pipeline collects structured logs from thousands of services, parses and enriches them, builds an inverted index for full-text search, enforces retention policies, and evaluates alert rules in near real-time. This article designs each stage from Kafka ingestion through index construction to threshold-based alerting with sliding window aggregation.

Ingestion via Kafka

Services emit structured JSON logs to a Kafka topic logs.raw partitioned by service_name. Partitioning by service keeps logs for the same service on the same partition, preserving intra-service ordering without global ordering overhead. Consumers in the parsing tier are a Kafka consumer group; each partition is owned by exactly one consumer, enabling parallel parsing without coordination locks.


# Log producer (service side)
import json, logging, time
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    value_serializer=lambda v: json.dumps(v).encode(),
    compression_type="lz4",
    acks="all",
    retries=5,
)

def emit_log(level: str, message: str, service: str, **extra):
    record = {
        "ts": int(time.time() * 1000),
        "level": level.upper(),
        "service": service,
        "message": message,
        **extra,
    }
    producer.send("logs.raw", value=record, key=service.encode())

SQL Schema


-- Partitioned by month for efficient retention drops
CREATE TABLE LogEntry (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    ts            DATETIME(3)       NOT NULL,
    level         ENUM('DEBUG','INFO','WARN','ERROR','FATAL') NOT NULL,
    service       VARCHAR(128)      NOT NULL,
    host          VARCHAR(255)      NOT NULL,
    trace_id      CHAR(32)          NULL,
    message       TEXT              NOT NULL,
    fields        JSON              NULL,      -- enriched fields (geo, env, version)
    ingested_at   DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    PRIMARY KEY (id, ts),
    INDEX idx_service_ts  (service, ts DESC),
    INDEX idx_level_ts    (level, ts DESC),
    INDEX idx_trace       (trace_id)
) ENGINE=InnoDB
  PARTITION BY RANGE (UNIX_TIMESTAMP(ts)) (
    PARTITION p_2025_01 VALUES LESS THAN (UNIX_TIMESTAMP('2025-02-01')),
    PARTITION p_2025_02 VALUES LESS THAN (UNIX_TIMESTAMP('2025-03-01')),
    PARTITION p_future  VALUES LESS THAN MAXVALUE
);

CREATE TABLE LogIndex (
    term          VARCHAR(255)      NOT NULL,
    log_id        BIGINT UNSIGNED   NOT NULL,
    ts            DATETIME(3)       NOT NULL,   -- denormalized for range pruning
    PRIMARY KEY (term, log_id),
    INDEX idx_log_terms (log_id)
) ENGINE=InnoDB;

CREATE TABLE AlertRule (
    id            INT UNSIGNED      NOT NULL AUTO_INCREMENT,
    name          VARCHAR(255)      NOT NULL,
    filter_expr   TEXT              NOT NULL,   -- DSL: 'level=ERROR AND service=payments'
    metric        ENUM('count','rate','p99') NOT NULL DEFAULT 'count',
    threshold     DECIMAL(12,4)     NOT NULL,
    window_secs   INT UNSIGNED      NOT NULL DEFAULT 300,
    severity      ENUM('P1','P2','P3','P4') NOT NULL DEFAULT 'P3',
    enabled       TINYINT(1)        NOT NULL DEFAULT 1,
    PRIMARY KEY (id)
) ENGINE=InnoDB;

CREATE TABLE AlertFiring (
    id            BIGINT UNSIGNED   NOT NULL AUTO_INCREMENT,
    rule_id       INT UNSIGNED      NOT NULL,
    fired_at      DATETIME(3)       NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    resolved_at   DATETIME(3)       NULL,
    value         DECIMAL(12,4)     NOT NULL,   -- metric value that breached threshold
    PRIMARY KEY (id),
    INDEX idx_rule_open (rule_id, resolved_at)
) ENGINE=InnoDB;

Parsing and Enrichment

Each Kafka consumer worker reads a batch of raw log records, applies grok-style regex patterns to extract structured fields, and enriches with geo-IP data and service metadata before writing to LogEntry.


import re, json, maxminddb
from dataclasses import dataclass, field
from typing import Any

GROK_PATTERNS = {
    "nginx_access": re.compile(
        r'(?Pd+.d+.d+.d+) .+ "(?P[A-Z]+) (?PS+)'
        r' HTTP/S+" (?Pd{3}) (?Pd+)'
    ),
    "java_exception": re.compile(
        r"(?P[A-Za-z.]+Exception): (?P.+)"
    ),
}

_geoip_reader = maxminddb.open_database("/var/lib/GeoLite2-City.mmdb")

@dataclass
class ParsedLog:
    ts: int
    level: str
    service: str
    host: str
    message: str
    trace_id: str | None = None
    fields: dict[str, Any] = field(default_factory=dict)


def parse_log_line(raw: dict) -> ParsedLog:
    """Parse and enrich a raw log dict from Kafka."""
    log = ParsedLog(
        ts=raw.get("ts", 0),
        level=raw.get("level", "INFO"),
        service=raw.get("service", "unknown"),
        host=raw.get("host", ""),
        message=raw.get("message", ""),
        trace_id=raw.get("trace_id"),
    )

    # field extraction via grok
    extra_fields: dict[str, Any] = {}
    for pattern_name, pattern in GROK_PATTERNS.items():
        m = pattern.search(log.message)
        if m:
            extra_fields["grok_pattern"] = pattern_name
            extra_fields.update(m.groupdict())
            break

    # geo-IP enrichment for client_ip if present
    client_ip = extra_fields.get("client_ip") or raw.get("client_ip")
    if client_ip:
        try:
            geo = _geoip_reader.get(client_ip)
            if geo:
                extra_fields["geo_country"] = geo.get("country", {}).get("iso_code")
                extra_fields["geo_city"] = (
                    geo.get("city", {}).get("names", {}).get("en")
                )
        except Exception:
            pass

    # service metadata from environment tag
    extra_fields["env"] = raw.get("env", "prod")
    log.fields = {**extra_fields, **{k: v for k, v in raw.items()
                                      if k not in ("ts","level","service","host","message","trace_id")}}
    return log


def index_log(log_id: int, ts: str, message: str, fields: dict, conn) -> None:
    """Build inverted index posting list entries for the log entry."""
    tokens = set(_tokenize(message))
    for fval in fields.values():
        if isinstance(fval, str):
            tokens.update(_tokenize(fval))
    if not tokens:
        return
    rows = [(tok, log_id, ts) for tok in tokens if len(tok) >= 3]
    cursor = conn.cursor()
    cursor.executemany(
        "INSERT IGNORE INTO LogIndex (term, log_id, ts) VALUES (%s, %s, %s)", rows
    )


def _tokenize(text: str) -> list[str]:
    return re.findall(r"[a-z0-9_-.]{3,}", text.lower())

Full-Text Search with Inverted Index

The LogIndex table is a posting list: each (term, log_id) pair lets you look up all log entries containing a term. A multi-term query performs an index intersection: fetch posting lists for each term and intersect on log_id. Time-range pruning uses the denormalized ts column: WHERE term = 'oom' AND ts BETWEEN '...' AND '...' hits the composite index and prunes most of the posting list before the intersection.

Alerting with Sliding Window Aggregation


from datetime import datetime, timedelta
import db

def evaluate_alert_rules() -> list[dict]:
    """Run periodically (e.g., every 60 s). Returns list of newly fired alerts."""
    rules = db.fetchall("SELECT * FROM AlertRule WHERE enabled = 1")
    fired = []

    for rule in rules:
        window_start = datetime.utcnow() - timedelta(seconds=rule["window_secs"])
        where_clause, params = _parse_filter_expr(rule["filter_expr"])

        count = db.fetchone(
            f"SELECT COUNT(*) AS cnt FROM LogEntry WHERE ts >= %s AND {where_clause}",
            [window_start] + params
        )["cnt"]

        if count >= rule["threshold"]:
            # deduplicate: only fire if no open alert for this rule
            open_alert = db.fetchone(
                "SELECT id FROM AlertFiring WHERE rule_id = %s AND resolved_at IS NULL",
                (rule["id"],)
            )
            if not open_alert:
                alert_id = db.execute(
                    "INSERT INTO AlertFiring (rule_id, value) VALUES (%s, %s)",
                    (rule["id"], count)
                )
                fired.append({"rule": rule["name"], "value": count, "alert_id": alert_id})
        else:
            # resolve any open alert
            db.execute(
                "UPDATE AlertFiring SET resolved_at = NOW(3) WHERE rule_id = %s AND resolved_at IS NULL",
                (rule["id"],)
            )

    return fired


def _parse_filter_expr(expr: str) -> tuple[str, list]:
    """Minimal DSL: 'level=ERROR AND service=payments' -> SQL fragment + params."""
    clauses, params = [], []
    for token in re.split(r"bANDb", expr, flags=re.IGNORECASE):
        token = token.strip()
        m = re.match(r"(w+)s*=s*(.+)", token)
        if m:
            col, val = m.group(1).strip(), m.group(2).strip().strip("'"")
            clauses.append(f"{col} = %s")
            params.append(val)
    return " AND ".join(clauses) if clauses else "1=1", params

Retention Policy

The LogEntry table is range-partitioned by month. Dropping a month’s data is an instant DDL operation: ALTER TABLE LogEntry DROP PARTITION p_2025_01 avoids expensive row-level deletes. The LogIndex posting lists for dropped partitions are purged by a background job that batches deletes on ts < retention_cutoff.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why use Kafka for log ingestion instead of writing directly to a database?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Kafka decouples producers from consumers and absorbs traffic spikes without back-pressuring services. It provides durable, replayable storage so consumers can be restarted or scaled without losing logs. Partitioning by service_name keeps logs for each service ordered on a single partition, enabling efficient parallel parsing. Writing directly to a database would create a write bottleneck and a hard operational dependency between every service and the storage tier.”
}
},
{
“@type”: “Question”,
“name”: “How does an inverted index enable full-text search over logs?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “An inverted index maps each token (term) to the list of log IDs containing it, called a posting list. For a multi-term query, the system fetches posting lists for each term and intersects them to find log IDs matching all terms. A denormalized timestamp in the posting list row allows time-range pruning before the intersection, dramatically reducing the working set. This is the same approach used by Elasticsearch and Lucene.”
}
},
{
“@type”: “Question”,
“name”: “How do sliding window alert rules work in a log aggregation system?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “A sliding window alert rule defines a time window (e.g., 5 minutes), a filter expression (e.g., level=ERROR AND service=payments), and a threshold (e.g., 100 occurrences). An evaluator runs periodically, computes COUNT(*) from LogEntry for rows matching the filter within the window, and fires an alert if the count exceeds the threshold. Open alerts are deduplicated so a single breach does not create multiple notifications, and the alert resolves automatically when the count drops below the threshold.”
}
},
{
“@type”: “Question”,
“name”: “How do you implement log retention without expensive deletes?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Range partition the LogEntry table by time period (e.g., monthly). When a partition ages out of the retention window, issue ALTER TABLE … DROP PARTITION, which is an O(1) metadata operation that removes all rows in the partition instantly without row-level deletes or table locks. Purge the corresponding inverted index posting list entries separately with batched deletes on the ts column to avoid long-running transactions.”
}
}
]
}

{“@context”:”https://schema.org”,”@type”:”FAQPage”,”mainEntity”:[{“@type”:”Question”,”name”:”How are logs ingested at high throughput without data loss?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Producers write to Kafka topics partitioned by service; consumers commit offsets only after successful processing, ensuring at-least-once delivery with idempotent writes.”}},{“@type”:”Question”,”name”:”How does the inverted index enable fast log search?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”Token extraction builds a posting list mapping term → [(doc_id, position)]; queries look up terms, intersect posting lists, and rank by BM25 score.”}},{“@type”:”Question”,”name”:”How are alert rules evaluated over log streams?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”A sliding-window aggregation counts matching log lines within a time window; when the count exceeds a threshold, an AlertFiring record is created and notification is sent.”}},{“@type”:”Question”,”name”:”How is log retention managed at scale?”,”acceptedAnswer”:{“@type”:”Answer”,”text”:”LogEntry is range-partitioned by day; old partitions are detached and dropped (or archived to S3) by a maintenance job without locking the active partition.”}}]}

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

See also: Anthropic Interview Guide 2026: Process, Questions, and AI Safety

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

Scroll to Top