当前位置: 首页 > news >正文

【LangChain-AI】聊天模型--流式传输

1. stream() 同步传输

fromlangchain.chat_modelsimportinit_chat_model# 流式传输model=init_chat_model(model="deepseek-chat",model_provider="deepseek")# stream方法返回的是一个迭代器,产生的是消息块print(model.invoke("写一个关于春天的作文,1000字"))

stream方法返回的是一个迭代器,产生的是消息块,那么就可以遍历拿到所有的消息块。

fromlangchain.chat_modelsimportinit_chat_model# 流式传输model=init_chat_model(model="deepseek-chat",model_provider="deepseek")# stream方法返回的是一个迭代器,产生的是消息块# 那么就可以遍历拿到所有的消息块forchunkinmodel.stream("写一个关于春天的作文,1000字"):print(chunk.content,end="|",flush=True)# end="|":每个块结尾用|进行分割;flush=True:强制立即刷新输出缓冲区

输出结果:

|##| |春|分||||||||时分|||站在|学校|操场|中央||地理|老师说||这时|太阳||||||南北|半球|昼夜|平分|||我怎么||感受|不到|这种|平衡|——|脚下的|影子|几乎|||成一个|||仿佛|随时||消失||阳光|||||||头皮||||空气||||||||极了|冬天|残留||雪花|||抬头|望向||||||||||||出的|||||||显得|格外|||||让我|想起|爷爷|家的|春天||爷爷|住在|太行||深处||那里|没有|高大的|教学楼||只有|连绵|起伏|的山||||月的|||依然||||带着|冬天|最后的||||阳光|穿过|稀疏||树枝||在地上|留下|||的影子||去年|这时候|||||爷爷||||||看他||土豆||||||||||在地上||出一条|浅浅||||然后把|发芽||土豆|||进去|||用手|轻轻|盖上||||这样|就好了|?”|我问|||好了|。”|爷爷|||||拍拍|手上的|泥土|,“|等着|||再过||日子||它们||发芽||。”|||下来||盯着|那片|平整|的土地|||看不见|任何|变化||阳光||||色的|泥土|||反射||淡淡|的光||远处的||还是||黄的||只有||处的||||||出了||黄的|||爷爷|说的|||日子|”,|到底||多久|||现在||当我|站在|城市的|校园|||望着||高楼|切割|||块的|天空||突然|理解了|爷爷|的话|||说的|||日子|”,|不是|日历|||过去|的一|||而是|土地||种子|一点点|积蓄|力量|的过程||就像|学校的|铃声||定点|响起||准时|下课|||春天的|生长|从来不||铃声|||||||科学||上学||植物的|光合|作用||老师说|植物|通过|光合|作用||阳光|变成|能量||于是就||出了||||出了||||我觉得||事情|哪有|这么|简单||去年|春天||爷爷|||我看到|一颗|蒲公英|种子|落在|||||没有|阳光||没有|土壤||只有|一点|雨水||灰尘||但它|还是|发芽|||||出了一||小小的|黄花||那一刻||我才|明白||春天|不是|偶然|降临||礼物|||大概|就是|春天的|秘密||||不需要|大声|宣告|自己的|到来||也不需要|人们|时刻|惦记||就像|爷爷||下的|土豆|||看不见|的地方|悄悄|生长||而我们|||我们|||等待|一个||重的|开始||一个|完美的|春天|||忘了||春天|就在|每一次|呼吸|之间|||每一个|隐秘||角落里|静静|等待||||

我们调试可以看到整个chunk是一个AIMessageChunk,也就是AIMessage其中的一个块。

2. astream() 异步传输

2.1 同步(阻塞)方式

# 同步IOimporttimedefboil_water():print("开始烧水...")time.sleep(5)# 模拟烧水5s, CPU 完全空闲print("烧水完成...")defsend_message():print("开始发消息...")time.sleep(2)# 模拟烧水2sprint("发消息完成...")defmain():# 1、烧水boil_water()# 2、发消息send_message()# 共耗时7smain()

问题: 在boil_water 函数等待的5秒里,CPU 完全空闲,但却不能去做send_message 任务,效率低下。

2.2 协程

多进程通常利用的是多核 CPU 的优势,同时执行多个计算任务。每个进程有自己独立的内存管理,所以不同进程之间要进行数据通信比较麻烦。

多线程是在一个 cpu 上创建多个子任务,当某一个子任务休息的时候其他任务接着执行。多线程的控制是由 python 自己控制的。线程存在数据同步问题,所以要有锁机制。

协程的实现是在一个线程内实现的,相当于流水线作业。由于线程切换的消耗比较大,所以对于并发编程,可以优先使用协程。

协程,作为一种轻量级的并发编程模型,可以被视为用户态的“轻量级线程”。与传统线程相比,协程的核心优势在于其调度完全由用户空间掌控,避免了操作系统内核的频繁介入,从而显著降低了上下文切换的开销。

2.3 异步方式

# 异步 IOimportasyncio# 协程1asyncdefboil_water_async():print("开始烧水...")awaitasyncio.sleep(5)# 关键! await表示等待这个操作完成,但期间可以做别的事print("烧水完成...")# 协程2asyncdefsend_message_async():print("开始发消息...")awaitasyncio.sleep(2)# 模拟烧水2sprint("发消息完成...")

我们将这两个方法定义成了异步的,实际上boil_water_asyncsend_message_async就是两个协程

我们让main函数来管理这两个协程的流程:

# 异步 IOimportasyncio# 协程1asyncdefboil_water_async():print("开始烧水...")awaitasyncio.sleep(5)# 关键! await表示等待这个操作完成,但期间可以做别的事print("烧水完成...")# 协程2asyncdefsend_message_async():print("开始发消息...")awaitasyncio.sleep(2)# 模拟烧水2sprint("发消息完成...")# 协程:调度# 事件循环asyncdefmain():# 1、烧水(任务),使用asyncio.create_task将其创建成任务task1=asyncio.create_task(boil_water_async())# 2、发消息(任务)task2=asyncio.create_task(send_message_async())# 等待任务1和任务2都完成awaittask1awaittask2# 5s# run函数 会创建一个事件循环,并运行指定的协程。asyncio.run(main())

输出结果:

开始烧水...开始发消息...发消息完成...烧水完成...

什么是事件循环???

事件循环是 asyncio(Python 标准库中的模块,用于编写异步 I/O
操作的代码)的核心,你可以把它想象成一个总调度员或一个高效的待办事项 (To-Do List) 管理员。 它的工作流程非常简单:

  1. 它维护着一个任务列表(比如:煮水、发短信)。
  2. 它不断地循环检查每个任务: a. 如果任务处于“等待I/O” 状态(比如等水开、等网络响应),就暂停它,立即去执行下一个已经“就绪” 的任务。 b. 如果任务的等待时间到了或者 I/O 操作完成了,事件循环就恢复执行这个任务。

通过使用 asyncio ,我们可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程的核心机制,就是事件循环。它不停地检查哪些协程可以执行,哪些在等待。
总结一下:

  • 协程是 asyncio 的核心概念之一。它是一个特殊的函数,可以在执行过程中暂停,并在稍后恢复执行。协程通过 async def 关键字定义,并通过 await 关键字暂停执行,等待异步操作完成。
  • 要运行一个协程,可以使用 asyncio.run() 函数。它会创建一个事件循环,并运行指定的协程。事件循环是 asyncio 的核心组件,负责调度和执行协程。它不断地检查是否有任务需要执行,并在任务完成后调用相应的回调函数。

2.4 异步流式输出

fromlangchain.chat_modelsimportinit_chat_modelimportasyncio model=init_chat_model(model="deepseek-chat",model_provider="deepseek")# 异步流式输出asyncdefasync_stream():print("======异步调用======")asyncforchunkinmodel.astream("写一段关于春天的作文,100字"):print(chunk.content,end='|',flush=True)# 创建事件循环,执行协程asyncio.run(async_stream())

输出结果:

======异步调用======|#| |春||窗外||梧桐||出了|||||||||||来的|||阳光|||||下来|||融融|||||日的|寒气|||化了||||带着||泥土||味儿||还有|花的||||||||||绿|||燕子|||||||屋檐|||||||||||||热闹||春天|就这样|悄悄地|来了|||一切都||醒了||||

3. 自定义输出格式

fromtypingimportIterator,Listfromlangchain.chat_modelsimportinit_chat_modelfromlangchain_core.output_parsersimportStrOutputParser# 组件一:聊天模型model=init_chat_model(model="deepseek-chat",model_provider="deepseek")# 组件二:定义输出解析器parser=StrOutputParser()# 自定义生成器defsplit_into_list(input:Iterator[str])->Iterator[List[str]]:buffer=""forchunkininput:buffer+=chunk# 遇到句号,刷新while"。"inbuffer:# 找到句号的位置stop_index=buffer.index("。")yield[buffer[:stop_index].strip()]buffer=buffer[stop_index+1:]# 处理buffer最后几个字yield[buffer[:].strip()]# 定义链chain=model|parser|split_into_listforchunkinchain.stream("写一个关于春天的作文,5句话,每句话用中文句号隔开"):# print(chunk.content, end="|", flush=True)# 这里使用的是输出解析器,结果就是一个str,不用再chunk.contentprint(chunk,end="|",flush=True)

yield 是 Python 中用来创建生成器的关键字。包含 yield的函数不会一次性执行完并返回一个值,而是返回一个迭代器(生成器对象)。每次调用 next()或循环迭代时,函数会从上一次暂停的地方继续执行,直到遇到下一个 yield,然后把 yield 后面的值“吐”出来,再次暂停。

输出结果:

['春天总是悄然而至,像一位羞涩的画家,用嫩绿的草芽和粉白的杏花悄悄地涂抹着大地']|['清晨的露珠挂在刚刚舒展的柳叶上,折射出细碎的光芒,仿佛每一滴都藏着一个苏醒的梦']|['风也变得温柔了,不再像冬天那样尖锐,而是一阵一阵地送来泥土和青草的香气']|['人们脱去厚重的冬衣,脚步不自觉地轻快起来,连说话的声音里都带着笑意']|['春天不像夏天那样热烈,也不像秋天那样浓艳,它只是轻轻地、坚定地告诉我们:一切都可以重新开始']|['']|

4. 流式传输原理

在流式传输中,客户端向服务端发送消息之后,服务端需要向客户端持续的推送消息,WebSocket可以支持双向连接,但是我们需要维护状态。

4.1 SSE协议

HTTP 协议本身设计为无状态的请求-响应模式,严格来说,是无法做到服务器主动推送消息到客户端,但通过Server-Sent Events (服务器发送事件,简称 SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。
也就是说,服务器向客户端声明,接下来要发送的是流消息(streaming),这时客户端不会关闭连接,会一直等待服务器发送过来新的数据流。
SSE(Server-Sent Events)是一种基于 HTTP 的轻量级实时通信协议,浏览器可以通过内置的EventSource API 接收并处理这些实时事件。

SSE核心特点

  • 基于 HTTP 协议
    复用标准 HTTP/HTTPS 协议,无需额外端口或协议,兼容性好且易于部署。
  • 单向通信机制
    SSE 仅支持服务器向客户端的单向数据推送,客户端通过普通 HTTP 请求建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。
  • 自动重连机制
    支持断线重连,连接中断时,浏览器会自动尝试重新连接(支持 retry 字段指定重连间隔)。
  • 自定义消息类型
    客户端发起请求后,服务器保持连接开放,响应头设置Content-Type: text/eventstream,标识为事件流格式,持续推送事件流。

数据格式

服务端向浏览器发送 SSE 数据,需要设置必要的 HTTP 头信息:

# 告诉浏览器返回的数据类型是“事件流”,浏览器会据此保持连接并等待后续数据。Content-Type:text/event-stream;charset=utf-8# 要求 HTTP 连接保持打开状态,而不是每次发送完数据就关闭。这样服务器可以持续推送消息。Connection:keep-alive

SSE 的消息由若干行组成,每条消息之间用 两个换行符 \n\n 分隔。
每一行的基本格式是:

[field]:value\n

4.2 LangChain实现方式

上面的代码中,我们的model/chain调用的stream方法都是LangChain里面的组件的流式传输能力。

LangChain 本身并不“创造”或“规定”一个底层的网络传输协议,而是依赖于其底层的大模型供应商(如 OpenAI)和我们自身服务应用所使用的 Web 框架(如 FastAPI)的协议。

因此对于 LangChain 的流式传输能力,本身是因为大模型供应商提供了流式传输能力,由 LangChain进行调用后接收并处理成一个个的AIMessageChunk 。

http://www.jsqmd.com/news/965963/

相关文章:

  • YOLO11部署优化:ONNX精简 | 使用ONNX GraphSurgeon剔除冗余节点,配合算子融合,推理延迟再降20%
  • Python速通实战课:90分钟掌握文件处理与错误调试
  • MinIO文件分享与权限管理实战:mc share/policy命令生成临时链接与设置桶策略
  • PDFBox实战:批量清理上百份带斜体水印的PDF文档,我是如何用Java自动化搞定的
  • Web Speech API语音识别实战:从‘玩具Demo’到‘可用产品’的避坑指南
  • 2026年6月国内口碑好的纸箱包装袋生产厂家推荐,成都PE平口袋/油脂纸箱包装袋,纸箱包装袋直销厂家哪家靠谱 - 品牌推荐师
  • DsHidMini终极指南:如何在Windows 10/11上完美使用PS3手柄
  • DP2232H的MPSSE双引擎怎么玩?一个USB口同时调试JTAG和UART的实战配置
  • 2026万向导缆器选型全攻略:船用掣链器/单点式系泊导缆孔/卷车/导缆滚轮/托架/滚柱导缆器/系缆桩/羊角单滚轮导缆器/选择指南 - 优质品牌商家
  • RAPTOR检索框架:多粒度分层融合的工程化实践
  • 超越提示词工程:构建下一代智能 AI Agent 的技术架构与实践指南
  • AI测试入门:如何设计LLM的Prompt?这份提示词工程指南请收好
  • 程序员读《不速之客》:从间谍故事里学到的3个系统安全设计原则
  • ICC实战笔记:Chip Finishing阶段这6个坑,新手最容易踩(附详细命令与避坑指南)
  • Flowable实战:如何动态获取流程当前节点与候选人信息(附完整Java代码)
  • TensorFlow图像批量输入实战:构建健壮tf.data数据管道
  • 2026年遥控晾衣架专业品牌排行:全自动晾衣机/全自动晾衣架/升降晾衣机/升降衣架/小户型晾衣架/手摇衣架/晒衣架/选择指南 - 优质品牌商家
  • 逻辑回归:二分类决策的底层原理与工程实践
  • MM-REACT:基于ReAct框架的可验证视觉推理范式
  • e2 studio调试断点总失灵?一文搞懂Software与Hardware断点的区别与正确用法
  • 2026年武汉离婚律师推荐 丁嫣13年婚姻家事实战经验 - 本地品牌推荐
  • Python collections模块五大核心组件实战指南
  • 别再被FQDN卡住了!手把手教你搞定TDengine 2.x的远程连接(附Windows/Linux双端配置)
  • CSDN AI引流效果断崖式下跌?紧急预警:平台算法于2024年Q2完成重大升级,这4类内容已失效(附迁移清单)
  • 保姆级教程:在Win10上为STK11.6手动配置MATLAB2018b连接器(Connector 1.0.11)
  • ICPC/CCPC选手必备:2018-2022年所有赛题在线评测链接整理(附VJ/牛客/PTA直达)
  • 从一道CTF题复盘CVE-2021-3129:手把手解密Laravel漏洞流量中的Webshell与CobaltStrike密钥
  • 2026年盘扣租赁站技术维度评测与合规选型指南:方管租赁、江苏盘扣租赁、江苏钢管租赁、盘扣式脚手架租赁、脚手架钢管选择指南 - 优质品牌商家
  • 别再为多重共线性头疼了!用sklearn的RidgeCV和Lasso,5分钟搞定特征筛选与模型稳定
  • 拉夏贝尔Infor WMS实战交付包:五地仓协同、SAP双向集成、主流电商直连与即用型报表配置