Skip to main content

Command Palette

Search for a command to run...

How to Implement Event Sourcing with PostgreSQL and TypeScript

Building audit trails and temporal queries in production systems

Published
9 min read
T

Welcome to TopperBlog! 👋

I'm a tech content creator passionate about helping developers level up their careers and master cutting-edge technologies.

🎯 What I Write About: • AI/ML Engineering & LLMs • Web3 & Blockchain Development
• System Design & Architecture • Interview Preparation (FAANG) • Freelancing & Remote Work • Modern Tech Stacks (Next.js, React, Rust, TypeScript) • Performance Optimization & Best Practices

💼 Mission: Sharing practical, actionable insights that accelerate your tech career and maximize your earning potential.

📚 15+ In-Depth Guides covering everything from earning $10k/month as a freelancer to cracking FAANG interviews.

🌐 Let's connect and grow together in this amazing tech journey!

#TechBlogger #SoftwareEngineering #CareerGrowth #WebDevelopment #AIEngineering

Content Role: pillar


How to Implement Event Sourcing with PostgreSQL and TypeScript

Modern applications face increasing demands for complete audit trails, temporal queries, and the ability to reconstruct system state at any point in time. Traditional CRUD-based architectures delete or overwrite data, making it impossible to answer critical questions: Who changed this record? What was the state last Tuesday? How did we arrive at this corrupted state? These limitations create compliance risks under GDPR and SOC 2, complicate debugging in distributed systems, and prevent sophisticated analytics on historical data.

Event sourcing implementation addresses these challenges by persisting every state change as an immutable event rather than updating records in place. Instead of storing current state, you store the sequence of events that led to that state. This architectural pattern has moved from niche applications to mainstream adoption in 2025, driven by regulatory requirements, the rise of AI systems that need training data with full context, and the complexity of distributed microservices where understanding causality matters.

The problem is that most event sourcing tutorials demonstrate toy examples with in-memory stores or rely on specialized event store databases that add operational complexity. Production systems need event sourcing that integrates with existing PostgreSQL infrastructure, handles millions of events efficiently, supports concurrent writes without conflicts, and provides practical patterns for projections and queries.

Why Traditional Approaches Fail at Scale

Standard relational database patterns update rows in place. When you modify a user's email address, the old value disappears. This approach worked when applications were simpler, but modern requirements expose critical weaknesses.

First, compliance frameworks now mandate complete audit trails. Healthcare systems under HIPAA, financial platforms under PSD2, and any system handling EU citizen data under GDPR must prove who changed what and when. Bolt-on audit logging tables create performance bottlenecks and don't capture the full context of changes.

Second, distributed systems make debugging nearly impossible without event history. When a microservice produces incorrect output, you need to trace the exact sequence of events across service boundaries. Traditional logs expire, and database snapshots don't show causality.

Third, AI and machine learning systems in 2025 require rich historical data with temporal context. Training models on current state alone produces inferior results compared to learning from event sequences that capture user behavior patterns, system evolution, and causal relationships.

Finally, traditional update-in-place patterns create race conditions in concurrent systems. Two services updating the same record simultaneously can lose data or create inconsistent state. Event sourcing's append-only model eliminates these conflicts naturally.

Core Architecture for Event Sourcing with PostgreSQL

Event sourcing implementation centers on three components: the event store, aggregate roots, and projections. The event store persists immutable events. Aggregate roots enforce business rules and generate events. Projections build queryable read models from event streams.

PostgreSQL serves as an excellent event store foundation. Its ACID guarantees ensure event ordering, JSONB columns efficiently store event payloads, and partitioning handles billions of events. Unlike specialized event stores, PostgreSQL integrates with existing infrastructure, reducing operational overhead.

Here's a production-grade event store schema:

CREATE TABLE events (
    event_id BIGSERIAL PRIMARY KEY,
    aggregate_id UUID NOT NULL,
    aggregate_type VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_version INT NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB NOT NULL DEFAULT '{}',
    occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (occurred_at);

CREATE INDEX idx_events_aggregate ON events(aggregate_id, event_version);
CREATE INDEX idx_events_type ON events(aggregate_type, occurred_at);
CREATE INDEX idx_events_occurred ON events(occurred_at);

-- Create partitions for time-based queries
CREATE TABLE events_2025_q1 PARTITION OF events
    FOR VALUES FROM ('2025-01-01') TO ('2025-04-01');

The schema uses time-based partitioning for query performance and data lifecycle management. The event_version provides optimistic concurrency control. The metadata field captures correlation IDs, user context, and causation chains for distributed tracing.

Implementing Aggregate Roots in TypeScript

Aggregate roots encapsulate business logic and maintain consistency boundaries. They load events, apply them to rebuild state, and generate new events when commands execute.

interface DomainEvent {
  eventType: string;
  eventVersion: number;
  eventData: Record<string, any>;
  metadata: {
    correlationId: string;
    causationId?: string;
    userId?: string;
  };
}

abstract class AggregateRoot {
  protected aggregateId: string;
  protected version: number = 0;
  private uncommittedEvents: DomainEvent[] = [];

  constructor(aggregateId: string) {
    this.aggregateId = aggregateId;
  }

  protected addEvent(event: DomainEvent): void {
    this.version++;
    event.eventVersion = this.version;
    this.uncommittedEvents.push(event);
    this.applyEvent(event);
  }

  protected abstract applyEvent(event: DomainEvent): void;

  public getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  public markEventsAsCommitted(): void {
    this.uncommittedEvents = [];
  }

  public loadFromHistory(events: DomainEvent[]): void {
    events.forEach(event => {
      this.applyEvent(event);
      this.version = event.eventVersion;
    });
  }
}

Here's a concrete implementation for an order aggregate:

interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  eventData: {
    customerId: string;
    items: Array<{ productId: string; quantity: number; price: number }>;
  };
}

interface OrderItemAddedEvent extends DomainEvent {
  eventType: 'OrderItemAdded';
  eventData: {
    productId: string;
    quantity: number;
    price: number;
  };
}

interface OrderShippedEvent extends DomainEvent {
  eventType: 'OrderShipped';
  eventData: {
    trackingNumber: string;
    carrier: string;
  };
}

type OrderEvent = OrderCreatedEvent | OrderItemAddedEvent | OrderShippedEvent;

class Order extends AggregateRoot {
  private customerId?: string;
  private items: Map<string, { quantity: number; price: number }> = new Map();
  private status: 'draft' | 'confirmed' | 'shipped' = 'draft';
  private trackingNumber?: string;

  static create(
    orderId: string,
    customerId: string,
    items: Array<{ productId: string; quantity: number; price: number }>,
    metadata: DomainEvent['metadata']
  ): Order {
    const order = new Order(orderId);
    order.addEvent({
      eventType: 'OrderCreated',
      eventVersion: 0,
      eventData: { customerId, items },
      metadata
    });
    return order;
  }

  addItem(
    productId: string,
    quantity: number,
    price: number,
    metadata: DomainEvent['metadata']
  ): void {
    if (this.status !== 'draft') {
      throw new Error('Cannot add items to non-draft order');
    }

    this.addEvent({
      eventType: 'OrderItemAdded',
      eventVersion: 0,
      eventData: { productId, quantity, price },
      metadata
    });
  }

  ship(trackingNumber: string, carrier: string, metadata: DomainEvent['metadata']): void {
    if (this.status === 'shipped') {
      throw new Error('Order already shipped');
    }

    if (this.items.size === 0) {
      throw new Error('Cannot ship empty order');
    }

    this.addEvent({
      eventType: 'OrderShipped',
      eventVersion: 0,
      eventData: { trackingNumber, carrier },
      metadata
    });
  }

  protected applyEvent(event: DomainEvent): void {
    switch (event.eventType) {
      case 'OrderCreated':
        this.customerId = event.eventData.customerId;
        event.eventData.items.forEach((item: any) => {
          this.items.set(item.productId, {
            quantity: item.quantity,
            price: item.price
          });
        });
        this.status = 'draft';
        break;

      case 'OrderItemAdded':
        const existing = this.items.get(event.eventData.productId);
        this.items.set(event.eventData.productId, {
          quantity: (existing?.quantity || 0) + event.eventData.quantity,
          price: event.eventData.price
        });
        break;

      case 'OrderShipped':
        this.status = 'shipped';
        this.trackingNumber = event.eventData.trackingNumber;
        break;
    }
  }

  getTotal(): number {
    return Array.from(this.items.values())
      .reduce((sum, item) => sum + (item.quantity * item.price), 0);
  }
}

Building the Event Store Repository

The repository handles persistence, optimistic concurrency, and event retrieval:

import { Pool } from 'pg';

class EventStore {
  constructor(private pool: Pool) {}

  async save(
    aggregateId: string,
    aggregateType: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Check for concurrency conflicts
      const result = await client.query(
        `SELECT MAX(event_version) as current_version 
         FROM events 
         WHERE aggregate_id = $1`,
        [aggregateId]
      );

      const currentVersion = result.rows[0]?.current_version || 0;

      if (currentVersion !== expectedVersion) {
        throw new Error(
          `Concurrency conflict: expected version ${expectedVersion}, found ${currentVersion}`
        );
      }

      // Insert events
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            aggregate_id, aggregate_type, event_type, 
            event_version, event_data, metadata, occurred_at
          ) VALUES ($1, $2, $3, $4, $5, $6, NOW())`,
          [
            aggregateId,
            aggregateType,
            event.eventType,
            event.eventVersion,
            JSON.stringify(event.eventData),
            JSON.stringify(event.metadata)
          ]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT event_type, event_version, event_data, metadata
       FROM events
       WHERE aggregate_id = $1
       ORDER BY event_version ASC`,
      [aggregateId]
    );

    return result.rows.map(row => ({
      eventType: row.event_type,
      eventVersion: row.event_version,
      eventData: row.event_data,
      metadata: row.metadata
    }));
  }

  async getEventsByType(
    aggregateType: string,
    fromTimestamp: Date,
    limit: number = 1000
  ): Promise<Array<DomainEvent & { aggregateId: string }>> {
    const result = await this.pool.query(
      `SELECT aggregate_id, event_type, event_version, event_data, metadata
       FROM events
       WHERE aggregate_type = $1 AND occurred_at >= $2
       ORDER BY occurred_at ASC
       LIMIT $3`,
      [aggregateType, fromTimestamp, limit]
    );

    return result.rows.map(row => ({
      aggregateId: row.aggregate_id,
      eventType: row.event_type,
      eventVersion: row.event_version,
      eventData: row.event_data,
      metadata: row.metadata
    }));
  }
}

Implementing Projections for Query Models

Projections transform event streams into queryable read models. This implements the CQRS pattern, separating write models (aggregates) from read models (projections).

interface OrderProjection {
  orderId: string;
  customerId: string;
  status: string;
  totalAmount: number;
  itemCount: number;
  trackingNumber?: string;
  createdAt: Date;
  shippedAt?: Date;
}

class OrderProjectionBuilder {
  constructor(private pool: Pool) {}

  async initialize(): Promise<void> {
    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS order_projections (
        order_id UUID PRIMARY KEY,
        customer_id UUID NOT NULL,
        status VARCHAR(50) NOT NULL,
        total_amount DECIMAL(10, 2) NOT NULL,
        item_count INT NOT NULL,
        tracking_number VARCHAR(100),
        created_at TIMESTAMPTZ NOT NULL,
        shipped_at TIMESTAMPTZ,
        last_event_version INT NOT NULL
      );

      CREATE INDEX IF NOT EXISTS idx_order_customer 
        ON order_projections(customer_id);
      CREATE INDEX IF NOT EXISTS idx_order_status 
        ON order_projections(status);
    `);
  }

  async handleEvent(aggregateId: string, event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await this.handleOrderCreated(aggregateId, event as OrderCreatedEvent);
        break;
      case 'OrderItemAdded':
        await this.handleOrderItemAdded(aggregateId, event as OrderItemAddedEvent);
        break;
      case 'OrderShipped':
        await this.handleOrderShipped(aggregateId, event as OrderShippedEvent);
        break;
    }
  }

  private async handleOrderCreated(
    orderId: string,
    event: OrderCreatedEvent
  ): Promise<void> {
    const totalAmount = event.eventData.items.reduce(
      (sum, item) => sum + (item.quantity * item.price),
      0
    );
    const itemCount = event.eventData.items.reduce(
      (sum, item) => sum + item.quantity,
      0
    );

    await this.pool.query(
      `INSERT INTO order_projections (
        order_id, customer_id, status, total_amount, 
        item_count, created_at, last_event_version
      ) VALUES ($1, $2, $3, $4, $5, NOW(), $6)`,
      [orderId, event.eventData.customerId, 'draft', totalAmount, itemCount, event.eventVersion]
    );
  }

  private async handleOrderItemAdded(
    orderId: string,
    event: OrderItemAddedEvent
  ): Promise<void> {
    await this.pool.query(
      `UPDATE order_projections
       SET total_amount = total_amount + ($1 * $2),
           item_count = item_count + $1,
           last_event_version = $3
       WHERE order_id = $4`,
      [event.eventData.quantity, event.eventData.price, event.eventVersion, orderId]
    );
  }

  private async handleOrderShipped(
    orderId: string,
    event: OrderShippedEvent
  ): Promise<void> {
    await this.pool.query(
      `UPDATE order_projections
       SET status = 'shipped',
           tracking_number = $1,
           shipped_at = NOW(),
           last_event_version = $2
       WHERE order_id = $3`,
      [event.eventData.trackingNumber, event.eventVersion, orderId]
    );
  }

  async rebuildProjection(orderId: string, eventStore: EventStore): Promise<void> {
    const events = await eventStore.getEvents(orderId);

    await this.pool.query('DELETE FROM order_projections WHERE order_id = $1', [orderId]);

    for (const event of events) {
      await this.handleEvent(orderId, event);
    }
  }
}

Handling Temporal Queries and Point-in-Time Reconstruction

One of event sourcing's most powerful capabilities is reconstructing state at any point in time. This enables temporal queries like "What was this order's status on March 15th?" or "Show me all orders that were pending last quarter."

class TemporalQueryService {
  constructor(
    private pool: Pool,
    private eventStore: EventStore
  ) {}

  async getAggregateStateAt(
    aggregateId: string,
    timestamp: Date
  ): Promise<Order> {
    const result = await this.pool.query(
      `SELECT event_type, event_version, event_data, metadata
       FROM events
       WHERE aggregate_id = $1 AND occurred_at <= $2
       ORDER BY event_version ASC`,
      [aggregateId, timestamp]
    );

    const events: DomainEvent[] = result.rows.map(row => ({
      eventType: row.event_type,
      eventVersion: row.event_version,
      eventData: row.event_data,
      metadata: row.metadata
    }));

    const order = new Order(aggregateId);
    order.loadFromHistory(events);
    return order;
  }

  async getOrdersByStatusAt(
    status: string,
    timestamp: Date
  ): Promise<string[]> {
    // This requires replaying events up to the timestamp
    // In production, maintain temporal projections or snapshots
    const result = await this.pool.query(
      `SELECT DISTINCT aggregate_id
       FROM events
       WHERE aggregate_type = 'Order' 
         AND occurred_at <= $1
       ORDER BY aggregate_id`,
      [timestamp]
    );

    const orderIds: string[] = [];

    for (const row of result.rows) {
      const order = await this.getAggregateStateAt(row.aggregate_id, timestamp);
      // Access internal state through a getter method in production
      if ((order as any).status === status) {
        orderIds.push(row.aggregate_id);
      }
    }

    return orderIds;
  }
}

Optimizing Performance with Snapshots

Replaying thousands of events for every aggregate load becomes expensive. Snapshots cache aggregate state at specific versions, reducing replay overhead.

```typescript interface Snapshot { aggregateId: string; aggregateType: string; version: number; state: Record; createdAt: Date; }

class SnapshotStore { constructor(private pool: Pool) {}

async initialize(): Promise { await this.pool.query(` CREATE TABLE IF NOT EXISTS snapshots ( aggregate_id UUID PRIMARY KEY, aggregate_type VARCHAR(100) NOT NULL, version INT NOT NULL, state JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() );

CREATE INDEX idx_snapshots_type ON snapshots(aggregate_type); `); }

async saveSnapshot(snapshot: Snapshot): Promise { await this.pool.query( `INSERT INTO snapshots