正在加载,请稍候…

Redis 发布/订阅、BullMQ 任务队列与速率限制

将 Redis 用作消息代理——BullMQ 任务队列、发布/订阅、Redis Streams 以及基于 Lua 的滑动窗口速率限制器。

Redis 发布/订阅、BullMQ 任务队列与速率限制

Redis 超越缓存

Redis 在消息代理、任务队列和速率限制方面表现出色。

Redis 发布/订阅、BullMQ 任务队列与速率限制 插图

BullMQ 任务队列

import { Queue, Worker } from 'bullmq'
const queue = new Queue('email', { connection })

await queue.add('welcome', { to: 'user@example.com' }, {
  attempts: 3,
  backoff: { type: 'exponential', delay: 1000 },
  removeOnComplete: 100,
})

// Cron 任务
await queue.add('digest', {}, { repeat: { cron: '0 9 * * MON' } })

// Worker
const worker = new Worker('email', async (job) => {
  await sendEmail(job.data)
}, { connection, concurrency: 10 })

worker.on('failed', (job, err) => console.error('任务失败:', job?.id, err))

Redis 发布/订阅、BullMQ 任务队列与速率限制 插图

发布/订阅

await subscriber.subscribe('notifications', (msg) => {
  console.log('收到:', JSON.parse(msg))
})
await publisher.publish('notifications', JSON.stringify({ type: 'order_shipped', id: '123' }))

Redis 发布/订阅、BullMQ 任务队列与速率限制 插图

滑动窗口速率限制器(Lua)

const script = `
local key = KEYS[1]
local limit, now, window = tonumber(ARGV[1]), tonumber(ARGV[2]), tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
local count = redis.call('ZCARD', key)
if count >= limit then return 0 end
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
`

async function rateLimit(userId, limit, windowSecs) {
  return await redis.eval(script, {
    keys: ['rate:' + userId],
    arguments: [String(limit), String(Date.now()), String(windowSecs)]
  })
}

-> 使用 Token Generator 生成 API 密钥。