当前位置: 首页 > news >正文

LangChain 自定义 Chain 手写实现

核心原理

LangChain所有链都集成Chain基类

必须实现的两个核心方法

_call:同步执行逻辑

_acall:异步执行逻辑(FastAPI项目必用)

核心属性

input_variables:定义输入参数名

output_variables:定义输出结果

手写自定义业务链

class TestCaseGenerateChain(Chain): """ 自定义专属链:业务需求 → 自动生成标准测试用例 """ # 定义输入变量 必须传入一个叫 business_require 的参数 input_variables: list[str] = ["business_require"] # 定义输出变量 output_variables: list[str] = ["test_case_result"] @property #只读属性 不加括号()就能调用 def _chain_type(self) -> str: # 链标识名称 給自定义链去了唯一标识名称 return "test_case_generate_chain" def _call( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None ) -> Dict[str, Any]: """同步执行逻辑""" # 取出输入参数 require_content = inputs["business_require"] # 自定义业务提示词 prompt = f""" 你是资深测试工程师,根据业务需求生成标准测试用例 格式:用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求:{require_content} """.strip() # 调用大模型 response = llm.invoke(prompt) return {"test_case_result": response.content} async def _acall( self, inputs: Dict[str, Any], run_manager: Optional[CallbackManagerForChainRun] = None ) -> Dict[str, Any]: """异步执行逻辑,适配FastAPI异步接口""" require_content = inputs["business_require"] prompt = f""" 你是资深测试工程师,根据业务需求生成标准测试用例 格式:用例标题 | 前置条件 | 操作步骤 | 预期结果 业务需求:{require_content} """.strip() response = await llm.ainvoke(prompt) return {"test_case_result": response.content} #实例化自定义链 case_chain = TestCaseGeneratrChain() #同步调用 if __name__ == '__main__': res = case_chain.invoke({ "business_reqiore":"实现手机号验证码登录功能" }) print("生成测试用例:\n", res["test_case_result"])

实战 2:多字段输入复合型自定义链

class DocAnalysisChain(Chain): #文档分析链:文档内容+业务场景 - 摘要+核心要点双输出 input_variables = ["doc_content","scene"] output_variables = ["doc_summary","core_points"] @property def _chain_type(self): return "doc_analysis_chain" def _call(self, inputs:Dict,run_manager=None): content = inputs["doc_content"] scene = inputs["scene"] prompt1 = f"基于{scene}场景,精简摘要:{content}" summary = llm.invoke(prompt1).content prompt2 = f"从内容提取核心业务要点:{content}" points = llm.invoke(prompt2).content return { "doc_summary": summary, "core_points": points, } analyze_chain = DocAnalysisChain() result = analyze_chain.invoke({ "doc_content":"登录模块包含账号密码登录、异常拦截校验", "scene":"接口测试" }) print("摘要:",result["doc_summary"]) print("要点:",result["core_points"])

进阶用法: 自定义链 + 之前自定义检索器组合

#先检索知识库内容 retriever = HybridRerankRetriever(collection_name="langchain_rag_kb",all_docs=[]) docs = retriever._get_relevant_documents("登录异常场景") context_text = "\n".join([d.page_content for d in docs]) #传入自定义测试用例链 chain = TestCaseGenerateChain() final_res= chain.invoke({"business_require":context_text})

核心知识

1、所有自定义链必须集成Chain基类

2、固定声明input_variablesoutput_variables规范入参出参

3、_call同步、_acall异步双实现,后端项目必须写异步

4、抛弃固定模板,可自由编写提示词、组合业务逻辑

5、可自由搭配检索器、工具、记忆组件,灵活组装业务流程

6、自定义链可直接接入SequentialChain顺序工作流串联执行

:LangChain 回调函数进阶|流式回调 + 日志追踪 + 异常监听

1、回调核心概念

LangChain执行任何组件(LLM/Chain/Retriever/Tools)都会触发生命周期事件:

开启执行,结束执行

llm发起请求,逐快返回内容

检索开始,检索结束

任务失败抛出异常

BaseCallbackHandler就是用来监听这些事件,自定义业务行为。

内置常用的回调事件

on_chain_start 链开始执行

on_chain_end 链执行结束

on_chain_error 链执行异常

on_llm_start 调用大模型开始

on_llm_new_token 大模型逐字返回Token(流式核心)

on_llm_end 大模型调用结束

on_retriever_start 检索开始 on_retriever_end 检索结束

手写通用自定义回调类

class FullChainCallBackHandler(BaseCallbackHandler): #全功能自定义回调:日志+耗时+流式+异常 def __init__(self): self.start_time = 0 self.stream_content = "" #对外暴露流式字符缓存 self.chunk_queue = [] #链的声明周期 def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs) -> None: self.start_time = time.time() logger.info(f"【链开始执行】类型:{serialized.get('name')},入参:{list(inputs.keys())}") def on_chain_end(self,output:Dict[str,Any],** kwargs): cost = round(time.time() - self.start_time,2) logger.info(f"【链执行完成】耗时:{cost}s,输出字段:{list(outputs.keys())}") def on_chain_error(self,error:BaseException,**kwargs): logger.error(f"【链执行异常】错误信息:{str(error)}") #llm大模型回调(流式核心) def on_llm_start(self,serialized: dict[str, Any],prompts: list[str], **kwargs): logger.info("【发起大模型请求】") self.stream_content = "" self.chunk_queue.clear() def on_llm_new_token(self,token:str, **kwargs): #逐字接受流式token self.stream_content += token self.chunk_queue.append(token) #控制台实时打印流式内容 print((token, end="", flush=True) def on_llm_end(self,response:LLMResult,**kwargs): logger.info("【大模型返回完成】完整内容长度:{len(self.stream_content)}" #检索器回调 def on_retriever_start(self,query:str, **kwargs): logger.info(f"知识库开始,查询问题{query}") def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f"【检索完成】召回文档数量:{len(documents)}")

绑定回调到自定义 Chain / RAG 链

1. 普通调用绑定回调

def on_retriever_end(self,documents:List[Document],**kwargs): logging.info(f"【检索完成】召回文档数量:{len(documents)}") from langchain_core.callbacks import CallbackManager #实例化自定义回调 my_callback = FullChainCallBackHandler() #回调管理器 用来装载一大堆回调函数的管理员 cb_manager = CallbackManager([my_callback]) #实例化自定义链,注入回调 case_chain = TestCaseGenerateChain(callback_manager=cb_manager) if __name__ == "__main__": print("\n===== 开始执行自定义业务链 =====") res = case_chain.invoke({ "business_require": "用户忘记密码找回功能" }) print("\n最终结果:", res["test_case_result"])

2. 绑定到 RAG 检索问答链

llm = get_rag_llm() retriever = HybridRerankRetriever(collection_name="langchain_rag_kb",all_docs=[]) rag_cb = FullChainCallBackHandler() rag_manager = CallbackManager([rag_cb]) rag_chain = RetrievalQA.from_chain_type( llm=llm, retriever=retriever, retriever_manager=rag_manager, ) rag_chain.invoke({"query":"登录异常有哪些情况"})

FastAPI 对接回调实现 SSE 流式

把回调里的chunk_queue取出,结合生成器推送前端,抛弃旧流式写法,统一用回调管控

def sse_stream_generator(query:str): cb = FullChainCallBackHandler() cb_mgr = CallbackManager([cb]) rag_chain = RetrievalQA.from_chain_type(llm=get_rag_llm(),retriever=retriever,callbacks=cb_mgr) #异步后台执行 import threading threading.Thread(target=rag_chain.invoke, args=({"query":query},)).start() #循环去出流式片段推送 import time while True: if cb.chunk_queue: yield f"data: {char}\n\n" time.sleep(0.02) if not cb.chunk_queue and len(cb.stream_content) > 0 : break yield "data: {DONE}\n\n"

优势:

全链路统一监听,流式,日志,耗时统计一套代码搞定

六、企业级实际用途

  1. 性能监控:统计每一条问答、每一条接口 LLM 调用耗时
  2. 日志审计:记录用户提问、模型输出,方便回溯问题
  3. 异常兜底:模型超时、接口报错、检索失败统一捕获
  4. 流式统一管控:所有 AI 输出流式全部走回调,不用多处写流式逻辑
  5. Token 统计扩展:可在回调内对接接口统计输入输出 token,核算成本

今日核心总结

1、所有 LangChain 组件都支持注入CallbackManager绑定自定义回调

2、on_llm_new_token是实现流式输出最标准、企业最常用方式

3、回调可监听:链、大模型、检索器、工具全生命周期

4、同步 / 异步 Chain、RAG、Agent 全部通用这套回调体系

5、线上项目排查问题、性能优化、流式交互优先用回调实现

http://www.jsqmd.com/news/845673/

相关文章:

  • 从地图导航到网络路由:深入理解Floyd-Warshall算法的动态规划内核与空间优化技巧
  • 从防潮修复到智能升级:2026年佛山卫生间改造市场深度解析 - 优家闲谈
  • pc16550 LSTAT 位定义
  • 告别PLINK原始数据:用R包CMplot三步搞定SNP密度图(附完整代码)
  • TEdit终极指南:3步掌握开源泰拉瑞亚地图编辑器的完整教程
  • Obsidian个性化首页终极指南:3种配置方案提升知识管理效率70%
  • Vue-Codemirror 6:为什么它成为Vue3项目代码编辑器的首选方案?
  • 通过Taotoken CLI交互菜单快速完成团队开发环境统一配置
  • 终极指南:用DDrawCompat在现代Windows上完美复活经典游戏
  • 2026年乌鲁木齐搬家公司怎么选?同城搬迁、大件搬运一站式对标指南 - 企业名录优选推荐
  • 2026年智慧化实验室品牌推荐:国产IVD品牌横向对比,谁更接近医学检验“黑灯实验室”? - 博客万
  • 5个技巧彻底解决鸣潮性能卡顿:WaveTools终极优化指南
  • Perplexity招聘搜索失效?别再用Google了!工程师亲测有效的4层穿透式检索法(含Chrome插件配置清单)
  • 贝叶斯优化为何比DOE更高效?
  • 【ACM稳检索、河北美术学院主办、人文社科可投】2026年人工智能和数字人文国际学术会议(AIDH 2026) - 爱写稿的小帅哥
  • NoFences:重新定义Windows桌面管理的开源解决方案
  • Leetcode56 Merge Intervals 合并区间 -- C++实现
  • bugku——PWN——overflow2
  • 本地大模型部署终极指南:llama-cpp-python实战深度解析
  • QRazyBox:轻松修复损坏二维码的专业工具箱
  • 终极隐私保护神器:Boss-Key窗口隐藏工具的完整使用指南
  • 2026年4月评价高的活性炭箱优质厂家推荐,活性炭箱/沸石转轮/除尘器/催化燃烧,活性炭箱制造企业推荐分析 - 品牌推荐师
  • 支付系统在文旅场景的进阶之路:聚合收单、分账与自动化对账
  • 避开这些坑!STM32H743 FDCAN搭配TJA1042T的滤波器与中断配置避坑指南
  • 长沙二手房全屋定制公司实测评测:适配性与服务能力对比 - 奔跑123
  • PP/PPH储罐、PP/PPH搅拌罐
  • Illustrator智能对象替换引擎:如何将设计效率提升20倍?
  • 存量焕新与品质重塑:2026年东莞厨卫翻新市场深度洞察 - 优家闲谈
  • 从CTF靶场到实战:手把手复现UUCTF Web赛题中的PHP反序列化字符串逃逸漏洞
  • Perplexity字体调用失败?揭秘API响应延迟、字体缓存失效及跨域加载失败的5大根因