
Redis 是后端基础设施的瑞士军刀。以下模式将初级与高级后端工作区分开来。
旁路缓存(懒加载)
import redis, json, time
r = redis.Redis(host='localhost', decode_responses=True)
def cache_aside(key: str, ttl: int, fetch_fn):
cached = r.get(key)
if cached is not None:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value
def get_user(user_id: int):
return cache_aside(
f"user:{user_id}", 3600,
lambda: db.query("SELECT * FROM users WHERE id = ?", user_id)
)
缓存雪崩预防
当热门键过期时,数百个请求同时击中数据库。
def get_with_lock(key: str, ttl: int, fetch_fn):
cached = r.get(key)
if cached:
return json.loads(cached)
if r.set(f"lock:{key}", "1", nx=True, ex=5):
try:
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value
finally:
r.delete(f"lock:{key}")
else:
time.sleep(0.1)
cached = r.get(key)
return json.loads(cached) if cached else fetch_fn()
使用 Redlock 的分布式锁
from redlock import RedLock
def process_payment(payment_id: str, amount: float) -> None:
connections = [{'host': f'redis-{i}'} for i in range(1, 4)]
with RedLock(f"payment:{payment_id}", connection_details=connections, ttl=30000) as lock:
if not lock:
raise Exception("Could not acquire lock")
payment = db.get_payment(payment_id)
if payment.status != 'pending':
return # 幂等检查
charge_card(payment.card_token, amount)
db.update_payment(payment_id, status='completed')
用于实时事件的发布/订阅
def notify_order_update(order_id: str, status: str):
r.publish(f"order:{order_id}", json.dumps({'status': status, 'ts': time.time()}))
def watch_order(order_id: str):
pubsub = r.pubsub()
pubsub.subscribe(f"order:{order_id}")
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
notify_customer(data)
if data['status'] in ('delivered', 'cancelled'):
break
使用哈希标签的 Redis 集群
from redis.cluster import RedisCluster
rc = RedisCluster(startup_nodes=[{"host": f"redis-{i}", "port": 6379} for i in range(1, 4)])
def cache_session(user_id: int, session: dict):
# 哈希标签确保相关键进入同一集群槽位
pipeline = rc.pipeline()
pipeline.setex(f"{{user:{user_id}}}:session", 3600, json.dumps(session))
pipeline.setex(f"{{user:{user_id}}}:last_seen", 86400, str(time.time()))
pipeline.execute()
谨慎地缓存每个关注点。读取使用旁路缓存,一致性使用直写,协调使用 Redlock。
→ 使用 URL 编码器 工具编码缓存键。