正在加载,请稍候…

Node.js 事件驱动架构:模式与实现

使用 Node.js 构建松散耦合的事件驱动系统。学习事件发射器、消息队列、Kafka 和事件溯源模式。

Event-Driven Architecture with Node.js: Patterns and Implementation

Node.js 事件驱动架构

事件驱动系统通过事件而非直接调用来响应,从而实现松散耦合和可扩展性。

Node.js EventEmitter

import { EventEmitter } from 'events';

interface UserEvents {
  'user.created': [userId: string, email: string];
  'user.deleted': [userId: string];
  'order.placed': [orderId: string, userId: string, amount: number];
}

class ApplicationEventBus extends EventEmitter {
  emit<K extends keyof UserEvents>(event: K, ...args: UserEvents[K]): boolean {
    return super.emit(event, ...args);
  }

  on<K extends keyof UserEvents>(event: K, listener: (...args: UserEvents[K]) => void): this {
    return super.on(event, listener);
  }
}

const bus = new ApplicationEventBus();

// 发布者
bus.emit('user.created', 'user_123', 'alice@example.com');

// 订阅者
bus.on('user.created', async (userId, email) => {
  await emailService.sendWelcome(email);
  await analyticsService.track('user_signup', { userId });
});

Event-Driven Architecture with Node.js: Patterns and Implementation illustration

使用 Bull(Redis)的消息队列

import Queue from 'bull';

const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } });

// 生产者
async function onUserCreated(user: User): Promise<void> {
  await emailQueue.add('welcome', { userId: user.id, email: user.email }, {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: true,
  });
}

// 消费者
emailQueue.process('welcome', async (job) => {
  const { userId, email } = job.data;
  await sendWelcomeEmail(email);
  console.log(`Sent welcome email to ${email}`);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Event-Driven Architecture with Node.js: Patterns and Implementation illustration

使用 kafkajs 的 Kafka

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka:9092'],
});

// 生产者
const producer = kafka.producer();
await producer.connect();

await producer.send({
  topic: 'user-events',
  messages: [{
    key: 'user.created',
    value: JSON.stringify({ userId: 'user_123', email: 'alice@example.com' }),
    headers: { 'event-type': 'user.created' },
  }],
});

// 消费者
const consumer = kafka.consumer({ groupId: 'notification-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());
    const eventType = message.headers?.['event-type']?.toString();

    switch (eventType) {
      case 'user.created':
        await handleUserCreated(event);
        break;
      case 'order.placed':
        await handleOrderPlaced(event);
        break;
    }
  },
});

Event-Driven Architecture with Node.js: Patterns and Implementation illustration

Outbox 模式

确保事件与数据库更改原子性地发布。

// 在数据库事务内:
async function createUserWithOutbox(dto: CreateUserDto): Promise<User> {
  return await db.transaction(async (trx) => {
    const user = await trx.insert('users', { ...dto });

    // 在同一事务中将事件写入 outbox
    await trx.insert('outbox_events', {
      id: generateId(),
      aggregate_type: 'User',
      aggregate_id: user.id,
      event_type: 'user.created',
      payload: JSON.stringify({ userId: user.id, email: user.email }),
      created_at: new Date(),
      processed: false,
    });

    return user;
  });
}

// 独立进程轮询 outbox 并发布到 Kafka
async function processOutbox(): Promise<void> {
  const events = await db.query(
    'SELECT * FROM outbox_events WHERE processed = false ORDER BY created_at LIMIT 100'
  );

  for (const event of events) {
    await kafkaProducer.send({ topic: event.event_type, messages: [{ value: event.payload }] });
    await db.query('UPDATE outbox_events SET processed = true WHERE id = $1', [event.id]);
  }
}

Outbox 模式保证了至少一次投递,无需分布式事务。