
Redis 不仅仅是缓存
大多数开发者将 Redis 用作键值存储。但它拥有丰富的数据结构,可以解决整类问题。

Redis Streams(轻量级 Kafka)
import Redis from 'ioredis'
const redis = new Redis()
// 生产者:向流中添加事件
await redis.xadd('events:user', '*', // * = 自动生成 ID
'type', 'USER_CREATED',
'userId', '123',
'email', 'alice@example.com'
)
// 消费者:从流中读取
const messages = await redis.xread(
'COUNT', 10,
'BLOCK', 5000, // 阻塞 5 秒等待新消息
'STREAMS', 'events:user',
'#39; // $ = 仅新消息
)
// 消费者组(用于竞争消费者)
await redis.xgroup('CREATE', 'events:user', 'processors', '#39;, 'MKSTREAM')
// 作为消费者组中的消费者读取
const pending = await redis.xreadgroup(
'GROUP', 'processors', 'worker-1',
'COUNT', 10,
'BLOCK', 5000,
'STREAMS', 'events:user', '>'
)
// 确认已处理的消息
await redis.xack('events:user', 'processors', messageId)
// 检查待处理(未确认)的消息
const info = await redis.xpending('events:user', 'processors', '-', '+', 10)
HyperLogLog(独立访客计数)
// 统计独立页面访客,无需存储所有 ID
// 无论基数多大,仅使用 12KB 内存
async function trackVisit(page: string, userId: string) {
await redis.pfadd(`visitors:${page}:${getTodayKey()}`, userId)
}
async function getUniqueVisitors(page: string): Promise<number> {
return redis.pfcount(`visitors:${page}:${getTodayKey()}`)
}
// 合并多个计数器
async function getWeeklyUniqueVisitors(page: string): Promise<number> {
const keys = getLastNDayKeys(7).map(day => `visitors:${page}:${day}`)
await redis.pfmerge(`visitors:${page}:week`, ...keys)
return redis.pfcount(`visitors:${page}:week`)
}

布隆过滤器(使用 RedisBloom)
// 检查成员是否存在,无假阴性
// 假阳性率较低(约 1%)
// 这个邮箱之前用过吗?
await redis.bf.add('used:emails', 'alice@example.com')
const exists = await redis.bf.exists('used:emails', 'newuser@example.com')
if (exists) {
// 可能存在——执行数据库查询确认
} else {
// 肯定不存在——跳过数据库查询
}
// 创建自定义错误率的布隆过滤器
await redis.bf.reserve('bf:urls', 0.001, 1_000_000) // 0.1% 错误率,100 万个元素
有序集合(Sorted Sets)用于排名和时间窗口
// 实时排行榜
await redis.zadd('scores', 9500, 'player:alice')
await redis.zadd('scores', 8200, 'player:bob')
await redis.zadd('scores', 9800, 'player:charlie')
// 前 10 名玩家
const top10 = await redis.zrevrange('scores', 0, 9, 'WITHSCORES')
// 玩家排名
const rank = await redis.zrevrank('scores', 'player:alice') // 0 索引
// 时间窗口滑动计数器(用于限流、分析)
const now = Date.now()
const windowMs = 60 * 1000
await redis.zadd('api:calls:user:123', now, now.toString())
await redis.zremrangebyscore('api:calls:user:123', '-inf', now - windowMs)
const callsInWindow = await redis.zcard('api:calls:user:123')

Redis 时间序列(Time Series)
// 高效存储指标
await redis.call('TS.CREATE', 'cpu:usage', 'RETENTION', 86400000, 'LABELS', 'host', 'server1')
// 添加数据点
await redis.call('TS.ADD', 'cpu:usage', '*', '75.5')
// 查询范围
const data = await redis.call('TS.RANGE', 'cpu:usage',
Date.now() - 3600000, // 1 小时前
'+',
'AGGREGATION', 'avg', '60000' // 1 分钟平均值
)
地理空间查询(Geo Spatial)
// 添加位置
await redis.geoadd('restaurants',
-73.9857, 40.7484, 'restaurant:1', // 经度, 纬度, 成员
-73.9901, 40.7580, 'restaurant:2'
)
// 查找附近餐厅(1 公里内)
const nearby = await redis.georadius(
'restaurants',
-73.9857, 40.7484,
1, 'km',
'WITHCOORD', 'WITHDIST', 'ASC', 'COUNT', 10
)
Lua 脚本实现原子操作
const incrementIfLess = `
local current = redis.call('GET', KEYS[1])
if current == false then
redis.call('SET', KEYS[1], ARGV[1])
return tonumber(ARGV[1])
end
if tonumber(current) < tonumber(ARGV[2]) then
redis.call('SET', KEYS[1], ARGV[1])
return tonumber(ARGV[1])
end
return tonumber(current)
`
const result = await redis.eval(incrementIfLess, 1, 'counter', '1', '100')