正在加载,请稍候…

FastAPI 生产环境实战:认证、缓存、后台任务与 Kubernetes 部署

使用 FastAPI 构建生产级 REST API,涵盖 JWT 认证、Redis 缓存、Celery 后台任务、数据库连接池、中间件及生产部署。

FastAPI 生产环境实战:认证、缓存、后台任务与 Kubernetes 部署

FastAPI 将 Python 类型提示、自动验证和异步支持结合在一起,成为构建正确 API 的最快路径。

使用 pydantic-settings 进行配置

from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache

class Settings(BaseSettings):
    debug: bool = False
    database_url: str
    redis_url: str = "redis://localhost:6379"
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    model_config = SettingsConfigDict(env_file='.env')

@lru_cache
def get_settings() -> Settings:
    return Settings()

FastAPI 生产环境实战:认证、缓存、后台任务与 Kubernetes 部署 插图

JWT 认证

from datetime import datetime, timedelta
from jose import JWTError, jwt
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

bearer = HTTPBearer()

def create_access_token(sub: str) -> str:
    payload = {
        "sub": sub,
        "exp": datetime.utcnow() + timedelta(minutes=get_settings().access_token_expire_minutes),
        "type": "access"
    }
    return jwt.encode(payload, get_settings().secret_key, algorithm=get_settings().algorithm)

async def get_current_user(
    creds: HTTPAuthorizationCredentials = Depends(bearer),
    db = Depends(get_db)
) -> User:
    try:
        payload = jwt.decode(creds.credentials, get_settings().secret_key,
                             algorithms=[get_settings().algorithm])
        if payload.get("type") != "access":
            raise HTTPException(status_code=401)
        user = await db.get(User, int(payload["sub"]))
        if not user or not user.is_active:
            raise HTTPException(status_code=401)
        return user
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

FastAPI 生产环境实战:认证、缓存、后台任务与 Kubernetes 部署 插图

Redis 缓存装饰器

import redis.asyncio as aioredis
import json
from functools import wraps

cache = aioredis.from_url(get_settings().redis_url, decode_responses=True)

def cached(key_prefix: str, ttl: int = 300):
    def decorator(fn):
        @wraps(fn)
        async def wrapper(*args, **kwargs):
            key = f"{key_prefix}:{':'.join(str(a) for a in args)}"
            hit = await cache.get(key)
            if hit:
                return json.loads(hit)
            result = await fn(*args, **kwargs)
            await cache.setex(key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

FastAPI 生产环境实战:认证、缓存、后台任务与 Kubernetes 部署 插图

使用 Celery 处理后台任务

from celery import Celery

celery_app = Celery('tasks', broker=get_settings().redis_url)

@celery_app.task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int, email: str):
    try:
        mailer.send(to=email, template='welcome')
    except Exception as e:
        self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

@router.post("/users/", status_code=201)
async def create_user(data: UserCreate, bg: BackgroundTasks, db=Depends(get_db)):
    user = await user_service.create(db, data)
    bg.add_task(send_welcome_email.delay, user.id, user.email)
    return UserResponse.from_orm(user)

请求日志中间件

import time, uuid, structlog
from starlette.middleware.base import BaseHTTPMiddleware

log = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        req_id = str(uuid.uuid4())
        start = time.perf_counter()
        resp = await call_next(request)
        log.info("request", id=req_id, method=request.method,
                 path=request.url.path, status=resp.status_code,
                 ms=round((time.perf_counter() - start) * 1000, 2))
        resp.headers["X-Request-ID"] = req_id
        return resp

FastAPI 将验证、业务逻辑和序列化整合为一个连贯的系统。

→ 使用 JWT Parser 工具测试你的 API 的 JWT 令牌。