Skip to main content

Command Palette

Search for a command to run...

Columnar Storage: Analytical Query Optimization

Published
9 min read
T

Welcome to TopperBlog! 👋

I'm a tech content creator passionate about helping developers level up their careers and master cutting-edge technologies.

🎯 What I Write About: • AI/ML Engineering & LLMs • Web3 & Blockchain Development
• System Design & Architecture • Interview Preparation (FAANG) • Freelancing & Remote Work • Modern Tech Stacks (Next.js, React, Rust, TypeScript) • Performance Optimization & Best Practices

💼 Mission: Sharing practical, actionable insights that accelerate your tech career and maximize your earning potential.

📚 15+ In-Depth Guides covering everything from earning $10k/month as a freelancer to cracking FAANG interviews.

🌐 Let's connect and grow together in this amazing tech journey!

#TechBlogger #SoftwareEngineering #CareerGrowth #WebDevelopment #AIEngineering

Why Row-Oriented Storage Fails Modern Analytical Workloads

Row-oriented databases store complete records sequentially on disk, optimizing for transactional patterns where applications read and write entire rows. This design worked well for OLTP systems but creates severe penalties for analytical queries that typically aggregate or filter on specific columns across millions of rows.

Consider a customer transaction table with 50 columns storing behavioral data, demographics, and transaction details. An analytical query calculating monthly revenue by region needs only three columns: transaction_date, amount, and region. In row storage, the database must read all 50 columns for every row scanned, transferring 16x more data from disk than necessary. At petabyte scale, this translates to hours of unnecessary I/O and compute costs.

The problem intensifies with modern requirements. Real-time dashboards need sub-second query responses. Machine learning pipelines process feature columns across billions of training examples. Regulatory compliance requires efficient access to specific attributes without exposing entire records. Cloud storage costs are calculated per GB transferred, making inefficient scans financially unsustainable.

Traditional optimization techniques—indexes, materialized views, query caching—provide limited relief. Indexes help point lookups but don't reduce scan costs for aggregations. Materialized views require predicting query patterns and maintaining multiple copies of data. Caching works only for repeated queries, not ad-hoc exploration. These approaches add complexity without addressing the fundamental storage inefficiency.

Columnar Storage Architecture for Query Optimization

Columnar storage reorganizes data by column rather than row, storing all values for a single attribute contiguously. This seemingly simple change enables multiple optimization layers that dramatically improve analytical query performance.

The architecture separates each column into its own file or block structure. When a query requests specific columns, the system reads only those column files, eliminating unnecessary data transfer. For our 50-column transaction table, reading three columns means accessing only 6% of the data compared to row storage.

Beyond I/O reduction, columnar layout enables aggressive compression. Values within a column share the same data type and often exhibit patterns—repeated values, sorted sequences, limited cardinality. Modern compression algorithms exploit these patterns far more effectively than row-based compression.

// Modern columnar storage implementation using Apache Arrow format
import * as arrow from 'apache-arrow';
import { ParquetReader, ParquetWriter } from '@dsnp/parquetjs';

interface TransactionRecord {
  transaction_id: string;
  transaction_date: Date;
  customer_id: string;
  amount: number;
  region: string;
  // ... 45 additional columns
}

class ColumnarQueryEngine {
  private schema: arrow.Schema;

  constructor() {
    // Define schema with optimized data types
    this.schema = new arrow.Schema([
      new arrow.Field('transaction_date', new arrow.DateDay()),
      new arrow.Field('amount', new arrow.Decimal(10, 2)),
      new arrow.Field('region', new arrow.Dictionary(new arrow.Utf8(), new arrow.Int8())),
    ]);
  }

  async executeColumnProjection(
    filePath: string,
    columns: string[],
    predicate?: (batch: arrow.RecordBatch) => arrow.Vector
  ): Promise<arrow.Table> {
    const reader = await ParquetReader.openFile(filePath);
    const metadata = await reader.getMetadata();

    // Identify row groups that satisfy predicates using statistics
    const relevantRowGroups = this.pruneRowGroups(
      metadata,
      predicate
    );

    // Read only requested columns from relevant row groups
    const batches: arrow.RecordBatch[] = [];

    for (const rowGroupIndex of relevantRowGroups) {
      const rowGroup = metadata.rowGroups[rowGroupIndex];
      const columnChunks = await this.readColumnChunks(
        reader,
        rowGroup,
        columns
      );

      // Apply vectorized operations on decompressed column data
      const batch = this.createRecordBatch(columnChunks, columns);

      if (predicate) {
        const filtered = this.applyVectorizedFilter(batch, predicate);
        batches.push(filtered);
      } else {
        batches.push(batch);
      }
    }

    await reader.close();
    return new arrow.Table(batches);
  }

  private pruneRowGroups(
    metadata: any,
    predicate?: (batch: arrow.RecordBatch) => arrow.Vector
  ): number[] {
    if (!predicate) {
      return metadata.rowGroups.map((_: any, idx: number) => idx);
    }

    // Use column statistics (min/max, null count) for partition pruning
    return metadata.rowGroups
      .map((rg: any, idx: number) => ({ rg, idx }))
      .filter(({ rg }) => {
        // Check if row group statistics satisfy predicate
        const stats = rg.columns[0].metaData.statistics;
        return this.statisticsSatisfyPredicate(stats, predicate);
      })
      .map(({ idx }) => idx);
  }

  private async readColumnChunks(
    reader: any,
    rowGroup: any,
    columns: string[]
  ): Promise<Map<string, arrow.Vector>> {
    const chunks = new Map<string, arrow.Vector>();

    // Parallel column reads with decompression
    await Promise.all(
      columns.map(async (colName) => {
        const columnChunk = rowGroup.columns.find(
          (c: any) => c.metaData.path_in_schema[0] === colName
        );

        if (columnChunk) {
          // Read compressed column data
          const compressedData = await reader.readColumnChunk(
            columnChunk.metaData
          );

          // Decompress using codec specified in metadata
          const decompressed = await this.decompress(
            compressedData,
            columnChunk.metaData.codec
          );

          // Convert to Arrow vector for vectorized operations
          const vector = this.decodeToVector(
            decompressed,
            columnChunk.metaData.type
          );

          chunks.set(colName, vector);
        }
      })
    );

    return chunks;
  }

  private applyVectorizedFilter(
    batch: arrow.RecordBatch,
    predicate: (batch: arrow.RecordBatch) => arrow.Vector
  ): arrow.RecordBatch {
    // Generate boolean mask using SIMD-optimized operations
    const mask = predicate(batch);

    // Apply mask to all columns in parallel
    const filteredColumns = batch.schema.fields.map((field, idx) => {
      const column = batch.getChildAt(idx);
      return this.filterVector(column!, mask);
    });

    return new arrow.RecordBatch(
      batch.schema,
      filteredColumns[0].length,
      filteredColumns
    );
  }

  private filterVector(
    vector: arrow.Vector,
    mask: arrow.Vector
  ): arrow.Vector {
    // Use Arrow's built-in vectorized filtering
    const indices: number[] = [];
    for (let i = 0; i < mask.length; i++) {
      if (mask.get(i)) {
        indices.push(i);
      }
    }
    return vector.slice(0, vector.length).filter((_, idx) => 
      indices.includes(idx)
    );
  }

  // Additional helper methods for compression, encoding, statistics
  private async decompress(data: Buffer, codec: string): Promise<Buffer> {
    // Implementation depends on codec (SNAPPY, ZSTD, LZ4)
    return data; // Simplified
  }

  private decodeToVector(data: Buffer, type: string): arrow.Vector {
    // Convert Parquet encoding to Arrow vector
    return arrow.vectorFromArray([]); // Simplified
  }

  private statisticsSatisfyPredicate(stats: any, predicate: any): boolean {
    // Evaluate predicate against min/max statistics
    return true; // Simplified
  }

  private createRecordBatch(
    chunks: Map<string, arrow.Vector>,
    columns: string[]
  ): arrow.RecordBatch {
    const fields = columns.map(col => 
      new arrow.Field(col, chunks.get(col)!.type)
    );
    const schema = new arrow.Schema(fields);
    const vectors = columns.map(col => chunks.get(col)!);

    return new arrow.RecordBatch(schema, vectors[0].length, vectors);
  }
}

// Usage example for analytical query
async function analyzeMonthlyRevenue() {
  const engine = new ColumnarQueryEngine();

  // Query reads only 3 columns instead of all 50
  const result = await engine.executeColumnProjection(
    '/data/transactions.parquet',
    ['transaction_date', 'amount', 'region'],
    (batch) => {
      // Vectorized predicate: transaction_date >= '2025-01-01'
      const dates = batch.getChild('transaction_date')!;
      const threshold = new Date('2025-01-01').getTime();

      return arrow.vectorFromArray(
        Array.from({ length: dates.length }, (_, i) => 
          dates.get(i).getTime() >= threshold
        )
      );
    }
  );

  // Aggregate using vectorized operations
  const revenueByRegion = new Map<string, number>();
  const regionCol = result.getChild('region')!;
  const amountCol = result.getChild('amount')!;

  for (let i = 0; i < result.numRows; i++) {
    const region = regionCol.get(i);
    const amount = amountCol.get(i);
    revenueByRegion.set(
      region,
      (revenueByRegion.get(region) || 0) + amount
    );
  }

  return revenueByRegion;
}

This implementation demonstrates several critical optimization layers. Column projection reads only requested attributes from Parquet files. Partition pruning uses row group statistics to skip irrelevant data blocks before reading. Vectorized filtering applies predicates using SIMD operations on decompressed column vectors. Each layer compounds performance gains.

Compression Strategies for Columnar Data

Columnar storage enables compression ratios impossible with row-oriented formats. Homogeneous data types within columns allow specialized encoding schemes that exploit value patterns.

Dictionary encoding works exceptionally well for low-cardinality columns like region, status, or category. Instead of storing repeated string values, the system maintains a dictionary mapping integers to unique strings and stores only integer references. A region column with 50 distinct values across billions of rows compresses from gigabytes to megabytes.

Run-length encoding (RLE) compresses sorted or semi-sorted columns by storing value-count pairs. A timestamp column sorted by date might have millions of consecutive identical values, compressing to a single entry.

Delta encoding stores differences between consecutive values rather than absolute values. Monotonically increasing columns like auto-increment IDs or timestamps compress dramatically—storing deltas of 1 requires far fewer bits than storing full 64-bit integers.

Bit-packing reduces storage for integer columns with limited ranges. If values never exceed 1000, storing them in 10 bits instead of 32 bits saves 68% space.

class ColumnCompressionStrategy {
  selectOptimalEncoding(
    column: arrow.Vector,
    statistics: ColumnStatistics
  ): EncodingScheme {
    const cardinality = statistics.distinctCount;
    const totalRows = column.length;
    const sortedness = this.calculateSortedness(column);

    // Dictionary encoding for low cardinality
    if (cardinality < totalRows * 0.1 && cardinality < 10000) {
      return {
        type: 'DICTIONARY',
        estimatedRatio: totalRows / cardinality,
        indexBits: Math.ceil(Math.log2(cardinality))
      };
    }

    // RLE for sorted or semi-sorted data
    if (sortedness > 0.8) {
      const runs = this.countRuns(column);
      return {
        type: 'RLE',
        estimatedRatio: totalRows / runs,
        runLengthBits: Math.ceil(Math.log2(totalRows / runs))
      };
    }

    // Delta encoding for monotonic sequences
    if (this.isMonotonic(column)) {
      const maxDelta = this.calculateMaxDelta(column);
      return {
        type: 'DELTA',
        estimatedRatio: 64 / Math.ceil(Math.log2(maxDelta)),
        deltaBits: Math.ceil(Math.log2(maxDelta))
      };
    }

    // Bit-packing for bounded integers
    if (column.type.typeId === arrow.Type.Int) {
      const range = statistics.max - statistics.min;
      const requiredBits = Math.ceil(Math.log2(range + 1));
      if (requiredBits < 32) {
        return {
          type: 'BIT_PACKED',
          estimatedRatio: 32 / requiredBits,
          bits: requiredBits
        };
      }
    }

    // Fallback to general compression
    return {
      type: 'ZSTD',
      estimatedRatio: 2.5, // Typical ratio
      level: 3
    };
  }

  private calculateSortedness(column: arrow.Vector): number {
    let orderedPairs = 0;
    for (let i = 1; i < column.length; i++) {
      if (column.get(i) >= column.get(i - 1)) {
        orderedPairs++;
      }
    }
    return orderedPairs / (column.length - 1);
  }

  private countRuns(column: arrow.Vector): number {
    let runs = 1;
    for (let i = 1; i < column.length; i++) {
      if (column.get(i) !== column.get(i - 1)) {
        runs++;
      }
    }
    return runs;
  }

  private isMonotonic(column: arrow.Vector): boolean {
    let increasing = true;
    let decreasing = true;

    for (let i = 1; i < Math.min(1000, column.length); i++) {
      const curr = column.get(i);
      const prev = column.get(i - 1);
      if (curr < prev) increasing = false;
      if (curr > prev) decreasing = false;
    }

    return increasing || decreasing;
  }

  private calculateMaxDelta(column: arrow.Vector): number {
    let maxDelta = 0;
    for (let i = 1; i < column.length; i++) {
      const delta = Math.abs(column.get(i) - column.get(i - 1));
      maxDelta = Math.max(maxDelta, delta);
    }
    return maxDelta;
  }
}

interface EncodingScheme {
  type: 'DICTIONARY' | 'RLE' | 'DELTA' | 'BIT_PACKED' | 'ZSTD';
  estimatedRatio: number;
  [key: string]: any;
}

interface ColumnStatistics {
  distinctCount: number;
  min: any;
  max: any;
  nullCount: number;
}

Modern columnar formats like Parquet and ORC apply multiple encoding layers. A column might use dictionary encoding, then RLE on the dictionary indices, then general compression on the result. This cascading approach achieves compression ratios of 10:1 or higher on real-world datasets.

Partition Pruning and Predicate Pushdown

Reading only necessary columns solves half the problem. Partition pruning eliminates entire data blocks before reading, while predicate pushdown applies filters during scan rather than after.

Columnar formats organize data into row groups—typically 100MB to 1GB blocks containing a subset of rows. Each row group stores metadata including min/max values, null counts, and distinct counts for every column. Query engines use these statistics to skip row groups that cannot contain matching rows.

A query filtering WHERE transaction_date >= '2025-01-01' checks each row group's date column statistics. If a row group's maximum date is '2024-12-31', the engine skips it entirely without reading any data. At petabyte scale, this eliminates terabytes of unnecessary I/O.

```typescript class PartitionPruningOptimizer { async optimizeQueryPlan( query: AnalyticalQuery, fileMetadata: ParquetMetadata[] ): Promise { const prunedFiles: PrunedFile[] = [];

for (const file of fileMetadata) { const relevantRowGroups = this.pruneRowGroups( file.rowGroups, query.predicates );

if (relevantRowGroups.length > 0) { prunedFiles.push({ path: file.path, rowGroups: relevantRowGroups, estimatedRows: relevantRowGroups.reduce( (sum, rg) => sum + rg.numRows, 0 ) }); } }

// Sort files by estimated selectivity for optimal scheduling prunedFiles.sort((a, b) => a.estimatedRows - b.estimatedRows);

return { files: prunedFiles, projectedColumns: query.columns, pushedPredicates: this.identifyPushablePredicates(query.predicates), estimatedScanBytes: this.estimateScanSize(prunedFiles, query.columns) }; }

private pruneRowGroups( rowGroups: RowGroupMetadata[], predicates: Predicate[] ): RowGroupMetadata[] { return rowGroups.filter(rg => { return predicates.every(pred => this.rowGroupSatisfiesPredicate(rg, pred) ); }); }

private rowGroupSatisfiesPredicate( rowGroup: RowGroupMetadata, predicate: Predicate ): boolean { const columnStats = rowGroup.columns.find( c => c.name === predicate.column )?.statistics;

if (!columnStats) return true; // No statistics, must scan

switch (predicate.operator) { case