当前位置: 首页 > news >正文

Microsoft Agent Framework - Workflow 示例 — Checkpoint 与状态恢复

本篇包含三个渐进式示例,围绕同一个猜数字游戏展示 Workflow 的检查点(Checkpoint)机制:

示例

定位

核心点

CheckpointAndResume

同一 Run 实例上恢复

RestoreCheckpointAsync
CheckpointAndRehydrate

新实例"注水"恢复

ResumeStreamingAsync
CheckpointWithHumanInTheLoop

Checkpoint + RequestPort

交互节点上的检查点


之前写了一篇文章,分享了我基于 Microsoft Agent Framework 打造的开源项目 Inkwell 的实战经验:

用 Microsoft Agent Framework 造一个 AI 内容工厂:一次 Harness Engineering 实战

开源项目:https://github.com/shuaihuadu/inkwell

Microsoft Agent Framework 官方项目地址:https://github.com/microsoft/agent-framework

Microsoft Agent Framework 正在定义下一代 AI 应用的标准——你的技术栈该刷新了!我建了个交流群,欢迎加入,一起学习,不掉队。


猜数字游戏逻辑

三个示例共享同一套猜数字游戏逻辑,理解它是后续分析检查点机制的前提。

游戏规则

Workflow 由两个 Executor 组成一个反馈循环:

  • GuessNumberExecutor(猜测者):维护一个搜索范围[LowerBound, UpperBound],初始为[1, 100],每次取中间值(L+U)/2作为猜测

  • JudgeExecutor(裁判):持有目标数字42,判断猜测值并回复Above(猜大了)、Below(猜小了)或直接输出结果(猜对了)

两者通过NumberSignal枚举通信:Init(开始猜)、AboveBelow

二分查找执行过程

GuessNumberExecutor的核心逻辑:收到反馈后先调整边界,再用新边界计算下一次猜测

case NumberSignal.Above: this.UpperBound = this.NextGuess - 1; // 用旧边界的 NextGuess 调整上界 await context.SendMessageAsync(this.NextGuess, ...); // 用新边界算猜测

完整的 7 轮执行追踪:

轮次

信号

执行前 L/U

边界调整

调整后 L/U

猜测

1

Init

1/100

1/100

50

2

Above

1/100

U=50-1=49

1/49

25

3

Below

1/49

L=25+1=26

26/49

37

4

Below

26/49

L=37+1=38

38/49

43

5

Above

38/49

U=43-1=42

38/42

40

6

Below

38/42

L=40+1=41

41/42

41

7

Below

41/42

L=41+1=42

42/42

42 ✅

范围[1, 100]二分查找最多 ⌈log₂100⌉ = 7 次,目标值 42 恰好走满 7 步。两个 Executor 交替执行,每次执行各占一个 Super Step,因此整个游戏会产生14 个 Super Step(GuessNumberExecutor 执行 7 次 + JudgeExecutor 执行 7 次,其中 6 次反馈 + 1 次输出结果)。

示例 3 的变体

第三个示例(CheckpointWithHumanInTheLoop)中,GuessNumberExecutor被替换为RequestPort,由用户手动输入猜测值而非自动二分。信号类型也从NumberSignal枚举升级为SignalWithNumber类,携带上次猜测值以提示用户。


核心概念

在分析具体示例之前,需要理解三个关键概念:

  • Super Step:Workflow 的执行划分为多个阶段,每个阶段运行一批 Executor 并等待它们完成。一个 Super Step 结束时就是一个天然的检查点时机

  • Checkpoint:在 Super Step 结束时自动保存 Workflow 的完整状态快照,包括所有 Executor 的内部状态和共享状态

  • CheckpointManager:管理检查点的序列化/反序列化。CheckpointManager.Default提供内存中的默认实现


示例 1:CheckpointAndResume——同实例恢复

示例实现的功能

在猜数字 Workflow 运行过程中自动创建检查点,运行完成后从之前的某个检查点在同一个 Run 实例上恢复状态并重新执行。

代码完整解析

启用检查点
var checkpointManager = CheckpointManager.Default; await using StreamingRun checkpointedRun = await InProcessExecution .RunStreamingAsync(workflow, NumberSignal.Init, checkpointManager);

第三个参数checkpointManager让框架在每个 Super Step 结束时自动生成CheckpointInfo

收集检查点
case SuperStepCompletedEvent superStepCompletedEvt: CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint; if (checkpoint is not null) { checkpoints.Add(checkpoint); }

通过SuperStepCompletedEvent事件获取每个 Super Step 的检查点信息。

恢复执行
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { ... }

RestoreCheckpointAsync同一个 Run 实例上恢复状态。恢复后再次调用WatchStreamAsync从恢复点继续执行。

Executor 的状态保存/恢复
// 保存状态 protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, ...) => context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), ...); // 恢复状态 protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, ...) => (LowerBound, UpperBound) = await context.ReadStateAsync<(int, int)>(StateKey, ...);

每个有状态的 Executor 需要覆写两个方法:

  • OnCheckpointingAsync:在 Super Step 结束时被框架调用,通过QueueStateUpdateAsync将状态排队写入检查点

  • OnCheckpointRestoredAsync:恢复检查点时被框架调用,通过ReadStateAsync从检查点中读回状态


示例 2:CheckpointAndRehydrate——新实例重建

示例实现的功能

与示例 1 的区别在于,不在同一个 Run 上恢复,而是创建一个全新的 Workflow 实例并从检查点"注水"(Rehydrate),模拟进程重启后的恢复场景。

代码完整解析

恢复到新实例
var newWorkflow = WorkflowFactory.BuildWorkflow(); // 全新 Workflow 实例 await using StreamingRun newCheckpointedRun = await InProcessExecution.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager);

关键 API 是InProcessExecution.ResumeStreamingAsync(而非RestoreCheckpointAsync):

  • 接受一个全新的Workflow实例

  • savedCheckpoint恢复状态

  • 返回新的StreamingRun实例

这模拟了真实的持久化恢复场景——应用崩溃后重建 Workflow 并从保存的检查点继续。

与示例 1 的核心区别

特性

CheckpointAndResume

CheckpointAndRehydrate

恢复目标

同一个 Run 实例

全新 Workflow + 新 Run

API

run.RestoreCheckpointAsyncInProcessExecution.ResumeStreamingAsync

应用场景

运行中的回退/重试

进程重启后的恢复

Executor 实例

复用原有实例

全新实例


示例 3:CheckpointWithHumanInTheLoop——检查点 + 人机交互

示例实现的功能

将 Checkpoint 与 RequestPort(HumanInTheLoop)结合,展示更复杂的场景:每次等待用户输入时都会产生检查点,恢复后可以从任意交互点重新开始。

代码完整解析

信号类型升级
internal sealed class SignalWithNumber { public NumberSignal Signal { get; } public int? Number { get; } }

从简单的NumberSignal枚举升级为包含当前猜测值的复合类型,让用户知道上次猜了什么数字。

每次交互产生两个检查点

每个 RequestPort 交互循环经历两个 Super Step:

  1. RequestPort 发出RequestInfoEvent,Super Step 结束 →检查点 1(等待输入的状态)

  2. 收到响应后 JudgeExecutor 处理完成,Super Step 结束 →检查点 2(处理完成的状态)

因此如果猜了 3 次,会产生约 6 个检查点。

恢复交互
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { switch (evt) { case RequestInfoEvent requestInputEvt: ExternalResponse response = HandleExternalRequest(requestInputEvt.Request); await checkpointedRun.SendResponseAsync(response); break; // 其他事件处理同首次运行... } }

恢复后的处理逻辑与首次运行完全一致,体现了 Checkpoint + HumanInTheLoop 的无缝结合。


深入剖析

检查点的存储粒度

检查点不是保存整个 Workflow 对象的序列化,而是保存:

  1. 每个 Executor 通过OnCheckpointingAsync主动写入的状态

  2. 边的状态数据(EdgeStateData),记录各条边上的消息传递状态

  3. 共享状态(Shared State)中的全部数据

  4. Workflow 引擎的执行位置信息(当前 Super Step 编号、Runner 状态等)

这个设计要求开发者显式声明需要持久化的状态。没有覆写OnCheckpointingAsync的 Executor,恢复后将以初始状态开始。

OnCheckpointingAsync 的设计哲学

框架不自动序列化 Executor 的全部字段,而是让开发者手动选择要持久化的状态。这看似麻烦,但有明确的好处:

  1. 避免序列化问题:Executor 可能持有不可序列化的资源(如 IChatClient、文件句柄)

  2. 状态精简:只持久化恢复所必需的最小状态,减少检查点大小

  3. 版本兼容:Executor 代码升级后只要 state key 和数据格式一致,旧检查点仍可恢复

Super Step 与检查点的关系

Input → [Super Step 1: Executor A 执行] → Checkpoint 1 → [Super Step 2: Executor B 执行] → Checkpoint 2 → [Super Step 3: Executor A 再次执行] → Checkpoint 3 → ...

每个 Super Step 的边界就是一个一致性快照点。在循环 Workflow 中,每次循环迭代(A → B → A)可能跨越多个 Super Step,每个 Step 结束都生成检查点。

CheckpointManager 的扩展性

CheckpointManager.Default是内存实现,等价于CheckpointManager.CreateInMemory()。要接入自定义存储后端,使用:

CheckpointManager.CreateJson(ICheckpointStore<JsonElement> store)

开发者只需实现ICheckpointStore<JsonElement>接口,即可将检查点持久化到任意后端(Redis、Azure Blob Storage、数据库等)。MAF 内置了FileSystemJsonCheckpointStoreJsonCheckpointStore作为参考实现。

这与 MAF 的 Durable Agent 架构互补——Durable Agent 通过 DurableTask 框架实现更完整的持久化,Checkpoint 则提供了轻量级的本地快照能力。

应用场景

  • 长流程恢复:运行时长的 Workflow 崩溃后从最后检查点恢复,避免从头运行

  • 状态机分支探查:一个检查点多次恢复,比较不同后续分支的表现

  • HITL 中断保抜:用户长时间未响应时保存现场,下次恢复就像从未中断

  • 发布协调:代码升级后用旬heckpoint测试兼容性

  • 调试与排错:定位问题 Executor 后从前一个检查点重跑,缩小复现范围

http://www.jsqmd.com/news/743319/

相关文章:

  • 2026年常州有名的短视频代运营品牌推荐 - 工业品牌热点
  • 小红书数据采集革命:XHS-Downloader如何重塑内容获取体验?
  • 大语言模型终端部署优化:从13B参数到4GB内存的实践
  • 为AI编程助手构建持久化记忆系统:agentmemory实战指南
  • 大模型推理优化:资源分配与自一致性技术实践
  • 从天气预报API实战解析:手把手教你用cJSON处理嵌套数组与对象(避坑指南)
  • 2026年分切复卷机选购指南,口碑如何? - 工业品牌热点
  • 5个实用技巧:用ZenTimings轻松监控AMD内存时序
  • 本地AI对话历史管理:基于SQLite与Flask的Cursor View工具实践
  • Nemotron-Cascade:级联强化学习框架提升AI推理能力
  • 企业AI模型评测:OfficeQA Pro框架解析与实践
  • LLM智能体核心技术:从记忆架构到自主决策
  • 别再为LoRaWAN入网失败抓狂了!手把手教你排查OTAA/ABP激活问题(以利尔达WB25模组为例)
  • 低资源语言机器翻译实战:数据策略与模型优化
  • Python自动化实现敏感信息脱敏与日志保护
  • 兴达矿业的影响力大吗?市场口碑怎么样? - 工业推荐榜
  • 物联网OTA包数字签名之Ed25519
  • 简单三步实现百度网盘免客户端高速下载:完整指南
  • 大模型后训练数据集评估平台OpenDataArena解析
  • 大语言模型安全测试实战:开源工具jimeng-free-api应用指南
  • OpenAPI与MCP协议融合:构建AI原生API网关的实践指南
  • 基于Next.js与React构建浏览器端AI会话日志分析工具
  • Kokonut UI:基于Tailwind CSS与Framer Motion的React交互动画增强方案
  • 如何快速定位电话号码归属地:开源工具的完整使用指南
  • OBS多平台直播终极指南:Multi RTMP插件一键搞定所有平台
  • 超声图像分割的半监督学习与Switch架构实践
  • 手把手教你用Arduino Nano驱动0.96寸OLED(IIC接口,含完整库文件)
  • BabelDOC:智能PDF双语翻译的终极解决方案,让学术文档翻译变得简单高效
  • Python自动化脚本:日期时间处理完全指南
  • 告别适配烦恼!一份表格搞定iOS开发中的iPhone屏幕尺寸与分辨率(含iPhone 15系列)