正在加载,请稍候…

事件驱动架构:模式与实现

通过事件驱动架构构建可扩展系统。学习事件溯源、CQRS、Saga模式以及Kafka和RabbitMQ消息代理集成。

事件驱动架构:模式与实现

事件驱动架构:模式与实现

事件驱动系统通过事件而非直接调用来通信,从而实现松耦合。

核心概念

// 领域事件
interface DomainEvent {
  eventId: string;
  eventType: string;
  occurredAt: Date;
  aggregateId: string;
}

class OrderPlaced implements DomainEvent {
  readonly eventId = crypto.randomUUID();
  readonly eventType = 'order.placed';
  readonly occurredAt = new Date();

  constructor(
    readonly aggregateId: string,
    readonly customerId: string,
    readonly items: OrderItem[],
    readonly total: number
  ) {}
}

事件驱动架构:模式与实现插图

事件溯源

将状态存储为一系列事件,而非当前快照。

class OrderAggregate {
  private events: DomainEvent[] = [];
  private state: OrderState = { status: 'pending', items: [] };

  // 应用事件以重建状态
  private apply(event: DomainEvent): void {
    if (event instanceof OrderPlaced) {
      this.state = { status: 'placed', items: event.items };
    } else if (event instanceof OrderShipped) {
      this.state = { ...this.state, status: 'shipped', trackingId: event.trackingId };
    }
  }

  // 命令:下单
  place(customerId: string, items: OrderItem[]) {
    const event = new OrderPlaced(this.id, customerId, items, this.calculateTotal(items));
    this.events.push(event);
    this.apply(event);
    return event;
  }

  // 从事件历史重建
  static fromEvents(id: string, events: DomainEvent[]): OrderAggregate {
    const order = new OrderAggregate(id);
    events.forEach(e => order.apply(e));
    return order;
  }
}

事件驱动架构:模式与实现插图

CQRS(命令查询职责分离)

// 写端(命令)
class PlaceOrderCommand {
  constructor(
    readonly customerId: string,
    readonly items: OrderItem[]
  ) {}
}

class PlaceOrderHandler {
  constructor(private repo: OrderRepository, private bus: EventBus) {}

  async handle(cmd: PlaceOrderCommand): Promise<string> {
    const order = Order.create(cmd.customerId, cmd.items);
    await this.repo.save(order);
    await this.bus.publish(order.events);
    return order.id;
  }
}

// 读端(查询)——独立的优化读取模型
class OrderSummaryQuery {
  constructor(readonly customerId: string) {}
}

class OrderSummaryHandler {
  constructor(private readDb: ReadDatabase) {}

  async handle(query: OrderSummaryQuery): Promise<OrderSummary[]> {
    return this.readDb.query(
      'SELECT * FROM order_summaries WHERE customer_id = $1',
      [query.customerId]
    );
  }
}

事件驱动架构:模式与实现插图

分布式事务的Saga模式

// 基于编排的Saga
class OrderSaga {
  // 每个服务发布/订阅事件
  // 无中央协调器

  // OrderService
  async handlePaymentCompleted(event: PaymentCompleted) {
    await this.orderRepo.updateStatus(event.orderId, 'payment_confirmed');
    await this.bus.publish(new OrderReadyToShip(event.orderId));
  }

  // 处理失败时的补偿
  async handlePaymentFailed(event: PaymentFailed) {
    await this.orderRepo.cancel(event.orderId);
    await this.bus.publish(new OrderCancelled(event.orderId, 'payment_failed'));
  }
}

Kafka集成

import { Kafka, Producer, Consumer } from 'kafkajs';

const kafka = new Kafka({ clientId: 'order-service', brokers: ['kafka:9092'] });

// 生产者
class EventPublisher {
  private producer: Producer;

  async publish(event: DomainEvent) {
    await this.producer.send({
      topic: event.eventType.replace('.', '-'),
      messages: [{
        key: event.aggregateId,
        value: JSON.stringify(event),
        headers: { 'event-type': event.eventType },
      }],
    });
  }
}

// 消费者
class OrderEventConsumer {
  async start() {
    const consumer = kafka.consumer({ groupId: 'shipping-service' });
    await consumer.subscribe({ topic: 'order-placed', fromBeginning: false });

    await consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value!.toString());
        await this.handleOrderPlaced(event);
      },
    });
  }
}

事件驱动架构在微服务环境中擅长解耦服务。