
API 安全最佳实践
OWASP API 安全十大风险
| # |
风险 |
预防措施 |
| 1 |
对象级授权失效 |
每次请求验证所有权 |
| 2 |
身份验证失效 |
强身份验证 + 速率限制 |
| 3 |
对象属性级授权失效 |
白名单响应字段 |
| 4 |
资源消耗无限制 |
速率限制 + 请求大小限制 |
| 5 |
功能级授权失效 |
每个端点实施 RBAC |
| 6 |
敏感操作无限制访问 |
关键操作升级身份验证 |
| 7 |
SSRF |
验证/白名单 URL |
| 8 |
安全配置错误 |
禁用调试、CORS、标头 |
| 9 |
资产管理不当 |
API 版本控制 + 弃用 |
| 10 |
不安全 API 消费 |
验证第三方响应 |

API 密钥管理
import crypto from 'crypto'
import { hash, compare } from 'bcryptjs'
class APIKeyManager {
// 生成 API 密钥,带前缀便于识别
generate(prefix = 'sk') {
const key = crypto.randomBytes(32).toString('base64url')
const apiKey = `${prefix}_${key}`
const keyId = crypto.randomUUID()
return { keyId, apiKey }
}
// 存储哈希后的密钥(绝不存储明文)
async store(keyId, apiKey, userId, scopes = []) {
const hashedKey = await hash(apiKey, 12)
await db.query(
'INSERT INTO api_keys (id, hashed_key, user_id, scopes, created_at) VALUES ($1, $2, $3, $4, NOW())',
[keyId, hashedKey, userId, JSON.stringify(scopes)]
)
}
async validate(apiKey) {
// 提取密钥前缀以查找候选
const [prefix] = apiKey.split('_')
const rows = await db.query('SELECT * FROM api_keys WHERE prefix = $1 AND revoked = false', [prefix])
for (const row of rows) {
if (await compare(apiKey, row.hashed_key)) {
await this.recordUsage(row.id)
return { userId: row.user_id, scopes: row.scopes }
}
}
return null
}
async rotate(oldKeyId, userId) {
const { keyId, apiKey } = this.generate()
await this.store(keyId, apiKey, userId)
await db.query('UPDATE api_keys SET revoked = true WHERE id = $1', [oldKeyId])
return { keyId, apiKey }
}
}

对象级授权失效(BOLA)
// 有漏洞:未检查所有权
app.get('/api/orders/:id', authMiddleware, async (req, res) => {
const order = await db.findOne('orders', { id: req.params.id })
res.json(order) // 返回任何订单,无论所有者
})
// 安全:始终验证所有权
app.get('/api/orders/:id', authMiddleware, async (req, res) => {
const order = await db.findOne('orders', {
id: req.params.id,
user_id: req.user.id // 强制所有权
})
if (!order) return res.status(404).json({ error: 'Not found' })
res.json(order)
})
// RBAC 中间件
function requireScope(scope) {
return (req, res, next) => {
if (!req.user.scopes.includes(scope)) {
return res.status(403).json({ error: `Requires scope: ${scope}` })
}
next()
}
}
app.delete('/api/users/:id', authMiddleware, requireScope('users:delete'), handler)

速率限制策略
import Bottleneck from 'bottleneck'
import { RateLimiterRedis } from 'rate-limiter-flexible'
import { createClient } from 'redis'
const redis = createClient()
// 基于用户的速率限制器
const userLimiter = new RateLimiterRedis({
storeClient: redis,
keyPrefix: 'rl_user',
points: 100, // 请求数
duration: 60, // 每 60 秒
blockDuration: 600, // 封禁 10 分钟
})
// 基于 IP 的速率限制器
const ipLimiter = new RateLimiterRedis({
storeClient: redis,
keyPrefix: 'rl_ip',
points: 200,
duration: 60,
})
async function rateLimitMiddleware(req, res, next) {
const ip = req.ip
const userId = req.user?.id
try {
// 检查 IP 限制
await ipLimiter.consume(ip)
// 如果已认证,检查用户限制
if (userId) await userLimiter.consume(userId)
next()
} catch (rejInfo) {
const retryAfter = Math.round(rejInfo.msBeforeNext / 1000)
res.set('Retry-After', retryAfter)
res.status(429).json({
error: 'Too Many Requests',
retry_after: retryAfter,
})
}
}
使用 Zod 进行请求验证
import { z } from 'zod'
const CreateOrderSchema = z.object({
product_id: z.string().uuid(),
quantity: z.number().int().positive().max(100),
delivery_address: z.object({
street: z.string().max(200),
city: z.string().max(100),
country: z.string().length(2).toUpperCase(),
zip: z.string().regex(/^\d{5}(-\d{4})?$/),
}),
notes: z.string().max(500).optional(),
})
function validateBody(schema) {
return (req, res, next) => {
try {
req.validatedBody = schema.parse(req.body)
next()
} catch (err) {
res.status(400).json({
error: 'Validation failed',
details: err.errors.map(e => ({ path: e.path.join('.'), message: e.message })),
})
}
}
}
app.post('/api/orders', authMiddleware, validateBody(CreateOrderSchema), handler)
SSRF 防护
import { URL } from 'url'
import ipRangeCheck from 'ip-range-check'
const BLOCKED_RANGES = [
'10.0.0.0/8', // 私有网络
'172.16.0.0/12',
'192.168.0.0/16',
'127.0.0.0/8', // 回环地址
'169.254.0.0/16', // 链路本地
'::1/128', // IPv6 回环
]
const ALLOWED_HOSTS = ['api.example.com', 'cdn.example.com']
async function safeRequest(url) {
const parsed = new URL(url)
// 白名单检查
if (!ALLOWED_HOSTS.includes(parsed.hostname)) {
throw new Error(`Host not allowed: ${parsed.hostname}`)
}
// 解析 DNS 并检查 IP
const { address } = await dns.promises.lookup(parsed.hostname)
if (ipRangeCheck(address, BLOCKED_RANGES)) {
throw new Error(`IP address blocked: ${address}`)
}
// 仅允许 HTTPS
if (parsed.protocol !== 'https:') {
throw new Error('Only HTTPS allowed')
}
return fetch(url, {
headers: { 'User-Agent': 'MyApp/1.0' },
redirect: 'manual', // 不自动跟随重定向
})
}