
Redis 生产级高级数据结构
大多数开发者将 Redis 用作缓存或会话存储。但 Redis 包含精巧的数据结构,能优雅地解决特定问题。
Redis Streams:持久化事件日志
Streams 提供持久化、追加只读的消息传递,支持消费者组:
import redis
r = redis.Redis(decode_responses=True)
# 生产者:追加事件到流
msg_id = r.xadd(
'user-events',
{ 'user_id': '12345', 'event_type': 'purchase', 'amount': '99.99' },
maxlen=1000000, approximate=True
)
# 消费者组:分布式处理
r.xgroup_create('user-events', 'payment-processors', id='#39;, mkstream=True)
messages = r.xreadgroup(
groupname='payment-processors',
consumername='worker-1',
streams={'user-events': '>'}, # 仅未投递的消息
count=10, block=2000
)
for stream, events in messages:
for event_id, data in events:
try:
process_payment(data)
r.xack('user-events', 'payment-processors', event_id)
except Exception:
pass # 消息保留在待处理列表中以供重试
# 回收死 worker 的陈旧消息
r.xautoclaim('user-events', 'payment-processors', 'worker-2',
min_idle_time=300000, start_id='0-0')
HyperLogLog:数十亿级去重计数
无论数据集大小,仅用 12KB 内存(0.81% 误差率):
def track_page_view(page_id: str, user_id: str):
key = f"hll:views:{page_id}:{datetime.date.today()}"
r.pfadd(key, user_id)
r.expire(key, 86400 * 30)
def get_unique_visitors(page_id: str) -> int:
return r.pfcount(f"hll:views:{page_id}:{datetime.date.today()}")
def visitors_across_pages(page_ids: list) -> int:
keys = [f"hll:views:{pid}:{datetime.date.today()}" for pid in page_ids]
temp = f"hll:temp:{uuid.uuid4()}"
r.pfmerge(temp, *keys)
r.expire(temp, 60)
return r.pfcount(temp)
# 1亿独立用户的内存对比:
# Python Set: ~3.2GB
# HyperLogLog: 12KB (少 267,000 倍!)
布隆过滤器
r.bf().create('seen_emails', error_rate=0.01, capacity=10_000_000)
def process_email(email: str) -> bool:
if r.bf().exists('seen_emails', email):
return False # 可能已见过(1% 误判率)
r.bf().add('seen_emails', email)
process_new_email(email)
return True
# Cuckoo Filter 支持删除(布隆过滤器不支持)
r.cf().create('active_sessions', capacity=1_000_000)
r.cf().add('active_sessions', session_token)
r.cf().delete('active_sessions', session_token) # 登出时

有序集合:排行榜与限流
class Leaderboard:
def __init__(self, game_id: str):
self.key = f"leaderboard:{game_id}"
def add_score(self, player_id: str, score: float):
r.zadd(self.key, {player_id: score})
def get_rank(self, player_id: str):
rank = r.zrevrank(self.key, player_id)
return rank + 1 if rank is not None else None
def get_top(self, n: int):
return r.zrevrange(self.key, 0, n-1, withscores=True)
def get_nearby(self, player_id: str, window: int = 5):
rank = r.zrevrank(self.key, player_id)
if rank is None: return []
return r.zrevrange(self.key, max(0, rank-window), rank+window, withscores=True)
class SlidingWindowRateLimiter:
def is_allowed(self, user_id: str, max_req: int, window_sec: int) -> bool:
key = f"rate:{user_id}"
now = time.time()
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, now - window_sec)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, window_sec * 2)
count = pipe.execute()[1]
return count < max_req
Lua 脚本:原子操作
# 使用 Lua 脚本实现原子分布式锁
acquire_script = r.register_script(
"if redis.call('exists', KEYS[1]) == 0 then "
"redis.call('set', KEYS[1], ARGV[1], 'px', tonumber(ARGV[2])) "
"return 1 end return 0"
)
release_script = r.register_script(
"if redis.call('get', KEYS[1]) == ARGV[1] then "
"redis.call('del', KEYS[1]) return 1 end return 0"
)
class RedisLock:
def __init__(self, resource: str, ttl_ms: int = 30000):
self.key = f"lock:{resource}"
self.token = str(uuid.uuid4())
self.ttl_ms = ttl_ms
def acquire(self, timeout: float = 10) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if acquire_script(keys=[self.key], args=[self.token, self.ttl_ms]):
return True
time.sleep(0.05)
return False
def release(self):
release_script(keys=[self.key], args=[self.token])
def __enter__(self):
if not self.acquire():
raise TimeoutError(f"Cannot acquire lock: {self.key}")
return self
def __exit__(self, *_):
self.release()
with RedisLock("payment:12345"):
process_payment(12345)
Pub/Sub 与 Streams 对比
Pub/Sub:即发即忘的广播。离线则丢失消息。用于实时通知。
Streams:持久化、可重放的日志。消费者组支持分布式处理。用于事件溯源、任务队列、审计日志。
Redis 的特化数据结构消除了整个服务:有序集合排行榜优于专用服务,HyperLogLog 免费处理任意规模的去重计数,Streams 替代简单的消息队列。