当前位置: 首页 > news >正文

FastAPI学习笔记:二、ORM

ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。

优势:

  • 减少重复的 SQL 代码

  • 代码更简洁易读

  • 自动处理数据库连接和事务

  • 自动防止 SQL 注入攻击

ORM 分类表

排名ORM 工具特点适应场景
1SQLAlchemy ORM功能最强、最灵活、企业级各类 API、微服务、数据应用
2Django ORM封装好、上手快Django 项目、管理后台
3Tortoise ORM全异步异步 Web 服务、高并发 API

1、ORM 使用流程

安装依赖

打开终端输入命令:

pip install sqlalchemy[asyncio] aiomysql
  • sqlalchemy[asyncio]:SQLAlchemy 的异步支持版本,适配 FastAPI 异步场景

  • aiomysql:MySQL 的异步数据库驱动,和异步 ORM 配套使用


1. 创建异步引擎、建库、建表

创建异步引擎

from sqlalchemy.ext.asyncio import create_async_engine ​ ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastAPI_first?charset=utf8" ​ # 创建异步引擎 async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选:输出SQL日志 pool_size=10, # 设置连接池中保持的持久连接数 max_overflow=20 # 设置连接池允许创建的额外连接数 )

(1)create_async_engine是什么?

它是 SQLAlchemy 提供的异步数据库引擎创建方法,专门用于 FastAPI 这类异步 Web 项目,配合aiomysql驱动实现异步数据库操作。

(2)数据库 URL 解析

mysql+aiomysql://root:123456@localhost
  • mysql+aiomysql:指定使用 MySQL 数据库 + 异步aiomysql驱动

  • root:123456:数据库账号密码

  • localhost:3306:数据库地址和端口

  • fastAPI_first:目标数据库名

  • charset=utf8:指定字符集

(3)参数

参数作用场景
echo=True控制台打印所有执行的 SQL 语句开发调试用,生产环境建议关闭
pool_size=10连接池保持的持久连接数高并发场景建议根据业务量调大
max_overflow=20连接池允许的临时额外连接数峰值流量时可临时扩容,用完自动回收

2. 建库建表

首先自己在数据库手动创建一个名为fastAPI_first的库

接着:


from datetime import datetime from fastapi import FastAPI from sqlalchemy import func, Float, DateTime, String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column ​ from sqlalchemy.ext.asyncio import create_async_engine ​ ASYNC_DATABASE_URL = "mysql+aiomysql://root:1234@localhost:3306/fastAPI_first?charset=utf8" #记得更换为自己的数据库账号密码 ​ app= FastAPI() ​ # 1、 创建异步引擎 async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选:输出SQL日志 pool_size=10, # 设置连接池中保持的持久连接数 ) ​ ​ # ---------------------------------------------------------------------- ​ # 2. 定义模型类: 基类 + 表对应的模型类 # 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社 ​ class Base(DeclarativeBase): create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, comment="创建时间") update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, onupdate=func.now()) # 定义公共基类 Base # 定义了所有表的公共字段:create_time(创建时间)、update_time(更新时间) # 配置了时间自动管理: # insert_default=func.now():插入数据时自动填充当前时间 # onupdate=func.now():数据更新时自动刷新时间 # 其他表模型继承 Base 后,会自动带上这两个字段,避免重复代码 ​ ​ class Book(Base): __tablename__ = "book" ​ id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") bookname: Mapped[str] = mapped_column(String(255), comment="书名") author: Mapped[str] = mapped_column(String(255), comment="作者") price: Mapped[float] = mapped_column(Float, comment="价格") publisher: Mapped[str] = mapped_column(String(255), comment="出版社") ​ ​ # __tablename__ = "book":声明该类对应数据库中的 book 表 # 定义了书籍表的业务字段:id(主键)、书名、作者、价格、出版社 # 继承了 Base 的 create_time 和 update_time 字段 # 用 comment 给每个字段添加了数据库注释,方便维护 # ---------------------------------------------------------------------- ​ # 3. 建表:定义函数建表 → FastAPI 启动的时候调用建表的函数 async def create_tables(): # 获取异步引擎,创建事务 - 建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 ​ ​ # async with async_engine.begin():开启一个数据库事务,保证建表操作的原子性 # conn.run_sync(Base.metadata.create_all): # Base.metadata 是所有 ORM 模型(比如写的 Book 表)的元数据集合 # create_all 会根据这些元数据,自动生成并执行 CREATE TABLE SQL 语句,在数据库中创建所有定义好的表 # run_sync 是异步 SQLAlchemy 的写法,用来在异步环境中执行同步的建表操作 ​ @app.on_event("startup") async def startup_event(): await create_tables() ​ ​ # @app.on_event("startup") 是 FastAPI 的启动事件钩子 # 作用:在服务启动时自动调用 create_tables() 建表 # 效果:你一运行 FastAPI 服务,它就会自动检查并创建所有模型对应的表,不用手动执行 SQL @app.get("/") async def root(): return {"message": "Hello World"}
run_sync(Base.metadata.create_all)是关键:它会把 ORM 模型同步转换为数据库表结构,自动执行建表语句

接着启动FastAPI程序

然后登录数据库,查看表是否成功创建。

注意:因为这里的on_event 在 FastAPI 的新版本中已经被弃用了。应该使用 lifespan 事件处理器来替代。


3. 操作数据(CRUD)

ORM 的核心能力就是用 Python 代码替代原生 SQL,完成数据操作:

  • 查询:用select()语法筛选数据,无需写SELECT

  • 新增:创建模型对象,添加到会话并提交

  • 修改:查询到对象后修改属性,提交会话

  • 删除:查询到对象后从会话删除并提交

2、路由匹配中使用 ORM

我们有一个搜索的功能,用户点击搜索的时候,完成的是数据库查询的工作。

那么当我们点击注册的功能时,需要在数据库表中新出入一条数据。

那么在一个项目当时都是通过一个接口来实现数据库的增删改查的。接下来就用到了路由匹配中使用ORM了

核心:创建依赖项,使用 Depends 注入到处理函数

创建方式如下:

# 创建异步会话工厂 AsyncSessionLocal = async_sessionmaker( bind=async_engine, # 绑定数据库引擎 class_=AsyncSession, # 指定会话类 expire_on_commit=False # 会话对象不过期,不重新查询数据库 ) # 依赖项,用于获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据库会话给路由处理函数 await session.commit() # 无异常,提交事务 except Exception: await session.rollback() # 有异常则回滚 raise finally: await session.close() # 关闭会话 @app.get("/book/books") async def get_book_list( db: AsyncSession = Depends(get_database)#依赖注入数据库对象 ): # 查询所有书籍 result = await db.execute(select(Book)) # Book 模型类 user = result.scalars().all()#查询所有 return user

直接查询book表中的数据

from datetime import datetime from contextlib import asynccontextmanager from fastapi import FastAPI, Depends from sqlalchemy import DateTime, func, String, Float, select from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column # 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期 @asynccontextmanager async def lifespan(app: FastAPI): """ 应用生命周期管理器 - yield 之前:应用启动时执行(如:数据库初始化、连接池创建等) - yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等) """ # 启动时创建数据库表 await create_tables() yield # 如果有关闭时需要执行的清理操作,可以在这里添加 app = FastAPI(lifespan=lifespan) # 1、创建异步引擎 ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8" async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选,输出SQL日志 pool_size=10, # 设置连接池活跃的连接数 max_overflow=20 # 允许额外的连接数 ) # 2、定义模型类: 基类 + 表对应的模型类 # 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社 class Base(DeclarativeBase): # 创建时间字段:插入时自动设置为当前时间,默认值为当前时间 create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, comment="创建时间") # 更新时间字段:插入和更新时自动设置为当前时间 update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, onupdate=func.now(), comment="更新时间") class Book(Base): """书籍表模型类""" __tablename__ = "book" # 数据库表名 id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") # 主键 bookname: Mapped[str] = mapped_column(String(255), comment="书名") # 书名 author: Mapped[str] = mapped_column(String(255), comment="作者") # 作者 price: Mapped[float] = mapped_column(Float, comment="价格") # 价格 publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 出版社 # 3、建表:定义函数建表->FastAPI启动时调用建表的函数 async def create_tables(): """ 创建数据库表 使用异步引擎创建所有继承自 Base 的模型类对应的表 """ # 获取异步引擎,创建事务-建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 @app.get("/") async def root(): """根路径接口""" return {"message": "Hello World"} # 需求:查询功能的接口,查询图书->依赖注入:创建依赖项获取数据库会话 + Depends 注入路由处理函数 # 创建异步会话工厂 AsyncSessionLocal = async_sessionmaker( bind = async_engine, # 绑定上面的数据库引擎 class_=AsyncSession, # 指定会话类 expire_on_commit=False # 提交后不关闭会话,不会重新查询数据库 ) # 创建依赖项:获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据会话给路由处理函数 await session.commit() # 提交事务 except Exception as e: await session.rollback() # 有异常则回滚事务 raise e finally: await session.close() # 关闭会话 @app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): """ 就是这里为什么传递的依赖是get_database,而不是get_database()函数呢? ✅ 不加括号:传递函数本身,让 FastAPI 在合适的时机调用 ❌ 加括号:立即执行函数,传递的是返回值(这会导致错误) 这是 FastAPI 依赖注入的核心机制,通过传递函数引用,框架可以控制何时调用、如何管理生命周期(比如处理 yield 前后的逻辑)。 """ # 查询 result = await db.execute(select(Book)) book = result.scalars().all() return book

这里是空的,因为我们还没有往数据库表中插入数据

在PyCharm中连接数据库,然后使用图形化工具插入一条数据

然后重新进入到接口文档,刷新,再次发起请求进行查询

3、ORM数据库操作

先插入几条书籍的数据

INSERT INTO fastapi_first.book (bookname, author, price, publisher, create_time, update_time) VALUES ('活着', '余华', 39.0, '作家出版社', NOW(), NOW()), ('平凡的世界', '路遥', 78.0, '北京十月文艺出版社', NOW(), NOW()), ('百年孤独', '加西亚·马尔克斯', 56.8, '南海出版公司', NOW(), NOW()), ('解忧杂货店', '东野圭吾', 42.0, '南海出版公司', NOW(), NOW());

(1)查询

核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象

获取所有数据:scalars().all()

@app.get("/book/get_books") async def get_book_list(db: AsyncSession=Depends(get_database)): result = await db.execute(select(Book)) book = result.scalars().all() return book

获取单条数据:scalars().first()get(模型类, 主键值)

@app.get("/book/get_book") async def get_book(db: AsyncSession=Depends(get_database)): # result = await db.execute(select(Book))#获取DEM对象 # book=result.scalars().first()#获取对象第一条数据 book = await db.get(Book, 1)#根据主键来获取单条数据 return book

更改这个方法get_book_list

@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询所有图书 result = await db.execute(select(Book)) books = result.scalars().all() return books

查询全部数据、查询第一条数据

@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询指定的图书 result = await db.execute(select(Book)) # books = result.scalars().all() # 获取所有结果 books =result.scalars().first() return books

查询指定的图书

@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询指定的图书 # result = await db.execute(select(Book)) # books = result.scalars().all() # 获取所有结果 # books =result.scalars().first() book = await db.get(Book,4) return book
查询条件

select(Book).where(条件, 条件2, ...)

条件:

  • 比较判断:==; >; <; >=; <= 等

  • 模糊查询:like()

  • 与非查询:&; |; ~

  • 包含查询:in_()

比较判断

  • 比较判断:==; >; <; >=; <= 等

# 需求:路径参数 书籍id @app.get("/book/get_book/{book_id}") async def get_book_list(book_id,db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.id == book_id)) book = result.scalar_one_or_none() return book # 需求:条件 价格大于等于200 @app.get("/book/get_book_price/{price}") async def get_book_price(price,db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.price >= price)) books = result.scalars().all() return books
模糊查询

模糊查询:like()

  • %:零个、一个或多个字符

  • _:一个单个字符

# 需求:作者以 曹 开头 % _ @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.author.like("曹%"))) book = result.scalars().all() return book @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.author.like("曹_"))) # 这个只能查询两个字并且姓曹的人 book = result.scalars().all() return book
与非查询

与非查询:

  • &:与

  • |:或

  • ~:非

# 需求:作者以 曹 开头 % _ @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): # result = await db.execute(select(Book).where(Book.author.like("曹%"))) result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100))) result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100))) result = await db.execute(select(Book).where(~Book.author.like("曹%"))) book = result.scalars().all() return book
包含查询
  • in_()

@app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): # result = await db.execute(select(Book).where(Book.author.like("曹%"))) # result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100))) # result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100))) # result = await db.execute(select(Book).where(~Book.author.like("曹%"))) # 需求:书籍id列表,数据库里面的id如果在 id列表里面,就返回 id_list = [1,2,3] result = await db.execute(select(Book).where(Book.id.in_(id_list))) book = result.scalars().all() return book
聚合查询

聚合计算:func.方法(模型类.属性)

  • count:统计行数量

  • avg:求平均值

  • max:求最大值

  • min:求最小值

  • sum:求和

@app.get("/book/count") async def get_count(db: AsyncSession = Depends(get_database)): # 聚合查询 select(func.方法名(模型类.属性)) # result = await db.execute(select(func.count(Book.id))) # result = await db.execute(select(func.max(Book.price))) # result = await db.execute(select(func.sum(Book.price))) result = await db.execute(select(func.avg(Book.price))) count = result.scalar() # 用来提取一个数值->标量值 return count
分页查询

分页查询:select().offset().limit()

  • offset:跳过的记录数

  • limit:返回的记录数

@app.get("/book/get_books") async def get_book_list( page: int = 1, # 需要查询哪一页 page_size: int = 3, # 每页展示多少条数据 db: AsyncSession = Depends(get_database) ): # 跳过的多少条数据 skip = (page-1) * page_size # 查询【第 page 页】的那一页数据,offset 跳过的记录数;limit 每页的记录数 stmt = select(Book).offset(skip).limit(page_size) result = await db.execute(stmt) books = result.scalars().all() return {"books": books}
查询总结

核心思路:

  • select() → db.execute() → 从 ORM 对象获取数据 → 响应结果

  • db.get(模型类, 主键值)

从 ORM 对象获取数据的方式

  • 获取所有数据 scalars().all()

  • 获取单条数据 scalars().first(): 提取第一个数据

    scalar_one_or_none(): 提取一个或 null

    scalar(): 提取标量值(配合聚合查询使用)

(2)新增

步骤:定义 ORM 对象 → 添加对象到事务:add(添加) → commit 提交到数据库

from pydantic import BaseModel # 导入基类 # 需求:用户输入图书信息(id、书名、作者、价格、出版社)->新增 # 用书输入->参数->请求体 # 定义需要新增的数据类 """ BookBase(继承BaseModel)是 Pydantic 请求体模型,用来接收、校验前端传参,相当于 Java 里的 DTO。 Book(继承DeclarativeBase)是 SQLAlchemy ORM 模型,对应数据库表,相当于 Java 里的 Entity。 新增时,先把请求体数据转成 ORM 对象,再通过 ORM 方法写入数据库。 """ class BookBase(BaseModel): id: int bookname: str author: str price: float publisher: str @app.post("/book/add_book") async def add_book(book: BookBase, db: AsyncSession = Depends(get_database)):#获取数据库连接 # ORM对象->add->commit # 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典) book_obj = Book(**book.__dict__) # 先把book通过__dict__转化为字典,然后**解包,对字典进行展开 db.add(book_obj) await db.commit() return book

(3)更新

步骤:查询 get → 属性重新赋值 → commit 提交到数据库

from pydantic import BaseModel # 导入基类 from fastapi import FastAPI,Depends,HTTPException # 需求:修改图书信息:先查再改 # 设计思路:路径参数书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社) class BookUpdate(BaseModel): # bookname: str # author: str # price: float # publisher: str #可选字段 bookname: Optional[str] = None author: Optional[str] = None price: Optional[float] = None publisher: Optional[str] = None @app.put("/book/update_book/{book_id}") # 通过主键id来找,同时也需要准备一个请求体参数:BookUpdate async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)): # 1. 查询 book = await db.get(Book, book_id) # 如果未找到,抛出异常 if book is None: raise HTTPException(status_code=404, detail="Book not found") # 2. 修改属性(重新赋值) # book.bookname = data.bookname # book.author = data.author # book.price = data.price # book.publisher = data.publisher # 2. 只更新前端传了的字段(没传的不修改) update_data = data.dict(exclude_unset=True) # 🔥只保留前端真正传过来的字段,忽略没传的字段 for key, value in update_data.items(): setattr(book, key, value) # 3. 提交 await db.commit() return book

(4)删除

步骤:查询 get → delete 删除 → commit 提交到数据库

@app.delete("/book/delete_book/{book_id}") async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)): # 先查再删 提交 db_book = await db.get(Book, book_id) # 如果查不到,抛出异常 if db_book is None: raise HTTPException(status_code=404, detail="Book not found") # 查到谁就删除谁 await db.delete(db_book) await db.commit() return {"message": "Book deleted"}

总结

安装sqlalchemy[asyncio]aiomysql两个包,提供异步 ORM 与 MySQL 异步驱动。

建表

create_async_engine创建异步连接引擎

② 继承DeclarativeBase编写公共基类与业务表模型

run_sync(Base.metadata.create_all)配合启动事件自动建表

操作数据

Depends注入数据库会话

② 查询用select()、新增用add()、更新直接赋值、删除用delete(),操作后 commit 提交事务

http://www.jsqmd.com/news/987638/

相关文章:

  • 后端技术栈深度解析:从入门到精通的完整指南
  • 后端技术栈实战指南:打造高性能、高可用系统
  • 2026年 除漆剂/除臭剂/絮凝剂/消泡剂厂家推荐榜:源头工艺与环保高效除味消泡实力品牌解析 - 品牌发掘
  • dubbo和oppenFeign是如何找到正确的url请求地址的
  • 抽象数据类型和数据结构的定义
  • Redis 分布式锁进阶第一百二十八篇
  • 2026年 浙江宣传册设计公司最新推荐榜单:品牌画册、企业宣传册与产品手册设计服务及创意案例精选 - 品牌发掘
  • SAP PS避坑指南:项目状态管理与字段选择配置中的5个常见误区
  • 2026 成都迪奥回收最新行情,经典款与新款二手流通价差解析 - 奢侈品回收评测
  • 2026选店指南,哈尔滨黄金回收门店参考手册 - 奢侈品回收测评
  • 济南车主改灯避坑指南|改灯别乱选门店,天眼照明专业才是硬道理 - Ayu8888
  • 2026 消费电子异形磁铁赛道 多家源头厂商技术能力多维对比 - 变量人生001
  • 别再只会用uvm_do了!手把手教你用start_item/finish_item搞定复杂transaction发送
  • S32K3安全机制实战:手把手教你用EIM模块注入ECC错误(附MCAL配置)
  • 低代码开发:关联规则算法,新手也能快速上手
  • 皮质磨损 / 五金划痕 / 污渍:福州包包回收成色分级与扣损标准 - 奢侈品回收评测
  • 新手选店攻略,对比哈尔滨各区黄金回收门店快速避坑 - 奢侈品回收测评
  • 摸底上海黄金回收渠道:2026年6月最新测评5家合规门店结果分享 - 奢侈品回收评测
  • 无锡闲置包包出手指南,2026名牌包包回收没盒子还能高价出 - 奢侈品回收评测
  • 给老盒子续命:魔百盒CM301H刷入当贝影视桌面后,我实现了哪些自由?
  • 特氟龙高温胶带评价好的品牌是哪些 - 品牌推荐大师
  • 2026年 奥迪维修/奥迪专修/奥迪保养/奥迪烧机油免拆治理/奥迪底盘异响维修/奥迪发动机维修/奥迪原厂升级改装权威推荐榜单 - 品牌发掘
  • 2026苏州外墙漏水维修市场全景分析与苏州鼎壹万防水补漏公司等三家服务商适配推荐 专业防水公司排名推荐(2026年6月防水补漏最新TOP权威排名) - 鼎壹万修缮说
  • 2026 合肥生成式引擎优化(GEO)行业权威测评报告 —— 基于第三方数据、产业底座与商业实效的中立评估 - 安徽工业
  • 2026揭阳防水补漏哪家靠谱?正规公司排名及避坑价格指南 - 苏易修缮
  • 2026 合肥生成式引擎优化(GEO)服务商权威测评报告 —— 基于第三方数据、产业底座与商业实效的中立评估 - 安徽工业
  • UVM验证进阶:深入start_item源码,解锁指定sequencer发送item的两种隐藏技巧
  • 哈尔滨黄金回收攻略,看懂黄金回收计价规则再出手 - 奢侈品回收测评
  • 魔百盒CM301H刷机后还能做什么?解锁当贝桌面后的5个高阶玩法与优化设置
  • S32K3内存错误处理全解析:从ERM报告到FCCU收集的完整链路