当前位置: 首页 > news >正文

FastAPI + SQLAlchemy 2.0 通用CRUD操作手册 —— 从同步到异步,一次讲透

你是不是也遇到过这样的场景——FastAPI 项目一开始用同步写得好好的,接口响应压测也漂亮,后来心血来潮想全面拥抱异步,结果 Session 报错、延迟加载直接炸了,甚至连一个简单的 refresh 都能把你卡半天?

今天这篇,就是想跟你好好聊聊,在 FastAPI 里用 SQLAlchemy 做 CRUD,同步和异步到底怎么玩、怎么选、怎么避开那些让人头秃的深坑。🎯

📌 本文能帮你解决什么

• 彻底搞懂 SQLAlchemy 2.0 的核心变化,告别旧式 Column 写法

• 搭建同步/异步双引擎,掌握连接池的最佳配置

• 亲手写出通用 CRUD,从单条插入到复杂关联查询全涵盖

• 给出“同步还是异步”的务实答案,让你不再盲目跟风

• 奉上我踩过的坑和排查清单,出了问题可以直接照单抓药

🧭 内容主要脉络

🔹 问题与背景:异步真的更快吗?
🔹 核心原理:从模型定义到引擎配置
🔹 实战 CRUD:增删改查 + 高级查询
🔹 事务与异常:显式边界 + FastAPI 依赖注入
🔹 注意事项:同步异步共存策略 & 常见翻车现场

🐢 第一部分:问题与背景——你其实不用那么焦虑

先说一个反直觉的结论:异步不会自动让你的接口变快

如果你的接口只是查一下数据库然后返回,计算量很小,那同步和异步的性能差距微乎其微。

真正让异步闪光的是高并发 I/O 密集场景——比如你的服务要同时请求多个外部 API、读写大量 WebSocket,这时候异步才能把事件循环的优势发挥出来。

很多团队一上来就非异步不可,结果发现 SQLAlchemy 1.x 时代异步支持根本是半残废,各种 hack,维护得想哭。2.0 之后异步终于能打了,但也有一堆前置条件。

所以,简单项目、低并发、团队对异步不熟,同步完全够用。如果你正准备起一个新项目,而且能保证全链路异步(FastAPI + async DB driver + 异步任务队列),那上异步很香。否则,别给自己找麻烦。

⚙️ 第二部分:核心原理与基础配置

SQLAlchemy 2.0 把声明式映射彻底革新了。现在模型定义长这样:

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass Base(DeclarativeBase):passclass User(Base):__tablename__ = "users"id: Mapped[int] = mapped_column(primary_key=True)name: Mapped[str] = mapped_column(String(50))version: Mapped[int] = mapped_column(default=1)  # 乐观锁版本号

老式的 Column() 也能用,但我强烈建议你全部换上 Mapped + mapped_column ,类型提示更清晰,IDE 提示也舒服多了。

接下来是引擎配置,同步异步的差别主要在这:

# 同步引擎
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmakersync_engine = create_engine("postgresql://user:pass@localhost/db",pool_size=20,max_overflow=10,pool_pre_ping=True,
)
SyncSessionLocal = sessionmaker(bind=sync_engine)# 异步引擎
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSessionasync_engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db",pool_size=20,max_overflow=10,pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)

你可能会问:为毛异步 URL 要变成 postgresql+asyncpg ?因为 asyncpg 是真正的异步驱动,psycopg2 是同步的,混用必定翻车。MySQL 用户请选 aiomysql

连接池参数 pool_sizemax_overflow 在两种模式下意义一样,记得加上 pool_pre_ping=True ,防止连接被数据库回收后应用还傻傻复用。

🛠️ 第三部分:全程实战——CRUD 一把梭

下面我会同步异步对照着写,你会发现核心逻辑几乎一样,区别只在于 async/await 关键字和会话对象

✅ CREATE 操作

# 同步单条插入
def create_user_sync(session, name: str):user = User(name=name)session.add(user)session.commit()session.refresh(user)  # 获取数据库生成字段return user# 异步单条插入
async def create_user_async(session: AsyncSession, name: str):user = User(name=name)session.add(user)await session.commit()await session.refresh(user)  # 千万别忘 awaitreturn user

批量插入时, add_all 很直观,但对于大数据量,我更喜欢 bulk_insert_mappings ,速度快得多。
但注意:异步下 bulk 操作不支持自动刷新和关系绑定,用完记得手动 commit。

🛎️ async with session.begin() 事务管理

# 异步批量插入最佳实践:使用 async with session.begin() 
# 更新和删除同理,写操作一律用 begin() 管事务,读操作直接用裸 session(因为读一般不需要事务边界)
async def bulk_create_users(session: AsyncSession, names: list[str]):async with session.begin():session.add_all([User(name=n) for n in names])# 事务已提交,无需显式 commit# 当然,同步时也可使用 with session.begin() 事务自动提交,user 已持有数据库生成字段

✅ READ 操作

主键查询用 session.get() ,简单粗暴。高级查询统一用 select() 构造,再也没 query() 什么事了。

from sqlalchemy import select, and_, or_, func
from sqlalchemy.orm import selectinload# 异步条件查询 + 排序 + 分页
async def search_users(session: AsyncSession, keyword: str, page: int, size: int):stmt = (select(User).where(User.name.ilike(f"%{keyword}%")).order_by(User.id.desc()).limit(size).offset((page - 1) * size))result = await session.execute(stmt)return result.scalars().all()

关联查询时,强烈建议显式预加载,用 selectinloadjoinedload 把 N+1 问题扼杀在摇篮里。
异步下尤其注意,懒加载属性一访问就会抛出 MissingGreenlet 错误,血的教训啊。

stmt = select(User).options(selectinload(User.posts)).where(User.id == uid)

聚合查询用 func

stmt = select(func.count(User.id)).where(User.name.like("%小%"))
total = (await session.execute(stmt)).scalar()

🧩 再来一个复杂综合案例:搜索用户及其文章统计

需求:搜用户名或邮箱包含关键词的活跃用户,带出他们发表的文章数量,按文章数降序,分页。

async def search_users_with_post_count(session: AsyncSession,keyword: str,page: int = 1,size: int = 10,
):# 子查询:每篇文章所属作者post_count_sub = (select(Post.author_id, func.count(Post.id).label("post_count")).group_by(Post.author_id).subquery())# 主查询:用户 left join 文章统计stmt = (select(User, func.coalesce(post_count_sub.c.post_count, 0).label("post_count")).outerjoin(post_count_sub, User.id == post_count_sub.c.author_id).where(and_(User.is_active == True,or_(User.username.ilike(f"%{keyword}%"),User.email.ilike(f"%{keyword}%"),),)).order_by(desc("post_count"))   # 按别名排.limit(size).offset((page - 1) * size))result = await session.execute(stmt)rows = result.all()return [{"user": row.User, "post_count": row.post_count}for row in rows]

这个案例一口气用上了 子查询、outerjoin、and_/or_、ilike、coalesce、别名排序、分页,基本上日常查询的"天花板难度"也就这样了。你把它拆开看,每一块都是前面讲过的基础零件拼起来的。

✅ UPDATE 操作

方式一:查出对象,改属性,commit。直观,适合小更新。

方式二:批量更新 update() ,直接发 SQL,不经过 ORM 对象。适合批量改状态。

from sqlalchemy import updateasync def deactivate_users(session: AsyncSession, user_ids: list[int]):async with session.begin():stmt = update(User).where(User.id.in_(user_ids)).values(active=False)result = await session.execute(stmt)return result.rowcount

再说个容易被忽略的:乐观锁更新。利用版本号字段 version ,修改时带上版本条件,防止并发覆盖。

stmt = (update(User).where(User.id == uid, User.version == old_version).values(name=new_name, version=old_version + 1)
)
result = await session.execute(stmt)
if result.rowcount == 0:raise Exception("数据已被修改,请重试")

✅ DELETE 操作

from sqlalchemy import deleteasync def remove_user(session: AsyncSession, uid: int):async with session.begin():stmt = delete(User).where(User.id == uid)await session.execute(stmt)

🔐 第四部分:事务与异常——别把连接池打爆了

在 FastAPI 中,我习惯用依赖注入管理会话生命周期,这样每个请求都会自动获取、自动关闭。

async def get_db():async with AsyncSessionLocal() as session:try:yield sessionawait session.commit()except Exception:await session.rollback()raisefinally:await session.close()

关键一条:显式事务边界。哪怕你用了依赖注入,也建议在业务逻辑里用 async with session.begin() 包裹写操作,这样出错了自动回滚,绝对不会出现连接未释放的惨案。

有时候你需要设置事务隔离级别,特别是报表类查询,加 with_for_update() 或修改 session 连接参数就行,异步下完全一致。

🔄 第五部分:同步异步共存——俩种方案,一个原则

项目大了,难免有遗留模块还在同步,新的又想用异步。怎么搞?

如果在异步路由里调用同步数据库方法,可以用 asyncio.to_thread 把同步操作扔进线程池,避免阻塞事件循环。

import asyncioasync def async_endpoint():user = await asyncio.to_thread(sync_get_user, user_id=1)return user

反过来,在同步路由里调异步方法,千万别用 asyncio.run() ,它会新建事件循环,和 FastAPI 当前循环冲突,结果就是玄学报错。要么老实用同步,要么把整个路由改成 async。

归根结底,我的建议是:新项目从一开始就全异步栈——FastAPI + SQLAlchemy 2.0 async + asyncpg/aiomysql。即使现在并发不高,也能免去未来重构的痛苦。

💣 第六部分:常见翻车现场 & 排查指南

1. 异步 refresh 前没 await

现象:获取不到数据库默认值或自增 ID。
原因: session.refresh() 是协程,必须 await

2. MissingGreenlet 错误

现象:异步会话访问懒加载属性直接炸。
解决:把所有要用的关联全部 selectinloadjoinedload 。别偷懒。

3. 连接池耗尽

现象:突然所有请求卡住,日志显示“等待连接超时”。
大概率是事务没关闭,连接一直占着。马上检查是否有 commit/rollback 缺失。

4. 同步异步引擎混用

现象:在同一个模块里不小心把异步 session 传给了同步方法。
对策:严格分层,文件名带 _sync_async 后缀,写清楚。

🏁 总结:别让选择本身变成负担

说到底,CRUD 就是那几个固定动作,2.0 的 API 也帮我们抹平了很多心智负担。同步还是异步,真正要看的只有两件事:你的并发模型团队的肌肉记忆。选定一套栈,写清楚规范,然后沉下心写业务就好。

老规矩,如果这篇文章帮你少踩了一个坑,或者帮你下定了用同步/异步的决心,点个赞加关注不过分吧?☺️ 转发给那个还在纠结的同事,说不定会请你喝奶茶呢。🙌

http://www.jsqmd.com/news/693817/

相关文章:

  • Weka中CSV数据加载的完整指南与实战技巧
  • 终极指南:如何在foobar2000中安装和配置OpenLyrics歌词插件
  • 2026全球扭矩传感器十大品牌权威发布:广东犸力登顶,国产精密测量实现历史性突破 - 速递信息
  • PyCharm 下载安装教程,免激活码下载安装和使用教程
  • 2026年塑料管帽/塑料托盘/中空板箱子/塑料周转箱/法兰保护盖厂家怎么选? - 深度智识库
  • 外贸逆势大涨?全球每卖10台3D打印机,9台来自深圳|华南3d打印展 TCT深圳展
  • 上海乐时宜实业:崇明H型钢批发公司电话推荐 - LYL仔仔
  • 如何解决小龙虾 OpenClaw 上下文或session的token超限的问题
  • STM32CubeMX + VL53L5CX:手把手教你配置长距离ToF测距(避坑LPn/INT引脚)
  • 成都创意广告机构推荐与优势分析
  • Jetson Xavier NX功耗与性能的平衡术:DVFS动态调频详解与jetson_clocks使用指南
  • 哪家少儿编程机构最靠谱?2026 年五大机构深度测评与选择指南 - 速递信息
  • 5分钟精通:ES-Client Elasticsearch客户端的完整使用手册
  • Conda换源后还是安装失败?试试这个‘组合拳’:官方源+国内源+conda-forge的混合配置指南
  • 给iOS开发新手的礼物:5分钟在Windows虚拟机里搭好Xcode测试环境(macOS Catalina版)
  • 资深采购分享:串口屏选型与项目落地经验谈 - 浴缸里的巡洋舰
  • 国产AI音乐工具中文效果实测对比:哪款适配最优
  • Ⅱ–Ⅵ族多壳结构量子点分类:以CdSe/CdS/ZnS QDs为例
  • 2026年微信小程序开发工具哪个服务好? - FaiscoJeff
  • 用2美元的Attiny85芯片,DIY一个能自动填表的USB小键盘(附完整代码)
  • 告别混乱共享!用群晖DSM的SMB协议精细控制文件夹访问权限(附网络邻居隐藏技巧)
  • 杰理之触摸互斥配置项【篇】
  • 2026熙琦科技专业提供便携迷你机贴牌加工全流程定制服务 - 热敏感科技蜂
  • 按摩椅品牌排名 艾力斯特、荣泰、奥佳华第一梯队品牌对比分析 - 速递信息
  • Python 项目创建+依赖管理+版本控制
  • 2026年西藏装配式建筑与拉萨轻质混凝土墙板全景指南:官方渠道、品牌深度横评与高原建筑避坑秘籍 - 企业名录优选推荐
  • Linux 中-nan 字符串的匹配
  • 2026年苏州古风写真机构权威发布榜:写真/个性写真/旗袍写真/园林写真/国风写真 - 品牌策略师
  • 若依(RuoYi)整合异构数据库:基于MyBatis-Plus与Dynamic-Datasource的多源实战
  • 温州广成地坪:瑞安环氧平涂施工推荐几家 - LYL仔仔