事件驱动架构:模式与实现
事件驱动系统通过事件而非直接调用来通信,从而实现松耦合。
核心概念
// 领域事件
interface DomainEvent {
eventId: string;
eventType: string;
occurredAt: Date;
aggregateId: string;
}
class OrderPlaced implements DomainEvent {
readonly eventId = crypto.randomUUID();
readonly eventType = 'order.placed';
readonly occurredAt = new Date();
constructor(
readonly aggregateId: string,
readonly customerId: string,
readonly items: OrderItem[],
readonly total: number
) {}
}

事件溯源
将状态存储为一系列事件,而非当前快照。
class OrderAggregate {
private events: DomainEvent[] = [];
private state: OrderState = { status: 'pending', items: [] };
// 应用事件以重建状态
private apply(event: DomainEvent): void {
if (event instanceof OrderPlaced) {
this.state = { status: 'placed', items: event.items };
} else if (event instanceof OrderShipped) {
this.state = { ...this.state, status: 'shipped', trackingId: event.trackingId };
}
}
// 命令:下单
place(customerId: string, items: OrderItem[]) {
const event = new OrderPlaced(this.id, customerId, items, this.calculateTotal(items));
this.events.push(event);
this.apply(event);
return event;
}
// 从事件历史重建
static fromEvents(id: string, events: DomainEvent[]): OrderAggregate {
const order = new OrderAggregate(id);
events.forEach(e => order.apply(e));
return order;
}
}

CQRS(命令查询职责分离)
// 写端(命令)
class PlaceOrderCommand {
constructor(
readonly customerId: string,
readonly items: OrderItem[]
) {}
}
class PlaceOrderHandler {
constructor(private repo: OrderRepository, private bus: EventBus) {}
async handle(cmd: PlaceOrderCommand): Promise<string> {
const order = Order.create(cmd.customerId, cmd.items);
await this.repo.save(order);
await this.bus.publish(order.events);
return order.id;
}
}
// 读端(查询)——独立的优化读取模型
class OrderSummaryQuery {
constructor(readonly customerId: string) {}
}
class OrderSummaryHandler {
constructor(private readDb: ReadDatabase) {}
async handle(query: OrderSummaryQuery): Promise<OrderSummary[]> {
return this.readDb.query(
'SELECT * FROM order_summaries WHERE customer_id = $1',
[query.customerId]
);
}
}

分布式事务的Saga模式
// 基于编排的Saga
class OrderSaga {
// 每个服务发布/订阅事件
// 无中央协调器
// OrderService
async handlePaymentCompleted(event: PaymentCompleted) {
await this.orderRepo.updateStatus(event.orderId, 'payment_confirmed');
await this.bus.publish(new OrderReadyToShip(event.orderId));
}
// 处理失败时的补偿
async handlePaymentFailed(event: PaymentFailed) {
await this.orderRepo.cancel(event.orderId);
await this.bus.publish(new OrderCancelled(event.orderId, 'payment_failed'));
}
}
Kafka集成
import { Kafka, Producer, Consumer } from 'kafkajs';
const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka:9092'] });
// 生产者
class EventPublisher {
private producer: Producer;
async publish(event: DomainEvent) {
await this.producer.send({
topic: event.eventType.replace('.', '-'),
messages: [{
key: event.aggregateId,
value: JSON.stringify(event),
headers: { 'event-type': event.eventType },
}],
});
}
}
// 消费者
class OrderEventConsumer {
async start() {
const consumer = kafka.consumer({ groupId: 'shipping-service' });
await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value!.toString());
await this.handleOrderPlaced(event);
},
});
}
}
事件驱动架构在微服务环境中擅长解耦服务。