Skip to main content

Command Palette

Search for a command to run...

Event Sourcing: Implementation Guide

Published
9 min read
T

Welcome to TopperBlog! 👋

I'm a tech content creator passionate about helping developers level up their careers and master cutting-edge technologies.

🎯 What I Write About: • AI/ML Engineering & LLMs • Web3 & Blockchain Development
• System Design & Architecture • Interview Preparation (FAANG) • Freelancing & Remote Work • Modern Tech Stacks (Next.js, React, Rust, TypeScript) • Performance Optimization & Best Practices

💼 Mission: Sharing practical, actionable insights that accelerate your tech career and maximize your earning potential.

📚 15+ In-Depth Guides covering everything from earning $10k/month as a freelancer to cracking FAANG interviews.

🌐 Let's connect and grow together in this amazing tech journey!

#TechBlogger #SoftwareEngineering #CareerGrowth #WebDevelopment #AIEngineering

Why Traditional State-Based Systems Fail Modern Requirements

State-based databases store only the current state of entities. When a user updates their profile, the old data vanishes. This approach worked when systems were monolithic and compliance requirements were minimal, but it fundamentally conflicts with modern operational needs.

Consider a payment processing system handling 50,000 transactions per second. Traditional approaches require complex change data capture (CDC) mechanisms, separate audit tables, and fragile triggers that introduce race conditions. When regulators demand a complete history of a disputed transaction, teams must reconstruct events from scattered logs, database backups, and external monitoring systems—a process that takes days and often produces incomplete results.

The shift to distributed microservices compounds these problems. Multiple services updating shared state create consistency nightmares. Eventual consistency models help with availability but make it impossible to understand causality. Did the inventory decrease because of a purchase, a return, or a correction? Without event sourcing, you're guessing.

AI and machine learning workloads in 2025 require complete historical datasets for model training. Deleting or overwriting data means losing the temporal patterns that make predictions accurate. Event sourcing provides the immutable, time-ordered data that modern ML pipelines demand.

Core Event Sourcing Implementation Architecture

Event sourcing implementation centers on three components: the event store, the write model, and the read model. The event store is an append-only log of domain events. The write model validates commands and produces events. The read model projects events into queryable views optimized for specific use cases.

Here's a production-grade event store implementation using TypeScript and PostgreSQL:

interface DomainEvent {
  eventId: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  eventVersion: number;
  payload: Record<string, unknown>;
  metadata: {
    timestamp: Date;
    userId: string;
    correlationId: string;
    causationId: string;
  };
}

class EventStore {
  constructor(private pool: pg.Pool) {}

  async appendEvents(
    aggregateId: string,
    expectedVersion: number,
    events: DomainEvent[]
  ): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query('BEGIN');

      // Optimistic concurrency check
      const versionCheck = await client.query(
        `SELECT COALESCE(MAX(event_version), 0) as current_version 
         FROM events 
         WHERE aggregate_id = $1 
         FOR UPDATE`,
        [aggregateId]
      );

      const currentVersion = versionCheck.rows[0].current_version;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(
          `Expected version ${expectedVersion}, found ${currentVersion}`
        );
      }

      // Append events atomically
      for (const event of events) {
        await client.query(
          `INSERT INTO events (
            event_id, aggregate_id, aggregate_type, event_type,
            event_version, payload, metadata, created_at
          ) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())`,
          [
            event.eventId,
            event.aggregateId,
            event.aggregateType,
            event.eventType,
            event.eventVersion,
            JSON.stringify(event.payload),
            JSON.stringify(event.metadata)
          ]
        );
      }

      await client.query('COMMIT');

      // Publish to message bus asynchronously
      await this.publishEvents(events);

    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async loadEvents(
    aggregateId: string,
    fromVersion: number = 0
  ): Promise<DomainEvent[]> {
    const result = await this.pool.query(
      `SELECT * FROM events 
       WHERE aggregate_id = $1 AND event_version > $2
       ORDER BY event_version ASC`,
      [aggregateId, fromVersion]
    );

    return result.rows.map(row => ({
      eventId: row.event_id,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      eventType: row.event_type,
      eventVersion: row.event_version,
      payload: row.payload,
      metadata: row.metadata
    }));
  }
}

This implementation handles optimistic concurrency through version checking, ensuring that concurrent writes to the same aggregate fail fast rather than corrupting state. The FOR UPDATE lock prevents race conditions during version validation.

Building Aggregates with Event Replay

Aggregates reconstruct their state by replaying events. This pattern separates write validation from read optimization:

abstract class AggregateRoot {
  protected aggregateId: string;
  protected version: number = 0;
  private uncommittedEvents: DomainEvent[] = [];

  protected applyEvent(event: DomainEvent): void {
    this.mutate(event);
    this.version = event.eventVersion;
  }

  protected abstract mutate(event: DomainEvent): void;

  protected addEvent(eventType: string, payload: Record<string, unknown>): void {
    const event: DomainEvent = {
      eventId: crypto.randomUUID(),
      aggregateId: this.aggregateId,
      aggregateType: this.constructor.name,
      eventType,
      eventVersion: this.version + 1,
      payload,
      metadata: {
        timestamp: new Date(),
        userId: getCurrentUserId(),
        correlationId: getCorrelationId(),
        causationId: getCausationId()
      }
    };

    this.applyEvent(event);
    this.uncommittedEvents.push(event);
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this.uncommittedEvents];
  }

  markEventsAsCommitted(): void {
    this.uncommittedEvents = [];
  }

  loadFromHistory(events: DomainEvent[]): void {
    events.forEach(event => this.applyEvent(event));
  }
}

class BankAccount extends AggregateRoot {
  private balance: number = 0;
  private isActive: boolean = true;

  static create(accountId: string, initialDeposit: number): BankAccount {
    const account = new BankAccount();
    account.aggregateId = accountId;
    account.addEvent('AccountOpened', { initialDeposit });
    return account;
  }

  deposit(amount: number): void {
    if (!this.isActive) {
      throw new Error('Cannot deposit to closed account');
    }
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive');
    }

    this.addEvent('MoneyDeposited', { amount });
  }

  withdraw(amount: number): void {
    if (!this.isActive) {
      throw new Error('Cannot withdraw from closed account');
    }
    if (amount > this.balance) {
      throw new Error('Insufficient funds');
    }

    this.addEvent('MoneyWithdrawn', { amount });
  }

  protected mutate(event: DomainEvent): void {
    switch (event.eventType) {
      case 'AccountOpened':
        this.balance = event.payload.initialDeposit as number;
        this.isActive = true;
        break;
      case 'MoneyDeposited':
        this.balance += event.payload.amount as number;
        break;
      case 'MoneyWithdrawn':
        this.balance -= event.payload.amount as number;
        break;
      case 'AccountClosed':
        this.isActive = false;
        break;
    }
  }
}

Implementing CQRS Read Models

Event sourcing pairs naturally with CQRS (Command Query Responsibility Segregation). Write models enforce business rules; read models optimize queries:

class AccountProjection {
  constructor(private db: pg.Pool) {}

  async handleEvent(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'AccountOpened':
        await this.db.query(
          `INSERT INTO account_balances (account_id, balance, status, updated_at)
           VALUES ($1, $2, 'active', $3)`,
          [event.aggregateId, event.payload.initialDeposit, event.metadata.timestamp]
        );
        break;

      case 'MoneyDeposited':
        await this.db.query(
          `UPDATE account_balances 
           SET balance = balance + $1, updated_at = $2
           WHERE account_id = $3`,
          [event.payload.amount, event.metadata.timestamp, event.aggregateId]
        );
        break;

      case 'MoneyWithdrawn':
        await this.db.query(
          `UPDATE account_balances 
           SET balance = balance - $1, updated_at = $2
           WHERE account_id = $3`,
          [event.payload.amount, event.metadata.timestamp, event.aggregateId]
        );
        break;
    }
  }

  async getAccountBalance(accountId: string): Promise<number> {
    const result = await this.db.query(
      'SELECT balance FROM account_balances WHERE account_id = $1',
      [accountId]
    );
    return result.rows[0]?.balance ?? 0;
  }
}

Read models can be rebuilt from scratch by replaying all events. This enables schema migrations, bug fixes in projection logic, and creating new views without touching the event store.

Handling Event Versioning and Schema Evolution

Events are immutable, but business requirements change. Event versioning strategies prevent breaking existing projections:

interface EventUpgrader {
  canUpgrade(event: DomainEvent): boolean;
  upgrade(event: DomainEvent): DomainEvent;
}

class MoneyDepositedV1ToV2Upgrader implements EventUpgrader {
  canUpgrade(event: DomainEvent): boolean {
    return event.eventType === 'MoneyDeposited' && 
           !event.payload.currency;
  }

  upgrade(event: DomainEvent): DomainEvent {
    return {
      ...event,
      eventType: 'MoneyDepositedV2',
      payload: {
        ...event.payload,
        currency: 'USD' // Default for legacy events
      }
    };
  }
}

class EventUpgraderPipeline {
  private upgraders: EventUpgrader[] = [];

  register(upgrader: EventUpgrader): void {
    this.upgraders.push(upgrader);
  }

  upgrade(event: DomainEvent): DomainEvent {
    let upgradedEvent = event;

    for (const upgrader of this.upgraders) {
      if (upgrader.canUpgrade(upgradedEvent)) {
        upgradedEvent = upgrader.upgrade(upgradedEvent);
      }
    }

    return upgradedEvent;
  }
}

Snapshot Strategies for Performance Optimization

Replaying thousands of events for every command becomes prohibitively expensive. Snapshots cache aggregate state at specific versions:

interface Snapshot {
  aggregateId: string;
  aggregateType: string;
  version: number;
  state: Record<string, unknown>;
  createdAt: Date;
}

class SnapshotStore {
  constructor(private pool: pg.Pool) {}

  async saveSnapshot(snapshot: Snapshot): Promise<void> {
    await this.pool.query(
      `INSERT INTO snapshots (aggregate_id, aggregate_type, version, state, created_at)
       VALUES ($1, $2, $3, $4, $5)
       ON CONFLICT (aggregate_id) 
       DO UPDATE SET version = $3, state = $4, created_at = $5`,
      [
        snapshot.aggregateId,
        snapshot.aggregateType,
        snapshot.version,
        JSON.stringify(snapshot.state),
        snapshot.createdAt
      ]
    );
  }

  async loadSnapshot(aggregateId: string): Promise<Snapshot | null> {
    const result = await this.pool.query(
      'SELECT * FROM snapshots WHERE aggregate_id = $1',
      [aggregateId]
    );

    if (result.rows.length === 0) return null;

    const row = result.rows[0];
    return {
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      version: row.version,
      state: row.state,
      createdAt: row.created_at
    };
  }
}

class AggregateRepository {
  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore,
    private snapshotFrequency: number = 100
  ) {}

  async load<T extends AggregateRoot>(
    aggregateId: string,
    factory: () => T
  ): Promise<T> {
    const aggregate = factory();
    const snapshot = await this.snapshotStore.loadSnapshot(aggregateId);

    let fromVersion = 0;
    if (snapshot) {
      Object.assign(aggregate, snapshot.state);
      fromVersion = snapshot.version;
    }

    const events = await this.eventStore.loadEvents(aggregateId, fromVersion);
    aggregate.loadFromHistory(events);

    return aggregate;
  }

  async save(aggregate: AggregateRoot): Promise<void> {
    const events = aggregate.getUncommittedEvents();
    if (events.length === 0) return;

    const currentVersion = aggregate['version'] - events.length;
    await this.eventStore.appendEvents(
      aggregate['aggregateId'],
      currentVersion,
      events
    );

    aggregate.markEventsAsCommitted();

    // Create snapshot every N events
    if (aggregate['version'] % this.snapshotFrequency === 0) {
      await this.snapshotStore.saveSnapshot({
        aggregateId: aggregate['aggregateId'],
        aggregateType: aggregate.constructor.name,
        version: aggregate['version'],
        state: this.serializeAggregate(aggregate),
        createdAt: new Date()
      });
    }
  }

  private serializeAggregate(aggregate: AggregateRoot): Record<string, unknown> {
    // Serialize only state properties, not methods
    return JSON.parse(JSON.stringify(aggregate));
  }
}

Snapshot frequency depends on aggregate characteristics. High-frequency aggregates (thousands of events) benefit from snapshots every 50-100 events. Low-frequency aggregates may never need snapshots.

Common Pitfalls and Failure Modes

Event Store as Message Queue: Teams often conflate the event store with a message bus. The event store is the source of truth; the message bus handles distribution. Losing messages from the bus is acceptable if you can rebuild projections from the event store. Losing events from the store is catastrophic.

Large Event Payloads: Storing entire documents in events bloats storage and slows queries. Events should contain only the data that changed. Reference external storage for large payloads like images or documents.

Missing Correlation IDs: Without correlation and causation IDs in metadata, debugging distributed workflows becomes impossible. Every event must trace back to the originating command and user action.

Synchronous Projection Updates: Updating read models synchronously during command handling couples write and read performance. Use asynchronous event handlers with at-least-once delivery guarantees.

Ignoring Idempotency: Event handlers will receive duplicate events due to retries. Every handler must be idempotent, typically by tracking processed event IDs.

Unbounded Event Streams: Aggregates that accumulate events indefinitely (like user activity logs) need archival strategies. Move old events to cold storage and implement time-based snapshots.

Best Practices for Production Event Sourcing

Implement Event Metadata Standards: Every event needs timestamp, user ID, correlation ID, and causation ID. This metadata enables distributed tracing, audit trails, and debugging.

Use Semantic Event Names: Name events in past tense describing business facts: OrderPlaced, PaymentProcessed, InventoryReserved. Avoid technical names like DataUpdated.

Design Events for Replay: Events must be self-contained. Avoid references to external state that might change. Include all data needed to process the event independently.

Partition Event Streams: Distribute events across partitions by aggregate ID for horizontal scaling. Ensure all events for a single aggregate land in the same partition to maintain ordering.

Monitor Projection Lag: Track the delay between event creation and projection updates. Lag exceeding SLAs indicates scaling issues or handler failures.

Implement Event Archival: Archive events older than your compliance requirements to cold storage. Keep recent events in hot storage for fast replay.

Version Events from Day One: Even if you don't need versioning initially, include a version field in every event. Retrofitting versioning is painful.

Test Event Replay: Regularly rebuild projections from scratch in staging environments. This validates that your event store contains complete information and projection logic is correct.

FAQ

What is event sourcing implementation and how does it differ from event-driven architecture?

Event sourcing implementation is a specific pattern where all state changes are stored as immutable events in an append-only log, and current state is derived by replaying these events. Event-driven architecture is a broader architectural style where components communicate through events. Event sourcing is one pattern within event-driven architecture, focused specifically on persistence and state management rather than just inter-service communication.

How does event sourcing work in distributed systems in 2026?

Modern event sourcing implementations use distributed event stores like EventStoreDB or Kafka with exactly-once semantics. Events are partitioned by aggregate ID for parallel processing while maintaining ordering guarantees within each aggregate. Read models run as separate services consuming events asynchronously, enabling independent scaling. Cloud-native implementations leverage managed services for event storage, reducing operational overhead while maintaining consistency guarantees through optimistic concurrency control.

What is the best way to handle event versioning in production systems?

Use an event upgrader pipeline that transforms old event versions to new schemas during replay. Store events in their original format but apply transformations when loading. For breaking changes, create new event types (e.g., OrderPlacedV2) and maintain handlers for both versions. Include a schema version in event metadata. Avoid in-place event mutations—immutability is fundamental to event sourcing. Plan for versioning from the start; retrofitting is significantly more complex.

When should you avoid using event sourcing?

Avoid event sourcing for simple CRUD applications