
Kafka 核心概念
- Topic(主题):命名的记录流
- Partition(分区):主题内的有序、不可变序列
- Offset(偏移量):记录在分区中的位置
- Consumer Group(消费者组):并行消费分区的消费者集合

使用 KafkaJS 进行设置
npm install kafkajs
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['kafka1:9092', 'kafka2:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
retry: {
initialRetryTime: 100,
retries: 8,
},
});
生产者
const producer = kafka.producer({
createPartitioner: Partitioners.LegacyPartitioner,
idempotent: true, // 精确一次投递
transactionTimeout: 30000,
});
await producer.connect();
// 发送消息
await producer.send({
topic: 'user-events',
messages: [
{
key: userId, // 确保按用户排序
value: JSON.stringify({
type: 'USER_CREATED',
userId,
email,
timestamp: Date.now(),
}),
headers: {
'correlation-id': requestId,
'service': 'user-service',
},
},
],
});
// 事务性生产者(精确一次)
const transaction = await producer.transaction();
try {
await transaction.send({
topic: 'orders',
messages: [{ key: orderId, value: JSON.stringify(order) }],
});
await transaction.sendOffsets({
consumerGroupId: 'order-processor',
topics: [{ topic: 'payments', partitions: [{ partition: 0, offset: '42' }] }],
});
await transaction.commit();
} catch (err) {
await transaction.abort();
throw err;
}

消费者与优雅关闭
const consumer = kafka.consumer({ groupId: 'user-processor' });
await consumer.connect();
await consumer.subscribe({ topics: ['user-events'], fromBeginning: false });
await consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
try {
const event = JSON.parse(message.value.toString());
await processEvent(event);
resolveOffset(message.offset);
await heartbeat(); // 防止慢处理导致会话超时
} catch (err) {
// 发送到死信队列
await dlqProducer.send({
topic: 'user-events-dlq',
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'error': err.message,
'original-topic': 'user-events',
},
}],
});
resolveOffset(message.offset); // 跳过坏消息
}
}
},
});
// 优雅关闭
process.on('SIGTERM', async () => {
console.log('正在关闭消费者...');
await consumer.disconnect();
process.exit(0);
});
死信队列模式
// DLQ 消费者,用于手动检查/重试
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' });
await dlqConsumer.subscribe({ topic: 'user-events-dlq' });
await dlqConsumer.run({
eachMessage: async ({ message }) => {
const headers = message.headers;
const errorMsg = headers.error?.toString();
const originalTopic = headers['original-topic']?.toString();
// 记录到监控系统
await alerting.send({
title: '消息处理失败',
topic: originalTopic,
error: errorMsg,
payload: message.value.toString(),
});
// 可选:N 小时后重试
const retryCount = parseInt(headers['retry-count'] ?? '0');
if (retryCount < 3) {
await producer.send({
topic: originalTopic,
messages: [{
...message,
headers: {
...headers,
'retry-count': String(retryCount + 1),
},
}],
});
}
},
});

使用 Avro 的 Schema Registry
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
// 编码消息
const schemaId = await registry.getLatestSchemaId('user-events-value');
const encodedValue = await registry.encode(schemaId, {
type: 'USER_CREATED',
userId: '123',
email: 'user@example.com',
});
// 在消费者中解码消息
const decodedValue = await registry.decode(message.value);
Kafka 最佳实践
- 分区策略:使用消息键保证顺序
- 副本因子:生产环境至少 3
- 保留策略:根据消费者 SLA 设置(默认 7 天)
- 监控:跟踪消费者滞后——超过阈值时告警
- 幂等消费者:设计为至少一次投递