正在加载,请稍候…

Python 异步模式与 FastAPI:后台任务、WebSocket 和流式传输

使用 FastAPI 构建高性能异步 Python API,涵盖后台任务、WebSocket 连接、服务器发送事件、异步数据库查询和任务队列。

Python 异步模式与 FastAPI:后台任务、WebSocket 和流式传输

Python 异步模式与 FastAPI

FastAPI 基于 Python 的 async/await 构建,支持高性能 API。本指南涵盖基本 CRUD 之外的进阶异步模式。

后台任务

from fastapi import FastAPI, BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig

app = FastAPI()

# 简单后台任务
def send_email_sync(email: str, message: str):
    # 此函数在响应发送后运行
    mail_client.send(email, message)

@app.post("/register")
async def register(user: UserCreate, background_tasks: BackgroundTasks):
    db_user = await create_user(user)
    
    # 添加到后台队列(非阻塞)
    background_tasks.add_task(send_welcome_email, db_user.email)
    background_tasks.add_task(update_analytics, "user_registered")
    
    return {"id": db_user.id, "status": "created"}

# 对于繁重任务,使用 Celery 或 arq
import arq

async def send_welcome_email(ctx, email: str):
    """在 arq worker 中运行"""
    await email_service.send_welcome(email)

class WorkerSettings:
    functions = [send_welcome_email]
    redis_settings = arq.connections.RedisSettings(host='redis')

@app.post("/register-heavy")
async def register_heavy(user: UserCreate):
    db_user = await create_user(user)
    
    redis = await arq.create_pool(arq.connections.RedisSettings())
    await redis.enqueue_job('send_welcome_email', db_user.email)
    
    return {"id": db_user.id}

Python 异步模式与 FastAPI:后台任务、WebSocket 和流式传输 插图

WebSocket 连接

from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}
    
    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = set()
        self.active_connections[room].add(websocket)
    
    async def disconnect(self, websocket: WebSocket, room: str):
        self.active_connections[room].discard(websocket)
    
    async def broadcast(self, message: dict, room: str):
        if room not in self.active_connections:
            return
        dead = set()
        for ws in self.active_connections[room]:
            try:
                await ws.send_json(message)
            except Exception:
                dead.add(ws)
        self.active_connections[room] -= dead

manager = ConnectionManager()

@app.websocket("/ws/{room}/{user_id}")
async def websocket_endpoint(
    websocket: WebSocket,
    room: str,
    user_id: str,
):
    await manager.connect(websocket, room)
    try:
        while True:
            data = await websocket.receive_json()
            await manager.broadcast({
                "from": user_id,
                "message": data["message"],
                "timestamp": datetime.utcnow().isoformat(),
            }, room)
    except WebSocketDisconnect:
        await manager.disconnect(websocket, room)
        await manager.broadcast({"system": f"{user_id} left"}, room)

Python 异步模式与 FastAPI:后台任务、WebSocket 和流式传输 插图

服务器发送事件 (SSE)

from fastapi.responses import StreamingResponse
import asyncio

@app.get("/events/{user_id}")
async def sse_events(user_id: str, request: Request):
    async def event_generator():
        try:
            async for event in event_stream(user_id):
                if await request.is_disconnected():
                    break
                yield f"data: {json.dumps(event)}\n\n"
                await asyncio.sleep(0)  # 让出控制权
        except asyncio.CancelledError:
            pass

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 禁用 nginx 缓冲
        },
    )

# 流式 LLM 响应
@app.post("/chat/stream")
async def chat_stream(message: ChatMessage):
    async def generate():
        async with openai.AsyncOpenAI() as client:
            stream = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": message.content}],
                stream=True,
            )
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield f"data: {chunk.choices[0].delta.content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Python 异步模式与 FastAPI:后台任务、WebSocket 和流式传输 插图

使用 SQLAlchemy 的异步数据库

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(
        select(User).where(User.id == user_id).options(
            selectinload(User.orders)  # 预先加载关联
        )
    )
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(404, "User not found")
    return user

# 并发数据库查询
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int, db: AsyncSession = Depends(get_db)):
    # 并发运行查询
    user_task = asyncio.create_task(get_user_by_id(db, user_id))
    stats_task = asyncio.create_task(get_user_stats(db, user_id))
    notifications_task = asyncio.create_task(get_notifications(db, user_id))
    
    user, stats, notifications = await asyncio.gather(
        user_task, stats_task, notifications_task
    )
    
    return {"user": user, "stats": stats, "notifications": notifications}

中间件和生命周期

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动
    app.state.redis = await aioredis.create_redis_pool("redis://localhost")
    app.state.http_client = httpx.AsyncClient()
    print("App started")
    
    yield
    
    # 关闭
    app.state.redis.close()
    await app.state.redis.wait_closed()
    await app.state.http_client.aclose()
    print("App shutdown")

app = FastAPI(lifespan=lifespan)

# 请求计时中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

总结

FastAPI 异步模式:

  • 使用 BackgroundTasks 处理轻量级响应后工作
  • 使用 Celery/arq 处理 CPU 密集型或长时间运行的任务
  • WebSocket ConnectionManager 实现基于房间的广播
  • SSE 实现服务器到客户端的单向流式传输(LLM 响应)
  • asyncio.gather 实现并发数据库查询
  • 生命周期上下文管理器管理启动/关闭资源