当前位置: 首页 > news >正文

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

从零构建轻量级 DAG 编排引擎:处理大模型复杂工作流的实战

一、为什么简单的链式调用不够用

在真实业务里,单靠一个 Prompt 很难处理复杂的流程。开发者通常会把多个 LLM 调用、API 请求和数据清洗步骤串在一起。但一旦逻辑变复杂,这种线性调用就会出问题。

代码里开始出现大量的if-else嵌套,处理异步等待和节点依赖变得非常麻烦。这时候,有向无环图(DAG)是一个更稳妥的选择。它不仅能理清任务顺序,还能让没有依赖关系的节点并行执行,减少等待时间。


二、DAG 调度逻辑与拓扑排序

工作流里的每个步骤就是一个“节点”,节点间的依赖关系就是“边”。要执行这个图,核心是先做环路检测,再通过拓扑排序确定执行顺序。

下面的图展示了一个简单的流程:先清洗输入,然后并行做情感分类和关键词提取,最后汇总生成报告。

graph LR Start([启动]) --> NodeA[清洗输入] NodeA --> NodeB[情感分类] NodeA --> NodeC[关键词提取] NodeB --> NodeD[生成报告] NodeC --> NodeD NodeD --> End([结束]) style NodeB fill:#bbf,stroke:#333,stroke-width:2px style NodeD fill:#bfb,stroke:#333,stroke-width:2px

节点 B 和 C 都依赖 A,但它们之间没关系,所以引擎会让它们同时跑。


三、Node.js 轻量级实现

这是一个基于 JavaScript 的简单 DAG 引擎。它实现了拓扑排序来检查依赖,并支持异步并发执行。

class WorkflowNode { constructor(id, taskFunction) { this.id = id; this.taskFunction = taskFunction; this.dependencies = []; this.status = 'PENDING'; this.result = null; } addDependency(nodeId) { this.dependencies.push(nodeId); } } class DagEngine { constructor() { this.nodes = new Map(); } registerNode(node) { this.nodes.set(node.id, node); } // 拓扑排序:检查环路并决定顺序 resolveExecutionOrder() { const inDegree = new Map(); const adjList = new Map(); const order = []; for (const [id, node] of this.nodes) { inDegree.set(id, 0); adjList.set(id, []); } for (const [id, node] of this.nodes) { for (const depId of node.dependencies) { if (!this.nodes.has(depId)) { throw new Error(`节点 ${id} 依赖的 ${depId} 未注册`); } adjList.get(depId).push(id); inDegree.set(id, inDegree.get(id) + 1); } } const queue = []; for (const [id, degree] of inDegree) { if (degree === 0) queue.push(id); } while (queue.length > 0) { const currId = queue.shift(); order.push(currId); for (const nextId of adjList.get(currId)) { inDegree.set(nextId, inDegree.get(nextId) - 1); if (inDegree.get(nextId) === 0) { queue.push(nextId); } } } if (order.length !== this.nodes.size) { throw new Error("检测到循环依赖,无法执行"); } return order; } // 并发执行 async executeWorkflow(inputContext) { const completedResults = { ...inputContext }; const runningPromises = new Map(); while (true) { let hasPending = false; let progressed = false; for (const [id, node] of this.nodes) { if (node.status === 'COMPLETED' || node.status === 'FAILED') continue; hasPending = true; if (node.status === 'RUNNING') continue; // 检查依赖是否都完成了 const allDepsMet = node.dependencies.every(depId => { const depNode = this.nodes.get(depId); return depNode && depNode.status === 'COMPLETED'; }); if (allDepsMet) { node.status = 'RUNNING'; progressed = true; const promise = (async () => { try { const depData = {}; node.dependencies.forEach(depId => { depData[depId] = this.nodes.get(depId).result; }); node.result = await node.taskFunction(completedResults, depData); node.status = 'COMPLETED'; } catch (error) { node.status = 'FAILED'; throw error; } })(); runningPromises.set(id, promise); } } if (!hasPending) break; if (!progressed && runningPromises.size === 0) { throw new Error("死锁:没有节点能继续执行"); } await Promise.race(runningPromises.values()); for (const [id, promise] of runningPromises) { const node = this.nodes.get(id); if (node.status === 'COMPLETED' || node.status === 'FAILED') { runningPromises.delete(id); } } } const finalOutput = {}; for (const [id, node] of this.nodes) { finalOutput[id] = node.result; } return finalOutput; } } // 测试运行 (async () => { const engine = new DagEngine(); const nodeA = new WorkflowNode('CleanInput', async (context) => { return context.rawText.trim().replace(/[<>]/g, ''); }); const nodeB = new WorkflowNode('LlmClassify', async (context, depData) => { const text = depData.CleanInput; await new Promise(resolve => setTimeout(resolve, 500)); // 模拟 API 延迟 return text.includes("好") ? "POSITIVE" : "NEGATIVE"; }); nodeB.addDependency('CleanInput'); const nodeC = new WorkflowNode('ExtractKeywords', async (context, depData) => { const text = depData.CleanInput; return text.split(' ').filter(word => word.length > 1); }); nodeC.addDependency('CleanInput'); const nodeD = new WorkflowNode('GenerateReport', async (context, depData) => { const sentiment = depData.LlmClassify; const keywords = depData.ExtractKeywords; return `情感: ${sentiment}, 关键词: [${keywords.join(', ')}]`; }); nodeD.addDependency('LlmClassify'); nodeD.addDependency('ExtractKeywords'); engine.registerNode(nodeA); engine.registerNode(nodeB); engine.registerNode(nodeC); engine.registerNode(nodeD); const order = engine.resolveExecutionOrder(); console.log("执行顺序:", order.join(' -> ')); const result = await engine.executeWorkflow({ rawText: " 这个产品设计得非常 好,解决了我的痛点。 " }); console.log("结果:", result); })();

四、生产环境需要考虑的几个问题

上面的代码适合本地或简单场景,如果要上生产环境,还得考虑下面几点:

1. 内存 vs 持久化
内存里的调度很快,但服务器一挂,中间结果就没了。如果工作流跑了几分钟才失败,重头再来很浪费。生产环境通常要用 Redis 或像 Temporal 这样的状态机来存状态,但这会增加网络延迟。

2. 重试策略与成本
大模型 API 经常超时或限流,加重试机制是必须的。但要注意,如果上游节点因为超时一直重试,可能会在短时间内消耗大量 Token。给每个节点设置重试上限和超时时间是必要的。

3. 静态图 vs 动态分支
DAG 在运行前就定好了结构,容易校验。但 LLM 的输出是动态的,有时候需要根据结果决定下一步走哪条路。如果要支持这种动态分支,图的拓扑结构得在运行时变,这会大大增加调试难度。


五、小结

做智能工作流,核心是把杂乱的调用拆成清晰的节点和依赖。用拓扑排序处理并发,不需要复杂的框架,也能让多个模型任务协同工作。对于小团队来说,这种轻量级的方案既能控制成本,也能保证流程跑得通。

http://www.jsqmd.com/news/1017961/

相关文章:

  • 2026芜湖屹东金属材料贸易有限公司行业竞品测评 - 百航
  • 微博图片批量下载终极指南:免登录高效获取用户相册
  • Whisper本地部署实战:中文语音转文字全流程指南
  • CCF-GESP三级C++真题解析:进制判断这道题,用‘最大字符法’5分钟搞定
  • 廊坊安次区卖黄金去哪儿?跑了五家店,终于把“无损耗、零扣费”的门道摸清了 - 行行星
  • PXD10 PDI接口解析:嵌入式视频同步与BT.656标准应用实战
  • 佛山黄金回收连锁门店盘点,全国连锁更安心 - 讯息早知道
  • 2026年河南AI搜索推广与GEO优化服务商深度横评:开封、郑州企业获客新风口完全指南 - 年度推荐企业名录
  • GBase 8s数据库安装包运维监控类脚本解析
  • Windows系统瘦身神器:Win11Debloat让你的电脑焕然一新
  • 别再被MybatisPlus的saveBatch骗了!手把手教你配置MySQL的rewriteBatchedStatements参数实现真批量插入
  • VSCode、Typora里输入Emoji太麻烦?分享我的Markdown效率神器与自定义代码片段
  • ExDark数据集实战指南:如何用7363张低光照图像解决夜间视觉难题
  • 2026石家庄黄金回收,卖之前先搞懂这五件事,可以少走很多弯路 - 奢侈品回收测评
  • WaveTools鸣潮工具箱抽卡记录完整指南:从数据同步到故障排查的终极解决方案
  • 从协议到用例:如何用CANoe Test Package EV/EVSE自动化测试国标/欧标充电协议
  • 哪些NLP任务不该用预训练语言模型?4类负增益场景与工业决策框架
  • 告别PDF乱码!手把手教你配置MiKTeX与WinEdt的中文支持(UTF-8与字体设置详解)
  • 深度时序模型训练效率优化:早停策略的技术实现与性能提升方案
  • 开源小说下载器:200+网站一键离线保存的智能解决方案
  • 知识图谱事件流的增量学习:边看边学不遗忘的实时进化方案
  • 软考高项论文别再死记硬背!我用‘规划绩效域’和‘项目工作绩效域’搞定了一个真实项目复盘
  • 告别枯燥:用橙心主题让Typora写作体验焕然一新
  • MultiLogin:如何让正版与外置登录玩家在Minecraft服务器无缝共存?
  • 2026江诗丹顿回收人气榜:合扬领跑全场,六大优质商户全方位对比 - 开心测评
  • 内容即体验:从功能清单到用户参与
  • MoveIt! 四自由度机械臂规划避坑:set_position_target() 为啥还是报错?手把手教你改 Kinematics.yaml
  • MySQL忘记密码怎么办
  • Three.js 特效避坑指南:手把手教你调试魔法阵的旋转、缩放与粒子动画
  • Spring Boot项目里,MybatisPlus的saveBatch批量插入到底该怎么配才有效?(附完整yml示例)