正在加载,请稍候…

Redis 高级数据结构:Streams、HyperLogLog 与概率型

超越字符串和哈希,探索 Redis 的 Streams 事件日志、HyperLogLog 基数估计、布隆过滤器、时间序列及向量搜索等高级数据结构。

Redis 高级数据结构:Streams、HyperLogLog 与概率型

Redis 不仅仅是缓存

大多数开发者将 Redis 用作键值存储。但它拥有丰富的数据结构,可以解决整类问题。

Redis 高级数据结构:Streams、HyperLogLog 与概率型 示意图

Redis Streams(轻量级 Kafka)

import Redis from 'ioredis'
const redis = new Redis()

// 生产者:向流中添加事件
await redis.xadd('events:user', '*',  // * = 自动生成 ID
  'type', 'USER_CREATED',
  'userId', '123',
  'email', 'alice@example.com'
)

// 消费者:从流中读取
const messages = await redis.xread(
  'COUNT', 10,
  'BLOCK', 5000,        // 阻塞 5 秒等待新消息
  'STREAMS', 'events:user',
  '
#39; // $ = 仅新消息 ) // 消费者组(用于竞争消费者) await redis.xgroup('CREATE', 'events:user', 'processors', '
#39;, 'MKSTREAM') // 作为消费者组中的消费者读取 const pending = await redis.xreadgroup( 'GROUP', 'processors', 'worker-1', 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', 'events:user', '>' ) // 确认已处理的消息 await redis.xack('events:user', 'processors', messageId) // 检查待处理(未确认)的消息 const info = await redis.xpending('events:user', 'processors', '-', '+', 10)

HyperLogLog(独立访客计数)

// 统计独立页面访客,无需存储所有 ID
// 无论基数多大,仅使用 12KB 内存

async function trackVisit(page: string, userId: string) {
  await redis.pfadd(`visitors:${page}:${getTodayKey()}`, userId)
}

async function getUniqueVisitors(page: string): Promise<number> {
  return redis.pfcount(`visitors:${page}:${getTodayKey()}`)
}

// 合并多个计数器
async function getWeeklyUniqueVisitors(page: string): Promise<number> {
  const keys = getLastNDayKeys(7).map(day => `visitors:${page}:${day}`)
  await redis.pfmerge(`visitors:${page}:week`, ...keys)
  return redis.pfcount(`visitors:${page}:week`)
}

Redis 高级数据结构:Streams、HyperLogLog 与概率型 示意图

布隆过滤器(使用 RedisBloom)

// 检查成员是否存在,无假阴性
// 假阳性率较低(约 1%)

// 这个邮箱之前用过吗?
await redis.bf.add('used:emails', 'alice@example.com')

const exists = await redis.bf.exists('used:emails', 'newuser@example.com')
if (exists) {
  // 可能存在——执行数据库查询确认
} else {
  // 肯定不存在——跳过数据库查询
}

// 创建自定义错误率的布隆过滤器
await redis.bf.reserve('bf:urls', 0.001, 1_000_000) // 0.1% 错误率,100 万个元素

有序集合(Sorted Sets)用于排名和时间窗口

// 实时排行榜
await redis.zadd('scores', 9500, 'player:alice')
await redis.zadd('scores', 8200, 'player:bob')
await redis.zadd('scores', 9800, 'player:charlie')

// 前 10 名玩家
const top10 = await redis.zrevrange('scores', 0, 9, 'WITHSCORES')

// 玩家排名
const rank = await redis.zrevrank('scores', 'player:alice')  // 0 索引

// 时间窗口滑动计数器(用于限流、分析)
const now = Date.now()
const windowMs = 60 * 1000

await redis.zadd('api:calls:user:123', now, now.toString())
await redis.zremrangebyscore('api:calls:user:123', '-inf', now - windowMs)
const callsInWindow = await redis.zcard('api:calls:user:123')

Redis 高级数据结构:Streams、HyperLogLog 与概率型 示意图

Redis 时间序列(Time Series)

// 高效存储指标
await redis.call('TS.CREATE', 'cpu:usage', 'RETENTION', 86400000, 'LABELS', 'host', 'server1')

// 添加数据点
await redis.call('TS.ADD', 'cpu:usage', '*', '75.5')

// 查询范围
const data = await redis.call('TS.RANGE', 'cpu:usage',
  Date.now() - 3600000,  // 1 小时前
  '+',
  'AGGREGATION', 'avg', '60000'  // 1 分钟平均值
)

地理空间查询(Geo Spatial)

// 添加位置
await redis.geoadd('restaurants',
  -73.9857, 40.7484, 'restaurant:1',  // 经度, 纬度, 成员
  -73.9901, 40.7580, 'restaurant:2'
)

// 查找附近餐厅(1 公里内)
const nearby = await redis.georadius(
  'restaurants',
  -73.9857, 40.7484,
  1, 'km',
  'WITHCOORD', 'WITHDIST', 'ASC', 'COUNT', 10
)

Lua 脚本实现原子操作

const incrementIfLess = `
local current = redis.call('GET', KEYS[1])
if current == false then
  redis.call('SET', KEYS[1], ARGV[1])
  return tonumber(ARGV[1])
end
if tonumber(current) < tonumber(ARGV[2]) then
  redis.call('SET', KEYS[1], ARGV[1])
  return tonumber(ARGV[1])
end
return tonumber(current)
`

const result = await redis.eval(incrementIfLess, 1, 'counter', '1', '100')