当前位置: 首页 > news >正文

FastAPI进阶开发:ORM

目录

1、ORM 的核心优势

2、主流 ORM 工具对比

3、SQLAlchemy 异步版实战步骤

步骤 1:安装依赖

步骤 2:创建数据库(提前准备)

步骤 3:初始化 FastAPI 项目,创建异步数据库引擎

步骤 4:定义 ORM 模型类(映射数据库表)

步骤 5:创建数据库会话依赖项,注入到路由

步骤 6:ORM 核心操作:增删改查(CRUD)

1. 查询:基础查询、条件查询

2. 新增:创建对象 → 添加到会话 → 提交事务

3. 更新:先查询 → 重新赋值 → 提交事务

4. 删除:先查询 → 删除对象 → 提交事务

4、ORM 操作核心总结

完整代码

建环境(项目初始化)

数据操作(CRUD)


在 Web 开发中,数据库操作是核心环节。原生 SQL 编写繁琐、易出错且存在 SQL 注入风险,而ORM(对象关系映射) 技术可以让我们通过操作 Python 对象的方式与数据库交互,无需直接编写 SQL,大幅提升开发效率。

FastAPI 中推荐使用SQLAlchemy(异步版)作为 ORM 工具,它功能强大、灵活性高,是企业级 Python 项目的首选,本文将基于SQLAlchemy[asyncio] + MySQL讲解异步 ORM 的实战使用。

1、ORM 的核心优势

  • 告别原生 SQL:通过 Python 对象操作数据库,代码更简洁易读
  • 避免 SQL 注入:ORM 自动做参数转义,从根源上防止注入攻击
  • 代码复用性高:减少重复的 SQL 语句,统一数据库操作逻辑
  • 自动管理连接:处理数据库连接、会话和事务,无需手动管理
  • 跨数据库兼容:修改数据库类型时,无需大幅修改业务代码

2、主流 ORM 工具对比

排名

ORM 工具

特点

适应场景

1

SQLAlchemy

功能最强、最灵活,企业级

各类 API、微服务、数据应用

2

Django ORM

封装好、上手快

Django 项目、管理后台

3

Tortoise ORM

全异步设计

高并发异步 Web 服务

3、SQLAlchemy 异步版实战步骤

步骤 1:安装依赖

需要安装异步版 SQLAlchemy 和 MySQL 的异步驱动aiomysql:

# 安装异步SQLAlchemy pip install sqlalchemy[asyncio] -i https://mirrors.aliyun.com/pypi/simple/ # 安装MySQL异步驱动 pip install aiomysql -i https://mirrors.aliyun.com/pypi/simple/

步骤 2:创建数据库(提前准备)

  1. 确保 MySQL 服务已启动;

  1. 连接 MySQL 并创建项目数据库:

按下 Win + R,输入 cmd 回车打开命令提示符

# 假设 MySQL 安装在 C 盘 Program Files 下(根据你的实际路径修改)

cd C:\Program Files\MySQL\MySQL Server 8.0\bin

输入连接命令,回车后输入MySQL 密码(安装时设置的root密码)

输入密码后回车,如下图所示说明连接成功

接下来创建数据库

-- 创建数据库,指定字符集为utf8mb4(支持emoji) CREATE DATABASE IF NOT EXISTS fastapi_orm DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci;

打开pycharm

如下图就是连接成功了

步骤 3:初始化 FastAPI 项目,创建异步数据库引擎

数据库引擎是 SQLAlchemy 与数据库的连接核心,异步版使用create_async_engine创建,同时配置连接池参数(提升数据库访问性能)。

from fastapi import FastAPI from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from contextlib import asynccontextmanager # 数据库配置 DB_URL = "mysql+aiomysql://root:你的密码@localhost:3306/fastapi_orm?charset=utf8mb4" # 全局异步引擎(避免重复创建) async_engine: AsyncEngine | None = None # 全局会话工厂(延迟初始化) AsyncSessionLocal: async_sessionmaker | None = None # 项目生命周期管理器(替代@app.on_event,优化资源释放) @asynccontextmanager async def lifespan(app: FastAPI): global async_engine # 应用启动:初始化异步引擎 async_engine = create_async_engine( DB_URL, echo=True, # 输出SQL日志,开发阶段开启,生产阶段关闭 pool_size=10, # 连接池常驻连接数 max_overflow=20, # 连接池允许的额外连接数 pool_pre_ping=True # 检查连接有效性,避免无效连接 ) print("应用启动,数据库引擎初始化完成") yield # 应用关闭:优雅释放引擎和连接池 if async_engine: await async_engine.dispose() print("应用关闭,数据库连接已释放") # 初始化FastAPI,绑定生命周期 app = FastAPI(lifespan=lifespan)

步骤 4:定义 ORM 模型类(映射数据库表)

ORM 的核心是模型类与数据库表的一一映射,先定义基础模型类(包含所有表的公共字段,如创建时间、更新时间),再定义业务模型类(如书籍表、用户表)。

需要使用DeclarativeBase作为基类,Mapped约定属性类型,mapped_column映射数据库字段。

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy import DateTime, func, String, Float from datetime import datetime # 基础模型类:包含公共的创建时间、更新时间 class Base(DeclarativeBase): # 创建时间:默认当前时间,仅插入时生效 create_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), comment="创建时间") # 更新时间:默认当前时间,更新时自动刷新 update_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间") # 书籍模型类:映射book表 class Book(Base): __tablename__ = "book" # 数据库表名 __table_args__ = {'extend_existing': True} # 允许重复加载模型,避免报错 # 字段映射:主键id、书名、作者、价格、出版社 id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID") name: Mapped[str] = mapped_column(String(255), comment="书名") author: Mapped[str] = mapped_column(String(255), comment="作者") price: Mapped[float] = mapped_column(Float, comment="价格") publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 建表函数:基于模型类创建数据库表 async def create_tables(): if not async_engine: raise RuntimeError("数据库引擎未初始化") # 开启事务,创建所有未存在的表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # 注意:需要在lifespan的应用启动阶段调用建表函数 # 改造lifespan的yield上方代码: # await create_tables() # print("数据库表创建完成")

改造后的lifespan:

@asynccontextmanager async def lifespan(app: FastAPI): global async_engine async_engine = create_async_engine(DB_URL, echo=True, pool_size=10, max_overflow=20, pool_pre_ping=True) await create_tables() # 调用建表函数 print("应用启动,数据库引擎初始化完成,表创建成功") yield if async_engine: await async_engine.dispose() print("应用关闭,数据库连接已释放")

步骤 5:创建数据库会话依赖项,注入到路由

数据库会话(Session)是 ORM 操作数据库的核心对象,通过依赖注入将会话注入到路由中,实现会话的统一管理(创建、使用、关闭、事务回滚)。

from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession from fastapi import Depends # 创建异步会话工厂,绑定数据库引擎 AsyncSessionLocal = async_sessionmaker( bind=async_engine, # 绑定数据库引擎 class_=AsyncSession, # 指定会话类 expire_on_commit=False # 提交后会话对象不过期,避免重复查询 autoflush=True # 开启自动刷新,确保数据及时写入数据库 ) # 定义数据库会话依赖项:路由中通过Depends调用 async def get_db(): if not AsyncSessionLocal: raise RuntimeError("数据库会话工厂未初始化") db = AsyncSessionLocal() try: yield db # 将会话传递给路由 await db.commit() # 无异常则提交事务 except Exception as e: await db.rollback() # 有异常则回滚事务 raise e finally: await db.close() # 无论是否异常,最终关闭会话

改造后的lifespan:

# 项目生命周期管理器(替代@app.on_event,优化资源释放) @asynccontextmanager async def lifespan(app: FastAPI): global async_engine, AsyncSessionLocal # 应用启动:初始化异步引擎 async_engine = create_async_engine( DB_URL, echo=True, # 输出SQL日志,开发阶段开启,生产阶段关闭 pool_size=10, # 连接池常驻连接数 max_overflow=20, # 连接池允许的额外连接数 pool_pre_ping=True # 检查连接有效性,避免无效连接 ) await create_tables() # 调用建表函数 # 引擎初始化完成后,再创建会话工厂 AsyncSessionLocal = async_sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False, autoflush=True # 开启自动刷新,确保数据及时写入数据库 ) print("应用启动,数据库引擎初始化完成,表创建成功") yield # 应用关闭:优雅释放引擎和连接池 if async_engine: await async_engine.dispose() print("应用关闭,数据库连接已释放") # 初始化FastAPI,绑定生命周期 app = FastAPI(lifespan=lifespan)

步骤 6:ORM 核心操作:增删改查(CRUD)

基于上述配置,我们可以通过操作 Python 对象实现数据库的增删改查,无需编写任何原生 SQL,以下是书籍表的完整 CRUD 实战。

前置:定义 Pydantic 模型(请求体校验)

FastAPI 中推荐使用 Pydantic 做请求参数校验,定义与 Book 模型对应的 Pydantic 类,接收前端传入的参数:

from pydantic import BaseModel # 书籍新增/更新的参数校验模型 class BookBase(BaseModel): name: str author: str price: float publisher: str # 支持Pydantic模型解析ORM对象 class BookResponse(BookBase): id: int create_time: datetime update_time: datetime # 查询数据库得到 ORM 对象后,直接返回给前端,无需手动转字典 class Config: orm_mode = True # V1 写法(V2 用 from_attributes = True)

1. 查询:基础查询、条件查询

查询是数据库操作中最常用的场景,SQLAlchemy 提供了丰富的查询语法,核心是select()函数,配合where()/offset()/limit()实现复杂查询。

from fastapi import APIRouter from sqlalchemy import select, func from typing import List # 1. 查询所有书籍(直接使用app定义路由) @app.get("/book/all", response_model=List[BookResponse]) async def get_all_books(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Book)) return result.scalars().all() # 2. 根据ID查询单本书籍(直接使用app定义路由) @app.get("/book/{book_id}", response_model=BookResponse | dict) async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)): book = await db.get(Book, book_id) if not book: return {"code": 404, "message": "书籍不存在"} return book # 3. 条件查询:价格小于指定值 + 作者模糊查询(直接使用app定义路由) @app.get("/book/search/condition", response_model=List[BookResponse]) async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)): # 模糊匹配 like # result = await db.execute(select(Book).where(Book.author.like("曹_"))) # & 逻辑与 ; , 作用同于& ; | 逻辑或 # result = await db.execute(select(Book).where((Book.author.like("曹%"))&(Book.price>100))) # result = await db.execute(select(Book).where((Book.author.like("曹%")),(Book.price>100))) # result = await db.execute(select(Book).where((Book.author.like("曹%"))|(Book.price>100))) # 需求:书籍id列表,数据库里面的id在这个列表中 就返回 # id_list = [1,3,5,7] # result = await db.execute(select(Book).where(Book.id.in_(id_list))) result = await db.execute( select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%")) ) return result.scalars().all()

2. 新增:创建对象 → 添加到会话 → 提交事务

核心步骤:通过 Pydantic 模型创建 ORM 对象 → 用db.add()添加到会话 → 提交事务(依赖项中自动提交)。

# 新增书籍 # 用户输入图书信息(id、书名、作者、价格、出版社) -> 新增 # 用户输入 -> 参数 -> 请求体 @book_router.post("/add", response_model=BookResponse) async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)): # 获取 book 参数 ,创建图书对象(__dict__ 返回 book 对象的属性字典) book_obj = Book(**book.dict()) # 添加到数据库会话 db.add(book_obj) await db.commit() return book_obj

3. 更新:先查询 → 重新赋值 → 提交事务

FastAPI 中 ORM 更新的核心是先查后改:先根据主键查询到对象,再对对象的属性重新赋值,提交事务后自动同步到数据库。

from fastapi import HTTPException # 7. 更新书籍信息 # 先查在改 # 设计思路:路径参数 书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社) @app.put("/book/update/{book_id}", response_model=BookResponse) async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)): db_book = await db.get(Book, book_id) # 未找到抛出异常 if not db_book: raise HTTPException(status_code=404, detail="书籍不存在") # 更新字段(update_time会自动刷新) db_book.name = book.name db_book.author = book.author db_book.price = book.price db_book.publisher = book.publisher # 刷新会话,确保字段更新 await db.flush() await db.refresh(db_book) return db_book

4. 删除:先查询 → 删除对象 → 提交事务

核心步骤:先查询对象 → 用db.delete()删除对象 → 提交事务,完成数据库删除。

# 8. 删除书籍(直接使用app定义路由) @app.delete("/book/delete/{book_id}") async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)): db_book = await db.get(Book, book_id) # 未找到抛出异常 if not db_book: raise HTTPException(status_code=404, detail="书籍不存在") await db.delete(db_book) await db.commit() return {"code": 200, "message": "书籍删除成功"}

4、ORM 操作核心总结

完整代码

from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine, async_sessionmaker, AsyncSession from contextlib import asynccontextmanager from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy import DateTime, func, String, Float, select from datetime import datetime from pydantic import BaseModel from typing import List # 数据库配置 DB_URL = "mysql+aiomysql://root:458362@localhost:3306/fastapi_orm?charset=utf8mb4" # 全局异步引擎和会话工厂 async_engine: AsyncEngine | None = None AsyncSessionLocal: async_sessionmaker | None = None # 基础模型类 class Base(DeclarativeBase): create_time: Mapped[datetime] = mapped_column( DateTime, comment="创建时间", default=datetime.now() ) update_time: Mapped[datetime] = mapped_column( DateTime, comment="更新时间", default=datetime.now(), onupdate=datetime.now() ) # 书籍模型类 class Book(Base): __tablename__ = "book" __table_args__ = {'extend_existing': True} id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, comment="书籍ID") name: Mapped[str] = mapped_column(String(255), comment="书名") author: Mapped[str] = mapped_column(String(255), comment="作者") price: Mapped[float] = mapped_column(Float, comment="价格") publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 建表函数 async def create_tables(): if not async_engine: raise RuntimeError("数据库引擎未初始化") async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # 项目生命周期管理器 @asynccontextmanager async def lifespan(app: FastAPI): global async_engine, AsyncSessionLocal async_engine = create_async_engine( DB_URL, echo=True, pool_size=10, max_overflow=20, pool_pre_ping=True ) await create_tables() AsyncSessionLocal = async_sessionmaker( bind=async_engine, class_=AsyncSession, expire_on_commit=False, autoflush=True ) print("应用启动,数据库引擎初始化完成,表创建成功") yield if async_engine: await async_engine.dispose() print("应用关闭,数据库连接已释放") # 初始化FastAPI app = FastAPI(lifespan=lifespan) # 数据库会话依赖项 async def get_db(): if not AsyncSessionLocal: raise RuntimeError("数据库会话工厂未初始化") db = AsyncSessionLocal() try: yield db await db.commit() except Exception as e: await db.rollback() raise e finally: await db.close() # Pydantic模型 class BookBase(BaseModel): name: str author: str price: float publisher: str class BookResponse(BookBase): id: int create_time: datetime update_time: datetime class Config: from_attributes = True orm_mode = True # 根路由 @app.get("/") async def root(): return {"message": "FastAPI ORM项目运行成功", "docs_url": "/docs"} # 查询所有书籍 @app.get("/book/all", response_model=List[BookResponse]) async def get_all_books(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Book)) return result.scalars().all() # 根据ID查询单本书籍 @app.get("/book/{book_id}", response_model=BookResponse | dict) async def get_book_by_id(book_id: int, db: AsyncSession = Depends(get_db)): book = await db.get(Book, book_id) if not book: return {"code": 404, "message": "书籍不存在"} return book # 条件查询:价格小于指定值 + 作者模糊查询 @app.get("/book/search/condition", response_model=List[BookResponse]) async def search_book(price: float, author: str, db: AsyncSession = Depends(get_db)): result = await db.execute( select(Book).where(Book.price < price).where(Book.author.like(f"%{author}%")) ) return result.scalars().all() # 新增书籍 @app.post("/book/add", response_model=BookResponse) async def add_book(book: BookBase, db: AsyncSession = Depends(get_db)): book_obj = Book(**book.__dict__) db.add(book_obj) await db.commit() return book_obj # 更新书籍信息 @app.put("/book/update/{book_id}", response_model=BookResponse) async def update_book(book_id: int, book: BookBase, db: AsyncSession = Depends(get_db)): db_book = await db.get(Book, book_id) if not db_book: raise HTTPException(status_code=404, detail="书籍不存在") db_book.name = book.name db_book.author = book.author db_book.price = book.price db_book.publisher = book.publisher await db.flush() await db.refresh(db_book) return db_book # 删除书籍 @app.delete("/book/delete/{book_id}") async def delete_book(book_id: int, db: AsyncSession = Depends(get_db)): db_book = await db.get(Book, book_id) if not db_book: raise HTTPException(status_code=404, detail="书籍不存在") await db.delete(db_book) await db.commit() return {"code": 200, "message": "书籍删除成功"}

SQLAlchemy 异步版操作数据库的核心流程可概括为3 步建环境,4 步做操作:

建环境(项目初始化)

  1. 安装依赖:sqlalchemy[asyncio] + 对应数据库的异步驱动;
  2. 创建异步引擎:create_async_engine,配置连接池和日志;
  3. 定义模型类:继承DeclarativeBase,实现表与对象的映射;

数据操作(CRUD)

  1. 创建会话工厂:async_sessionmaker,绑定引擎;
  2. 定义会话依赖项:get_db(),统一管理会话的创建和释放;
  3. 路由中注入会话:db: AsyncSession = Depends(get_db);
  4. 操作对象:通过select()/add()/delete() + 对象属性赋值实现 CRUD。
http://www.jsqmd.com/news/450741/

相关文章:

  • Ostrakon-VL-8B镜像免配置:start.sh一键拉起Gradio服务,省去环境踩坑
  • MT5 Zero-Shot中文增强镜像实操手册:从安装到批量生成全流程
  • [ARM原生加速]:M1/M2开发者的Android模拟器性能优化指南
  • 用Obsidian-Git构建知识安全网:从数据防护到协作管理的完整指南
  • DCT-Net人像卡通化效果提升:输入图像分辨率与输出质量关系
  • GLM-OCR模型Typora伴侣工具开发:自动识别图片并插入Markdown
  • RMBG-2.0GPU算力优化:梯度检查点+内存映射减少峰值显存
  • 7天精通REINVENT4:AI驱动分子设计全流程指南
  • 通义千问3-Reranker-0.6B效果惊艳展示:中英文混合查询下Top-1准确率实录
  • AIGlasses_for_navigation高清展示:盲道与人行横道交界处像素级分割边界
  • 3步永久保存QQ空间回忆:GetQzonehistory数据备份工具全解析
  • 从手写代码到日提 30 个 PR:Claude Code 缔造者的 AI 编程启示录
  • 加密MCP保险库:人工智能系统中安全凭证管理的关键
  • 如何借助ChanlunX实现缠论技术分析的可视化与实战应用
  • 南北阁Nanbeige 4.1-3B代码生成效果:Java面试算法题一键解答
  • Flutter 三方库 enough_icalendar 的鸿蒙化适配指南 - 掌控日历日程资产、RFC-5545 治理实战、鸿蒙级精密时轴专家
  • AI辅助开发:让快马AI设计一个高可扩展的openclaw爬虫框架架构
  • 3个步骤构建个人知识管理中心:本地化工具让学习资源永久掌控
  • SmolVLA生产环境部署:Nginx反向代理+7860端口安全访问配置指南
  • 5分钟搞定WhisperLiveKit本地部署:实时语音转文字+说话人识别全流程
  • 手把手教你用Cartographer给MickX4小车实现室外3D建图(附避坑指南)
  • 基于影刀RPA构建智能客服回复系统的技术实践与性能优化
  • DAMOYOLO-S快速上手:Postman调试API接口与返回字段完整性校验
  • 开源图像分割模型 RMBG-1.4 部署案例:免配置镜像实测
  • MediaPipeUnityPlugin实战指南:面部追踪与手势识别技术解析
  • ERNIE-4.5-0.3B-PT效果展示:生成符合ISO/IEC 27001标准的信息安全报告框架
  • 提升效率:用快马AI自动生成222yn页面升级访问优化脚本
  • 如何实现PDF智能转换?揭秘PDF Craft的高效解决方案
  • REINVENT4分子设计实战指南:从入门到进阶的AI药物发现之旅
  • ChatTTS模型自训练实战:从零构建个性化语音合成系统