
Node.js 事件驱动架构
事件驱动系统通过事件而非直接调用来响应,从而实现松散耦合和可扩展性。
Node.js EventEmitter
import { EventEmitter } from 'events';
interface UserEvents {
'user.created': [userId: string, email: string];
'user.deleted': [userId: string];
'order.placed': [orderId: string, userId: string, amount: number];
}
class ApplicationEventBus extends EventEmitter {
emit<K extends keyof UserEvents>(event: K, ...args: UserEvents[K]): boolean {
return super.emit(event, ...args);
}
on<K extends keyof UserEvents>(event: K, listener: (...args: UserEvents[K]) => void): this {
return super.on(event, listener);
}
}
const bus = new ApplicationEventBus();
// 发布者
bus.emit('user.created', 'user_123', 'alice@example.com');
// 订阅者
bus.on('user.created', async (userId, email) => {
await emailService.sendWelcome(email);
await analyticsService.track('user_signup', { userId });
});
使用 Bull(Redis)的消息队列
import Queue from 'bull';
const emailQueue = new Queue('email', { redis: { host: 'localhost', port: 6379 } });
// 生产者
async function onUserCreated(user: User): Promise<void> {
await emailQueue.add('welcome', { userId: user.id, email: user.email }, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true,
});
}
// 消费者
emailQueue.process('welcome', async (job) => {
const { userId, email } = job.data;
await sendWelcomeEmail(email);
console.log(`Sent welcome email to ${email}`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});
使用 kafkajs 的 Kafka
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka:9092'],
});
// 生产者
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'user-events',
messages: [{
key: 'user.created',
value: JSON.stringify({ userId: 'user_123', email: 'alice@example.com' }),
headers: { 'event-type': 'user.created' },
}],
});
// 消费者
const consumer = kafka.consumer({ groupId: 'notification-service' });
await consumer.connect();
await consumer.subscribe({ topic: 'user-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const eventType = message.headers?.['event-type']?.toString();
switch (eventType) {
case 'user.created':
await handleUserCreated(event);
break;
case 'order.placed':
await handleOrderPlaced(event);
break;
}
},
});
Outbox 模式
确保事件与数据库更改原子性地发布。
// 在数据库事务内:
async function createUserWithOutbox(dto: CreateUserDto): Promise<User> {
return await db.transaction(async (trx) => {
const user = await trx.insert('users', { ...dto });
// 在同一事务中将事件写入 outbox
await trx.insert('outbox_events', {
id: generateId(),
aggregate_type: 'User',
aggregate_id: user.id,
event_type: 'user.created',
payload: JSON.stringify({ userId: user.id, email: user.email }),
created_at: new Date(),
processed: false,
});
return user;
});
}
// 独立进程轮询 outbox 并发布到 Kafka
async function processOutbox(): Promise<void> {
const events = await db.query(
'SELECT * FROM outbox_events WHERE processed = false ORDER BY created_at LIMIT 100'
);
for (const event of events) {
await kafkaProducer.send({ topic: event.event_type, messages: [{ value: event.payload }] });
await db.query('UPDATE outbox_events SET processed = true WHERE id = $1', [event.id]);
}
}
Outbox 模式保证了至少一次投递,无需分布式事务。