Spring AI Alibaba 1.x 系列【55】Interrupts 中断机制:静态中断源码分析
文章目录
- 1. interruptBefore 模式
- 1.1 中断判断逻辑
- 1.2 构建中断元数据
- 1.3 返回中断响应
- 1.4 初始化【中断执行】上下文
- 1.5 合并状态(BUG)
- 1.6 执行结束
- 2. interruptsAfter 模式
- 2.1 设置 INTERRUPT_AFTER 标记
- 2.2 动态计算下一个节点
- 3. 中断时机对比
1. interruptBefore 模式
核心特性:
- 执行前中断:节点业务逻辑执行前触发
- 静态配置:节点
ID配置在interruptsBefore列表 - 无动态路由:恢复后直接执行原节点
1.1 中断判断逻辑
MainGraphExecutor#execute关于中断处理的核心逻辑:
- 中断判断:调用上下文方法,校验是否满足流程中断条件
- 元数据构建:记录当前节点
ID+ 克隆的状态数据(防止原状态被篡改) - 响应返回:返回流程中断完成的响应,携带中断信息
// 3. 判断是否需要中断 → 触发中断,返回中断元数据if(context.shouldInterrupt()){InterruptionMetadatametadata=InterruptionMetadata.builder(context.getCurrentNodeId(),context.cloneState(context.getCurrentStateData())).build();returnFlux.just(GraphResponse.done(metadata));}GraphRunnerContext#shouldInterrupt()是中断判断的入口,只要执行前中断或执行后中断满足一个,就会返回true:
/** * 判断当前流程是否需要中断 * @return true-需要中断,false-不需要中断 */publicbooleanshouldInterrupt(){// 满足【节点执行前中断】 或 【节点执行后中断】 任一条件,即触发中断returnshouldInterruptBefore(nextNodeId,currentNodeId)||shouldInterruptAfter(currentNodeId,nextNodeId);}执行前中断shouldInterruptBefore()判断:
/** * 判断是否需要在【目标节点执行前】进行中断 * @param nodeId 即将执行的下一个节点ID * @param previousNodeId 当前执行完成的节点ID * @return true-执行前中断,false-不中断 */privatebooleanshouldInterruptBefore(StringnodeId,StringpreviousNodeId){// 无上一个节点(流程初始状态),不执行前置中断if(previousNodeId==null)returnfalse;// 判断节点ID是否配置在【执行前中断点】集合中returncompiledGraph.compileConfig.interruptsBefore().contains(nodeId);}执行后中断shouldInterruptAfter()判断:
/** * 判断是否需要在【当前节点执行后】进行中断 * @param nodeId 即将执行的下一个节点ID * @param previousNodeId 当前执行完成的节点ID * @return true-执行后中断,false-不中断 */privatebooleanshouldInterruptAfter(StringnodeId,StringpreviousNodeId){// 无下一个节点 或 下一个节点与当前节点是同一个,不执行后置中断if(nodeId==null||Objects.equals(nodeId,previousNodeId))returnfalse;// 满足任一条件则后置中断:// 1. 开启了条件边执行前中断 + 节点标记为固定后置中断节点// 2. 节点ID配置在【执行后中断点】集合中return(compiledGraph.compileConfig.interruptBeforeEdge()&&Objects.equals(nodeId,INTERRUPT_AFTER))||compiledGraph.compileConfig.interruptsAfter().contains(nodeId);}1.2 构建中断元数据
需要中断时,会构建InterruptionMetadata:
// 创建中断元数据,包含当前节点ID和状态InterruptionMetadatametadata=InterruptionMetadata.builder(context.getCurrentNodeId(),context.cloneState(context.getCurrentStateData())).build();构建参数:
currentNodeId:当前执行中断的节点名称currentStateData:调用overallState.data()获取的状态data数据
build()构建逻辑:
// 1. 私有构造方法,只能通过内部 Builder 类创建对象privateInterruptionMetadata(Builderbuilder){// 2. 调用父类构造方法,传入 builder 中的 nodeId 和 statesuper(builder.nodeId,builder.state);// 3. 给当前类的 metadata 字段赋值(直接引用 builder 中的值)this.metadata=builder.metadata();// 4. 给 toolFeedbacks 赋值:创建新 ArrayList,拷贝 builder 中的集合this.toolFeedbacks=newArrayList<>(builder.toolFeedbacks);// 5. 安全赋值 toolsAutomaticallyApproved:空值防护if(builder.toolsAutomaticallyApproved!=null){this.toolsAutomaticallyApproved=builder.toolsAutomaticallyApproved;}else{// 6. 如果 builder 中为 null,赋值为空集合,避免后续 NPEthis.toolsAutomaticallyApproved=newArrayList<>();}}构建完成后的对象:
1.3 返回中断响应
MainGraphExecutor#execute将中断数据包装为GraphResponse返回:
returnFlux.just(GraphResponse.done(metadata));调用方需要判断输出类型为InterruptionMetadata时,说明流程中断了,需要向用户显示可处理的操作,用户执行操作后,进入到流程恢复阶段。
1.4 初始化【中断执行】上下文
用户操作后进入到流程恢复阶段,首先进入到GraphRunner#run方法初始化【执行】上下文,再调用执行器执行,和上篇动态中断一致。
1.5 合并状态(BUG)
和上篇动态中断一致。
1.6 执行结束
合并状态之后,说明这个暂停节点已经被正式恢复了,按照正常流程继续执行直到结束。
2. interruptsAfter 模式
核心特性:
- 执行后中断:节点业务逻辑执行完成后触发
- 动态路由:开启
interruptBeforeEdge,恢复时重新计算下一个节点
interruptsAfter和interruptBefore的处理流程一致,主要区别是interruptsAfter支持配置interruptBeforeEdge参数,支持在恢复时动态计算下一个节点,所以下面只介绍下不一样的地方。
2.1 设置 INTERRUPT_AFTER 标记
在NodeExecutor节点执行完成处理时,当前节点是否配置了【执行后中断】,并开启了interruptBeforeEdge配置, 会将下一个节点设置为固定的INTERRUPT_AFTER:
// 判断是否开启【边中断】机制 + 当前节点是否配置了【执行后中断】if(context.getCompiledGraph().compileConfig.interruptBeforeEdge()&&context.getCompiledGraph().compileConfig.interruptsAfter().contains(context.getCurrentNodeId())){// ==============================================// 场景:节点执行后,走【中断】,不自动走向下一个节点// 下一个节点设置为固定的 INTERRUPT_AFTER// ==============================================context.setNextNodeId(INTERRUPT_AFTER);}else{// ==============================================// 正常场景:根据当前状态 + 节点路由规则,计算下一个真实节点// ==============================================CommandnextCommand=context.nextNodeId(context.getCurrentNodeId(),context.getCurrentStateData());context.setNextNodeId(nextCommand.gotoNode());}2.2 动态计算下一个节点
创建好上下文后,调用MainGraphExecutor执行中,进入到处理中断恢复处理逻辑,如果配置了interruptBeforeEdge = true且下一个节点是INTERRUPT_AFTER,说明要重新计算下一个节点:
// 从哪里中断的,由 创建上下文时设置finalvarresumeFrom=context.getResumeFromAndReset();if(resumeFrom.isPresent()){// 开启延迟路由 + 当前是中断标记 → 重新计算下一个节点if(context.getCompiledGraph().compileConfig.interruptBeforeEdge()&&java.util.Objects.equals(context.getNextNodeId(),INTERRUPT_AFTER)){varnextNode=context.nextNodeId(resumeFrom.get(),context.getCurrentStateData());context.setNextNodeId(nextNode.gotoNode());context.setCurrentNodeId(null);}}3. 中断时机对比
| 中断类型 | 触发位置 | 状态已更新 | nextNodeId 已计算 | checkpoint 已创建 | 适用场景 |
|---|---|---|---|---|---|
interruptsBefore | MainGraphExecutor | ❌ | ❌ | ❌ | 节点执行前审批 |
interruptsAfter | MainGraphExecutor | ✅ | ✅ | ✅ | 节点执行后审批 |
interruptBeforeEdge | NodeExecutor | ✅ | ❌ (INTERRUPT_AFTER) | ✅ | 分支决策前审批 |
InterruptableAction.interrupt() | NodeExecutor | ❌ | ❌ | ❌ | 自定义执行前中断 |
InterruptableAction.interruptAfter() | NodeExecutor | ✅ (先合并) | ✅ | ✅ | 自定义执行后中断 |
