Columnar Storage Engine Low-Level Design: Column Grouping, Encoding, Predicate Pushdown, and Vectorized Execution

Columnar vs Row Storage: The Fundamental Tradeoff

In a row-oriented storage engine (PostgreSQL, MySQL, SQL Server), all column values for a given row are stored together on disk. To read column A across 10 million rows, the engine must read all row pages even if only column A is needed — most of the I/O is wasted reading unused columns.

In a columnar storage engine (Parquet, Apache ORC, Redshift, ClickHouse), all values for column A are stored together, all values for column B together, and so on. To read column A across 10 million rows, the engine reads only the pages containing column A — eliminating I/O for unused columns entirely. For OLAP queries that typically access 5–10 of 100+ columns, this can reduce I/O by 90%+.

Row Groups

Columnar files are divided into row groups — horizontal partitions containing a fixed number of rows (e.g., 128,000 rows in Parquet's default). Within each row group, data is stored in column chunks: all values for column A in the row group, then all values for column B, etc.

Row groups serve several purposes:

  • They make per-column statistics (min, max, null_count) meaningful for skipping: if the row group's min/max for column “age” is [25, 35] and the query filters age > 50, the entire row group can be skipped without reading any data.
  • They allow partial decompression: reading column A from row group 3 only decompresses that specific column chunk.
  • They set the granularity for parallel processing: different workers can process different row groups concurrently.

Column Encoding Schemes

Because all values in a column chunk are the same data type and often have limited cardinality, columnar formats achieve much higher compression than row formats:

Dictionary encoding: maintain a per-chunk dictionary mapping each distinct string value to a small integer code. Store the column as an array of integer codes plus the dictionary. For a “country” column with 195 distinct values, a 4-byte string becomes a 1-byte code. Dictionary encoding is highly effective for string columns with repeated values — cardinality below ~10,000 is typical. Parquet uses dictionary encoding by default for string columns.

RLE (Run-Length Encoding): for sorted or clustered data, store (value, count) pairs instead of repeated values. A column with 1 million rows sorted by status where “active” appears 800,000 consecutive times is encoded as a single pair (active, 800000). RLE compresses aggressively when data is sorted by the encoded column — a key reason columnar tables are often sorted by high-cardinality filter columns.

Bit packing: if an integer column's maximum value fits in N bits (N < 32 or 64), pack multiple values into a single word. A column where all values are 0–255 uses 8-bit integers instead of 32-bit. Combined with SIMD, packed integers allow processing 8–16 values per CPU instruction.

Delta encoding: store differences between consecutive values. Useful for monotonically increasing columns like timestamps: instead of storing [1700000000, 1700000001, 1700000002], store [1700000000, 1, 1] — deltas that are tiny and compress further with RLE or bit packing.

Parquet Format and Nested Column Encoding

Apache Parquet encodes nested schemas (arrays, maps, structs) using the Dremel encoding from Google's Dremel paper. Each column value is associated with two auxiliary integers:

  • Definition level: how deeply the value's path is defined in the schema (distinguishes null from missing)
  • Repetition level: how many levels in the path have been repeated (for arrays within arrays)

These levels are themselves RLE-encoded and stored alongside the column data. This allows Parquet to flatten fully nested JSON-like structures into columnar storage without losing structural information.

Predicate Pushdown and Zone Maps

Zone maps (also called min/max indexes or small materialized aggregates) store per-row-group statistics for each column: minimum value, maximum value, null count, and optionally distinct count. These statistics are stored in the file metadata and can be read without decompressing any column data.

During query execution, the engine evaluates filter predicates against zone maps before reading any row group. A row group is skipped entirely if the predicate cannot possibly match any row in it. For example, a query with WHERE event_date = '2024-01-15' on a dataset sorted by event_date can skip all row groups whose max event_date < 2024-01-15 or min event_date > 2024-01-15. In practice, predicate pushdown with zone maps eliminates 80–99% of I/O for selective queries on sorted columnar data.

Late Materialization

Late materialization defers assembling full rows until after filter predicates have been applied. The query engine:

  1. Reads only the filtered columns (e.g., “status”, “amount”) from the column chunks.
  2. Evaluates the WHERE predicates on those columns, producing a bitmask of matching row positions.
  3. Only then reads the remaining projected columns (e.g., “user_id”, “product_name”) for the rows that matched.

If 1% of rows match the filter, late materialization reduces I/O for projected columns by 99×. This is a significant optimization for selective OLAP queries on wide tables.

Vectorized Execution and SIMD

Traditional row-at-a-time execution engines call operator functions once per row — significant per-row overhead from function calls, branch mispredictions, and interpreter dispatch. Vectorized execution processes batches of 1024+ values per operator call, amortizing this overhead.

On columnar data, adjacent values for the same column are already stored contiguously in memory — exactly the layout SIMD (Single Instruction Multiple Data) instructions require. A SIMD comparison instruction processes 8 32-bit integers or 4 64-bit integers simultaneously. Combined with bit-packed columnar data and vectorized operators, modern columnar engines (DuckDB, Velox, ClickHouse) achieve throughput 10–100× higher than row-at-a-time engines for aggregation and filter-heavy OLAP queries.

SQL Data Model

-- Row group metadata
CREATE TABLE ColumnGroup (
    id              BIGSERIAL PRIMARY KEY,
    table_name      VARCHAR(255) NOT NULL,
    columns         JSONB NOT NULL,         -- list of column names in this group
    row_count       BIGINT NOT NULL,
    row_group_size  INT NOT NULL DEFAULT 131072,  -- rows per group
    file_path       VARCHAR(1024) NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

-- Per-column chunk metadata within a row group
CREATE TABLE ColumnChunk (
    id              BIGSERIAL PRIMARY KEY,
    group_id        BIGINT NOT NULL REFERENCES ColumnGroup(id),
    column_name     VARCHAR(255) NOT NULL,
    encoding        VARCHAR(64) NOT NULL,   -- DICTIONARY, RLE, PLAIN, DELTA, BIT_PACKED
    compressed_size BIGINT NOT NULL,
    uncompressed_size BIGINT NOT NULL,
    min_value       TEXT,
    max_value       TEXT,
    null_count      BIGINT NOT NULL DEFAULT 0,
    data            BYTEA,
    UNIQUE (group_id, column_name)
);

-- Row group statistics for zone map pruning
CREATE TABLE RowGroupStats (
    group_id    BIGINT PRIMARY KEY REFERENCES ColumnGroup(id),
    min_row_id  BIGINT NOT NULL,
    max_row_id  BIGINT NOT NULL,
    file_offset BIGINT NOT NULL
);

CREATE INDEX idx_columnchunk_group ON ColumnChunk(group_id);
CREATE INDEX idx_columnchunk_name ON ColumnChunk(column_name);

Python Implementation Sketch

import struct, zlib
from typing import Any
import pandas as pd

ROW_GROUP_SIZE = 131_072  # 128K rows per row group

def write_column_group(table_name: str, df: pd.DataFrame,
                       file_path: str) -> list[dict]:
    """Write DataFrame to columnar format in row groups. Returns row group metadata."""
    metadata = []
    for rg_idx, start in enumerate(range(0, len(df), ROW_GROUP_SIZE)):
        chunk = df.iloc[start:start + ROW_GROUP_SIZE]
        rg_meta = {"row_group": rg_idx, "row_count": len(chunk), "columns": {}}
        for col in df.columns:
            values = chunk[col].tolist()
            encoded, encoding_name = encode_dictionary(values)
            compressed = zlib.compress(encoded, level=6)
            rg_meta["columns"][col] = {
                "encoding": encoding_name,
                "compressed_size": len(compressed),
                "uncompressed_size": len(encoded),
                "min_value": min((v for v in values if v is not None), default=None),
                "max_value": max((v for v in values if v is not None), default=None),
                "null_count": sum(1 for v in values if v is None),
            }
        metadata.append(rg_meta)
    return metadata

def encode_dictionary(values: list) -> tuple[bytes, str]:
    """Apply dictionary encoding to a list of values."""
    distinct = list(dict.fromkeys(v for v in values if v is not None))
    if len(distinct)  pd.DataFrame:
    """Read specified columns, applying zone map pruning and predicate pushdown."""
    results = []
    for rg in row_group_stats:
        # Zone map pruning: skip row groups that cannot match predicates
        skip = False
        for col, (op, val) in predicates.items():
            col_stats = rg.get("columns", {}).get(col, {})
            if op == "eq":
                if (col_stats.get("min_value") is not None and
                        col_stats["max_value"] is not None):
                    if val  col_stats["max_value"]:
                        skip = True; break
        if skip:
            continue
        # Load only requested columns (late materialization)
        # In practice: read compressed column chunk bytes, decompress, decode
        results.append(rg)
    return pd.DataFrame(results)  # placeholder

def vectorized_filter(column_chunk: list, predicate: tuple) -> list[int]:
    """Apply a predicate to a column chunk, returning matching row indices."""
    op, val = predicate
    # In practice: use NumPy vectorized comparison or SIMD-backed operations
    if op == "eq":
        return [i for i, v in enumerate(column_chunk) if v == val]
    elif op == "gt":
        return [i for i, v in enumerate(column_chunk) if v is not None and v > val]
    elif op == "lt":
        return [i for i, v in enumerate(column_chunk) if v is not None and v < val]
    return list(range(len(column_chunk)))

Frequently Asked Questions

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “When should I use columnar storage instead of row storage?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Use columnar storage for OLAP workloads: analytical queries that scan large portions of the table, aggregate across many rows, and access only a few columns at a time. Examples include reporting, dashboarding, log analytics, and data warehousing. Row storage is better for OLTP workloads: transactional queries that read or write a small number of specific rows accessing all or most columns (INSERT, UPDATE, SELECT by primary key). The fundamental rule: if your queries scan columns, use columnar; if your queries scan rows, use row-oriented storage.”
}
},
{
“@type”: “Question”,
“name”: “How much I/O does predicate pushdown save?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Predicate pushdown with zone maps can eliminate 80-99% of I/O for selective queries on sorted columnar data. If the queried column is the sort key of the columnar file (or one of the sort keys in a compound sort), most row groups will have non-overlapping value ranges and can be skipped by min/max comparison. For example, a query filtering on a date column in a table sorted by date needs to read only the row groups covering the queried date range — potentially 1-5 row groups out of thousands. Even for unsorted data, a predicate that matches 10% of rows will skip ~60-70% of row groups on average if values are uniformly distributed.”
}
},
{
“@type”: “Question”,
“name”: “What is the typical compression ratio for dictionary encoding?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Dictionary encoding compression ratio depends on column cardinality and value size. For a string column with 100 distinct values (average 10 bytes each) in a 10-million-row table, without encoding each value requires 100 MB. With dictionary encoding, the dictionary is 1 KB and each row stores a 1-byte code: 10 MB total — a 10x compression ratio. Additional compression (zstd, snappy) on the encoded data typically adds another 2-5x. Parquet files with dictionary-encoded string columns and zstd compression commonly achieve 10-50x compression over raw CSV data for typical business data.”
}
},
{
“@type”: “Question”,
“name”: “What is late materialization and why does it matter?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Late materialization defers reading non-filter columns until after filter predicates have been applied. Instead of reading all projected columns for every row and then filtering, the engine reads filter columns first, evaluates predicates to get a set of matching row IDs, and only then reads the remaining projected columns for those rows. If 1% of rows match the filter, late materialization reduces I/O for non-filter projected columns by 99x. This optimization is especially valuable in columnar storage because reading each column independently is natural — the engine reads only the column chunks it needs, for only the row groups that pass zone map pruning.”
}
}
]
}

Columnar vs Row Storage for OLAP

Use columnar storage when queries scan large row counts but access few columns — analytics, reporting, log analysis. Use row storage for OLTP queries that access entire rows by primary key. The rule: queries that scan columns benefit from columnar; queries that scan rows benefit from row-oriented storage.

Predicate Pushdown I/O Savings

Zone map pruning eliminates 80–99% of I/O for selective queries on sorted columnar data. A filter on a sort key date column needs to read only row groups covering the queried date range — often 1–5 of thousands. Even on unsorted data, a 10%-selective predicate skips ~60–70% of row groups on average.

Dictionary Encoding Compression Ratio

For a string column with 100 distinct values (avg 10 bytes) in 10 million rows: raw = 100 MB, dictionary-encoded = ~10 MB (10× ratio). Adding zstd or snappy contributes another 2–5×. Real business data commonly achieves 10–50× compression over CSV with dictionary encoding plus block compression.

Late Materialization

Read filter columns first, evaluate predicates, get matching row IDs, then read remaining projected columns only for those rows. If 1% of rows match, non-filter column I/O is reduced by 99×. Works naturally with columnar storage where each column is read independently.

{
“@context”: “https://schema.org”,
“@type”: “FAQPage”,
“mainEntity”: [
{
“@type”: “Question”,
“name”: “Why is columnar storage better than row storage for OLAP queries?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Analytics queries typically read only a few columns from many rows; columnar storage reads only the queried columns from disk, skipping irrelevant columns entirely; row storage reads entire rows including unused columns, wasting I/O bandwidth.”
}
},
{
“@type”: “Question”,
“name”: “How does predicate pushdown save I/O in columnar storage?”,
“acceptedAnswer”: {
“@type”: “Answer”,
“text”: “Zone maps store per-chunk min and max statistics; if a query predicate (e.g., amount > 1000) cannot match any value in a chunk (max < 1000), the entire chunk is skipped without decompression or scan; this can eliminate 90%+ of data reads for selective queries."
}
},
{
"@type": "Question",
"name": "What is dictionary encoding and when does it provide high compression?",
"acceptedAnswer": {
"@type": "Answer",
"text": "Dictionary encoding replaces repeated string values with small integer codes; the dictionary maps code to original value; high compression occurs when a column has few distinct values repeated many times (e.g., country, status, category) — compression ratios of 10-100x are common."
}
},
{
"@type": "Question",
"name": "How does vectorized execution improve query performance?",
"acceptedAnswer": {
"@type": "Answer",
"text": "Instead of processing one row at a time (Volcano model), vectorized execution processes a batch of 1024+ values using SIMD CPU instructions; batch processing amortizes interpreter overhead and enables CPU-level parallelism, typically achieving 5-10x speedup over row-at-a-time execution."
}
}
]
}

See also: Databricks Interview Guide 2026: Spark Internals, Delta Lake, and Lakehouse Architecture

See also: Netflix Interview Guide 2026: Streaming Architecture, Recommendation Systems, and Engineering Excellence

See also: Scale AI Interview Guide 2026: Data Infrastructure, RLHF Pipelines, and ML Engineering

Scroll to Top