正在加载,请稍候…

事件溯源与CQRS:完整实现指南

在生产环境中实现事件溯源和CQRS。学习事件存储、投影、快照、时间查询以及处理事件驱动系统中的模式演化。

事件溯源与CQRS:完整实现指南

事件溯源与CQRS:完整实现

事件溯源基础

不存储当前状态,而是将每次变更存储为不可变事件。

// 事件是事实——它们已经发生,永不改变
type BankAccountEvent =
  | { type: 'AccountOpened'; accountId: string; ownerId: string; initialDeposit: number; timestamp: Date }
  | { type: 'MoneyDeposited'; accountId: string; amount: number; timestamp: Date }
  | { type: 'MoneyWithdrawn'; accountId: string; amount: number; timestamp: Date }
  | { type: 'AccountClosed'; accountId: string; timestamp: Date };

// 从事件重建聚合
class BankAccount {
  id!: string;
  ownerId!: string;
  balance = 0;
  closed = false;

  static fromEvents(events: BankAccountEvent[]): BankAccount {
    const account = new BankAccount();
    for (const event of events) {
      account.apply(event);
    }
    return account;
  }

  private apply(event: BankAccountEvent): void {
    switch (event.type) {
      case 'AccountOpened':
        this.id = event.accountId;
        this.ownerId = event.ownerId;
        this.balance = event.initialDeposit;
        break;
      case 'MoneyDeposited':
        this.balance += event.amount;
        break;
      case 'MoneyWithdrawn':
        this.balance -= event.amount;
        break;
      case 'AccountClosed':
        this.closed = true;
        break;
    }
  }

  deposit(amount: number): BankAccountEvent {
    if (this.closed) throw new Error('Account is closed');
    if (amount <= 0) throw new Error('Amount must be positive');

    return { type: 'MoneyDeposited', accountId: this.id, amount, timestamp: new Date() };
  }

  withdraw(amount: number): BankAccountEvent {
    if (this.closed) throw new Error('Account is closed');
    if (amount > this.balance) throw new Error('Insufficient funds');

    return { type: 'MoneyWithdrawn', accountId: this.id, amount, timestamp: new Date() };
  }
}

事件溯源与CQRS:完整实现指南插图

事件存储

interface EventStore {
  append(streamId: string, events: BankAccountEvent[], expectedVersion: number): Promise<void>;
  load(streamId: string, fromVersion?: number): Promise<BankAccountEvent[]>;
}

class PostgresEventStore implements EventStore {
  async append(streamId: string, events: BankAccountEvent[], expectedVersion: number): Promise<void> {
    await this.db.transaction(async (trx) => {
      // 乐观并发检查
      const current = await trx.query(
        'SELECT MAX(version) as version FROM events WHERE stream_id = $1',
        [streamId]
      );
      const currentVersion = current.rows[0].version ?? -1;

      if (currentVersion !== expectedVersion) {
        throw new ConcurrencyError(`Expected version ${expectedVersion}, got ${currentVersion}`);
      }

      for (let i = 0; i < events.length; i++) {
        await trx.query(
          'INSERT INTO events (stream_id, version, type, data, timestamp) VALUES ($1, $2, $3, $4, $5)',
          [streamId, expectedVersion + i + 1, events[i].type, JSON.stringify(events[i]), events[i].timestamp]
        );
      }
    });
  }

  async load(streamId: string, fromVersion = 0): Promise<BankAccountEvent[]> {
    const result = await this.db.query(
      'SELECT data FROM events WHERE stream_id = $1 AND version >= $2 ORDER BY version',
      [streamId, fromVersion]
    );
    return result.rows.map(r => JSON.parse(r.data));
  }
}

事件溯源与CQRS:完整实现指南插图

快照优化性能

class BankAccountRepository {
  constructor(private eventStore: EventStore, private snapshotStore: SnapshotStore) {}

  async load(accountId: string): Promise<BankAccount> {
    // 从快照 + 最近事件加载
    const snapshot = await this.snapshotStore.getLatest(accountId);
    const fromVersion = snapshot ? snapshot.version + 1 : 0;
    const events = await this.eventStore.load(accountId, fromVersion);

    let account: BankAccount;
    if (snapshot) {
      account = BankAccount.fromSnapshot(snapshot.state);
    } else {
      account = new BankAccount();
    }

    for (const event of events) account.apply(event);

    // 每50个事件保存一次快照
    if (events.length >= 50) {
      await this.snapshotStore.save(accountId, account.toSnapshot());
    }

    return account;
  }
}

事件溯源与CQRS:完整实现指南插图

CQRS 读模型(投影)

// 独立的读模型,针对查询优化
class AccountBalanceProjection {
  async handle(event: BankAccountEvent): Promise<void> {
    switch (event.type) {
      case 'AccountOpened':
        await this.db.insert('account_balances', {
          account_id: event.accountId,
          balance: event.initialDeposit,
          owner_id: event.ownerId,
          updated_at: event.timestamp,
        });
        break;
      case 'MoneyDeposited':
        await this.db.query(
          'UPDATE account_balances SET balance = balance + $1, updated_at = $2 WHERE account_id = $3',
          [event.amount, event.timestamp, event.accountId]
        );
        break;
      case 'MoneyWithdrawn':
        await this.db.query(
          'UPDATE account_balances SET balance = balance - $1, updated_at = $2 WHERE account_id = $3',
          [event.amount, event.timestamp, event.accountId]
        );
        break;
    }
  }
}

// 查询端简单——仅从投影表读取
class AccountQueryService {
  async getBalance(accountId: string): Promise<{ balance: number }> {
    const row = await this.db.queryOne(
      'SELECT balance FROM account_balances WHERE account_id = $1',
      [accountId]
    );
    return { balance: row.balance };
  }

  async getTransactionHistory(accountId: string): Promise<Transaction[]> {
    const events = await this.eventStore.load(accountId);
    return events
      .filter(e => e.type === 'MoneyDeposited' || e.type === 'MoneyWithdrawn')
      .map(e => ({
        type: e.type === 'MoneyDeposited' ? 'credit' : 'debit',
        amount: e.amount,
        timestamp: e.timestamp,
      }));
  }
}

事件溯源提供了完整的审计跟踪,但增加了复杂性——仅在审计跟踪真正有价值时使用。