当前位置: 首页 > news >正文

LLM 驱动的智能工作流引擎:从 Prompt 编排到 DAG 调度的工程实践

LLM 驱动的智能工作流引擎:从 Prompt 编排到 DAG 调度的工程实践

一、当 Prompt 链变成意大利面条:AI 工作流的编排困境

大语言模型(LLM)的能力集成正在从"单次对话"向"多步骤工作流"演进。然而,许多团队在构建 AI 工作流时,仍然用线性 Prompt 链串联多个模型调用,导致系统脆弱且难以维护。

某内容审核系统中,审核流程包含 7 个步骤:文本预处理→敏感词过滤→语义分析→风险评估→人工复审判定→结果聚合→通知推送。最初的实现用 7 个顺序函数调用完成,每个函数内部硬编码了 Prompt 模板和模型参数。当业务要求"风险评估"和"语义分析"可以并行执行时,开发者不得不重构整个调用链;当需要根据"敏感词过滤"结果跳过后续步骤时,又引入了层层嵌套的条件判断。

核心痛点:

  • 线性链无法表达并行与条件分支:真实业务流程是 DAG(有向无环图),不是链表
  • Prompt 与代码耦合:修改 Prompt 需要重新部署服务,无法热更新
  • 缺乏可观测性:工作流执行到哪一步、哪一步耗时最长、哪一步失败率最高,全靠日志翻查

二、DAG 调度引擎:AI 工作流的核心抽象

AI 工作流的本质是一个 DAG:每个节点是一次 LLM 调用或工具执行,边表示数据依赖关系。DAG 调度引擎负责拓扑排序、并行执行、条件路由和错误恢复。

graph TD A[输入节点: 原始文本] --> B[预处理: 文本清洗] B --> C[并行分支1: 敏感词检测] B --> D[并行分支2: 语义分析] C --> E[条件路由: 是否命中敏感词?] E -->|是| F[高风险流程: 人工复审标记] E -->|否| G[低风险流程: 自动通过] D --> H[风险评估: 综合打分] F --> I[结果聚合] G --> I H --> I I --> J[输出节点: 审核结果] style A fill:#e1f5fe style J fill:#e8f5e9 style E fill:#fff3e0

调度引擎的核心时序:

sequenceDiagram participant Client participant Engine as DAG调度引擎 participant Executor as 节点执行器 participant LLM as LLM服务 Client->>Engine: 提交工作流定义+输入数据 Engine->>Engine: 拓扑排序,识别可并行节点 Engine->>Executor: 调度首批节点(无依赖) Executor->>LLM: 执行Prompt调用 LLM-->>Executor: 返回结果 Executor-->>Engine: 节点完成,输出数据 Engine->>Engine: 更新DAG状态,解锁下游节点 Engine->>Executor: 调度条件路由节点 Executor->>Executor: 评估条件,选择分支 Executor-->>Engine: 路由结果 Engine->>Executor: 调度后续节点(并行) Executor-->>Engine: 全部完成 Engine-->>Client: 返回最终结果+执行轨迹

三、生产级 DAG 工作流引擎实现

3.1 工作流定义与调度核心

// workflow-engine.ts — DAG工作流引擎核心 import { z } from 'zod'; // 节点类型定义 type NodeType = 'input' | 'llm' | 'tool' | 'condition' | 'output'; // 工作流节点定义 interface WorkflowNode { id: string; type: NodeType; // LLM节点配置 prompt?: string; // Prompt模板,支持{{变量}}插值 model?: string; // 模型名称 temperature?: number; // 条件节点配置 condition?: { expression: string; // 条件表达式,引用上游节点输出 branches: Record<string, string[]>; // 分支→下游节点ID列表 }; // 工具节点配置 toolName?: string; toolParams?: Record<string, string>; // 参数映射,支持引用上游输出 // 超时与重试 timeoutMs?: number; maxRetries?: number; } // 工作流边定义 interface WorkflowEdge { from: string; to: string; dataMapping?: Record<string, string>; // 输出字段→输入字段映射 } // 工作流定义 interface WorkflowDefinition { name: string; version: string; nodes: WorkflowNode[]; edges: WorkflowEdge[]; } // 节点执行结果 interface NodeResult { nodeId: string; output: unknown; durationMs: number; tokenUsage?: { prompt: number; completion: number }; retryCount: number; } // 工作流执行上下文 class WorkflowContext { private nodeOutputs: Map<string, unknown> = new Map(); private nodeResults: NodeResult[] = []; // 存储节点输出 setOutput(nodeId: string, output: unknown): void { this.nodeOutputs.set(nodeId, output); } // 获取节点输出,支持路径访问(如 "node1.result.score") getOutput(path: string): unknown { const [nodeId, ...rest] = path.split('.'); const output = this.nodeOutputs.get(nodeId); if (!output || rest.length === 0) return output; return rest.reduce((obj, key) => { if (obj && typeof obj === 'object') return (obj as Record<string, unknown>)[key]; return undefined; }, output as unknown); } addResult(result: NodeResult): void { this.nodeResults.push(result); } getResults(): NodeResult[] { return [...this.nodeResults]; } } // DAG调度引擎 class WorkflowEngine { private nodeExecutors: Map<NodeType, NodeExecutor> = new Map(); constructor( private llmClient: LLMClient, private toolRegistry: ToolRegistry, ) { // 注册节点执行器 this.nodeExecutors.set('llm', new LLMNodeExecutor(llmClient)); this.nodeExecutors.set('tool', new ToolNodeExecutor(toolRegistry)); this.nodeExecutors.set('condition', new ConditionNodeExecutor()); this.nodeExecutors.set('input', new PassThroughExecutor()); this.nodeExecutors.set('output', new PassThroughExecutor()); } // 执行工作流 async execute( definition: WorkflowDefinition, input: unknown, ): Promise<{ output: unknown; results: NodeResult[] }> { // 1. 构建邻接表和入度表 const graph = this.buildGraph(definition); const context = new WorkflowContext(); context.setOutput('__input__', input); // 2. 拓扑排序+并行调度 const completed = new Set<string>(); const inDegree = new Map<string, number>(); for (const nodeId of graph.keys()) { inDegree.set(nodeId, 0); } for (const [, targets] of graph) { for (const target of targets) { inDegree.set(target, (inDegree.get(target) || 0) + 1); } } // 使用队列管理就绪节点 const readyQueue: string[] = []; for (const [nodeId, degree] of inDegree) { if (degree === 0) readyQueue.push(nodeId); } while (readyQueue.length > 0) { // 取出所有就绪节点,并行执行 const batch = readyQueue.splice(0); const promises = batch.map(nodeId => this.executeNode(nodeId, definition, context, graph)); const batchResults = await Promise.allSettled(promises); // 处理执行结果 for (let i = 0; i < batchResults.length; i++) { const result = batchResults[i]; const nodeId = batch[i]; if (result.status === 'fulfilled') { completed.add(nodeId); // 更新下游节点入度 for (const target of graph.get(nodeId) || []) { const newDegree = (inDegree.get(target) || 1) - 1; inDegree.set(target, newDegree); if (newDegree === 0 && !completed.has(target)) { readyQueue.push(target); } } } else { // 节点执行失败,根据策略决定是否终止 throw new Error(`节点 ${nodeId} 执行失败: ${result.reason}`); } } } // 从输出节点获取最终结果 const outputNode = definition.nodes.find(n => n.type === 'output'); const output = outputNode ? context.getOutput(outputNode.id) : undefined; return { output, results: context.getResults() }; } // 执行单个节点(带重试) private async executeNode( nodeId: string, definition: WorkflowDefinition, context: WorkflowContext, graph: Map<string, string[]>, ): Promise<void> { const node = definition.nodes.find(n => n.id === nodeId); if (!node) throw new Error(`节点 ${nodeId} 不存在`); const executor = this.nodeExecutors.get(node.type); if (!executor) throw new Error(`未注册的节点类型: ${node.type}`); const maxRetries = node.maxRetries ?? 2; let lastError: Error | undefined; for (let attempt = 0; attempt <= maxRetries; attempt++) { const start = Date.now(); try { const output = await executor.execute(node, context); const durationMs = Date.now() - start; context.setOutput(nodeId, output); context.addResult({ nodeId, output, durationMs, retryCount: attempt, }); return; } catch (error) { lastError = error as Error; if (attempt < maxRetries) { // 指数退避重试 await this.sleep(Math.pow(2, attempt) * 1000); } } } throw lastError; } private buildGraph(definition: WorkflowDefinition): Map<string, string[]> { const graph = new Map<string, string[]>(); for (const node of definition.nodes) { graph.set(node.id, []); } for (const edge of definition.edges) { graph.get(edge.from)?.push(edge.to); } return graph; } private sleep(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } }

3.2 节点执行器实现

// executors.ts — 各类型节点执行器 interface NodeExecutor { execute(node: WorkflowNode, context: WorkflowContext): Promise<unknown>; } // LLM节点执行器:Prompt模板插值+模型调用 class LLMNodeExecutor implements NodeExecutor { constructor(private llmClient: LLMClient) {} async execute(node: WorkflowNode, context: WorkflowContext): Promise<unknown> { if (!node.prompt) throw new Error('LLM节点缺少prompt配置'); // Prompt模板插值:将{{nodeId.field}}替换为上游输出 const resolvedPrompt = node.prompt.replace( /\{\{(\w+(?:\.\w+)*)\}\}/g, (_, path) => { const value = context.getOutput(path); if (value === undefined) { throw new Error(`模板变量 {{${path}}} 未找到对应的上游输出`); } return typeof value === 'string' ? value : JSON.stringify(value); }, ); const response = await this.llmClient.chat({ model: node.model || 'qwen2:7b', messages: [{ role: 'user', content: resolvedPrompt }], temperature: node.temperature ?? 0.1, timeoutMs: node.timeoutMs ?? 30000, }); // 尝试解析JSON输出,失败则返回原始文本 try { return JSON.parse(response.content); } catch { return { text: response.content }; } } } // 条件节点执行器:评估表达式,返回选中的分支 class ConditionNodeExecutor implements NodeExecutor { async execute(node: WorkflowNode, context: WorkflowContext): Promise<unknown> { if (!node.condition) throw new Error('条件节点缺少condition配置'); // 安全评估条件表达式 // 生产环境应使用表达式引擎(如jsonpath-plus),此处简化实现 const expression = node.condition.expression; const evaluated = this.evaluateExpression(expression, context); // 返回评估结果,调度引擎根据branches配置路由 return { _conditionResult: String(evaluated), _branches: node.condition.branches, }; } private evaluateExpression(expr: string, context: WorkflowContext): unknown { // 支持: {{nodeId.field}} === "value", {{nodeId.score}} > 0.8 const match = expr.match(/\{\{(\w+(?:\.\w+)*)\}\}\s*(===|!==|>|<|>=|<=)\s*(.+)/); if (!match) throw new Error(`无法解析条件表达式: ${expr}`); const [, path, operator, rightRaw] = match; const left = context.getOutput(path); const right = rightRaw.trim().replace(/^["']|["']$/g, ''); switch (operator) { case '===': return String(left) === right; case '!==': return String(left) !== right; case '>': return Number(left) > Number(right); case '<': return Number(left) < Number(right); case '>=': return Number(left) >= Number(right); case '<=': return Number(left) <= Number(right); default: throw new Error(`不支持的操作符: ${operator}`); } } } // 工具节点执行器 class ToolNodeExecutor implements NodeExecutor { constructor(private toolRegistry: ToolRegistry) {} async execute(node: WorkflowNode, context: WorkflowContext): Promise<unknown> { if (!node.toolName) throw new Error('工具节点缺少toolName配置'); const tool = this.toolRegistry.get(node.toolName); if (!tool) throw new Error(`未注册的工具: ${node.toolName}`); // 解析工具参数,支持引用上游输出 const params: Record<string, unknown> = {}; if (node.toolParams) { for (const [key, value] of Object.entries(node.toolParams)) { if (value.startsWith('{{') && value.endsWith('}}')) { const path = value.slice(2, -2); params[key] = context.getOutput(path); } else { params[key] = value; } } } return tool.execute(params); } } // 透传执行器(输入/输出节点) class PassThroughExecutor implements NodeExecutor { async execute(_node: WorkflowNode, _context: WorkflowContext): Promise<unknown> { return null; // 输入/输出节点的数据通过context直接管理 } } // LLM客户端接口(适配不同供应商) interface LLMClient { chat(params: { model: string; messages: Array<{ role: string; content: string }>; temperature: number; timeoutMs: number; }): Promise<{ content: string; usage: { promptTokens: number; completionTokens: number } }>; } // 工具注册表接口 interface ToolRegistry { get(name: string): { execute: (params: Record<string, unknown>) => Promise<unknown> } | undefined; }

3.3 工作流定义示例

# content-review-workflow.yaml — 内容审核工作流定义 name: content-review version: "1.0" nodes: - id: input type: input - id: preprocess type: llm prompt: | 对以下文本进行预处理:去除特殊字符、标准化空格、提取关键段落。 输出JSON格式: {"cleaned_text": "...", "key_paragraphs": [...]} 原始文本: {{__input__}} - id: sensitive_check type: llm model: qwen2:7b temperature: 0 prompt: | 检测以下文本是否包含敏感内容(政治、色情、暴力)。 输出JSON: {"has_sensitive": true/false, "keywords": [...], "confidence": 0.0-1.0} 文本: {{preprocess.cleaned_text}} - id: semantic_analysis type: llm model: qwen2:7b prompt: | 对以下文本进行语义分析:情感倾向、主题分类、意图识别。 输出JSON: {"sentiment": "positive/negative/neutral", "topic": "...", "intent": "..."} 文本: {{preprocess.cleaned_text}} - id: risk_route type: condition condition: expression: '{{sensitive_check.has_sensitive}} === true' branches: 'true': [human_review_flag] 'false': [auto_approve] - id: human_review_flag type: tool toolName: create_review_ticket toolParams: content: '{{preprocess.cleaned_text}}' keywords: '{{sensitive_check.keywords}}' priority: high - id: auto_approve type: tool toolName: approve_content toolParams: content: '{{preprocess.cleaned_text}}' - id: aggregate type: llm prompt: | 聚合审核结果,生成最终报告。 敏感词检测结果: {{sensitive_check}} 语义分析结果: {{semantic_analysis}} 处理动作: {{human_review_flag}}{{auto_approve}} 输出JSON: {"decision": "approve/review/reject", "reason": "...", "confidence": 0.0-1.0} - id: output type: output edges: - from: input to: preprocess - from: preprocess to: sensitive_check - from: preprocess to: semantic_analysis - from: sensitive_check to: risk_route - from: risk_route to: human_review_flag - from: risk_route to: auto_approve - from: human_review_flag to: aggregate - from: auto_approve to: aggregate - from: semantic_analysis to: aggregate - from: aggregate to: output

四、DAG 工作流的架构权衡

1. 条件路由的图完整性问题

条件节点选择分支后,未选中分支的下游节点不会执行,但这些节点在 DAG 中仍然存在。聚合节点(如aggregate)需要处理"部分上游输出缺失"的情况。当前实现通过context.getOutput返回undefined来处理,但这要求每个聚合节点的 Prompt 都要考虑字段缺失的场景,增加了 Prompt 设计的复杂度。

2. Prompt 模板的安全边界

当前模板插值直接将上游输出嵌入 Prompt,如果上游输出包含 Prompt 注入攻击内容,可能篡改工作流行为。生产环境需要对插值内容做转义处理,或使用 ChatML 等结构化格式隔离用户输入与系统指令。

3. 状态持久化与恢复

当前引擎是内存态的,进程崩溃后工作流状态丢失。对于长时间运行的工作流(如人工审核可能持续数小时),需要将中间状态持久化到数据库,并在重启后从断点恢复。这会引入序列化/反序列化开销和分布式锁问题。

4. 并行度的资源控制

DAG 调度器会并行执行所有就绪节点,在节点数量多或 LLM 调用密集时,可能耗尽 API 配额或内存。需要引入信号量或令牌桶限制并行度,但这会降低 DAG 的天然并行性。

禁用场景

  • 需要人工审批等长时间等待的节点(应改用事件驱动模式,而非轮询阻塞)
  • 工作流步骤之间存在循环依赖(DAG 不允许环,需要拆分为多个工作流)
  • 对延迟极度敏感的实时场景(DAG 调度本身有拓扑计算开销)

五、总结

LLM 驱动的智能工作流引擎,核心抽象是将多步骤 AI 调用建模为 DAG,通过拓扑排序实现自动并行调度,通过条件路由实现动态分支,通过模板插值实现节点间的数据传递。声明式的工作流定义(YAML)将 Prompt 与代码解耦,支持热更新和版本管理。

DAG 模型的局限在于无法表达循环和长时间等待,条件路由引入了图完整性问题,模板插值存在注入风险,内存态引擎缺乏故障恢复能力。这些权衡不是缺陷,而是架构边界的清晰标注——知道哪里不能用,比知道哪里能用更重要。工作流引擎的留白,是对复杂度的克制,也是对可维护性的守护。

http://www.jsqmd.com/news/1077703/

相关文章:

  • 终极指南:Pyodide - 如何在浏览器中高效运行完整的Python科学计算生态
  • 德布鲁因图独立数:渐近公式推导与精确构造方法详解
  • 突破性抖音直播数据采集方案:5分钟实现智能弹幕抓取系统
  • TscanCode实战指南:构建企业级C++/C/Lua代码安全防线
  • STM32-S03-时钟定时+坐姿监测+蜂鸣器+人体感应+光敏+手自动+10档+TFT彩屏+(无线方式选择)-3(设计源文件+万字报告+讲解)(支持资料、图片参考_相关定制)_文章底部可以扫码
  • 博弈论实战指南:从纳什均衡到日常决策操作系统
  • 计算机毕业设计之“汉画像砖” 文化宣传网站
  • 新手必看的美食视频背景音乐选曲指南:5个高性价比素材网站深度评测
  • LPC315x微控制器PCM/IOM接口配置与SysCReg寄存器详解
  • 网易云QQ音乐歌词下载神器:三分钟让本地音乐“开口说话“
  • iPhone本地大模型实战:Gemma 2量化部署与Core ML优化指南
  • 网站有流量为什么没有询盘?很多时候不是SEO没用,而是页面没接住客户
  • 彻底告别风扇噪音:用Fan Control打造你的静音电脑工作站
  • DSP5685x主机接口驱动API详解:hiOpen/hiWrite/hiRead/hiIoctl实战指南
  • Rook:在 Kubernetes 上管理 Ceph 存储
  • 音乐格式解密终极指南:如何快速解锁QQ音乐、网易云等加密音频文件
  • 电池管理系统MOSFET:选型要求与工程设计要点
  • 20种复利一齐发力,我为何越努力越不满?
  • Theano符号计算原理与GPU加速实践指南
  • 还在为B站视频下载发愁?这个开源工具让你3分钟搞定高清资源
  • 智能重建中的三维建模与纹理映射
  • Self-Attention自注意力机制
  • 《2025-2026年中国网络安全行业观察:实战为王》
  • VRCT终极指南:免费实时翻译工具彻底打破VRChat语言障碍
  • Python之richtypo包语法、参数和实际应用案例
  • 明日方舟素材资源库:一站式获取高清游戏素材的终极指南
  • ROS 2 自定义 rosdep 规则实战:私有依赖管理全指南
  • 智能择优调度深度实测:多 AI 聚合平台自动匹配任务模型的原理与实效
  • Qwen3-VL实战指南:端到端视觉语言建模与工业级部署
  • 山东大学创新实训第十二阶段汇报