
Python 异步模式与 FastAPI
FastAPI 基于 Python 的 async/await 构建,支持高性能 API。本指南涵盖基本 CRUD 之外的进阶异步模式。
后台任务
from fastapi import FastAPI, BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
app = FastAPI()
# 简单后台任务
def send_email_sync(email: str, message: str):
# 此函数在响应发送后运行
mail_client.send(email, message)
@app.post("/register")
async def register(user: UserCreate, background_tasks: BackgroundTasks):
db_user = await create_user(user)
# 添加到后台队列(非阻塞)
background_tasks.add_task(send_welcome_email, db_user.email)
background_tasks.add_task(update_analytics, "user_registered")
return {"id": db_user.id, "status": "created"}
# 对于繁重任务,使用 Celery 或 arq
import arq
async def send_welcome_email(ctx, email: str):
"""在 arq worker 中运行"""
await email_service.send_welcome(email)
class WorkerSettings:
functions = [send_welcome_email]
redis_settings = arq.connections.RedisSettings(host='redis')
@app.post("/register-heavy")
async def register_heavy(user: UserCreate):
db_user = await create_user(user)
redis = await arq.create_pool(arq.connections.RedisSettings())
await redis.enqueue_job('send_welcome_email', db_user.email)
return {"id": db_user.id}
WebSocket 连接
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Set
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
if room not in self.active_connections:
self.active_connections[room] = set()
self.active_connections[room].add(websocket)
async def disconnect(self, websocket: WebSocket, room: str):
self.active_connections[room].discard(websocket)
async def broadcast(self, message: dict, room: str):
if room not in self.active_connections:
return
dead = set()
for ws in self.active_connections[room]:
try:
await ws.send_json(message)
except Exception:
dead.add(ws)
self.active_connections[room] -= dead
manager = ConnectionManager()
@app.websocket("/ws/{room}/{user_id}")
async def websocket_endpoint(
websocket: WebSocket,
room: str,
user_id: str,
):
await manager.connect(websocket, room)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast({
"from": user_id,
"message": data["message"],
"timestamp": datetime.utcnow().isoformat(),
}, room)
except WebSocketDisconnect:
await manager.disconnect(websocket, room)
await manager.broadcast({"system": f"{user_id} left"}, room)
服务器发送事件 (SSE)
from fastapi.responses import StreamingResponse
import asyncio
@app.get("/events/{user_id}")
async def sse_events(user_id: str, request: Request):
async def event_generator():
try:
async for event in event_stream(user_id):
if await request.is_disconnected():
break
yield f"data: {json.dumps(event)}\n\n"
await asyncio.sleep(0) # 让出控制权
except asyncio.CancelledError:
pass
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 nginx 缓冲
},
)
# 流式 LLM 响应
@app.post("/chat/stream")
async def chat_stream(message: ChatMessage):
async def generate():
async with openai.AsyncOpenAI() as client:
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message.content}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield f"data: {chunk.choices[0].delta.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
使用 SQLAlchemy 的异步数据库
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(User).where(User.id == user_id).options(
selectinload(User.orders) # 预先加载关联
)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return user
# 并发数据库查询
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int, db: AsyncSession = Depends(get_db)):
# 并发运行查询
user_task = asyncio.create_task(get_user_by_id(db, user_id))
stats_task = asyncio.create_task(get_user_stats(db, user_id))
notifications_task = asyncio.create_task(get_notifications(db, user_id))
user, stats, notifications = await asyncio.gather(
user_task, stats_task, notifications_task
)
return {"user": user, "stats": stats, "notifications": notifications}
中间件和生命周期
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动
app.state.redis = await aioredis.create_redis_pool("redis://localhost")
app.state.http_client = httpx.AsyncClient()
print("App started")
yield
# 关闭
app.state.redis.close()
await app.state.redis.wait_closed()
await app.state.http_client.aclose()
print("App shutdown")
app = FastAPI(lifespan=lifespan)
# 请求计时中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
总结
FastAPI 异步模式:
- 使用
BackgroundTasks处理轻量级响应后工作 - 使用 Celery/arq 处理 CPU 密集型或长时间运行的任务
- WebSocket
ConnectionManager实现基于房间的广播 - SSE 实现服务器到客户端的单向流式传输(LLM 响应)
asyncio.gather实现并发数据库查询- 生命周期上下文管理器管理启动/关闭资源