
消息队列:Kafka vs RabbitMQ
对比概览
Kafka:
- 基于日志,消息保留(数天/永久)
- 高吞吐量(百万/秒)
- 消费者组可从任意偏移量重放
- 适用于事件流、审计日志、分析
- 运维复杂(Zookeeper/KRaft、分区、主题)
RabbitMQ:
- 传统消息队列(消息消费后删除)
- 复杂路由(交换机、绑定)
- 内置死信队列
- 适用于任务队列、RPC、复杂路由
- 更易运维

Kafka 模式
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({ clientId: 'app', brokers: ['kafka:9092'] });
// 生产者,保证投递
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
});
await producer.connect();
async function publishEvent(topic: string, event: object, key?: string): Promise<void> {
await producer.send({
topic,
messages: [{
key: key || null,
value: JSON.stringify(event),
headers: {
'event-type': topic,
'timestamp': Date.now().toString(),
},
}],
});
}
// 消费者组(多个消费者 = 并行处理)
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topics: ['orders', 'payments'], fromBeginning: false });
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
for (const message of batch.messages) {
await processMessage(JSON.parse(message.value!.toString()));
resolveOffset(message.offset);
await heartbeat(); // 防止再平衡超时
}
},
});

RabbitMQ 模式
import amqp from 'amqplib';
const connection = await amqp.connect(process.env.RABBITMQ_URL!);
const channel = await connection.createChannel();
// 工作队列(任务分发)
await channel.assertQueue('tasks', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx', // 失败消息发送至此
'x-message-ttl': 3600000, // 1 小时 TTL
}
});
// 发布任务
channel.sendToQueue('tasks', Buffer.from(JSON.stringify({ type: 'email', userId: '123' })), {
persistent: true, // 持久化,重启后保留
messageId: generateId(), // 幂等性
});
// 消费并确认
channel.prefetch(10); // 最多处理 10 条未确认消息
channel.consume('tasks', async (msg) => {
if (!msg) return;
try {
await processTask(JSON.parse(msg.content.toString()));
channel.ack(msg); // 成功:从队列移除
} catch (err) {
const retries = (msg.properties.headers['x-retries'] || 0) + 1;
if (retries < 3) {
channel.nack(msg, false, false); // 死信,用于重试
} else {
channel.ack(msg); // 放弃:移至死信
}
}
});
// 死信队列:重试或检查失败消息
await channel.assertExchange('dlx', 'direct');
await channel.assertQueue('dead-letters', { durable: true });
await channel.bindQueue('dead-letters', 'dlx', '');

发布/订阅模式
// Kafka:天然发布/订阅(多个消费者组)
// 每个消费者组独立接收所有消息
kafka.consumer({ groupId: 'email-service' }).subscribe({ topic: 'user-events' });
kafka.consumer({ groupId: 'analytics-service' }).subscribe({ topic: 'user-events' });
// RabbitMQ:fanout 交换机实现发布/订阅
await channel.assertExchange('events', 'fanout');
const q1 = await channel.assertQueue('email-notifications', { durable: true });
const q2 = await channel.assertQueue('analytics-tracking', { durable: true });
await channel.bindQueue(q1.queue, 'events', '');
await channel.bindQueue(q2.queue, 'events', '');
// 发布给所有订阅者
channel.publish('events', '', Buffer.from(JSON.stringify(event)));
选择 Kafka 还是 RabbitMQ
使用 Kafka 的场景:
- 事件流(Kafka 作为事实来源)
- 高吞吐量(>1万条/秒)
- 多个消费者需要相同消息
- 需要审计日志或事件重放
- 构建数据管道
使用 RabbitMQ 的场景:
- 任务队列(工作者拉取任务)
- 复杂路由(主题/标头路由)
- RPC(请求-回复模式)
- 吞吐量较低但逻辑复杂
- 运维要求更简单
对于大多数微服务,RabbitMQ 更易运维;Kafka 在数据管道方面表现出色。