当前位置: 首页 > news >正文

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

FastAPI 流式响应中,如何优雅处理客户端断连后的数据库操作?

在使用 FastAPI 构建 AI 对话应用时,StreamingResponse 是实现打字机效果的绝佳工具。通过 yield 逐步返回内容,用户体验非常流畅。但一个棘手的问题随之而来:如果用户在 AI 回答的过程中取消对话或中断了连接,后端会发生什么?我们如何确保对话记录等重要数据依然能被可靠地保存到数据库中?

问题出现过程

1. 客户端发起流式对话请求

我们从一个典型的流式对话接口开始。我们使用依赖注入来获取一个 SQLAlchemy 的 AsyncSession,在对话开始时创建消息,在对话结束后更新 AI 的回答。

流式对话原始代码(伪代码)
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

2. 客户端取消对话(主动断开)

当用户取消发送时,会抛出这个异常

pymysql.err.InterfaceError: 

原因:当客户端断开时 ,FastAPI 会立即把它的 session连接回收掉,底层的那个物理连接被标记为 Cancelled,然后执行finally的时候,再往下传原来session连接就不对了,save_conversation函数就会抛pymysql.err.InterfaceError。


问题解决尝试

尝试一:在 save_conversation 函数中创建新连接

一个自然的想法是:既然旧的 session 不能用了,那就在保存的时候检查一下,如果不可用就创建一个新的。

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息async save_conversation(session, full_response)print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

结果:失败! 没想到,即使创建了新的 session,依然抛出了 pymysql.err.InterfaceError

原因分析:之所以还会抛错误,原因是这个新会话 依然在使用已经被取消的连接池资源,因为 FastAPI/Starlette 在主请求取消时,会把整个 AsyncSessionLocal() 对象的连接都标记为 “cancelled”。即便你重新 async with AsyncSessionLocal(),底层复用的还是同一个数据库连接池里的连接,而那个连接刚被 cancel。

重新创建个数据库引擎 是肯定可以的,但是只是对话后更新,这么搞完全没必要。
或者创建个独立线程,在新线程中去创建新连接,应该是可以的,个人还是感觉比较重,浪费资源。


尝试二:创建个协程去执行save_conversation

代码更新
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_session # 依赖注入函数router = APIRouter()async def stream_chat_generator(user_input: str, conversation_id:str, session: AsyncSession):# 模拟流式生成try:# 1.创建一对消息 query answermessage_user_id = create_message(conversation_id, query, session)message_ai_id = create_message(conversation_id, "", seesion)# 2.ai对话full_response = ""for chunk in model.generate(user_input): # 假设这是你的 AI 模型yield chunkfull_response += chunkexcept Exception as e:passfinally:# 更新answer消息# 创建协程执行asyncio.create_task(save_conversation(session, full_response))print("对话已保存。")async def save_conversation(session:AsyncSession, full_response:str, message_ai_id:str):# 根据传来的session和message_ai_id 更新当前消息即可# 1.先判断传入的连接是否可用is_pass = session.inspect(self.db).closedif is_pass:# 继续更新消息else:# 创建新连接  async with AsyncSessionLocal() as session:# 继续更新消息  这又报错了⚠await session.commit()@router.post("/chat")
async def chat_endpoint(user_input: str, conversation_id:str, session: AsyncSession = Depends(get_session)):# get_session 单例generator = stream_chat_generator(user_input, conversation_id, session)return StreamingResponse(generator, media_type="text/event-stream")

疑惑点:asyncio.create_task 启动的协程仍然跑在同一个线程和进程里,也会复用那个全局的连接池,理论上确实还有可能拿到刚才那个被 cancel 的连接啊?

主要在于操作的时序和上下文隔离

  1. 先清理,后执行

    当原始请求被取消后,FastAPI 会立即开始清理与该请求相关的资源(包括回收它持有的数据库连接)。这个清理动作在 finally 块中调用 create_task 之前就已经触发了。我们派生出的后台任务是在这个清理逻辑之后才启动的。

  2. 上下文隔离

    这个后台协程已经完全不挂在 HTTP 请求的上下文上了。客户端断开与否,都影响不了它的独立运行。只要连接池中还有任意一个好连接,它就能完成写入。

  3. 高成功率

    因为顺序已经变成了:先断开、先清理 → 再新建、再执行,所以新任务向连接池请求时,拿到那个“坏掉”连接的概率已经大大降低。连接池会优先分配一个健康的、空闲的连接。

即使在极端情况下又拿到了旧连接,它也很有可能在 session.begin() 阶段就失败,我们还可以在后台任务的 try...except 块里加入重试逻辑(比如 await asyncio.sleep(0.1) 后重试),进一步提高健壮性。

大量测试后,发现真没问题😁。

结论

对于需要确保最终操作(如数据写入)一定执行的流式 API,asyncio.create_task 提供了一种轻量级且非常有效的解决方案。它避免了引入像 Celery 这样复杂的任务队列,同时优雅地解决了因客户端断连导致资源状态污染的问题。

通过将关键的收尾工作与不稳定的 HTTP 请求生命周期解耦,我们能构建出更加健壮和可靠的 FastAPI 应用。"

http://www.jsqmd.com/news/50722/

相关文章:

  • 上海有哪些AI企业值得投资?行业潜力机构盘点
  • 2025年AI最具投资价值的企业排行及发展洞察
  • 2025 长效阻垢马桶权威榜单:95% 阻垢率才达标,告别管路发黄烦恼
  • 2025 年成都蜂窝铝扣板生产厂家口碑推荐榜出炉
  • 2025年行业内四川噪声治理厂家口碑最好的厂家榜
  • 2025 年最新推荐冲击试验机优质厂家排行榜:摆锤 / 落锤 / 低温型设备精选,助力企业精准采购优质供应商低温冲击试验机/冲击试验机低温槽/冲击试验机缺口拉床公司推荐
  • 北京知名婚姻律所推荐:专注家事纠纷解决的专业选择
  • 2025 年试验机厂家最新推荐排行榜:覆盖多品类检测设备,揭晓国际认证齐全的优质品牌拉力试验机/万能试验机/弯曲试验机/扭转试验机/压力试验机/杯突试验机/高低温拉伸试验机公司推荐
  • 争取孩子抚养权找哪个律师靠谱?婚姻纠纷律师选择参考
  • 2025 最新硅芯管源头厂家推荐排行榜:权威甄选高密度聚乙烯 / 通信 / 光缆用优质管材供应企业通信用硅芯管/光缆保护用硅芯管/高强度硅芯管/内壁润滑硅芯管公司推荐
  • 2025年11月山东石材雕刻机/墓碑雕刻机/绳锯机综合测评TOP10
  • 2025 卫浴健康革命!全链路杀菌马桶榜单,99% 家庭都需要
  • 2025年质量好的西安净化板实力厂家推荐排行榜
  • 2025年评价高的西安净化板实力厂家最新用户好评榜
  • 2025年11月山东石材雕刻机/墓碑雕刻机/绳锯机综合选购指南与十大推荐:山东永福泰登顶
  • 时间序列信息预测:14种机器学习与深度学习模型
  • 2025年口碑好的成都制造业短视频运营公司最新权威实力榜
  • 2025年盐雾试验箱厂家口碑评分排行榜,淋雨试验箱/恒温恒湿试验箱/恒温恒湿房/光伏组件湿演式验箱/高低温试验箱盐雾试验箱厂商推荐排行
  • 2025 最新推荐!金刚石量子传感器厂家权威榜单:聚焦技术创新、产业应用与国际测评领先品牌金刚石量子磁力仪/金刚石量子探针扫描仪/金刚石量子显微镜/金刚石量子温度探针公司推荐
  • 2025国内私有云服务厂商排名 | China’s Private Cloud Providers Ranking 2025
  • rman备份顺序详解
  • 2025年办公场地租赁优质推荐:十大口碑场地,办公场地/园区/企业独栋办公场地出租口碑推荐
  • 中国私有云格局2025:Top 5 Private Cloud Providers Hybrid Cloud Trends
  • 【中山大学主办,IEEE出版】第五届通信技术与信息科技国际学术会议(ICCTIT 2025)
  • 创建随机数组
  • luogu P2015 二叉苹果树
  • 创业企业如何选云?AWS、Azure、Google Cloud差异全解析(IDC Gartner洞察)
  • 2025海外云服务器推荐报告:Why AWS Dominates the Global Cloud Market
  • 创建同值数组
  • 国标GB28181算法算力平台EasyGBS助力构建食品安全监督管理系统全流程可视化监管方案