FastAPI学习笔记:二、ORM
ORM(Object-RelationalMapping,对象关系映射)是一种编程技术,用于在面向对象编程语言和关系型数据库之间建立映射。它允许开发者通过操作对象的方式与数据库进行交互,而无需直接编写复杂的SQL语句。
优势:
减少重复的 SQL 代码
代码更简洁易读
自动处理数据库连接和事务
自动防止 SQL 注入攻击
ORM 分类表
| 排名 | ORM 工具 | 特点 | 适应场景 |
|---|---|---|---|
| 1 | SQLAlchemy ORM | 功能最强、最灵活、企业级 | 各类 API、微服务、数据应用 |
| 2 | Django ORM | 封装好、上手快 | Django 项目、管理后台 |
| 3 | Tortoise ORM | 全异步 | 异步 Web 服务、高并发 API |
1、ORM 使用流程
安装依赖
打开终端输入命令:
pip install sqlalchemy[asyncio] aiomysqlsqlalchemy[asyncio]:SQLAlchemy 的异步支持版本,适配 FastAPI 异步场景aiomysql:MySQL 的异步数据库驱动,和异步 ORM 配套使用
1. 创建异步引擎、建库、建表
创建异步引擎
from sqlalchemy.ext.asyncio import create_async_engine ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastAPI_first?charset=utf8" # 创建异步引擎 async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选:输出SQL日志 pool_size=10, # 设置连接池中保持的持久连接数 max_overflow=20 # 设置连接池允许创建的额外连接数 )(1)create_async_engine是什么?
它是 SQLAlchemy 提供的异步数据库引擎创建方法,专门用于 FastAPI 这类异步 Web 项目,配合aiomysql驱动实现异步数据库操作。
(2)数据库 URL 解析
mysql+aiomysql://root:123456@localhostmysql+aiomysql:指定使用 MySQL 数据库 + 异步aiomysql驱动root:123456:数据库账号密码localhost:3306:数据库地址和端口fastAPI_first:目标数据库名charset=utf8:指定字符集
(3)参数
| 参数 | 作用 | 场景 |
|---|---|---|
echo=True | 控制台打印所有执行的 SQL 语句 | 开发调试用,生产环境建议关闭 |
pool_size=10 | 连接池保持的持久连接数 | 高并发场景建议根据业务量调大 |
max_overflow=20 | 连接池允许的临时额外连接数 | 峰值流量时可临时扩容,用完自动回收 |
2. 建库建表
首先自己在数据库手动创建一个名为fastAPI_first的库
接着:
from datetime import datetime from fastapi import FastAPI from sqlalchemy import func, Float, DateTime, String from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column from sqlalchemy.ext.asyncio import create_async_engine ASYNC_DATABASE_URL = "mysql+aiomysql://root:1234@localhost:3306/fastAPI_first?charset=utf8" #记得更换为自己的数据库账号密码 app= FastAPI() # 1、 创建异步引擎 async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选:输出SQL日志 pool_size=10, # 设置连接池中保持的持久连接数 ) # ---------------------------------------------------------------------- # 2. 定义模型类: 基类 + 表对应的模型类 # 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社 class Base(DeclarativeBase): create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, comment="创建时间") update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, onupdate=func.now()) # 定义公共基类 Base # 定义了所有表的公共字段:create_time(创建时间)、update_time(更新时间) # 配置了时间自动管理: # insert_default=func.now():插入数据时自动填充当前时间 # onupdate=func.now():数据更新时自动刷新时间 # 其他表模型继承 Base 后,会自动带上这两个字段,避免重复代码 class Book(Base): __tablename__ = "book" id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") bookname: Mapped[str] = mapped_column(String(255), comment="书名") author: Mapped[str] = mapped_column(String(255), comment="作者") price: Mapped[float] = mapped_column(Float, comment="价格") publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # __tablename__ = "book":声明该类对应数据库中的 book 表 # 定义了书籍表的业务字段:id(主键)、书名、作者、价格、出版社 # 继承了 Base 的 create_time 和 update_time 字段 # 用 comment 给每个字段添加了数据库注释,方便维护 # ---------------------------------------------------------------------- # 3. 建表:定义函数建表 → FastAPI 启动的时候调用建表的函数 async def create_tables(): # 获取异步引擎,创建事务 - 建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 # async with async_engine.begin():开启一个数据库事务,保证建表操作的原子性 # conn.run_sync(Base.metadata.create_all): # Base.metadata 是所有 ORM 模型(比如写的 Book 表)的元数据集合 # create_all 会根据这些元数据,自动生成并执行 CREATE TABLE SQL 语句,在数据库中创建所有定义好的表 # run_sync 是异步 SQLAlchemy 的写法,用来在异步环境中执行同步的建表操作 @app.on_event("startup") async def startup_event(): await create_tables() # @app.on_event("startup") 是 FastAPI 的启动事件钩子 # 作用:在服务启动时自动调用 create_tables() 建表 # 效果:你一运行 FastAPI 服务,它就会自动检查并创建所有模型对应的表,不用手动执行 SQL @app.get("/") async def root(): return {"message": "Hello World"}run_sync(Base.metadata.create_all)是关键:它会把 ORM 模型同步转换为数据库表结构,自动执行建表语句。接着启动FastAPI程序
然后登录数据库,查看表是否成功创建。
注意:因为这里的on_event 在 FastAPI 的新版本中已经被弃用了。应该使用 lifespan 事件处理器来替代。
3. 操作数据(CRUD)
ORM 的核心能力就是用 Python 代码替代原生 SQL,完成数据操作:
查询:用
select()语法筛选数据,无需写SELECT新增:创建模型对象,添加到会话并提交
修改:查询到对象后修改属性,提交会话
删除:查询到对象后从会话删除并提交
2、路由匹配中使用 ORM
我们有一个搜索的功能,用户点击搜索的时候,完成的是数据库查询的工作。
那么当我们点击注册的功能时,需要在数据库表中新出入一条数据。
那么在一个项目当时都是通过一个接口来实现数据库的增删改查的。接下来就用到了路由匹配中使用ORM了
核心:创建依赖项,使用 Depends 注入到处理函数
创建方式如下:
# 创建异步会话工厂 AsyncSessionLocal = async_sessionmaker( bind=async_engine, # 绑定数据库引擎 class_=AsyncSession, # 指定会话类 expire_on_commit=False # 会话对象不过期,不重新查询数据库 ) # 依赖项,用于获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据库会话给路由处理函数 await session.commit() # 无异常,提交事务 except Exception: await session.rollback() # 有异常则回滚 raise finally: await session.close() # 关闭会话 @app.get("/book/books") async def get_book_list( db: AsyncSession = Depends(get_database)#依赖注入数据库对象 ): # 查询所有书籍 result = await db.execute(select(Book)) # Book 模型类 user = result.scalars().all()#查询所有 return user直接查询book表中的数据
from datetime import datetime from contextlib import asynccontextmanager from fastapi import FastAPI, Depends from sqlalchemy import DateTime, func, String, Float, select from sqlalchemy.ext.asyncio import create_async_engine,async_sessionmaker,AsyncSession from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column # 创建 FastAPI 应用实例,使用 lifespan 管理应用生命周期 @asynccontextmanager async def lifespan(app: FastAPI): """ 应用生命周期管理器 - yield 之前:应用启动时执行(如:数据库初始化、连接池创建等) - yield 之后:应用关闭时执行(如:关闭数据库连接、清理资源等) """ # 启动时创建数据库表 await create_tables() yield # 如果有关闭时需要执行的清理操作,可以在这里添加 app = FastAPI(lifespan=lifespan) # 1、创建异步引擎 ASYNC_DATABASE_URL = "mysql+aiomysql://root:123456@localhost:3306/fastapi_first?charset=utf8" async_engine = create_async_engine( ASYNC_DATABASE_URL, echo=True, # 可选,输出SQL日志 pool_size=10, # 设置连接池活跃的连接数 max_overflow=20 # 允许额外的连接数 ) # 2、定义模型类: 基类 + 表对应的模型类 # 基类:创建时间、更新时间;书籍表:id、书名、作者、价格、出版社 class Base(DeclarativeBase): # 创建时间字段:插入时自动设置为当前时间,默认值为当前时间 create_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, comment="创建时间") # 更新时间字段:插入和更新时自动设置为当前时间 update_time: Mapped[datetime] = mapped_column(DateTime, insert_default=func.now(), default=func.now, onupdate=func.now(), comment="更新时间") class Book(Base): """书籍表模型类""" __tablename__ = "book" # 数据库表名 id: Mapped[int] = mapped_column(primary_key=True, comment="书籍id") # 主键 bookname: Mapped[str] = mapped_column(String(255), comment="书名") # 书名 author: Mapped[str] = mapped_column(String(255), comment="作者") # 作者 price: Mapped[float] = mapped_column(Float, comment="价格") # 价格 publisher: Mapped[str] = mapped_column(String(255), comment="出版社") # 出版社 # 3、建表:定义函数建表->FastAPI启动时调用建表的函数 async def create_tables(): """ 创建数据库表 使用异步引擎创建所有继承自 Base 的模型类对应的表 """ # 获取异步引擎,创建事务-建表 async with async_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # Base 模型类的元数据创建 @app.get("/") async def root(): """根路径接口""" return {"message": "Hello World"} # 需求:查询功能的接口,查询图书->依赖注入:创建依赖项获取数据库会话 + Depends 注入路由处理函数 # 创建异步会话工厂 AsyncSessionLocal = async_sessionmaker( bind = async_engine, # 绑定上面的数据库引擎 class_=AsyncSession, # 指定会话类 expire_on_commit=False # 提交后不关闭会话,不会重新查询数据库 ) # 创建依赖项:获取数据库会话 async def get_database(): async with AsyncSessionLocal() as session: try: yield session # 返回数据会话给路由处理函数 await session.commit() # 提交事务 except Exception as e: await session.rollback() # 有异常则回滚事务 raise e finally: await session.close() # 关闭会话 @app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): """ 就是这里为什么传递的依赖是get_database,而不是get_database()函数呢? ✅ 不加括号:传递函数本身,让 FastAPI 在合适的时机调用 ❌ 加括号:立即执行函数,传递的是返回值(这会导致错误) 这是 FastAPI 依赖注入的核心机制,通过传递函数引用,框架可以控制何时调用、如何管理生命周期(比如处理 yield 前后的逻辑)。 """ # 查询 result = await db.execute(select(Book)) book = result.scalars().all() return book这里是空的,因为我们还没有往数据库表中插入数据
在PyCharm中连接数据库,然后使用图形化工具插入一条数据
然后重新进入到接口文档,刷新,再次发起请求进行查询
3、ORM数据库操作
先插入几条书籍的数据
INSERT INTO fastapi_first.book (bookname, author, price, publisher, create_time, update_time) VALUES ('活着', '余华', 39.0, '作家出版社', NOW(), NOW()), ('平凡的世界', '路遥', 78.0, '北京十月文艺出版社', NOW(), NOW()), ('百年孤独', '加西亚·马尔克斯', 56.8, '南海出版公司', NOW(), NOW()), ('解忧杂货店', '东野圭吾', 42.0, '南海出版公司', NOW(), NOW());(1)查询
核心语句:await db.execute( select(模型类) ),返回一个 ORM 对象
获取所有数据:scalars().all()
@app.get("/book/get_books") async def get_book_list(db: AsyncSession=Depends(get_database)): result = await db.execute(select(Book)) book = result.scalars().all() return book获取单条数据:scalars().first();get(模型类, 主键值)
@app.get("/book/get_book") async def get_book(db: AsyncSession=Depends(get_database)): # result = await db.execute(select(Book))#获取DEM对象 # book=result.scalars().first()#获取对象第一条数据 book = await db.get(Book, 1)#根据主键来获取单条数据 return book更改这个方法get_book_list
@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询所有图书 result = await db.execute(select(Book)) books = result.scalars().all() return books查询全部数据、查询第一条数据
@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询指定的图书 result = await db.execute(select(Book)) # books = result.scalars().all() # 获取所有结果 books =result.scalars().first() return books查询指定的图书
@app.get("/book/books") async def get_book_list(db: AsyncSession = Depends(get_database)): # 查询指定的图书 # result = await db.execute(select(Book)) # books = result.scalars().all() # 获取所有结果 # books =result.scalars().first() book = await db.get(Book,4) return book查询条件
select(Book).where(条件, 条件2, ...)
条件:
比较判断:==; >; <; >=; <= 等
模糊查询:like()
与非查询:&; |; ~
包含查询:in_()
比较判断
比较判断:==; >; <; >=; <= 等
# 需求:路径参数 书籍id @app.get("/book/get_book/{book_id}") async def get_book_list(book_id,db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.id == book_id)) book = result.scalar_one_or_none() return book # 需求:条件 价格大于等于200 @app.get("/book/get_book_price/{price}") async def get_book_price(price,db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.price >= price)) books = result.scalars().all() return books模糊查询
模糊查询:like()
%:零个、一个或多个字符
_:一个单个字符
# 需求:作者以 曹 开头 % _ @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.author.like("曹%"))) book = result.scalars().all() return book @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): result = await db.execute(select(Book).where(Book.author.like("曹_"))) # 这个只能查询两个字并且姓曹的人 book = result.scalars().all() return book与非查询
与非查询:
&:与
|:或
~:非
# 需求:作者以 曹 开头 % _ @app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): # result = await db.execute(select(Book).where(Book.author.like("曹%"))) result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100))) result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100))) result = await db.execute(select(Book).where(~Book.author.like("曹%"))) book = result.scalars().all() return book包含查询
in_()
@app.get("/book/get_book_by_author") async def get_book_list(db: AsyncSession = Depends(get_database)): # result = await db.execute(select(Book).where(Book.author.like("曹%"))) # result = await db.execute(select(Book).where(Book.author.like("曹%") & (Book.price>100))) # result = await db.execute(select(Book).where(Book.author.like("曹%") | (Book.price>100))) # result = await db.execute(select(Book).where(~Book.author.like("曹%"))) # 需求:书籍id列表,数据库里面的id如果在 id列表里面,就返回 id_list = [1,2,3] result = await db.execute(select(Book).where(Book.id.in_(id_list))) book = result.scalars().all() return book聚合查询
聚合计算:func.方法(模型类.属性)
count:统计行数量
avg:求平均值
max:求最大值
min:求最小值
sum:求和
@app.get("/book/count") async def get_count(db: AsyncSession = Depends(get_database)): # 聚合查询 select(func.方法名(模型类.属性)) # result = await db.execute(select(func.count(Book.id))) # result = await db.execute(select(func.max(Book.price))) # result = await db.execute(select(func.sum(Book.price))) result = await db.execute(select(func.avg(Book.price))) count = result.scalar() # 用来提取一个数值->标量值 return count分页查询
分页查询:select().offset().limit()
offset:跳过的记录数
limit:返回的记录数
@app.get("/book/get_books") async def get_book_list( page: int = 1, # 需要查询哪一页 page_size: int = 3, # 每页展示多少条数据 db: AsyncSession = Depends(get_database) ): # 跳过的多少条数据 skip = (page-1) * page_size # 查询【第 page 页】的那一页数据,offset 跳过的记录数;limit 每页的记录数 stmt = select(Book).offset(skip).limit(page_size) result = await db.execute(stmt) books = result.scalars().all() return {"books": books}查询总结
核心思路:
select() → db.execute() → 从 ORM 对象获取数据 → 响应结果
db.get(模型类, 主键值)
从 ORM 对象获取数据的方式
获取所有数据 scalars().all()
获取单条数据 scalars().first(): 提取第一个数据
scalar_one_or_none(): 提取一个或 null
scalar(): 提取标量值(配合聚合查询使用)
(2)新增
步骤:定义 ORM 对象 → 添加对象到事务:add(添加) → commit 提交到数据库
from pydantic import BaseModel # 导入基类 # 需求:用户输入图书信息(id、书名、作者、价格、出版社)->新增 # 用书输入->参数->请求体 # 定义需要新增的数据类 """ BookBase(继承BaseModel)是 Pydantic 请求体模型,用来接收、校验前端传参,相当于 Java 里的 DTO。 Book(继承DeclarativeBase)是 SQLAlchemy ORM 模型,对应数据库表,相当于 Java 里的 Entity。 新增时,先把请求体数据转成 ORM 对象,再通过 ORM 方法写入数据库。 """ class BookBase(BaseModel): id: int bookname: str author: str price: float publisher: str @app.post("/book/add_book") async def add_book(book: BookBase, db: AsyncSession = Depends(get_database)):#获取数据库连接 # ORM对象->add->commit # 获取 book 参数,创建图书对象(__dict__ 返回 book 对象的属性字典) book_obj = Book(**book.__dict__) # 先把book通过__dict__转化为字典,然后**解包,对字典进行展开 db.add(book_obj) await db.commit() return book(3)更新
步骤:查询 get → 属性重新赋值 → commit 提交到数据库
from pydantic import BaseModel # 导入基类 from fastapi import FastAPI,Depends,HTTPException # 需求:修改图书信息:先查再改 # 设计思路:路径参数书籍id:作用是查找;请求体参数:作用是新数据(书名、作者、价格、出版社) class BookUpdate(BaseModel): # bookname: str # author: str # price: float # publisher: str #可选字段 bookname: Optional[str] = None author: Optional[str] = None price: Optional[float] = None publisher: Optional[str] = None @app.put("/book/update_book/{book_id}") # 通过主键id来找,同时也需要准备一个请求体参数:BookUpdate async def update_book(book_id: int, data: BookUpdate, db: AsyncSession = Depends(get_database)): # 1. 查询 book = await db.get(Book, book_id) # 如果未找到,抛出异常 if book is None: raise HTTPException(status_code=404, detail="Book not found") # 2. 修改属性(重新赋值) # book.bookname = data.bookname # book.author = data.author # book.price = data.price # book.publisher = data.publisher # 2. 只更新前端传了的字段(没传的不修改) update_data = data.dict(exclude_unset=True) # 🔥只保留前端真正传过来的字段,忽略没传的字段 for key, value in update_data.items(): setattr(book, key, value) # 3. 提交 await db.commit() return book(4)删除
步骤:查询 get → delete 删除 → commit 提交到数据库
@app.delete("/book/delete_book/{book_id}") async def delete_book(book_id: int, db: AsyncSession = Depends(get_database)): # 先查再删 提交 db_book = await db.get(Book, book_id) # 如果查不到,抛出异常 if db_book is None: raise HTTPException(status_code=404, detail="Book not found") # 查到谁就删除谁 await db.delete(db_book) await db.commit() return {"message": "Book deleted"}总结
安装sqlalchemy[asyncio]、aiomysql两个包,提供异步 ORM 与 MySQL 异步驱动。
建表
①create_async_engine创建异步连接引擎
② 继承DeclarativeBase编写公共基类与业务表模型
③run_sync(Base.metadata.create_all)配合启动事件自动建表
操作数据
①Depends注入数据库会话
② 查询用select()、新增用add()、更新直接赋值、删除用delete(),操作后 commit 提交事务
