Microsoft Agent Framework - Workflow 示例 — Checkpoint 与状态恢复
本篇包含三个渐进式示例,围绕同一个猜数字游戏展示 Workflow 的检查点(Checkpoint)机制:
示例 | 定位 | 核心点 |
|---|---|---|
CheckpointAndResume | 同一 Run 实例上恢复 | RestoreCheckpointAsync |
CheckpointAndRehydrate | 新实例"注水"恢复 | ResumeStreamingAsync |
CheckpointWithHumanInTheLoop | Checkpoint + RequestPort | 交互节点上的检查点 |
之前写了一篇文章,分享了我基于 Microsoft Agent Framework 打造的开源项目 Inkwell 的实战经验:
用 Microsoft Agent Framework 造一个 AI 内容工厂:一次 Harness Engineering 实战
开源项目:https://github.com/shuaihuadu/inkwell
Microsoft Agent Framework 官方项目地址:https://github.com/microsoft/agent-framework
Microsoft Agent Framework 正在定义下一代 AI 应用的标准——你的技术栈该刷新了!我建了个交流群,欢迎加入,一起学习,不掉队。
猜数字游戏逻辑
三个示例共享同一套猜数字游戏逻辑,理解它是后续分析检查点机制的前提。
游戏规则
Workflow 由两个 Executor 组成一个反馈循环:
GuessNumberExecutor(猜测者):维护一个搜索范围
[LowerBound, UpperBound],初始为[1, 100],每次取中间值(L+U)/2作为猜测JudgeExecutor(裁判):持有目标数字
42,判断猜测值并回复Above(猜大了)、Below(猜小了)或直接输出结果(猜对了)
两者通过NumberSignal枚举通信:Init(开始猜)、Above、Below。
二分查找执行过程
GuessNumberExecutor的核心逻辑:收到反馈后先调整边界,再用新边界计算下一次猜测:
case NumberSignal.Above: this.UpperBound = this.NextGuess - 1; // 用旧边界的 NextGuess 调整上界 await context.SendMessageAsync(this.NextGuess, ...); // 用新边界算猜测完整的 7 轮执行追踪:
轮次 | 信号 | 执行前 L/U | 边界调整 | 调整后 L/U | 猜测 |
|---|---|---|---|---|---|
1 | Init | 1/100 | — | 1/100 | 50 |
2 | Above | 1/100 | U=50-1=49 | 1/49 | 25 |
3 | Below | 1/49 | L=25+1=26 | 26/49 | 37 |
4 | Below | 26/49 | L=37+1=38 | 38/49 | 43 |
5 | Above | 38/49 | U=43-1=42 | 38/42 | 40 |
6 | Below | 38/42 | L=40+1=41 | 41/42 | 41 |
7 | Below | 41/42 | L=41+1=42 | 42/42 | 42 ✅ |
范围[1, 100]二分查找最多 ⌈log₂100⌉ = 7 次,目标值 42 恰好走满 7 步。两个 Executor 交替执行,每次执行各占一个 Super Step,因此整个游戏会产生14 个 Super Step(GuessNumberExecutor 执行 7 次 + JudgeExecutor 执行 7 次,其中 6 次反馈 + 1 次输出结果)。
示例 3 的变体
第三个示例(CheckpointWithHumanInTheLoop)中,GuessNumberExecutor被替换为RequestPort,由用户手动输入猜测值而非自动二分。信号类型也从NumberSignal枚举升级为SignalWithNumber类,携带上次猜测值以提示用户。
核心概念
在分析具体示例之前,需要理解三个关键概念:
Super Step:Workflow 的执行划分为多个阶段,每个阶段运行一批 Executor 并等待它们完成。一个 Super Step 结束时就是一个天然的检查点时机
Checkpoint:在 Super Step 结束时自动保存 Workflow 的完整状态快照,包括所有 Executor 的内部状态和共享状态
CheckpointManager:管理检查点的序列化/反序列化。
CheckpointManager.Default提供内存中的默认实现
示例 1:CheckpointAndResume——同实例恢复
示例实现的功能
在猜数字 Workflow 运行过程中自动创建检查点,运行完成后从之前的某个检查点在同一个 Run 实例上恢复状态并重新执行。
代码完整解析
启用检查点
var checkpointManager = CheckpointManager.Default; await using StreamingRun checkpointedRun = await InProcessExecution .RunStreamingAsync(workflow, NumberSignal.Init, checkpointManager);第三个参数checkpointManager让框架在每个 Super Step 结束时自动生成CheckpointInfo。
收集检查点
case SuperStepCompletedEvent superStepCompletedEvt: CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint; if (checkpoint is not null) { checkpoints.Add(checkpoint); }通过SuperStepCompletedEvent事件获取每个 Super Step 的检查点信息。
恢复执行
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { ... }RestoreCheckpointAsync在同一个 Run 实例上恢复状态。恢复后再次调用WatchStreamAsync从恢复点继续执行。
Executor 的状态保存/恢复
// 保存状态 protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, ...) => context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), ...); // 恢复状态 protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, ...) => (LowerBound, UpperBound) = await context.ReadStateAsync<(int, int)>(StateKey, ...);每个有状态的 Executor 需要覆写两个方法:
OnCheckpointingAsync:在 Super Step 结束时被框架调用,通过QueueStateUpdateAsync将状态排队写入检查点OnCheckpointRestoredAsync:恢复检查点时被框架调用,通过ReadStateAsync从检查点中读回状态
示例 2:CheckpointAndRehydrate——新实例重建
示例实现的功能
与示例 1 的区别在于,不在同一个 Run 上恢复,而是创建一个全新的 Workflow 实例并从检查点"注水"(Rehydrate),模拟进程重启后的恢复场景。
代码完整解析
恢复到新实例
var newWorkflow = WorkflowFactory.BuildWorkflow(); // 全新 Workflow 实例 await using StreamingRun newCheckpointedRun = await InProcessExecution.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager);关键 API 是InProcessExecution.ResumeStreamingAsync(而非RestoreCheckpointAsync):
接受一个全新的
Workflow实例从
savedCheckpoint恢复状态返回新的
StreamingRun实例
这模拟了真实的持久化恢复场景——应用崩溃后重建 Workflow 并从保存的检查点继续。
与示例 1 的核心区别
特性 | CheckpointAndResume | CheckpointAndRehydrate |
|---|---|---|
恢复目标 | 同一个 Run 实例 | 全新 Workflow + 新 Run |
API | run.RestoreCheckpointAsync | InProcessExecution.ResumeStreamingAsync |
应用场景 | 运行中的回退/重试 | 进程重启后的恢复 |
Executor 实例 | 复用原有实例 | 全新实例 |
示例 3:CheckpointWithHumanInTheLoop——检查点 + 人机交互
示例实现的功能
将 Checkpoint 与 RequestPort(HumanInTheLoop)结合,展示更复杂的场景:每次等待用户输入时都会产生检查点,恢复后可以从任意交互点重新开始。
代码完整解析
信号类型升级
internal sealed class SignalWithNumber { public NumberSignal Signal { get; } public int? Number { get; } }从简单的NumberSignal枚举升级为包含当前猜测值的复合类型,让用户知道上次猜了什么数字。
每次交互产生两个检查点
每个 RequestPort 交互循环经历两个 Super Step:
RequestPort 发出
RequestInfoEvent,Super Step 结束 →检查点 1(等待输入的状态)收到响应后 JudgeExecutor 处理完成,Super Step 结束 →检查点 2(处理完成的状态)
因此如果猜了 3 次,会产生约 6 个检查点。
恢复交互
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None); await foreach (WorkflowEvent evt in checkpointedRun.WatchStreamAsync()) { switch (evt) { case RequestInfoEvent requestInputEvt: ExternalResponse response = HandleExternalRequest(requestInputEvt.Request); await checkpointedRun.SendResponseAsync(response); break; // 其他事件处理同首次运行... } }恢复后的处理逻辑与首次运行完全一致,体现了 Checkpoint + HumanInTheLoop 的无缝结合。
深入剖析
检查点的存储粒度
检查点不是保存整个 Workflow 对象的序列化,而是保存:
每个 Executor 通过
OnCheckpointingAsync主动写入的状态边的状态数据(EdgeStateData),记录各条边上的消息传递状态
共享状态(Shared State)中的全部数据
Workflow 引擎的执行位置信息(当前 Super Step 编号、Runner 状态等)
这个设计要求开发者显式声明需要持久化的状态。没有覆写OnCheckpointingAsync的 Executor,恢复后将以初始状态开始。
OnCheckpointingAsync 的设计哲学
框架不自动序列化 Executor 的全部字段,而是让开发者手动选择要持久化的状态。这看似麻烦,但有明确的好处:
避免序列化问题:Executor 可能持有不可序列化的资源(如 IChatClient、文件句柄)
状态精简:只持久化恢复所必需的最小状态,减少检查点大小
版本兼容:Executor 代码升级后只要 state key 和数据格式一致,旧检查点仍可恢复
Super Step 与检查点的关系
Input → [Super Step 1: Executor A 执行] → Checkpoint 1 → [Super Step 2: Executor B 执行] → Checkpoint 2 → [Super Step 3: Executor A 再次执行] → Checkpoint 3 → ...每个 Super Step 的边界就是一个一致性快照点。在循环 Workflow 中,每次循环迭代(A → B → A)可能跨越多个 Super Step,每个 Step 结束都生成检查点。
CheckpointManager 的扩展性
CheckpointManager.Default是内存实现,等价于CheckpointManager.CreateInMemory()。要接入自定义存储后端,使用:
CheckpointManager.CreateJson(ICheckpointStore<JsonElement> store)开发者只需实现ICheckpointStore<JsonElement>接口,即可将检查点持久化到任意后端(Redis、Azure Blob Storage、数据库等)。MAF 内置了FileSystemJsonCheckpointStore和JsonCheckpointStore作为参考实现。
这与 MAF 的 Durable Agent 架构互补——Durable Agent 通过 DurableTask 框架实现更完整的持久化,Checkpoint 则提供了轻量级的本地快照能力。
应用场景
长流程恢复:运行时长的 Workflow 崩溃后从最后检查点恢复,避免从头运行
状态机分支探查:一个检查点多次恢复,比较不同后续分支的表现
HITL 中断保抜:用户长时间未响应时保存现场,下次恢复就像从未中断
发布协调:代码升级后用旬heckpoint测试兼容性
调试与排错:定位问题 Executor 后从前一个检查点重跑,缩小复现范围
