当前位置: 首页 > news >正文

Agent Framework 定义流程节点以及节点的流式输出

今天我们开启Agent Framework Workflow系列,当我们在构建 AI Agent 或多步骤自动化系统时,工作通常不是一步完成的。而是一个任务可能会被拆分成多个独立步骤,每个步骤单独处理的过程。

在 Agent Framework 的 workflow 中,执行器(Executor)是工作流里的基本处理单元。它接收输入,执行一段具体逻辑,然后输出结果给下一个执行器。

比如,一个任务可以通过工作流编排进行串联,在执行过程中依次完成输入预处理、模型推理以及结果后处理等阶段。

如何创建执行器

我们定义两个 executor。

第一个是UppercaseExecutor,负责把输入字符串转换成大写:

internal sealed class UppercaseExecutor() : Executor<string, string>("UppercaseExecutor") { public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) => ValueTask.FromResult(message.ToUpperInvariant()); }

它继承自:

Executor

这表示该执行器接收一个string类型输入,并返回一个string类型结果。

构造函数中的"UppercaseExecutor"是执行器 ID,在后续流式事件输出中可以用来识别是哪一个执行器完成了处理。

第二个执行器是ReverseTextExecutor,负责反转字符串:

internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor") { public override ValueTask<string> HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken = default) { return ValueTask.FromResult(string.Concat(message.Reverse())); } }

如果输入是:

HELLO, WORLD!

那么它会返回:

!DLROW ,OLLEH

如何构建工作流

当我们有了执行器之后,就可以使用WorkflowBuilder将它们连接起来。

示例中的核心代码如下:

UppercaseExecutor uppercase = new(); ReverseTextExecutor reverse = new(); WorkflowBuilder builder = new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); var workflow = builder.Build();

这里主要做了几件事:

完成了工作流的构建:首先创建 UppercaseExecutor 和 ReverseTextExecutor 两个执行器实例,分别保存为 uppercase 和 reverse;然后以 uppercase 作为工作流起点,在 uppercase 和 reverse 之间建立执行顺序,使 UppercaseExecutor 的输出作为 ReverseTextExecutor 的输入;最后指定 reverse 的输出作为整个 workflow 的最终结果,并构建出 workflow 实例。

其中:

builder.AddEdge(uppercase, reverse)

表示上游执行器uppercase的输出会传递给下游执行器reverse

而:

.WithOutputFrom(reverse)

表示reverse执行器的结果会作为工作流的输出。

工作流的流式输出

在Agent Framework Workflow中,工作流不是等待全部执行完之后才返回,而是使用的是流式输出:

我们接下来看一下流式工作流的原理:

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, input: "Hello, World!"); awaitforeach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}"); } elseif (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred."); Console.ResetColor(); } elseif (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}."); Console.ResetColor(); } }

这里的使用了RunStreamingAsync。它不会只返回最终结果,而是返回一个StreamingRun对象。通过这个对象,可以监听工作流执行过程中的事件流。

接下来使用:

await foreach (WorkflowEvent evt in run.WatchStreamAsync())

逐个读取事件。这里是最最核心的地方。

监听执行器完成事件

当某个执行器完成处理后,会产生ExecutorCompletedEvent

示例代码中这样处理:

if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine($"{executorCompleted.ExecutorId}: {executorCompleted.Data}"); }

这意味着每当一个 executor 完成时,程序都会打印:

  • 执行器 ID

  • 当前执行器输出的数据

运行后,可以看到如下输出:

我们不需要等到整个工作流全部结束,才能知道中间发生了什么。每一步完成后,都可以立即拿到对应事件。

处理工作流错误

除了正常完成事件,示例中还处理了两类错误事件。

第一类是WorkflowErrorEvent

else if (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine( workflowError.Exception?.ToString() ?? "Unknown workflow error occurred."); Console.ResetColor(); }

这类事件表示整个工作流层面发生了错误。

第二类是ExecutorFailedEvent

else if (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor = ConsoleColor.Red; Console.Error.WriteLine( $"Executor '{executorFailed.ExecutorId}' failed with " + $"{(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}."); Console.ResetColor(); }

这类事件表示某个具体 executor 执行失败。

相比WorkflowErrorEventExecutorFailedEvent更具体,因为它包含了失败的 executor ID。对于复杂工作流来说,这对于排查问题非常有帮助。

小结

这一节我们介绍了Agent Framework Workflow的一些基础感念:

  • 如何定义执行器(Executor)

  • 如何使用 WorkflowBuilder 构建工作流

  • 如何使用 RunStreamingAsync 启动流式工作流

  • 如何使用 WatchStreamAsync 监听工作流事件

  • 如何区分处理执行器完成事件、执行器失败事件和工作流错误事件

源代码地址

https://github.com/bingbing-gui/dotnet-agent-playbook/tree/master/src/ai-agent/Agent-Framework/31-WorkFlow-Executor

http://www.jsqmd.com/news/784883/

相关文章:

  • 2026年GEO技术底座哪家强?T-GEO 5级标准深度拆解 - GrowthUME
  • Z-Image Turbo实战案例:营销文案配图一体化生成
  • AI驱动的网络安全渗透测试:原理、挑战与未来
  • CANN驱动AICPU信息获取
  • 强化学习与微随机化试验在移动健康干预中的融合应用
  • 边缘计算安全实战:从架构威胁到AI驱动的防护体系
  • Python项目打包实战:以MockingBird为例,详解cxfreeze的--packages参数如何解决第三方库依赖问题
  • 使用Taotoken CLI工具一键配置本地开发环境所需的所有API密钥
  • 在Node.js后端服务中集成Taotoken多模型API的步骤详解
  • 低比特量化技术:INT与FP格式性能对比与实践
  • AIGC率从94%降到7%:10款免费降ai率工具深度测评(附工具优缺点对比) - 殷念写论文
  • 2025年机器学习工作流中的7大AI代理框架解析
  • 别再花钱买设备了!旧电脑+免费iKuai系统,DIY一个家庭PPPoE服务器全记录
  • 调 Agent 的 Prompt 太痛苦了?这套“写法 + 测评”救了我
  • 《龙虾OpenClaw系列:从嵌入式裸机到芯片级系统深度实战60课》045、外设总线矩阵:AHB/APB桥接与带宽优化
  • 中国企业DevOps工具链选型新趋势:本土化与安全可控成核心竞争力
  • 2026企业 PCT 全球布局解读:专业专利代理机构甄选核心要点 - GrowthUME
  • CANN 3DGS负载均衡优化策略
  • Cloudflare 共享字典压缩:一行代码改动,不再触发全量重下载
  • 大气层系统进阶配置完全手册:从架构解析到性能调优
  • Nodejs后端服务如何稳定集成大模型并实现成本可控
  • 过渡(transition)高级:贝塞尔曲线、硬件加速
  • Java复习题
  • 技术中立原则:AI全球合规的工程解码与实践指南
  • 负责任AI实践指南:从伦理、可解释性到隐私安全的技术框架
  • 【图解】Claude Code 源码解析 |Prompt 提示词模块
  • 别让你的Arduino项目突然‘死机’!7个新手最易踩的坑与实战避雷指南
  • 字节Agent岗三面:你们线上跑了 RAG,那你怎么衡量它的效果好不好?
  • CANN/ops-cv TensorScalar互推导关系
  • 中心化吸引子模型的数学严谨性与应用前景