正在加载,请稍候…

Redis 高级模式:发布/订阅、流、Lua 脚本与速率限制

超越简单缓存,深入 Redis 高级特性:实现发布/订阅消息、Redis Streams、速率限制、分布式锁以及 Lua 脚本原子操作。

Redis 高级模式:发布/订阅、流、Lua 脚本与速率限制

Redis 高级模式

发布/订阅消息

import { createClient } from 'redis';

// 发布者
const publisher = createClient();
await publisher.connect();

async function publishEvent(channel: string, event: object): Promise<void> {
  await publisher.publish(channel, JSON.stringify(event));
}

// 订阅者
const subscriber = createClient();
await subscriber.connect();

await subscriber.subscribe('user-events', (message) => {
  const event = JSON.parse(message);
  console.log('Received:', event);
});

await subscriber.subscribe('order-events', (message) => {
  handleOrderEvent(JSON.parse(message));
});

// 发布
await publishEvent('user-events', { type: 'user.created', userId: '123' });

Redis 高级模式:发布/订阅、流、Lua 脚本与速率限制 插图

Redis Streams(持久化发布/订阅)

// 生产者:添加到流
await client.xAdd('events', '*', {
  type: 'order.placed',
  orderId: 'order_123',
  userId: 'user_456',
  amount: '99.99',
  timestamp: Date.now().toString(),
});

// 消费者组设置
await client.xGroupCreate('events', 'order-processor', '
#39;, { MKSTREAM: true }); // 消费者:从组中读取 while (true) { const messages = await client.xReadGroup( 'order-processor', 'consumer-1', [{ key: 'events', id: '>' }], { COUNT: 10, BLOCK: 2000 } ); if (messages) { for (const { name, messages: msgs } of messages) { for (const { id, message } of msgs) { await processEvent(message); // 确认已处理的消息 await client.xAck('events', 'order-processor', id); } } } }

Redis 高级模式:发布/订阅、流、Lua 脚本与速率限制 插图

滑动窗口速率限制

async function checkRateLimit(
  userId: string,
  limit: number,
  windowMs: number
): Promise<{ allowed: boolean; remaining: number; resetIn: number }> {
  const key = `rate:${userId}`;
  const now = Date.now();
  const windowStart = now - windowMs;

  // 使用管道进行原子操作
  const pipeline = client.multi();
  pipeline.zRemRangeByScore(key, '-inf', windowStart.toString());  // 移除旧请求
  pipeline.zAdd(key, { score: now, value: now.toString() });        // 添加当前请求
  pipeline.zCard(key);                                               // 统计窗口内请求数
  pipeline.expire(key, Math.ceil(windowMs / 1000));                  // 设置 TTL

  const results = await pipeline.exec();
  const count = results[2] as number;

  return {
    allowed: count <= limit,
    remaining: Math.max(0, limit - count),
    resetIn: windowMs,
  };
}

// 中间件
app.use(async (req, res, next) => {
  const result = await checkRateLimit(req.ip, 100, 60000);
  res.setHeader('X-RateLimit-Limit', 100);
  res.setHeader('X-RateLimit-Remaining', result.remaining);

  if (!result.allowed) {
    return res.status(429).json({ error: 'Too many requests' });
  }
  next();
});

Redis 高级模式:发布/订阅、流、Lua 脚本与速率限制 插图

分布式锁

class RedisLock {
  constructor(private client: RedisClient) {}

  async acquire(resource: string, ttlMs: number): Promise<string | null> {
    const token = crypto.randomUUID();
    const key = `lock:${resource}`;

    const acquired = await this.client.set(key, token, {
      NX: true,          // 仅当键不存在时设置
      PX: ttlMs,         // 在 ttlMs 毫秒后过期
    });

    return acquired ? token : null;
  }

  async release(resource: string, token: string): Promise<boolean> {
    // 使用 Lua 脚本进行原子检查并删除
    const script = `
      if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
      else
        return 0
      end
    `;
    const result = await this.client.eval(script, { keys: [`lock:${resource}`], arguments: [token] });
    return result === 1;
  }
}

// 使用示例
const lock = new RedisLock(client);

async function processOrder(orderId: string): Promise<void> {
  const token = await lock.acquire(`order:${orderId}`, 30000);
  if (!token) {
    throw new Error('Order is already being processed');
  }

  try {
    await doProcessOrder(orderId);
  } finally {
    await lock.release(`order:${orderId}`, token);
  }
}

有序集合实现排行榜

// 添加/更新分数
await client.zAdd('game:leaderboard', { score: 1500, value: 'player:alice' });

// 获取前十名
const top10 = await client.zRangeWithScores('game:leaderboard', 0, 9, { REV: true });
// 返回:[{ value: 'player:alice', score: 1500 }, ...]

// 获取玩家排名(从 0 开始)
const rank = await client.zRevRank('game:leaderboard', 'player:alice');

// 获取分数范围内的玩家
const midLevel = await client.zRangeByScoreWithScores('game:leaderboard', '1000', '2000');

Redis 的数据结构为复杂的实时问题提供了优雅的解决方案。