当前位置: 首页 > news >正文

Spring AI Alibaba 项目源码学习(八)-Flow Agent 分析

Flow Agent 分析

请关注微信公众号:阿呆-bot

概述

本文档分析 Spring AI Alibaba Agent Framework 中的 Flow Agent 系列,包括 FlowAgent 基类、SequentialAgentParallelAgentLoopAgentLlmRoutingAgent 的具体实现,重点分析其关键方法、功能职责和节点能力。

入口类说明

FlowAgent - Flow Agent 基类

FlowAgent 是所有 Flow Agent 的抽象基类,继承自 Agent,提供了多 Agent 编排的基础能力。

核心职责

  • 管理子 Agent 列表
  • 提供 Graph 构建的抽象方法
  • 委托给 FlowGraphBuilder 构建图

关键代码

public abstract class FlowAgent extends Agent {protected List<String> interruptBefore;protected List<Agent> subAgents;protected FlowAgent(String name, String description, CompileConfig compileConfig, List<Agent> subAgents)throws GraphStateException {super(name, description);this.compileConfig = compileConfig;this.subAgents = subAgents;}@Overrideprotected StateGraph initGraph() throws GraphStateException {// Use FlowGraphBuilder to construct the graphFlowGraphBuilder.FlowGraphConfig config = FlowGraphBuilder.FlowGraphConfig.builder().name(this.name()).rootAgent(this).subAgents(this.subAgents());// Delegate to specific graph builder based on agent typereturn buildSpecificGraph(config);}@Overridepublic ScheduledAgentTask schedule(ScheduleConfig scheduleConfig) throws GraphStateException {CompiledGraph compiledGraph = getAndCompileGraph();return compiledGraph.schedule(scheduleConfig);}public StateGraph asStateGraph(){return getGraph();}/*** Abstract method for subclasses to specify their graph building strategy. This* method should be implemented by concrete FlowAgent subclasses to define how their* specific graph structure should be built.* @param config the graph configuration* @return the constructed StateGraph* @throws GraphStateException if graph construction fails*/protected abstract StateGraph buildSpecificGraph(FlowGraphBuilder.FlowGraphConfig config)throws GraphStateException;public CompileConfig compileConfig() {return compileConfig;}public List<Agent> subAgents() {return this.subAgents;}/*** Creates a map with messages and input for String message*/private Map<String, Object> createInputMap(String message) {return Map.of("messages", convertToMessages(message), "input", message);}}

关键方法

  • buildSpecificGraph():抽象方法,子类实现具体的图构建策略
  • subAgents():获取子 Agent 列表

SequentialAgent - 顺序执行 Agent

SequentialAgent 按顺序执行多个子 Agent,前一个 Agent 的输出作为下一个 Agent 的输入。

关键代码

public class SequentialAgent extends FlowAgent {protected SequentialAgent(SequentialAgentBuilder builder) throws GraphStateException {super(builder.name, builder.description, builder.compileConfig, builder.subAgents);}public static SequentialAgentBuilder builder() {return new SequentialAgentBuilder();}@Overrideprotected StateGraph buildSpecificGraph(FlowGraphBuilder.FlowGraphConfig config) throws GraphStateException {return FlowGraphBuilder.buildGraph(FlowAgentEnum.SEQUENTIAL.getType(), config);}/*** Builder for creating SequentialAgent instances. Extends the common FlowAgentBuilder* to provide type-safe building.*/public static class SequentialAgentBuilder extends FlowAgentBuilder<SequentialAgent, SequentialAgentBuilder> {@Overrideprotected SequentialAgentBuilder self() {return this;}@Overrideprotected void validate() {super.validate();// Add any SequentialAgent-specific validation here if needed}@Overridepublic SequentialAgent build() throws GraphStateException {validate();return new SequentialAgent(this);}}}

执行模式

  • Agent1 → Agent2 → Agent3 → ... → END
  • 每个 Agent 的输出作为下一个 Agent 的输入

ParallelAgent - 并行执行 Agent

ParallelAgent 并行执行多个子 Agent,然后合并结果。

关键代码

public class ParallelAgent extends FlowAgent {private static final Logger logger = LoggerFactory.getLogger(ParallelAgent.class);private final MergeStrategy mergeStrategy;private String mergeOutputKey;private final Integer maxConcurrency;protected ParallelAgent(ParallelAgentBuilder builder) throws GraphStateException {super(builder.name, builder.description, builder.compileConfig, builder.subAgents);this.mergeStrategy = builder.mergeStrategy != null ? builder.mergeStrategy : new DefaultMergeStrategy();this.maxConcurrency = builder.maxConcurrency;this.mergeOutputKey = builder.mergeOutputKey;}public static ParallelAgentBuilder builder() {return new ParallelAgentBuilder();}@Overrideprotected StateGraph buildSpecificGraph(FlowGraphBuilder.FlowGraphConfig config) throws GraphStateException {// Add parallel-specific properties to configconfig.customProperty("mergeStrategy", this.mergeStrategy);config.customProperty("maxConcurrency", this.maxConcurrency);return FlowGraphBuilder.buildGraph(FlowAgentEnum.PARALLEL.getType(), config);}/*** Gets the merge strategy used by this ParallelAgent.* @return the merge strategy*/public MergeStrategy mergeStrategy() {return mergeStrategy;}public String mergeOutputKey() {return mergeOutputKey;}/*** Gets the maximum concurrency limit for this ParallelAgent.* @return the max concurrency, or null if unlimited*/public Integer maxConcurrency() {

执行模式

  • 所有子 Agent 同时执行(Fan-Out)
  • 执行完成后合并结果(Gather)
  • 支持最大并发数限制

LoopAgent - 循环执行 Agent

LoopAgent 循环执行一个子 Agent,支持多种循环策略。

关键代码

public class LoopAgent extends FlowAgent {private final LoopStrategy loopStrategy;public static final String LOOP_STRATEGY = "loopStrategy";private LoopAgent(LoopAgentBuilder builder) throws GraphStateException {super(builder.name, builder.description, builder.compileConfig, builder.subAgents);this.loopStrategy = builder.loopStrategy;}@Overrideprotected StateGraph buildSpecificGraph(FlowGraphBuilder.FlowGraphConfig config) throws GraphStateException {config.customProperty(LOOP_STRATEGY, loopStrategy);return FlowGraphBuilder.buildGraph(FlowAgentEnum.LOOP.getType(), config);}public static LoopAgentBuilder builder() {return new LoopAgentBuilder();}public static class LoopAgentBuilder extends FlowAgentBuilder<LoopAgent, LoopAgentBuilder> {private LoopStrategy loopStrategy = null;@Overrideprotected LoopAgentBuilder self() {return this;}public LoopAgentBuilder subAgent(Agent subAgent) {this.subAgents = List.of(subAgent);return self();}@Overridepublic LoopAgentBuilder subAgents(List<Agent> subAgents) {throw new UnsupportedOperationException("LoopAgent must have only one subAgent, please use subAgent() method.");}public LoopAgentBuilder loopStrategy(LoopStrategy loopStrategy) {this.loopStrategy = loopStrategy;return self();

循环策略

  • CountLoopStrategy:固定次数循环
  • ConditionLoopStrategy:条件循环(条件为 true 时终止)
  • ArrayLoopStrategy:遍历数组元素

LlmRoutingAgent - LLM 路由 Agent

LlmRoutingAgent 使用 LLM 根据输入内容路由到不同的子 Agent。

关键代码

public class LlmRoutingAgent extends FlowAgent {private final ChatModel chatModel;protected LlmRoutingAgent(LlmRoutingAgentBuilder builder) throws GraphStateException {super(builder.name, builder.description, builder.compileConfig, builder.subAgents);this.chatModel = builder.chatModel;}public static LlmRoutingAgentBuilder builder() {return new LlmRoutingAgentBuilder();}@Overrideprotected StateGraph buildSpecificGraph(FlowGraphBuilder.FlowGraphConfig config) throws GraphStateException {config.setChatModel(this.chatModel);return FlowGraphBuilder.buildGraph(FlowAgentEnum.ROUTING.getType(), config);}/*** Builder for creating LlmRoutingAgent instances. Extends the common FlowAgentBuilder* and adds LLM-specific configuration.*/public static class LlmRoutingAgentBuilder extends FlowAgentBuilder<LlmRoutingAgent, LlmRoutingAgentBuilder> {private ChatModel chatModel;/*** Sets the ChatModel for LLM-based routing decisions.* @param chatModel the chat model to use for routing* @return this builder instance for method chaining*/public LlmRoutingAgentBuilder model(ChatModel chatModel) {this.chatModel = chatModel;return this;}@Overrideprotected LlmRoutingAgentBuilder self() {return this;}@Overrideprotected void validate() {super.validate();if (chatModel == null) {throw new IllegalArgumentException("ChatModel must be provided for LLM routing agent");}}@Overridepublic LlmRoutingAgent build() throws GraphStateException {validate();return new LlmRoutingAgent(this);}}}

路由机制

  • 使用 LLM 分析输入内容
  • 根据分析结果选择目标 Agent
  • 支持动态路由决策

节点能力

ConditionEvaluator - 条件评估器

ConditionEvaluator 用于评估条件,支持条件路由。

功能

  • 评估状态中的条件表达式
  • 返回布尔值用于路由决策
  • 支持复杂条件逻辑

RoutingEdgeAction - 路由边动作

RoutingEdgeAction 用于实现路由逻辑,根据条件选择目标节点。

功能

  • 根据状态评估路由条件
  • 返回目标节点 ID
  • 支持 LLM 路由和条件路由

TransparentNode - 透明节点

TransparentNode 是一个透明节点,直接传递状态,不进行任何处理。

功能

  • 用于图结构的占位
  • 保持状态不变
  • 简化图结构

关键类关系

以下 PlantUML 类图展示了 Flow Agent 的类关系:

image.png

关键流程

以下 PlantUML 时序图展示了 SequentialAgent 的执行流程:
image.png

实现关键点说明

1. 策略模式

Flow Agent 使用策略模式构建图:

  • FlowGraphBuildingStrategy 定义图构建策略接口
  • 每种 Flow Agent 对应一个具体的策略实现
  • FlowGraphBuilder 根据类型选择策略

2. Builder 模式

所有 Flow Agent 都使用 Builder 模式:

  • FlowAgentBuilder 提供通用构建能力
  • 每个具体 Agent 有自己的 Builder 类
  • 支持链式调用和参数验证

3. 节点转换

子 Agent 通过 asNode() 方法转换为 Graph 节点:

  • BaseAgent.asNode() 将 Agent 转换为 SubGraphNode
  • 支持 Agent 嵌套和组合
  • 保持状态传递和隔离

4. 条件路由

支持条件路由能力:

  • ConditionEvaluator 评估条件表达式
  • RoutingEdgeAction 实现路由逻辑
  • 支持复杂条件判断

5. 并行执行

ParallelAgent 实现真正的并行执行:

  • 使用 Graph 的并行节点能力
  • 支持最大并发数限制
  • 提供结果合并策略

6. 循环策略

LoopAgent 支持多种循环策略:

  • CountLoopStrategy:固定次数
  • ConditionLoopStrategy:条件循环
  • ArrayLoopStrategy:数组遍历
  • 可扩展自定义策略

总结说明

核心设计理念

  1. 编排模式:提供顺序、并行、循环、路由四种编排模式
  2. 策略模式:通过策略模式实现灵活的图构建
  3. 节点能力:支持条件评估、路由选择等节点能力
  4. 可组合性:支持 Agent 嵌套和组合

关键优势

  • 灵活性:支持多种编排模式,满足不同场景需求
  • 可扩展性:通过策略模式支持自定义编排逻辑
  • 性能:并行执行提高处理效率
  • 智能路由:LLM 路由实现智能 Agent 选择

使用场景

  • SequentialAgent:需要顺序处理的多步骤任务
  • ParallelAgent:可以并行处理的独立任务
  • LoopAgent:需要循环处理的任务(如批量处理)
  • LlmRoutingAgent:需要根据内容智能选择处理方式的场景

Flow Agent 系列为多 Agent 编排提供了完整的解决方案,使开发者能够灵活构建复杂的 Agent 工作流。

http://www.jsqmd.com/news/41866/

相关文章:

  • Why did Hitler become a greater Napoleon?
  • vcpkg交叉编译
  • 详细介绍:什么是机械设备制造ERP?哲霖软件如何助力企业实现降本增效?
  • python -m pip install 就行 我pip install就不行?
  • Personalized QRCode - 个性化自定义二维码生成器
  • 对“机器人VCU”进行一个详细、架构的讲解。
  • Qt编写28181推流分发服务/统计访问数量/无人观看超时关闭/等待重新点播/复用点播
  • 20232407 2025-2026-1 《网络与系统攻防技术》 实验五实验报告
  • 实现string类
  • 实用指南:Vue 实例生命周期
  • React Native创建AndroidIOS流程完整指南
  • Ducky - BPMN 工作流管理系统
  • 图论建模问题
  • python多进程通信中的Queue、SimpleQueue、Pipe
  • 旧版本SiK数传的对频问题
  • 2025年甘肃广告策划服务综合推荐排行榜
  • 2025年甘肃兰州专业的广告物料制作公司推荐
  • 2025年甘肃兰州比较好的广告物料制作服务团队
  • wordpress批量删除文章
  • OpenAI Agent Kit 全网首发深度解读与上手指南 - 详解
  • supabase
  • 2025年加工型辣椒种子生产厂家排名前十:权威评测与选择攻略
  • 251116
  • 2025年加工型辣椒种子品牌前十强排行榜:镇江市镇研种业有限公司领跑行业
  • 2025年螺丝椒种子品牌综合实力排行榜前十强揭晓
  • 2025年线椒种子品牌前十强排名:专业选购指南与厂家实力解析
  • 2025年辣椒种子品牌前十强排行榜及深度解析
  • 华为鲲鹏 Aarch64 环境下多 Oracle 数据库汇聚操作指南 CMP(类 Cloudera CDP 7.3) - 指南
  • 这段时间的NOIP模拟赛
  • 《重生之我成为世界顶级黑客》第三章:艰难的抉择