FastAPI + SQLAlchemy 2.0 异步
异步会话 + FastAPI 的并发请求处理 = 最大吞吐量。

数据库设置
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db", pool_size=20)
SessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase): pass
模型(SQLAlchemy 2.0 风格)
from sqlalchemy.orm import Mapped, mapped_column, relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
is_active: Mapped[bool] = mapped_column(default=True)
posts: Mapped[list["Post"]] = relationship(back_populates="author", lazy="selectin")
依赖注入
async def get_db():
async with SessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
仓库模式
from sqlalchemy import select
class UserRepo:
def __init__(self, db: AsyncSession): self.db = db
async def get_by_id(self, uid: int):
r = await self.db.execute(select(User).where(User.id == uid))
return r.scalar_one_or_none()
async def create(self, email: str, pw: str):
u = User(email=email, hashed_password=pw)
self.db.add(u)
await self.db.flush()
await self.db.refresh(u)
return u
-> 使用 JSON Viewer 验证模式。