正在加载,请稍候…

Apache Kafka 与 Node.js:微服务事件流处理

使用 Kafka 和 Node.js 实现事件驱动微服务,涵盖生产者、消费者、消费者组、精确一次语义、死信队列和流处理。

Apache Kafka with Node.js: Event Streaming for Microservices

Kafka 核心概念

  • Topic(主题):命名的记录流
  • Partition(分区):主题内的有序、不可变序列
  • Offset(偏移量):记录在分区中的位置
  • Consumer Group(消费者组):并行消费分区的消费者集合

Apache Kafka with Node.js: Event Streaming for Microservices illustration

使用 KafkaJS 进行设置

npm install kafkajs
import { Kafka, Partitioners } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092'],
  ssl: true,
  sasl: {
    mechanism: 'scram-sha-256',
    username: process.env.KAFKA_USERNAME,
    password: process.env.KAFKA_PASSWORD,
  },
  retry: {
    initialRetryTime: 100,
    retries: 8,
  },
});

生产者

const producer = kafka.producer({
  createPartitioner: Partitioners.LegacyPartitioner,
  idempotent: true,           // 精确一次投递
  transactionTimeout: 30000,
});

await producer.connect();

// 发送消息
await producer.send({
  topic: 'user-events',
  messages: [
    {
      key: userId,             // 确保按用户排序
      value: JSON.stringify({
        type: 'USER_CREATED',
        userId,
        email,
        timestamp: Date.now(),
      }),
      headers: {
        'correlation-id': requestId,
        'service': 'user-service',
      },
    },
  ],
});

// 事务性生产者(精确一次)
const transaction = await producer.transaction();
try {
  await transaction.send({
    topic: 'orders',
    messages: [{ key: orderId, value: JSON.stringify(order) }],
  });
  await transaction.sendOffsets({
    consumerGroupId: 'order-processor',
    topics: [{ topic: 'payments', partitions: [{ partition: 0, offset: '42' }] }],
  });
  await transaction.commit();
} catch (err) {
  await transaction.abort();
  throw err;
}

Apache Kafka with Node.js: Event Streaming for Microservices illustration

消费者与优雅关闭

const consumer = kafka.consumer({ groupId: 'user-processor' });

await consumer.connect();
await consumer.subscribe({ topics: ['user-events'], fromBeginning: false });

await consumer.run({
  eachBatchAutoResolve: false,
  eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
    for (const message of batch.messages) {
      if (!isRunning()) break;
      
      try {
        const event = JSON.parse(message.value.toString());
        await processEvent(event);
        resolveOffset(message.offset);
        await heartbeat(); // 防止慢处理导致会话超时
      } catch (err) {
        // 发送到死信队列
        await dlqProducer.send({
          topic: 'user-events-dlq',
          messages: [{
            key: message.key,
            value: message.value,
            headers: {
              ...message.headers,
              'error': err.message,
              'original-topic': 'user-events',
            },
          }],
        });
        resolveOffset(message.offset); // 跳过坏消息
      }
    }
  },
});

// 优雅关闭
process.on('SIGTERM', async () => {
  console.log('正在关闭消费者...');
  await consumer.disconnect();
  process.exit(0);
});

死信队列模式

// DLQ 消费者,用于手动检查/重试
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' });
await dlqConsumer.subscribe({ topic: 'user-events-dlq' });

await dlqConsumer.run({
  eachMessage: async ({ message }) => {
    const headers = message.headers;
    const errorMsg = headers.error?.toString();
    const originalTopic = headers['original-topic']?.toString();
    
    // 记录到监控系统
    await alerting.send({
      title: '消息处理失败',
      topic: originalTopic,
      error: errorMsg,
      payload: message.value.toString(),
    });
    
    // 可选:N 小时后重试
    const retryCount = parseInt(headers['retry-count'] ?? '0');
    if (retryCount < 3) {
      await producer.send({
        topic: originalTopic,
        messages: [{
          ...message,
          headers: {
            ...headers,
            'retry-count': String(retryCount + 1),
          },
        }],
      });
    }
  },
});

Apache Kafka with Node.js: Event Streaming for Microservices illustration

使用 Avro 的 Schema Registry

import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';

const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });

// 编码消息
const schemaId = await registry.getLatestSchemaId('user-events-value');
const encodedValue = await registry.encode(schemaId, {
  type: 'USER_CREATED',
  userId: '123',
  email: 'user@example.com',
});

// 在消费者中解码消息
const decodedValue = await registry.decode(message.value);

Kafka 最佳实践

  • 分区策略:使用消息键保证顺序
  • 副本因子:生产环境至少 3
  • 保留策略:根据消费者 SLA 设置(默认 7 天)
  • 监控:跟踪消费者滞后——超过阈值时告警
  • 幂等消费者:设计为至少一次投递