【LangChain】 Runnable 链式调用深度解析:从 `itemgetter` 到 `RunnableLambda`
LangChain Runnable 链式调用深度解析:从itemgetter到RunnableLambda
本文基于 LangChain 框架,深入解析 Runnable 链式调用中的核心机制,重点剖析
itemgetter、|管道符以及RunnableLambda的用法与设计哲学。
一、从一个典型示例说起
先看一段典型的 LangChain 链式代码:
fromoperatorimportitemgetterfromlangchain_core.runnablesimportRunnableLambdafromlangchain.promptsimportChatPromptTemplatefromlangchain_community.chat_modelsimportChatTongyifromlangchain_core.output_parsersimportStrOutputParser# 计算字符串长度的普通函数deflength_function(text):returnlen(text)# Prompt 模板prompt=ChatPromptTemplate.from_template("{a} + {b} = ? 计算结果是多少?")# 大模型model=ChatTongyi()# 输出解析器out=StrOutputParser()# 构建链chain=({"a":itemgetter("k1")|RunnableLambda(length_function),"b":itemgetter("k2")|RunnableLambda(length_function)}|prompt|model|out)# 调用result=chain.invoke({"k1":"hello","k2":"world"})# 结果:模型回答 "10"这段代码的核心是将数据提取、函数处理、Prompt 构造、大模型推理串联成一个流水线。下面逐层拆解。
二、itemgetter:从标准库走来的"提取器"
2.1 基本用法
itemgetter来自 Python 标准库operator模块,用于从可迭代对象中提取指定索引或键的值:
fromoperatorimportitemgetter# 从字典提取data={"k1":"hello","k2":"world"}getter=itemgetter("k1")print(getter(data))# 输出: hello# 从列表提取(按索引)arr=["a","b","c"]getter=itemgetter(0,2)print(getter(arr))# 输出: ('a', 'c')2.2 本质:一个可调用对象
itemgetter("k1")返回的是一个函数对象,等价于:
lambdax:x["k1"]它本身不是 LangChain 的组件,也不懂什么是 Runnable、什么是管道符|。
三、|管道符:LangChain 的链式编排语法糖
3.1 Python 运算符重载机制
Python 执行A | B时,按以下顺序尝试:
1. 调用 A.__or__(B) ← 左边对象的"或"方法 2. 如果返回 NotImplemented 调用 B.__ror__(A) ← 右边对象的"反向或"方法Python 只负责"打电话",接不接、怎么接,完全由对象自己决定。(ror 就是 right or)
何时返回 NotImplemented
一句话总结NotImplemented 是"我处理不了,让别人试试"的信号。在 A | B 中:
A.__or__(B) 返回 NotImplemented → 尝试 B.__ror__(A)
两边都返回 NotImplemented → 抛 TypeError
方法根本不存在 → 等同于返回 NotImplemented ,直接进入下一步LangChain 的 Runnable.__ror__ 之所以能自动包装左边,正是因为 Python 的这个回退机制:普通函数没有 __or__ ,所以解释器自动去尝试 RunnableLambda.__ror__() ,LangChain 在那里拦截并包装。
3.2 LangChain 的双向策略
LangChain 的Runnable基类实现了__or__和__ror__,但两边策略截然不同:
| 位置 | 处理逻辑 | 原因 |
|---|---|---|
左边(__ror__) | 如果不是 Runnable,自动包一层RunnableLambda | 入口要兼容各种原始数据格式 |
右边(__or__) | 如果不是 Runnable,直接抛TypeError | 中间环节必须类型安全 |
# 源码逻辑示意(伪代码)classRunnable:def__ror__(self,other):# other 在左边ifnotisinstance(other,Runnable):other=RunnableLambda(other)# 自动包装returnRunnableSequence(other,self)def__or__(self,other):# other 在右边ifnotisinstance(other,Runnable):raiseTypeError(f"Expected a Runnable, got{type(other)}")returnRunnableSequence(self,other)3.3 为什么是"左包右不包"?
这是LangChain 的设计选择,不是 Python 语法限制:
- 左边是"数据源":需要兼容字典、函数、常量等各种输入形式,自动包装降低使用门槛
- 右边是"处理环节":链的中间节点必须是可控的 Runnable,避免隐式转换带来的调试困难
- 显式优于隐式:右边想用普通函数?请显式声明
RunnableLambda(func)
四、RunnableLambda:让普通函数融入流水线
4.1 核心作用
RunnableLambda是 LangChain 提供的适配器,将普通 Python 函数包装成符合Runnable接口的对象:
fromlangchain_core.runnablesimportRunnableLambdadefmy_func(x):returnx.upper()# 包装前:普通函数my_func("hello")# 直接调用# 包装后:Runnable 对象runnable=RunnableLambda(my_func)runnable.invoke("hello")# 通过 Runnable 接口调用包装后,该函数就具备了 Runnable 的全部能力:
invoke()—— 同步单条执行batch()—— 批量执行stream()—— 流式输出(如果函数支持生成器)ainvoke()—— 异步执行- 可以用
|与其他 Runnable 串联
4.2 完整用法示例
示例 1:基础包装与调用
fromlangchain_core.runnablesimportRunnableLambdadefadd_one(x:int)->int:returnx+1runnable=RunnableLambda(add_one)# 各种调用方式print(runnable.invoke(5))# 6print(runnable.batch([1,2,3]))# [2, 3, 4]# 异步调用importasyncioasyncdefmain():result=awaitrunnable.ainvoke(5)print(result)# 6asyncio.run(main())示例 2:函数接收字典输入
defextract_and_count(data:dict)->int:text=data.get("text","")returnlen(text)runnable=RunnableLambda(extract_and_count)print(runnable.invoke({"text":"hello"}))# 5示例 3:在链中串联使用
fromlangchain_core.runnablesimportRunnableLambda,RunnablePassthrough chain=(RunnablePassthrough()# 透传输入|RunnableLambda(lambdax:x*2)# 乘以 2|RunnableLambda(lambdax:x+1)# 加 1|RunnableLambda(lambdax:f"结果:{x}")# 格式化)print(chain.invoke(5))# 结果: 11 (5*2+1=11)示例 4:与 itemgetter 配合使用(本文开头示例的变体)
fromoperatorimportitemgetterfromlangchain_core.runnablesimportRunnableLambda# 原始数据data={"user":{"name":"Alice","age":30}}# 构建链:提取嵌套字段 -> 格式化defformat_user(name:str)->str:returnf"用户名:{name}"# itemgetter 提取嵌套值chain=(itemgetter("user")# 提取 {"name": "Alice", "age": 30}|itemgetter("name")# 提取 "Alice"|RunnableLambda(format_user)# 格式化为 "用户名: Alice")print(chain.invoke(data))# 用户名: Alice4.3 输入输出类型标注
LangChain 支持通过类型提示自动推断输入输出模式:
fromtypingimportTypedDictclassInput(TypedDict):text:strclassOutput(TypedDict):length:intdefcount_chars(data:Input)->Output:return{"length":len(data["text"])}runnable=RunnableLambda(count_chars)# LangChain 会自动识别 Input/Output 的结构4.4 错误处理与重试
RunnableLambda 继承 Runnable 的全部能力,包括重试机制:
fromlangchain_core.runnablesimportRunnableLambdadefflaky_function(x):importrandomifrandom.random()<0.5:raiseValueError("随机失败")returnx*2# 配置重试runnable=RunnableLambda(flaky_function).with_retry(stop_after_attempt=3,wait_exponential_jitter=True)print(runnable.invoke(5))# 自动重试最多 3 次五、完整数据流回顾
以本文开头的示例为例,完整数据流如下:
输入: {"k1": "hello", "k2": "world"} ↓ ┌─────────────────────────────────────────────┐ │ 并行分支处理(字典构造) │ │ ├─ "k1" → itemgetter("k1") → "hello" │ │ │ ↓ │ │ │ RunnableLambda(length_function) │ │ │ ↓ │ │ │ 5 ────────────┐ │ │ │ │ │ │ └─ "k2" → itemgetter("k2") → "world" │ │ ↓ │ │ RunnableLambda(length_function) │ │ ↓ │ │ 5 ─────────────┘ │ │ ↓ │ │ {"a": 5, "b": 5} │ └─────────────────────────────────────────────┘ ↓ ChatPromptTemplate.from_template("{a} + {b} = ? ...") ↓ "5 + 5 = ? 计算结果是多少?" ↓ ChatTongyi() 大模型推理 ↓ StrOutputParser() 解析输出 ↓ 最终结果: "10"六、总结
| 概念 | 核心要点 |
|---|---|
itemgetter | Python 标准库函数,用于提取字典键/列表索引值 |
|管道符 | LangChain 的链式编排语法,依赖 Python 运算符重载 |
| 左包右不包 | LangChain 设计选择:__ror__自动包装左边,__or__要求右边必须是 Runnable |
RunnableLambda | 将普通 Python 函数包装为 Runnable,使其可接入流水线 |
| 设计哲学 | 入口宽松(兼容原始数据),中间严格(保证类型安全),显式优于隐式 |
RunnableLambda是连接"普通 Python 代码"与"LangChain 流水线"的桥梁,理解它的用法,是掌握 LangChain 链式编排的关键一步。
本文基于 LangChain 框架源码及实践总结,如有错误欢迎指正。
