
事件溯源与CQRS:完整实现
事件溯源基础
不存储当前状态,而是将每次变更存储为不可变事件。
// 事件是事实——它们已经发生,永不改变
type BankAccountEvent =
| { type: 'AccountOpened'; accountId: string; ownerId: string; initialDeposit: number; timestamp: Date }
| { type: 'MoneyDeposited'; accountId: string; amount: number; timestamp: Date }
| { type: 'MoneyWithdrawn'; accountId: string; amount: number; timestamp: Date }
| { type: 'AccountClosed'; accountId: string; timestamp: Date };
// 从事件重建聚合
class BankAccount {
id!: string;
ownerId!: string;
balance = 0;
closed = false;
static fromEvents(events: BankAccountEvent[]): BankAccount {
const account = new BankAccount();
for (const event of events) {
account.apply(event);
}
return account;
}
private apply(event: BankAccountEvent): void {
switch (event.type) {
case 'AccountOpened':
this.id = event.accountId;
this.ownerId = event.ownerId;
this.balance = event.initialDeposit;
break;
case 'MoneyDeposited':
this.balance += event.amount;
break;
case 'MoneyWithdrawn':
this.balance -= event.amount;
break;
case 'AccountClosed':
this.closed = true;
break;
}
}
deposit(amount: number): BankAccountEvent {
if (this.closed) throw new Error('Account is closed');
if (amount <= 0) throw new Error('Amount must be positive');
return { type: 'MoneyDeposited', accountId: this.id, amount, timestamp: new Date() };
}
withdraw(amount: number): BankAccountEvent {
if (this.closed) throw new Error('Account is closed');
if (amount > this.balance) throw new Error('Insufficient funds');
return { type: 'MoneyWithdrawn', accountId: this.id, amount, timestamp: new Date() };
}
}

事件存储
interface EventStore {
append(streamId: string, events: BankAccountEvent[], expectedVersion: number): Promise<void>;
load(streamId: string, fromVersion?: number): Promise<BankAccountEvent[]>;
}
class PostgresEventStore implements EventStore {
async append(streamId: string, events: BankAccountEvent[], expectedVersion: number): Promise<void> {
await this.db.transaction(async (trx) => {
// 乐观并发检查
const current = await trx.query(
'SELECT MAX(version) as version FROM events WHERE stream_id = $1',
[streamId]
);
const currentVersion = current.rows[0].version ?? -1;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(`Expected version ${expectedVersion}, got ${currentVersion}`);
}
for (let i = 0; i < events.length; i++) {
await trx.query(
'INSERT INTO events (stream_id, version, type, data, timestamp) VALUES ($1, $2, $3, $4, $5)',
[streamId, expectedVersion + i + 1, events[i].type, JSON.stringify(events[i]), events[i].timestamp]
);
}
});
}
async load(streamId: string, fromVersion = 0): Promise<BankAccountEvent[]> {
const result = await this.db.query(
'SELECT data FROM events WHERE stream_id = $1 AND version >= $2 ORDER BY version',
[streamId, fromVersion]
);
return result.rows.map(r => JSON.parse(r.data));
}
}

快照优化性能
class BankAccountRepository {
constructor(private eventStore: EventStore, private snapshotStore: SnapshotStore) {}
async load(accountId: string): Promise<BankAccount> {
// 从快照 + 最近事件加载
const snapshot = await this.snapshotStore.getLatest(accountId);
const fromVersion = snapshot ? snapshot.version + 1 : 0;
const events = await this.eventStore.load(accountId, fromVersion);
let account: BankAccount;
if (snapshot) {
account = BankAccount.fromSnapshot(snapshot.state);
} else {
account = new BankAccount();
}
for (const event of events) account.apply(event);
// 每50个事件保存一次快照
if (events.length >= 50) {
await this.snapshotStore.save(accountId, account.toSnapshot());
}
return account;
}
}

CQRS 读模型(投影)
// 独立的读模型,针对查询优化
class AccountBalanceProjection {
async handle(event: BankAccountEvent): Promise<void> {
switch (event.type) {
case 'AccountOpened':
await this.db.insert('account_balances', {
account_id: event.accountId,
balance: event.initialDeposit,
owner_id: event.ownerId,
updated_at: event.timestamp,
});
break;
case 'MoneyDeposited':
await this.db.query(
'UPDATE account_balances SET balance = balance + $1, updated_at = $2 WHERE account_id = $3',
[event.amount, event.timestamp, event.accountId]
);
break;
case 'MoneyWithdrawn':
await this.db.query(
'UPDATE account_balances SET balance = balance - $1, updated_at = $2 WHERE account_id = $3',
[event.amount, event.timestamp, event.accountId]
);
break;
}
}
}
// 查询端简单——仅从投影表读取
class AccountQueryService {
async getBalance(accountId: string): Promise<{ balance: number }> {
const row = await this.db.queryOne(
'SELECT balance FROM account_balances WHERE account_id = $1',
[accountId]
);
return { balance: row.balance };
}
async getTransactionHistory(accountId: string): Promise<Transaction[]> {
const events = await this.eventStore.load(accountId);
return events
.filter(e => e.type === 'MoneyDeposited' || e.type === 'MoneyWithdrawn')
.map(e => ({
type: e.type === 'MoneyDeposited' ? 'credit' : 'debit',
amount: e.amount,
timestamp: e.timestamp,
}));
}
}
事件溯源提供了完整的审计跟踪,但增加了复杂性——仅在审计跟踪真正有价值时使用。