正在加载,请稍候…

Redis 生产级高级数据结构:Streams、HyperLogLog 与概率数据结构

超越基础 Redis,学习 Streams 实现事件溯源、HyperLogLog 进行大规模去重计数、布隆过滤器、有序集合排行榜以及 Lua 脚本实现原子操作。

Redis Streams, HyperLogLog, and Probabilistic Data Structures in Production

Redis 生产级高级数据结构

大多数开发者将 Redis 用作缓存或会话存储。但 Redis 包含精巧的数据结构,能优雅地解决特定问题。

Redis Streams:持久化事件日志

Streams 提供持久化、追加只读的消息传递,支持消费者组:

import redis
r = redis.Redis(decode_responses=True)

# 生产者:追加事件到流
msg_id = r.xadd(
    'user-events',
    { 'user_id': '12345', 'event_type': 'purchase', 'amount': '99.99' },
    maxlen=1000000, approximate=True
)

# 消费者组:分布式处理
r.xgroup_create('user-events', 'payment-processors', id='
#39;, mkstream=True) messages = r.xreadgroup( groupname='payment-processors', consumername='worker-1', streams={'user-events': '>'}, # 仅未投递的消息 count=10, block=2000 ) for stream, events in messages: for event_id, data in events: try: process_payment(data) r.xack('user-events', 'payment-processors', event_id) except Exception: pass # 消息保留在待处理列表中以供重试 # 回收死 worker 的陈旧消息 r.xautoclaim('user-events', 'payment-processors', 'worker-2', min_idle_time=300000, start_id='0-0')

Redis Streams, HyperLogLog, and Probabilistic Data Structures in Production illustration

HyperLogLog:数十亿级去重计数

无论数据集大小,仅用 12KB 内存(0.81% 误差率):

def track_page_view(page_id: str, user_id: str):
    key = f"hll:views:{page_id}:{datetime.date.today()}"
    r.pfadd(key, user_id)
    r.expire(key, 86400 * 30)

def get_unique_visitors(page_id: str) -> int:
    return r.pfcount(f"hll:views:{page_id}:{datetime.date.today()}")

def visitors_across_pages(page_ids: list) -> int:
    keys = [f"hll:views:{pid}:{datetime.date.today()}" for pid in page_ids]
    temp = f"hll:temp:{uuid.uuid4()}"
    r.pfmerge(temp, *keys)
    r.expire(temp, 60)
    return r.pfcount(temp)

# 1亿独立用户的内存对比:
# Python Set: ~3.2GB
# HyperLogLog: 12KB (少 267,000 倍!)

Redis Streams, HyperLogLog, and Probabilistic Data Structures in Production illustration

布隆过滤器

r.bf().create('seen_emails', error_rate=0.01, capacity=10_000_000)

def process_email(email: str) -> bool:
    if r.bf().exists('seen_emails', email):
        return False  # 可能已见过(1% 误判率)
    r.bf().add('seen_emails', email)
    process_new_email(email)
    return True

# Cuckoo Filter 支持删除(布隆过滤器不支持)
r.cf().create('active_sessions', capacity=1_000_000)
r.cf().add('active_sessions', session_token)
r.cf().delete('active_sessions', session_token)  # 登出时

Redis Streams, HyperLogLog, and Probabilistic Data Structures in Production illustration

有序集合:排行榜与限流

class Leaderboard:
    def __init__(self, game_id: str):
        self.key = f"leaderboard:{game_id}"

    def add_score(self, player_id: str, score: float):
        r.zadd(self.key, {player_id: score})

    def get_rank(self, player_id: str):
        rank = r.zrevrank(self.key, player_id)
        return rank + 1 if rank is not None else None

    def get_top(self, n: int):
        return r.zrevrange(self.key, 0, n-1, withscores=True)

    def get_nearby(self, player_id: str, window: int = 5):
        rank = r.zrevrank(self.key, player_id)
        if rank is None: return []
        return r.zrevrange(self.key, max(0, rank-window), rank+window, withscores=True)

class SlidingWindowRateLimiter:
    def is_allowed(self, user_id: str, max_req: int, window_sec: int) -> bool:
        key = f"rate:{user_id}"
        now = time.time()
        pipe = r.pipeline()
        pipe.zremrangebyscore(key, 0, now - window_sec)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, window_sec * 2)
        count = pipe.execute()[1]
        return count < max_req

Lua 脚本:原子操作

# 使用 Lua 脚本实现原子分布式锁
acquire_script = r.register_script(
    "if redis.call('exists', KEYS[1]) == 0 then "
    "redis.call('set', KEYS[1], ARGV[1], 'px', tonumber(ARGV[2])) "
    "return 1 end return 0"
)

release_script = r.register_script(
    "if redis.call('get', KEYS[1]) == ARGV[1] then "
    "redis.call('del', KEYS[1]) return 1 end return 0"
)

class RedisLock:
    def __init__(self, resource: str, ttl_ms: int = 30000):
        self.key = f"lock:{resource}"
        self.token = str(uuid.uuid4())
        self.ttl_ms = ttl_ms

    def acquire(self, timeout: float = 10) -> bool:
        deadline = time.time() + timeout
        while time.time() < deadline:
            if acquire_script(keys=[self.key], args=[self.token, self.ttl_ms]):
                return True
            time.sleep(0.05)
        return False

    def release(self):
        release_script(keys=[self.key], args=[self.token])

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError(f"Cannot acquire lock: {self.key}")
        return self

    def __exit__(self, *_):
        self.release()

with RedisLock("payment:12345"):
    process_payment(12345)

Pub/Sub 与 Streams 对比

Pub/Sub:即发即忘的广播。离线则丢失消息。用于实时通知。

Streams:持久化、可重放的日志。消费者组支持分布式处理。用于事件溯源、任务队列、审计日志。

Redis 的特化数据结构消除了整个服务:有序集合排行榜优于专用服务,HyperLogLog 免费处理任意规模的去重计数,Streams 替代简单的消息队列。