正在加载,请稍候…

API 速率限制:策略、算法与实现

了解 API 速率限制的工作原理,何时使用令牌桶、漏桶或固定窗口算法,以及如何在 Node.js、Python 和 Redis 中实现速率限制。

API 速率限制:策略、算法与实现

为什么速率限制很重要

速率限制控制客户端调用 API 的频率。如果没有它,一个行为异常的客户端——无论是错误脚本、DDoS 攻击,还是过于热情的用户——都可能耗尽服务器资源,降低所有用户的服务质量,并增加基础设施成本。

但速率限制不仅仅是一种防御措施。它也是一种业务工具:免费用户每天 100 次请求,付费用户 10,000 次,企业客户无限制。同一机制既保障安全又执行计费。

API 速率限制:策略、算法与实现 插图

四种主要速率限制算法

固定窗口计数器

工作原理: 在固定时间窗口内计数请求(例如 1 分钟桶)。每个窗口开始时重置计数器。

窗口:12:00:00 – 12:01:00 → 95 次请求 → 正常
窗口:12:01:00 – 12:02:00 → 计数器重置为 0

实现:

// 基于 Redis 的固定窗口
async function fixedWindowRateLimit(clientId, limit, windowSeconds) {
  const key = `ratelimit:${clientId}:${Math.floor(Date.now() / (windowSeconds * 1000))}`;
  const count = await redis.incr(key);
  if (count === 1) {
    await redis.expire(key, windowSeconds);
  }
  return count <= limit;
}

弱点: 边界问题。如果允许 100 次请求/分钟,客户端可以在 12:00:59 发送 100 次,在 12:01:01 再发送 100 次——2 秒内 200 次请求。这种“边界突发”可能压垮未为此设计的服务。

滑动窗口日志

工作原理: 记录每个请求的时间戳。要检查新请求是否允许,统计最近 N 秒内的时间戳。丢弃更早的时间戳。

优点: 精确——无边界突发问题。

弱点: 对于高流量 API 内存消耗大(存储每个时间戳)。

import redis
import time

def sliding_window_log(client_id: str, limit: int, window_seconds: int) -> bool:
    r = redis.Redis()
    key = f"ratelimit:{client_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = r.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)  # 移除旧条目
    pipe.zadd(key, {str(now): now})              # 添加当前请求
    pipe.zcard(key)                               # 统计窗口内请求数
    pipe.expire(key, window_seconds)
    results = pipe.execute()

    return results[2] <= limit

滑动窗口计数器

一种更节省内存的滑动窗口日志近似方法。将当前固定窗口计数与前一窗口计数的加权部分相结合。

当前请求数 = 当前窗口计数 + 前一窗口计数 × (1 - 已过时间比例)

如果限制为 100 次/分钟,当前时间为 12:00:45(窗口已过 75%),前一窗口有 80 次请求:

当前请求数 = 30 + 80 × (1 - 0.75) = 30 + 20 = 50

对于均匀流量,此近似值与精确速率的误差在 0.003% 以内。

API 速率限制:策略、算法与实现 插图

令牌桶

工作原理: 每个客户端有一个“桶”,具有最大容量。令牌以恒定速率添加。每个请求消耗一个令牌。如果桶为空,请求被拒绝(或排队)。

最大容量:100 个令牌
填充速率:10 个令牌/秒

客户端可以突发最多 100 个请求(消耗所有令牌),之后只能维持 10 个请求/秒。

class TokenBucket {
  constructor(capacity, refillRate) {
    this.capacity = capacity;       // 最大令牌数
    this.tokens = capacity;         // 初始满桶
    this.refillRate = refillRate;   // 每毫秒令牌数
    this.lastRefill = Date.now();
  }

  consume(tokens = 1) {
    this.refill();
    if (this.tokens >= tokens) {
      this.tokens -= tokens;
      return true;  // 允许
    }
    return false;   // 拒绝
  }

  refill() {
    const now = Date.now();
    const elapsed = now - this.lastRefill;
    this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.refillRate);
    this.lastRefill = now;
  }
}

const bucket = new TokenBucket(100, 0.01);  // 容量 100,10 个令牌/秒

优点: 优雅处理突发。一小时未发出请求的用户可以突发到容量上限。

漏桶

工作原理: 请求进入固定大小的队列。一个工作者以恒定速率处理它们。如果队列已满,新请求被拒绝。

平滑流量——无论请求如何到达,都以恒定速率处理。适用于保护后端服务免受突发影响。

令牌桶 vs 漏桶:

  • 令牌桶:允许短时突发,长期速率平滑
  • 漏桶:严格恒定输出速率,无突发

标准速率限制 HTTP 头部

使用这些头部,以便客户端可以调整其行为,而不是盲目地遇到 429 错误:

HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 74
X-RateLimit-Reset: 1716912000          # 窗口重置的 Unix 时间戳
Retry-After: 60                        # 重试前等待秒数(429 时)

IETF RateLimit 头部草案(RFC 6585 + draft-ietf-httpapi-ratelimit-headers)对此进行了标准化:

RateLimit-Limit: 100
RateLimit-Remaining: 74
RateLimit-Reset: 60

始终返回这些头部——遵守它们的客户端会减少 429 调用,从而减轻基础设施负载。

实现示例

API 速率限制:策略、算法与实现 插图

Node.js 与 express-rate-limit

const rateLimit = require('express-rate-limit');
const RedisStore = require('rate-limit-redis');
const redis = require('redis');

const client = redis.createClient({ url: process.env.REDIS_URL });

const limiter = rateLimit({
  windowMs: 60 * 1000,  // 1 分钟
  max: 100,             // 每个窗口的请求数
  standardHeaders: true, // 返回 RateLimit-* 头部
  legacyHeaders: false,
  store: new RedisStore({
    sendCommand: (...args) => client.sendCommand(args),
  }),
  keyGenerator: (req) => req.user?.id || req.ip,  // 如果已认证则按用户限速
  handler: (req, res) => {
    res.status(429).json({
      error: '请求过多',
      retryAfter: Math.ceil(req.rateLimit.resetTime / 1000 - Date.now() / 1000),
    });
  },
});

app.use('/api/', limiter);

// 对认证端点更严格的限制器
const authLimiter = rateLimit({
  windowMs: 15 * 60 * 1000,  // 15 分钟
  max: 10,
  message: { error: '登录尝试过多。请在 15 分钟后重试。' },
});

app.use('/api/auth/', authLimiter);

Python 与 FastAPI + slowapi

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request):
    return {"data": "..."}

@app.post("/api/login")
@limiter.limit("10/15minutes")
async def login(request: Request):
    return {"token": "..."}

Redis Lua 脚本(原子令牌桶)

对于分布式系统,使用 Lua 脚本使桶操作原子化:

-- KEYS[1] = 桶键
-- ARGV[1] = 容量, ARGV[2] = 填充速率(令牌/秒), ARGV[3] = 当前时间(毫秒), ARGV[4] = 请求的令牌数
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now

local elapsed = (now - last_refill) / 1000
tokens = math.min(capacity, tokens + elapsed * refill_rate)

if tokens >= requested then
  tokens = tokens - requested
  redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
  redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
  return 1  -- 允许
else
  redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
  return 0  -- 拒绝
end

按层级限速

实际 API 根据订阅层级区分:

const RATE_LIMITS = {
  free:       { requests: 100,   window: 60 * 1000 },
  pro:        { requests: 1000,  window: 60 * 1000 },
  enterprise: { requests: 10000, window: 60 * 1000 },
};

function getTierLimiter(tier) {
  const { requests, window } = RATE_LIMITS[tier] || RATE_LIMITS.free;
  return rateLimit({
    windowMs: window,
    max: requests,
    keyGenerator: (req) => `${req.user.id}:${req.user.tier}`,
  });
}

app.use('/api/', (req, res, next) => {
  const tier = req.user?.tier || 'free';
  getTierLimiter(tier)(req, res, next);
});

优雅的客户端处理

不要遇到 429 就崩溃。实现指数退避:

async function fetchWithRetry(url, options = {}, maxRetries = 3) {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    const res = await fetch(url, options);

    if (res.status !== 429) return res;

    if (attempt === maxRetries) throw new Error('重试后仍超出速率限制');

    const retryAfter = parseInt(res.headers.get('Retry-After') || '60', 10);
    const delay = retryAfter * 1000 * Math.pow(2, attempt);  // 指数
    console.log(`速率受限。${delay}ms 后重试...`);
    await new Promise(resolve => setTimeout(resolve, delay));
  }
}

→ 使用 Token Generator 为你的限速 API 生成安全 API 令牌。