
Redis 高级模式
发布/订阅消息
import { createClient } from 'redis';
// 发布者
const publisher = createClient();
await publisher.connect();
async function publishEvent(channel: string, event: object): Promise<void> {
await publisher.publish(channel, JSON.stringify(event));
}
// 订阅者
const subscriber = createClient();
await subscriber.connect();
await subscriber.subscribe('user-events', (message) => {
const event = JSON.parse(message);
console.log('Received:', event);
});
await subscriber.subscribe('order-events', (message) => {
handleOrderEvent(JSON.parse(message));
});
// 发布
await publishEvent('user-events', { type: 'user.created', userId: '123' });

Redis Streams(持久化发布/订阅)
// 生产者:添加到流
await client.xAdd('events', '*', {
type: 'order.placed',
orderId: 'order_123',
userId: 'user_456',
amount: '99.99',
timestamp: Date.now().toString(),
});
// 消费者组设置
await client.xGroupCreate('events', 'order-processor', '#39;, { MKSTREAM: true });
// 消费者:从组中读取
while (true) {
const messages = await client.xReadGroup(
'order-processor',
'consumer-1',
[{ key: 'events', id: '>' }],
{ COUNT: 10, BLOCK: 2000 }
);
if (messages) {
for (const { name, messages: msgs } of messages) {
for (const { id, message } of msgs) {
await processEvent(message);
// 确认已处理的消息
await client.xAck('events', 'order-processor', id);
}
}
}
}

滑动窗口速率限制
async function checkRateLimit(
userId: string,
limit: number,
windowMs: number
): Promise<{ allowed: boolean; remaining: number; resetIn: number }> {
const key = `rate:${userId}`;
const now = Date.now();
const windowStart = now - windowMs;
// 使用管道进行原子操作
const pipeline = client.multi();
pipeline.zRemRangeByScore(key, '-inf', windowStart.toString()); // 移除旧请求
pipeline.zAdd(key, { score: now, value: now.toString() }); // 添加当前请求
pipeline.zCard(key); // 统计窗口内请求数
pipeline.expire(key, Math.ceil(windowMs / 1000)); // 设置 TTL
const results = await pipeline.exec();
const count = results[2] as number;
return {
allowed: count <= limit,
remaining: Math.max(0, limit - count),
resetIn: windowMs,
};
}
// 中间件
app.use(async (req, res, next) => {
const result = await checkRateLimit(req.ip, 100, 60000);
res.setHeader('X-RateLimit-Limit', 100);
res.setHeader('X-RateLimit-Remaining', result.remaining);
if (!result.allowed) {
return res.status(429).json({ error: 'Too many requests' });
}
next();
});

分布式锁
class RedisLock {
constructor(private client: RedisClient) {}
async acquire(resource: string, ttlMs: number): Promise<string | null> {
const token = crypto.randomUUID();
const key = `lock:${resource}`;
const acquired = await this.client.set(key, token, {
NX: true, // 仅当键不存在时设置
PX: ttlMs, // 在 ttlMs 毫秒后过期
});
return acquired ? token : null;
}
async release(resource: string, token: string): Promise<boolean> {
// 使用 Lua 脚本进行原子检查并删除
const script = `
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
`;
const result = await this.client.eval(script, { keys: [`lock:${resource}`], arguments: [token] });
return result === 1;
}
}
// 使用示例
const lock = new RedisLock(client);
async function processOrder(orderId: string): Promise<void> {
const token = await lock.acquire(`order:${orderId}`, 30000);
if (!token) {
throw new Error('Order is already being processed');
}
try {
await doProcessOrder(orderId);
} finally {
await lock.release(`order:${orderId}`, token);
}
}
有序集合实现排行榜
// 添加/更新分数
await client.zAdd('game:leaderboard', { score: 1500, value: 'player:alice' });
// 获取前十名
const top10 = await client.zRangeWithScores('game:leaderboard', 0, 9, { REV: true });
// 返回:[{ value: 'player:alice', score: 1500 }, ...]
// 获取玩家排名(从 0 开始)
const rank = await client.zRevRank('game:leaderboard', 'player:alice');
// 获取分数范围内的玩家
const midLevel = await client.zRangeByScoreWithScores('game:leaderboard', '1000', '2000');
Redis 的数据结构为复杂的实时问题提供了优雅的解决方案。