正在加载,请稍候…

API 安全最佳实践:身份验证、速率限制与输入验证

通过全面的最佳实践保护您的 API。了解 API 密钥管理、OAuth 作用域、速率限制、输入验证以及 OWASP API 安全十大风险。

API 安全最佳实践:身份验证、速率限制与输入验证

API 安全最佳实践

OWASP API 安全十大风险

# 风险 预防措施
1 对象级授权失效 每次请求验证所有权
2 身份验证失效 强身份验证 + 速率限制
3 对象属性级授权失效 白名单响应字段
4 资源消耗无限制 速率限制 + 请求大小限制
5 功能级授权失效 每个端点实施 RBAC
6 敏感操作无限制访问 关键操作升级身份验证
7 SSRF 验证/白名单 URL
8 安全配置错误 禁用调试、CORS、标头
9 资产管理不当 API 版本控制 + 弃用
10 不安全 API 消费 验证第三方响应

API 安全最佳实践:身份验证、速率限制与输入验证 插图

API 密钥管理

import crypto from 'crypto'
import { hash, compare } from 'bcryptjs'

class APIKeyManager {
  // 生成 API 密钥,带前缀便于识别
  generate(prefix = 'sk') {
    const key = crypto.randomBytes(32).toString('base64url')
    const apiKey = `${prefix}_${key}`
    const keyId = crypto.randomUUID()
    return { keyId, apiKey }
  }

  // 存储哈希后的密钥(绝不存储明文)
  async store(keyId, apiKey, userId, scopes = []) {
    const hashedKey = await hash(apiKey, 12)
    await db.query(
      'INSERT INTO api_keys (id, hashed_key, user_id, scopes, created_at) VALUES ($1, $2, $3, $4, NOW())',
      [keyId, hashedKey, userId, JSON.stringify(scopes)]
    )
  }

  async validate(apiKey) {
    // 提取密钥前缀以查找候选
    const [prefix] = apiKey.split('_')
    const rows = await db.query('SELECT * FROM api_keys WHERE prefix = $1 AND revoked = false', [prefix])

    for (const row of rows) {
      if (await compare(apiKey, row.hashed_key)) {
        await this.recordUsage(row.id)
        return { userId: row.user_id, scopes: row.scopes }
      }
    }
    return null
  }

  async rotate(oldKeyId, userId) {
    const { keyId, apiKey } = this.generate()
    await this.store(keyId, apiKey, userId)
    await db.query('UPDATE api_keys SET revoked = true WHERE id = $1', [oldKeyId])
    return { keyId, apiKey }
  }
}

API 安全最佳实践:身份验证、速率限制与输入验证 插图

对象级授权失效(BOLA)

// 有漏洞:未检查所有权
app.get('/api/orders/:id', authMiddleware, async (req, res) => {
  const order = await db.findOne('orders', { id: req.params.id })
  res.json(order)  // 返回任何订单,无论所有者
})

// 安全:始终验证所有权
app.get('/api/orders/:id', authMiddleware, async (req, res) => {
  const order = await db.findOne('orders', {
    id: req.params.id,
    user_id: req.user.id  // 强制所有权
  })

  if (!order) return res.status(404).json({ error: 'Not found' })
  res.json(order)
})

// RBAC 中间件
function requireScope(scope) {
  return (req, res, next) => {
    if (!req.user.scopes.includes(scope)) {
      return res.status(403).json({ error: `Requires scope: ${scope}` })
    }
    next()
  }
}

app.delete('/api/users/:id', authMiddleware, requireScope('users:delete'), handler)

API 安全最佳实践:身份验证、速率限制与输入验证 插图

速率限制策略

import Bottleneck from 'bottleneck'
import { RateLimiterRedis } from 'rate-limiter-flexible'
import { createClient } from 'redis'

const redis = createClient()

// 基于用户的速率限制器
const userLimiter = new RateLimiterRedis({
  storeClient: redis,
  keyPrefix: 'rl_user',
  points: 100,      // 请求数
  duration: 60,     // 每 60 秒
  blockDuration: 600, // 封禁 10 分钟
})

// 基于 IP 的速率限制器
const ipLimiter = new RateLimiterRedis({
  storeClient: redis,
  keyPrefix: 'rl_ip',
  points: 200,
  duration: 60,
})

async function rateLimitMiddleware(req, res, next) {
  const ip = req.ip
  const userId = req.user?.id

  try {
    // 检查 IP 限制
    await ipLimiter.consume(ip)

    // 如果已认证,检查用户限制
    if (userId) await userLimiter.consume(userId)

    next()
  } catch (rejInfo) {
    const retryAfter = Math.round(rejInfo.msBeforeNext / 1000)
    res.set('Retry-After', retryAfter)
    res.status(429).json({
      error: 'Too Many Requests',
      retry_after: retryAfter,
    })
  }
}

使用 Zod 进行请求验证

import { z } from 'zod'

const CreateOrderSchema = z.object({
  product_id: z.string().uuid(),
  quantity: z.number().int().positive().max(100),
  delivery_address: z.object({
    street: z.string().max(200),
    city: z.string().max(100),
    country: z.string().length(2).toUpperCase(),
    zip: z.string().regex(/^\d{5}(-\d{4})?$/),
  }),
  notes: z.string().max(500).optional(),
})

function validateBody(schema) {
  return (req, res, next) => {
    try {
      req.validatedBody = schema.parse(req.body)
      next()
    } catch (err) {
      res.status(400).json({
        error: 'Validation failed',
        details: err.errors.map(e => ({ path: e.path.join('.'), message: e.message })),
      })
    }
  }
}

app.post('/api/orders', authMiddleware, validateBody(CreateOrderSchema), handler)

SSRF 防护

import { URL } from 'url'
import ipRangeCheck from 'ip-range-check'

const BLOCKED_RANGES = [
  '10.0.0.0/8',    // 私有网络
  '172.16.0.0/12',
  '192.168.0.0/16',
  '127.0.0.0/8',   // 回环地址
  '169.254.0.0/16', // 链路本地
  '::1/128',        // IPv6 回环
]

const ALLOWED_HOSTS = ['api.example.com', 'cdn.example.com']

async function safeRequest(url) {
  const parsed = new URL(url)

  // 白名单检查
  if (!ALLOWED_HOSTS.includes(parsed.hostname)) {
    throw new Error(`Host not allowed: ${parsed.hostname}`)
  }

  // 解析 DNS 并检查 IP
  const { address } = await dns.promises.lookup(parsed.hostname)
  if (ipRangeCheck(address, BLOCKED_RANGES)) {
    throw new Error(`IP address blocked: ${address}`)
  }

  // 仅允许 HTTPS
  if (parsed.protocol !== 'https:') {
    throw new Error('Only HTTPS allowed')
  }

  return fetch(url, {
    headers: { 'User-Agent': 'MyApp/1.0' },
    redirect: 'manual', // 不自动跟随重定向
  })
}