AI 自动化工作流设计:从单次调用到多步编排的可靠性实践
AI 自动化工作流设计:从单次调用到多步编排的可靠性实践
一、单次调用到多步编排:AI 工作流的复杂性跃迁
当 AI 能力从单次问答走向多步骤自动化工作流时,系统的复杂度会发生质变。一个典型的内容审核工作流可能包含:文本提取、敏感词检测、语义分析、审核决策生成、结果通知五个步骤。每个步骤都依赖上一步的输出,而每一步都可能失败。
单次 AI 调用的失败处理相对简单——重试或降级即可。但在多步工作流中,第三步失败后,是否需要重试前两步?如果第五步的模型输出格式异常,是否需要从第四步重新开始?这些问题在单次调用场景中不存在,但在工作流场景中必须明确回答。
更关键的是,多步工作流的成本是累加的。一个五步工作流,每步平均消耗 2000 Token,单次执行就需要 10000 Token。如果缺乏执行控制,一个异常触发的工作流循环可能在几分钟内消耗掉整月的 API 预算。因此,AI 自动化工作流的设计,核心不是"如何编排步骤",而是"如何安全地编排步骤"。
二、工作流引擎的状态机模型
AI 工作流的本质是一个状态机:每个步骤是一个状态,步骤间的依赖关系定义了状态转移规则。引入状态机模型后,工作流具备了可观测性(当前执行到哪一步)、可恢复性(从任意步骤重新开始)和可审计性(每步的输入输出都有记录)。
stateDiagram-v2 [*] --> 初始化: 触发工作流 初始化 --> 数据采集: 参数校验通过 初始化 --> 失败: 参数校验失败 数据采集 --> AI预处理: 数据获取成功 数据采集 --> 重试队列: 获取超时 重试队列 --> 数据采集: 重试触发 AI预处理 --> AI推理: 预处理完成 AI预处理 --> 失败: 数据格式异常 AI推理 --> 结果校验: 推理完成 AI推理 --> 降级推理: 主模型超时 降级推理 --> 结果校验: 降级模型返回 结果校验 --> 结果输出: 校验通过 结果校验 --> AI推理: 结果不合格(重试) 结果校验 --> 失败: 重试次数耗尽 结果输出 --> 通知: 输出完成 通知 --> [*]: 工作流结束 失败 --> [*]: 记录日志并告警状态机模型的核心优势在于:每个状态的退出条件是明确的。数据采集超时后进入重试队列,而非无限等待;AI 推理超时后切换到降级模型,而非直接失败;结果校验不通过时,可以选择重试推理而非从头开始。这种细粒度的错误处理,是工作流可靠性的基础。
三、生产级工作流引擎的代码实现
3.1 工作流定义与状态管理
type WorkflowStatus = 'pending' | 'running' | 'paused' | 'completed' | 'failed'; interface WorkflowStep<TInput, TOutput> { name: string; execute: (input: TInput, context: WorkflowContext) => Promise<TOutput>; // 超时时间,超时后自动标记为失败 timeoutMs: number; // 最大重试次数 maxRetries: number; // 是否可跳过(跳过后使用默认值继续) skippable: boolean; // 跳过时的默认输出 fallbackOutput?: TOutput; } interface WorkflowContext { workflowId: string; currentStep: number; totalSteps: number; retryCount: number; metadata: Record<string, unknown>; // 步骤执行记录,用于审计和恢复 stepHistory: Array<{ stepName: string; startedAt: number; completedAt: number; status: 'success' | 'failed' | 'skipped'; error?: string; }>; } interface WorkflowDefinition { name: string; steps: WorkflowStep<unknown, unknown>[]; // 全局超时:整个工作流的最大执行时间 globalTimeoutMs: number; // 全局 Token 预算:累计消耗超过此值时中止 maxTokenBudget: number; }3.2 工作流执行引擎
class WorkflowEngine { private tokenUsage: number = 0; /** * 执行工作流,支持断点恢复和超时控制 * 设计考量: * - 每步执行前检查全局预算,防止成本失控 * - 步骤失败时根据配置决定重试/跳过/中止 * - 执行记录持久化,支持从任意步骤恢复 */ async execute( definition: WorkflowDefinition, initialInput: unknown, resumeFromStep?: number ): Promise<{ status: WorkflowStatus; output: unknown; context: WorkflowContext }> { const context: WorkflowContext = { workflowId: crypto.randomUUID(), currentStep: resumeFromStep ?? 0, totalSteps: definition.steps.length, retryCount: 0, metadata: {}, stepHistory: [], }; let currentInput = initialInput; const globalStart = Date.now(); for (let i = context.currentStep; i < definition.steps.length; i++) { const step = definition.steps[i]; context.currentStep = i; // 全局超时检查 if (Date.now() - globalStart > definition.globalTimeoutMs) { return this.fail(context, '工作流全局超时'); } // Token 预算检查 if (this.tokenUsage > definition.maxTokenBudget) { return this.fail(context, `Token 预算耗尽:已消耗 ${this.tokenUsage}`); } const stepResult = await this.executeStep(step, currentInput, context); if (stepResult.status === 'success') { currentInput = stepResult.output; this.tokenUsage += stepResult.tokensUsed ?? 0; } else if (stepResult.status === 'skipped') { // 使用降级输出继续执行 currentInput = step.fallbackOutput; } else { // 步骤彻底失败 return this.fail(context, `步骤 ${step.name} 执行失败:${stepResult.error}`); } } context.currentStep = definition.steps.length; return { status: 'completed', output: currentInput, context }; } private async executeStep( step: WorkflowStep<unknown, unknown>, input: unknown, context: WorkflowContext ): Promise<{ status: 'success' | 'failed' | 'skipped'; output?: unknown; error?: string; tokensUsed?: number }> { let lastError: string = ''; const stepStart = Date.now(); for (let attempt = 0; attempt <= step.maxRetries; attempt++) { try { // 单步超时控制 const result = await Promise.race([ step.execute(input, context), new Promise<never>((_, reject) => setTimeout(() => reject(new Error('步骤超时')), step.timeoutMs) ), ]); context.stepHistory.push({ stepName: step.name, startedAt: stepStart, completedAt: Date.now(), status: 'success', }); return { status: 'success', output: result }; } catch (error) { lastError = (error as Error).message; // 重试前等待,避免立即重试加重服务端压力 if (attempt < step.maxRetries) { await new Promise(resolve => setTimeout(resolve, 1000 * Math.pow(2, attempt))); } } } // 重试耗尽,判断是否可跳过 if (step.skippable && step.fallbackOutput !== undefined) { context.stepHistory.push({ stepName: step.name, startedAt: stepStart, completedAt: Date.now(), status: 'skipped', error: lastError, }); return { status: 'skipped' }; } context.stepHistory.push({ stepName: step.name, startedAt: stepStart, completedAt: Date.now(), status: 'failed', error: lastError, }); return { status: 'failed', error: lastError }; } private fail(context: WorkflowContext, reason: string) { return { status: 'failed' as WorkflowStatus, output: null, context }; } }3.3 内容审核工作流的具体编排
const contentModerationWorkflow: WorkflowDefinition = { name: '内容审核工作流', globalTimeoutMs: 60000, // 全局 60 秒超时 maxTokenBudget: 15000, // 单次执行最多消耗 15000 Token steps: [ { name: '文本提取', execute: async (input, ctx) => { const { content, format } = input as { content: string; format: string }; // HTML 内容清洗,提取纯文本 if (format === 'html') { return content.replace(/<[^>]*>/g, '').trim(); } return content; }, timeoutMs: 5000, maxRetries: 1, skippable: false, }, { name: 'AI 敏感词检测', execute: async (text, ctx) => { // 调用大模型进行语义级敏感词检测 const result = await callModel('gpt-4o-mini', text as string, '检测文本中是否包含敏感内容'); return { text, sensitiveResult: result }; }, timeoutMs: 10000, maxRetries: 2, skippable: true, fallbackOutput: { text: '', sensitiveResult: { flagged: false, reason: '检测服务不可用' } }, }, { name: '审核决策生成', execute: async (data, ctx) => { const { text, sensitiveResult } = data as { text: string; sensitiveResult: unknown }; if ((sensitiveResult as { flagged: boolean }).flagged) { return { decision: 'reject', reason: '内容包含敏感信息' }; } return { decision: 'approve', reason: '内容审核通过' }; }, timeoutMs: 3000, maxRetries: 0, skippable: false, }, ], };四、工作流编排的隐性风险与适用边界
状态爆炸问题:当工作流步骤超过 10 个,且步骤间存在条件分支时,状态机的状态数量会呈指数增长。一个包含 3 个条件分支的 10 步工作流,可能的状态路径多达数千条。此时,状态机模型的可读性会急剧下降,维护成本远超收益。对于这种复杂场景,应考虑使用 DAG(有向无环图)模型替代线性状态机,或者将工作流拆分为多个子工作流。
成本控制的精度问题:Token 预算是粗粒度的成本控制手段,但实际场景中,不同步骤的 Token 消耗差异巨大。文本提取步骤几乎不消耗 Token,而 AI 推理步骤可能消耗数千 Token。如果统一预算,简单步骤会"浪费"预算配额。更精细的做法是为每个步骤设置独立的 Token 预算,但这也增加了配置复杂度。
断点恢复的一致性风险:从某个步骤恢复执行时,前序步骤的输出可能已经过期。例如,数据采集步骤获取的是 10 分钟前的数据,从第三步恢复时,这些数据可能已经不再准确。对于时效性要求高的工作流,断点恢复可能引入数据一致性问题,需要根据业务场景决定是否允许恢复。
工作流 vs 简单管道:并非所有多步骤场景都需要工作流引擎。如果一个流程是严格线性的、无需重试和断点恢复,一个简单的 Promise 链就足够了。工作流引擎的引入成本包括:状态持久化、步骤定义配置、执行日志存储。当流程步骤少于 5 个且失败率极低时,这些成本可能不值得支付。
五、总结
AI 自动化工作流的设计核心,是在编排灵活性与执行可靠性之间取得平衡。状态机模型为工作流提供了可观测、可恢复、可审计的执行框架,但同时也引入了状态管理和配置复杂度。在实际落地中,工作流引擎的引入时机应基于具体的业务需求:当流程步骤超过 5 个、存在条件分支、需要断点恢复或成本控制时,工作流引擎的价值才会显现。
落地建议:第一步,从线性管道开始,用 Promise 链串联步骤,验证流程逻辑的正确性;第二步,当需要重试和超时控制时,引入步骤级别的错误处理;第三步,当需要断点恢复和成本控制时,再升级为完整的工作流引擎。渐进式演进可以避免过早引入不必要的复杂度。
