正在加载,请稍候…

Redis 分布式缓存:模式、陷阱与生产策略

掌握 Redis 缓存模式:旁路缓存、直写、分布式锁、发布/订阅、缓存雪崩预防以及 Redis 集群高可用搭建。

Redis 分布式缓存:模式、陷阱与生产策略

Redis 是后端基础设施的瑞士军刀。以下模式将初级与高级后端工作区分开来。

旁路缓存(懒加载)

import redis, json, time
r = redis.Redis(host='localhost', decode_responses=True)

def cache_aside(key: str, ttl: int, fetch_fn):
    cached = r.get(key)
    if cached is not None:
        return json.loads(cached)
    value = fetch_fn()
    r.setex(key, ttl, json.dumps(value))
    return value

def get_user(user_id: int):
    return cache_aside(
        f"user:{user_id}", 3600,
        lambda: db.query("SELECT * FROM users WHERE id = ?", user_id)
    )

Redis 分布式缓存:模式、陷阱与生产策略 插图

缓存雪崩预防

当热门键过期时,数百个请求同时击中数据库。

def get_with_lock(key: str, ttl: int, fetch_fn):
    cached = r.get(key)
    if cached:
        return json.loads(cached)

    if r.set(f"lock:{key}", "1", nx=True, ex=5):
        try:
            value = fetch_fn()
            r.setex(key, ttl, json.dumps(value))
            return value
        finally:
            r.delete(f"lock:{key}")
    else:
        time.sleep(0.1)
        cached = r.get(key)
        return json.loads(cached) if cached else fetch_fn()

Redis 分布式缓存:模式、陷阱与生产策略 插图

使用 Redlock 的分布式锁

from redlock import RedLock

def process_payment(payment_id: str, amount: float) -> None:
    connections = [{'host': f'redis-{i}'} for i in range(1, 4)]
    with RedLock(f"payment:{payment_id}", connection_details=connections, ttl=30000) as lock:
        if not lock:
            raise Exception("Could not acquire lock")
        payment = db.get_payment(payment_id)
        if payment.status != 'pending':
            return  # 幂等检查
        charge_card(payment.card_token, amount)
        db.update_payment(payment_id, status='completed')

Redis 分布式缓存:模式、陷阱与生产策略 插图

用于实时事件的发布/订阅

def notify_order_update(order_id: str, status: str):
    r.publish(f"order:{order_id}", json.dumps({'status': status, 'ts': time.time()}))

def watch_order(order_id: str):
    pubsub = r.pubsub()
    pubsub.subscribe(f"order:{order_id}")
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            notify_customer(data)
            if data['status'] in ('delivered', 'cancelled'):
                break

使用哈希标签的 Redis 集群

from redis.cluster import RedisCluster

rc = RedisCluster(startup_nodes=[{"host": f"redis-{i}", "port": 6379} for i in range(1, 4)])

def cache_session(user_id: int, session: dict):
    # 哈希标签确保相关键进入同一集群槽位
    pipeline = rc.pipeline()
    pipeline.setex(f"{{user:{user_id}}}:session", 3600, json.dumps(session))
    pipeline.setex(f"{{user:{user_id}}}:last_seen", 86400, str(time.time()))
    pipeline.execute()

谨慎地缓存每个关注点。读取使用旁路缓存,一致性使用直写,协调使用 Redlock。

→ 使用 URL 编码器 工具编码缓存键。