
为什么需要缓存?为什么是 Redis?
数据库查询通常是 Web 请求中最慢的部分——单个页面可能触发 20-50 次查询。低流量时没问题,但在规模下是灾难性的。
Redis 是一个内存数据结构存储,每秒可处理数百万请求,延迟低于毫秒。它不仅仅是缓存:还是消息代理、会话存储、速率限制器和实时排行榜。但本指南专注于其最常见的用途:缓存。

用于缓存的核心 Redis 数据类型
# String — 最常用,将任何值存储为字符串
SET user:123 '{"id":123,"name":"Alice","email":"alice@example.com"}'
GET user:123
SETEX user:123 3600 '...' # 设置 60 分钟过期
# Hash — 适合对象(避免序列化开销)
HSET user:123 name "Alice" email "alice@example.com" age 30
HGET user:123 name # 获取单个字段
HGETALL user:123 # 获取所有字段
HINCRBY user:123 loginCount 1 # 递增字段
# List — 有序集合,队列
RPUSH queue:emails "email1" "email2"
LPOP queue:emails # 出队
LRANGE queue:emails 0 -1 # 获取所有
# Set — 唯一成员,标签
SADD user:123:tags "premium" "beta-tester"
SMEMBERS user:123:tags
SISMEMBER user:123:tags "premium" # O(1) 成员检查
# Sorted Set — 排名/排行榜
ZADD leaderboard 1500 "player:alice"
ZADD leaderboard 2300 "player:bob"
ZRANGE leaderboard 0 -1 REV WITHSCORES # 顶级玩家
ZRANK leaderboard "player:alice" # Alice 的排名
使用 Node.js 设置 Redis
import { createClient } from 'redis';
const redis = createClient({
url: process.env.REDIS_URL ?? 'redis://localhost:6379',
socket: {
reconnectStrategy: (retries) => Math.min(retries * 50, 2000),
connectTimeout: 5000,
},
});
redis.on('error', (err) => console.error('Redis error:', err));
redis.on('connect', () => console.log('Redis connected'));
redis.on('reconnecting', () => console.log('Redis reconnecting...'));
await redis.connect();
// 辅助函数:获取或设置,自动 JSON 序列化
async function getOrSet(key, ttlSeconds, fetchFn) {
const cached = await redis.get(key);
if (cached !== null) {
return JSON.parse(cached);
}
const value = await fetchFn();
await redis.setEx(key, ttlSeconds, JSON.stringify(value));
return value;
}
// 使用
const user = await getOrSet(
`user:${userId}`,
3600, // 1 小时 TTL
() => db.users.findById(userId)
);
四种缓存模式
1. 旁路缓存(惰性加载)—— 最常见
应用程序代码管理缓存:
async function getUser(userId) {
// 1. 先检查缓存
const cacheKey = `user:${userId}`;
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached); // 缓存命中
}
// 2. 缓存未命中:从数据库获取
const user = await db.users.findById(userId);
if (!user) return null;
// 3. 存入缓存供下次使用
await redis.setEx(cacheKey, 3600, JSON.stringify(user));
return user;
}
// 更新时:使缓存失效
async function updateUser(userId, data) {
await db.users.update(userId, data);
await redis.del(`user:${userId}`); // 强制下次重新获取
}
优点:只缓存实际请求的数据。缓存故障不会破坏应用。 缺点:第一次请求慢(缓存未命中)。可能存在过期数据。

2. 直写
同时写入缓存和数据库:
async function updateUserWriteThrough(userId, data) {
// 同时更新两者
const [updatedUser] = await Promise.all([
db.users.update(userId, data),
redis.setEx(`user:${userId}`, 3600, JSON.stringify({ ...data, id: userId }))
]);
return updatedUser;
}
优点:缓存始终新鲜。读取始终快速。 缺点:写入延迟增加。为可能永远不会读取的数据预热缓存。
3. 回写
立即写入缓存,异步持久化到数据库:
// 立即写入缓存,将数据库写入排队
async function updateUserWriteBehind(userId, data) {
const key = `user:${userId}`;
// 即时写入缓存
await redis.setEx(key, 3600, JSON.stringify(data));
// 排队后台数据库写入
await redis.lPush('write-queue', JSON.stringify({
type: 'update_user',
userId,
data
}));
return data;
}
// 后台工作进程处理队列
async function processWriteQueue() {
while (true) {
const item = await redis.brPop('write-queue', 0); // 阻塞弹出
const { type, userId, data } = JSON.parse(item.element);
if (type === 'update_user') {
await db.users.update(userId, data);
}
}
}
优点:写入极快。处理写入突发。 缺点:如果 Redis 在数据库写入前崩溃,存在数据丢失风险。实现复杂。
4. 通读
缓存位于数据库前面,对应用程序透明:
// 在通读设置中,缓存层本身在未命中时从数据库获取
// 通常作为中间件或代理层实现
class ReadThroughCache {
constructor(redis, db) {
this.redis = redis;
this.db = db;
}
async get(key, fetchFn, ttl = 3600) {
const cached = await this.redis.get(key);
if (cached !== null) return JSON.parse(cached);
// 缓存本身从源获取
const value = await fetchFn();
if (value !== null) {
await this.redis.setEx(key, ttl, JSON.stringify(value));
}
return value;
}
}
const cache = new ReadThroughCache(redis, db);
const user = await cache.get(`user:${id}`, () => db.users.findById(id));
缓存失效策略
缓存失效是出了名的困难。以下是主要方法:
// 1. 基于 TTL(最简单)—— 让缓存自然过期
await redis.setEx('product:123', 300, JSON.stringify(product)); // 5 分钟 TTL
// 2. 事件驱动失效 —— 更新时删除
async function updateProduct(productId, data) {
await db.products.update(productId, data);
// 使相关缓存键失效
await redis.del(`product:${productId}`);
await redis.del(`category:${data.categoryId}:products`); // 同时使分类列表失效
}
// 3. 缓存版本控制 —— 模式更改时更改键前缀
const CACHE_VERSION = 'v2';
const key = `${CACHE_VERSION}:user:${userId}`;
// 4. 基于标签的失效,使用 Redis 集合
async function setWithTags(key, value, ttl, tags) {
const pipeline = redis.multi();
pipeline.setEx(key, ttl, JSON.stringify(value));
// 跟踪哪些键具有每个标签
for (const tag of tags) {
pipeline.sAdd(`tag:${tag}`, key);
pipeline.expire(`tag:${tag}`, ttl + 60);
}
await pipeline.exec();
}
async function invalidateByTag(tag) {
const keys = await redis.sMembers(`tag:${tag}`);
if (keys.length > 0) {
await redis.del([...keys, `tag:${tag}`]);
}
}
// 使用:一次性使所有标记为 "user-profile" 的键失效
await invalidateByTag('user-profile');

避免缓存雪崩
当缓存过期时,许多请求同时命中数据库:
// 问题:缓存过期时,许多请求同时命中数据库
// 解决方案 1:互斥锁
import { Mutex } from 'async-mutex';
const locks = new Map();
async function getWithLock(key, ttl, fetchFn) {
const cached = await redis.get(key);
if (cached) return JSON.parse(cached);
// 一次只有一个获取者
if (!locks.has(key)) locks.set(key, new Mutex());
return locks.get(key).runExclusive(async () => {
// 获取锁后双重检查
const cached2 = await redis.get(key);
if (cached2) return JSON.parse(cached2);
const value = await fetchFn();
await redis.setEx(key, ttl, JSON.stringify(value));
return value;
});
}
// 解决方案 2:概率性提前过期
async function getWithEarlyExpiry(key, ttl, fetchFn) {
const data = await redis.get(key);
if (data) {
const { value, expiry } = JSON.parse(data);
const now = Date.now() / 1000;
const timeLeft = expiry - now;
// 在过期前随机重新计算(避免同步雪崩)
if (timeLeft > 0 && Math.random() > timeLeft / (ttl * 0.1)) {
return value; // 使用缓存值
}
}
const value = await fetchFn();
await redis.setEx(key, ttl, JSON.stringify({
value,
expiry: Date.now() / 1000 + ttl
}));
return value;
}
Redis 用于会话存储
// express-session 与 Redis
import session from 'express-session';
import { RedisStore } from 'connect-redis';
app.use(session({
store: new RedisStore({ client: redis }),
secret: process.env.SESSION_SECRET,
resave: false,
saveUninitialized: false,
cookie: {
secure: process.env.NODE_ENV === 'production',
httpOnly: true,
maxAge: 7 * 24 * 60 * 60 * 1000, // 1 周
sameSite: 'lax',
},
name: '__session', // 不要暴露 'connect.sid'
}));
Redis 用于速率限制
// 滑动窗口速率限制器
async function checkRateLimit(identifier, limit, windowSeconds) {
const key = `ratelimit:${identifier}`;
const now = Date.now();
const windowStart = now - windowSeconds * 1000;
const pipeline = redis.multi();
pipeline.zRemRangeByScore(key, '-inf', windowStart); // 移除旧条目
pipeline.zAdd(key, { score: now, value: now.toString() });
pipeline.zCard(key); // 计数请求
pipeline.expire(key, windowSeconds);
const results = await pipeline.exec();
const requestCount = results[2];
return {
allowed: requestCount <= limit,
count: requestCount,
remaining: Math.max(0, limit - requestCount),
};
}
// 在 Express 中间件中使用
app.use(async (req, res, next) => {
const identifier = req.ip;
const { allowed, remaining } = await checkRateLimit(identifier, 100, 60);
res.set('X-RateLimit-Remaining', remaining.toString());
if (!allowed) {
return res.status(429).json({ error: 'Rate limit exceeded' });
}
next();
});
Redis 发布/订阅用于实时功能
// 发布者
async function publishUserUpdate(userId, event) {
await redis.publish(`user:${userId}:events`, JSON.stringify(event));
}
// 订阅者(单独连接——订阅者只能订阅)
const subscriber = redis.duplicate();
await subscriber.connect();
await subscriber.subscribe('user:*:events', (message, channel) => {
const event = JSON.parse(message);
const userId = channel.split(':')[1];
console.log(`User ${userId} event:'`, event);
// 广播给 WebSocket 客户端
wsServer.to(`user-${userId}`).emit('update', event);
});
键命名约定
# ✅ 好的键命名——分层、描述性
user:123 # 用户对象
user:123:sessions # 用户的会话
user:123:preferences # 用户偏好
product:456:details # 产品详情
product:456:reviews:page:1 # 分页评论
search:q:javascript:page:1 # 搜索结果缓存
ratelimit:192.168.1.1 # 速率限制跟踪器
lock:payment:order:789 # 分布式锁
# ❌ 坏的键命名——扁平、无结构
user123data
product_456
search_javascript_1
→ 使用 JSON Viewer 查看和探索复杂的 JSON 数据结构。