Columnar Storage: Analytical Query Optimization
Welcome to TopperBlog! 👋
I'm a tech content creator passionate about helping developers level up their careers and master cutting-edge technologies.
🎯 What I Write About:
• AI/ML Engineering & LLMs
• Web3 & Blockchain Development
• System Design & Architecture
• Interview Preparation (FAANG)
• Freelancing & Remote Work
• Modern Tech Stacks (Next.js, React, Rust, TypeScript)
• Performance Optimization & Best Practices
💼 Mission: Sharing practical, actionable insights that accelerate your tech career and maximize your earning potential.
📚 15+ In-Depth Guides covering everything from earning $10k/month as a freelancer to cracking FAANG interviews.
🌐 Let's connect and grow together in this amazing tech journey!
#TechBlogger #SoftwareEngineering #CareerGrowth #WebDevelopment #AIEngineering
Why Row-Oriented Storage Fails Modern Analytical Workloads
Row-oriented databases store complete records sequentially on disk, optimizing for transactional patterns where applications read and write entire rows. This design worked well for OLTP systems but creates severe penalties for analytical queries that typically aggregate or filter on specific columns across millions of rows.
Consider a customer transaction table with 50 columns storing behavioral data, demographics, and transaction details. An analytical query calculating monthly revenue by region needs only three columns: transaction_date, amount, and region. In row storage, the database must read all 50 columns for every row scanned, transferring 16x more data from disk than necessary. At petabyte scale, this translates to hours of unnecessary I/O and compute costs.
The problem intensifies with modern requirements. Real-time dashboards need sub-second query responses. Machine learning pipelines process feature columns across billions of training examples. Regulatory compliance requires efficient access to specific attributes without exposing entire records. Cloud storage costs are calculated per GB transferred, making inefficient scans financially unsustainable.
Traditional optimization techniques—indexes, materialized views, query caching—provide limited relief. Indexes help point lookups but don't reduce scan costs for aggregations. Materialized views require predicting query patterns and maintaining multiple copies of data. Caching works only for repeated queries, not ad-hoc exploration. These approaches add complexity without addressing the fundamental storage inefficiency.
Columnar Storage Architecture for Query Optimization
Columnar storage reorganizes data by column rather than row, storing all values for a single attribute contiguously. This seemingly simple change enables multiple optimization layers that dramatically improve analytical query performance.
The architecture separates each column into its own file or block structure. When a query requests specific columns, the system reads only those column files, eliminating unnecessary data transfer. For our 50-column transaction table, reading three columns means accessing only 6% of the data compared to row storage.
Beyond I/O reduction, columnar layout enables aggressive compression. Values within a column share the same data type and often exhibit patterns—repeated values, sorted sequences, limited cardinality. Modern compression algorithms exploit these patterns far more effectively than row-based compression.
// Modern columnar storage implementation using Apache Arrow format
import * as arrow from 'apache-arrow';
import { ParquetReader, ParquetWriter } from '@dsnp/parquetjs';
interface TransactionRecord {
transaction_id: string;
transaction_date: Date;
customer_id: string;
amount: number;
region: string;
// ... 45 additional columns
}
class ColumnarQueryEngine {
private schema: arrow.Schema;
constructor() {
// Define schema with optimized data types
this.schema = new arrow.Schema([
new arrow.Field('transaction_date', new arrow.DateDay()),
new arrow.Field('amount', new arrow.Decimal(10, 2)),
new arrow.Field('region', new arrow.Dictionary(new arrow.Utf8(), new arrow.Int8())),
]);
}
async executeColumnProjection(
filePath: string,
columns: string[],
predicate?: (batch: arrow.RecordBatch) => arrow.Vector
): Promise<arrow.Table> {
const reader = await ParquetReader.openFile(filePath);
const metadata = await reader.getMetadata();
// Identify row groups that satisfy predicates using statistics
const relevantRowGroups = this.pruneRowGroups(
metadata,
predicate
);
// Read only requested columns from relevant row groups
const batches: arrow.RecordBatch[] = [];
for (const rowGroupIndex of relevantRowGroups) {
const rowGroup = metadata.rowGroups[rowGroupIndex];
const columnChunks = await this.readColumnChunks(
reader,
rowGroup,
columns
);
// Apply vectorized operations on decompressed column data
const batch = this.createRecordBatch(columnChunks, columns);
if (predicate) {
const filtered = this.applyVectorizedFilter(batch, predicate);
batches.push(filtered);
} else {
batches.push(batch);
}
}
await reader.close();
return new arrow.Table(batches);
}
private pruneRowGroups(
metadata: any,
predicate?: (batch: arrow.RecordBatch) => arrow.Vector
): number[] {
if (!predicate) {
return metadata.rowGroups.map((_: any, idx: number) => idx);
}
// Use column statistics (min/max, null count) for partition pruning
return metadata.rowGroups
.map((rg: any, idx: number) => ({ rg, idx }))
.filter(({ rg }) => {
// Check if row group statistics satisfy predicate
const stats = rg.columns[0].metaData.statistics;
return this.statisticsSatisfyPredicate(stats, predicate);
})
.map(({ idx }) => idx);
}
private async readColumnChunks(
reader: any,
rowGroup: any,
columns: string[]
): Promise<Map<string, arrow.Vector>> {
const chunks = new Map<string, arrow.Vector>();
// Parallel column reads with decompression
await Promise.all(
columns.map(async (colName) => {
const columnChunk = rowGroup.columns.find(
(c: any) => c.metaData.path_in_schema[0] === colName
);
if (columnChunk) {
// Read compressed column data
const compressedData = await reader.readColumnChunk(
columnChunk.metaData
);
// Decompress using codec specified in metadata
const decompressed = await this.decompress(
compressedData,
columnChunk.metaData.codec
);
// Convert to Arrow vector for vectorized operations
const vector = this.decodeToVector(
decompressed,
columnChunk.metaData.type
);
chunks.set(colName, vector);
}
})
);
return chunks;
}
private applyVectorizedFilter(
batch: arrow.RecordBatch,
predicate: (batch: arrow.RecordBatch) => arrow.Vector
): arrow.RecordBatch {
// Generate boolean mask using SIMD-optimized operations
const mask = predicate(batch);
// Apply mask to all columns in parallel
const filteredColumns = batch.schema.fields.map((field, idx) => {
const column = batch.getChildAt(idx);
return this.filterVector(column!, mask);
});
return new arrow.RecordBatch(
batch.schema,
filteredColumns[0].length,
filteredColumns
);
}
private filterVector(
vector: arrow.Vector,
mask: arrow.Vector
): arrow.Vector {
// Use Arrow's built-in vectorized filtering
const indices: number[] = [];
for (let i = 0; i < mask.length; i++) {
if (mask.get(i)) {
indices.push(i);
}
}
return vector.slice(0, vector.length).filter((_, idx) =>
indices.includes(idx)
);
}
// Additional helper methods for compression, encoding, statistics
private async decompress(data: Buffer, codec: string): Promise<Buffer> {
// Implementation depends on codec (SNAPPY, ZSTD, LZ4)
return data; // Simplified
}
private decodeToVector(data: Buffer, type: string): arrow.Vector {
// Convert Parquet encoding to Arrow vector
return arrow.vectorFromArray([]); // Simplified
}
private statisticsSatisfyPredicate(stats: any, predicate: any): boolean {
// Evaluate predicate against min/max statistics
return true; // Simplified
}
private createRecordBatch(
chunks: Map<string, arrow.Vector>,
columns: string[]
): arrow.RecordBatch {
const fields = columns.map(col =>
new arrow.Field(col, chunks.get(col)!.type)
);
const schema = new arrow.Schema(fields);
const vectors = columns.map(col => chunks.get(col)!);
return new arrow.RecordBatch(schema, vectors[0].length, vectors);
}
}
// Usage example for analytical query
async function analyzeMonthlyRevenue() {
const engine = new ColumnarQueryEngine();
// Query reads only 3 columns instead of all 50
const result = await engine.executeColumnProjection(
'/data/transactions.parquet',
['transaction_date', 'amount', 'region'],
(batch) => {
// Vectorized predicate: transaction_date >= '2025-01-01'
const dates = batch.getChild('transaction_date')!;
const threshold = new Date('2025-01-01').getTime();
return arrow.vectorFromArray(
Array.from({ length: dates.length }, (_, i) =>
dates.get(i).getTime() >= threshold
)
);
}
);
// Aggregate using vectorized operations
const revenueByRegion = new Map<string, number>();
const regionCol = result.getChild('region')!;
const amountCol = result.getChild('amount')!;
for (let i = 0; i < result.numRows; i++) {
const region = regionCol.get(i);
const amount = amountCol.get(i);
revenueByRegion.set(
region,
(revenueByRegion.get(region) || 0) + amount
);
}
return revenueByRegion;
}
This implementation demonstrates several critical optimization layers. Column projection reads only requested attributes from Parquet files. Partition pruning uses row group statistics to skip irrelevant data blocks before reading. Vectorized filtering applies predicates using SIMD operations on decompressed column vectors. Each layer compounds performance gains.
Compression Strategies for Columnar Data
Columnar storage enables compression ratios impossible with row-oriented formats. Homogeneous data types within columns allow specialized encoding schemes that exploit value patterns.
Dictionary encoding works exceptionally well for low-cardinality columns like region, status, or category. Instead of storing repeated string values, the system maintains a dictionary mapping integers to unique strings and stores only integer references. A region column with 50 distinct values across billions of rows compresses from gigabytes to megabytes.
Run-length encoding (RLE) compresses sorted or semi-sorted columns by storing value-count pairs. A timestamp column sorted by date might have millions of consecutive identical values, compressing to a single entry.
Delta encoding stores differences between consecutive values rather than absolute values. Monotonically increasing columns like auto-increment IDs or timestamps compress dramatically—storing deltas of 1 requires far fewer bits than storing full 64-bit integers.
Bit-packing reduces storage for integer columns with limited ranges. If values never exceed 1000, storing them in 10 bits instead of 32 bits saves 68% space.
class ColumnCompressionStrategy {
selectOptimalEncoding(
column: arrow.Vector,
statistics: ColumnStatistics
): EncodingScheme {
const cardinality = statistics.distinctCount;
const totalRows = column.length;
const sortedness = this.calculateSortedness(column);
// Dictionary encoding for low cardinality
if (cardinality < totalRows * 0.1 && cardinality < 10000) {
return {
type: 'DICTIONARY',
estimatedRatio: totalRows / cardinality,
indexBits: Math.ceil(Math.log2(cardinality))
};
}
// RLE for sorted or semi-sorted data
if (sortedness > 0.8) {
const runs = this.countRuns(column);
return {
type: 'RLE',
estimatedRatio: totalRows / runs,
runLengthBits: Math.ceil(Math.log2(totalRows / runs))
};
}
// Delta encoding for monotonic sequences
if (this.isMonotonic(column)) {
const maxDelta = this.calculateMaxDelta(column);
return {
type: 'DELTA',
estimatedRatio: 64 / Math.ceil(Math.log2(maxDelta)),
deltaBits: Math.ceil(Math.log2(maxDelta))
};
}
// Bit-packing for bounded integers
if (column.type.typeId === arrow.Type.Int) {
const range = statistics.max - statistics.min;
const requiredBits = Math.ceil(Math.log2(range + 1));
if (requiredBits < 32) {
return {
type: 'BIT_PACKED',
estimatedRatio: 32 / requiredBits,
bits: requiredBits
};
}
}
// Fallback to general compression
return {
type: 'ZSTD',
estimatedRatio: 2.5, // Typical ratio
level: 3
};
}
private calculateSortedness(column: arrow.Vector): number {
let orderedPairs = 0;
for (let i = 1; i < column.length; i++) {
if (column.get(i) >= column.get(i - 1)) {
orderedPairs++;
}
}
return orderedPairs / (column.length - 1);
}
private countRuns(column: arrow.Vector): number {
let runs = 1;
for (let i = 1; i < column.length; i++) {
if (column.get(i) !== column.get(i - 1)) {
runs++;
}
}
return runs;
}
private isMonotonic(column: arrow.Vector): boolean {
let increasing = true;
let decreasing = true;
for (let i = 1; i < Math.min(1000, column.length); i++) {
const curr = column.get(i);
const prev = column.get(i - 1);
if (curr < prev) increasing = false;
if (curr > prev) decreasing = false;
}
return increasing || decreasing;
}
private calculateMaxDelta(column: arrow.Vector): number {
let maxDelta = 0;
for (let i = 1; i < column.length; i++) {
const delta = Math.abs(column.get(i) - column.get(i - 1));
maxDelta = Math.max(maxDelta, delta);
}
return maxDelta;
}
}
interface EncodingScheme {
type: 'DICTIONARY' | 'RLE' | 'DELTA' | 'BIT_PACKED' | 'ZSTD';
estimatedRatio: number;
[key: string]: any;
}
interface ColumnStatistics {
distinctCount: number;
min: any;
max: any;
nullCount: number;
}
Modern columnar formats like Parquet and ORC apply multiple encoding layers. A column might use dictionary encoding, then RLE on the dictionary indices, then general compression on the result. This cascading approach achieves compression ratios of 10:1 or higher on real-world datasets.
Partition Pruning and Predicate Pushdown
Reading only necessary columns solves half the problem. Partition pruning eliminates entire data blocks before reading, while predicate pushdown applies filters during scan rather than after.
Columnar formats organize data into row groups—typically 100MB to 1GB blocks containing a subset of rows. Each row group stores metadata including min/max values, null counts, and distinct counts for every column. Query engines use these statistics to skip row groups that cannot contain matching rows.
A query filtering WHERE transaction_date >= '2025-01-01' checks each row group's date column statistics. If a row group's maximum date is '2024-12-31', the engine skips it entirely without reading any data. At petabyte scale, this eliminates terabytes of unnecessary I/O.
```typescript class PartitionPruningOptimizer { async optimizeQueryPlan( query: AnalyticalQuery, fileMetadata: ParquetMetadata[] ): Promise { const prunedFiles: PrunedFile[] = [];
for (const file of fileMetadata) { const relevantRowGroups = this.pruneRowGroups( file.rowGroups, query.predicates );
if (relevantRowGroups.length > 0) { prunedFiles.push({ path: file.path, rowGroups: relevantRowGroups, estimatedRows: relevantRowGroups.reduce( (sum, rg) => sum + rg.numRows, 0 ) }); } }
// Sort files by estimated selectivity for optimal scheduling prunedFiles.sort((a, b) => a.estimatedRows - b.estimatedRows);
return { files: prunedFiles, projectedColumns: query.columns, pushedPredicates: this.identifyPushablePredicates(query.predicates), estimatedScanBytes: this.estimateScanSize(prunedFiles, query.columns) }; }
private pruneRowGroups( rowGroups: RowGroupMetadata[], predicates: Predicate[] ): RowGroupMetadata[] { return rowGroups.filter(rg => { return predicates.every(pred => this.rowGroupSatisfiesPredicate(rg, pred) ); }); }
private rowGroupSatisfiesPredicate( rowGroup: RowGroupMetadata, predicate: Predicate ): boolean { const columnStats = rowGroup.columns.find( c => c.name === predicate.column )?.statistics;
if (!columnStats) return true; // No statistics, must scan
switch (predicate.operator) { case