Prompt 工程进阶:从单次调用到 Agent 工作流的结构化编排
Prompt 工程进阶:从单次调用到 Agent 工作流的结构化编排
一、当 Prompt 不再够用——从指令到工作流
单次 Prompt 调用解决不了的问题,正在变得越来越多。用户说“帮我整理这周的会议纪要,提取待办事项,分配给对应负责人,并同步到项目管理工具”——这不是一个 Prompt 能搞定的事,而是一个多步骤、有依赖、需纠错的工作流。
生产环境里,Prompt 工程主要卡在三个地方。第一,复杂任务单 Prompt 容易崩溃:把多步任务塞进一个 Prompt,模型要么漏步骤,要么在步骤间产生幻觉。一个“分析报告生成”任务,单 Prompt 的步骤完成率只有 63%,拆成多步工作流后能提升到 94%。第二,上下文窗口是硬约束:长 Prompt 一旦超过模型上限,要么截断丢失信息,要么被迫换更贵的模型。第三,错误传播不可控:单 Prompt 里某一步出错,后续所有步骤都基于错误前提继续,没法中途修正。
某内容团队的自动化工作流实践显示,把“选题调研→大纲生成→初稿撰写→审校修订”拆成 4 步 Agent 工作流后,产出质量评分从 3.2 提升到 4.5(5 分制)。而且每步都能独立重试,整体 Token 消耗反而下降了 28%,因为每步只需要传递必要上下文,不用全量信息。
二、Agent 工作流的编排模型:从 ReAct 到 DAG 调度
Agent 工作流的核心问题是怎么编排多步任务,让它既灵活又可控。
flowchart TB subgraph 任务解析层 P1[用户输入] P2[任务分解器<br/>Task Decomposer] P3[依赖图构建<br/>DAG Builder] end subgraph 执行编排层 E1{调度器<br/>Scheduler} E2[并行执行组 A] E3[并行执行组 B] E4[串行执行步骤] end subgraph 单步执行引擎 S1[Prompt 模板渲染<br/>变量注入 + 约束注入] S2[LLM 调用<br/>带重试与超时] S3[输出解析与验证<br/>JSON Schema 校验] S4{校验通过?} S5[自修复重试<br/>将错误信息回注 Prompt] end subgraph 上下文管理 C1[步骤间上下文传递<br/>只传必要信息] C2[全局状态存储<br/>共享变量池] C3[上下文压缩<br/>摘要 + 关键信息提取] end subgraph 监控与恢复 M1[步骤级日志] M2[断点续跑<br/>从失败步骤恢复] M3[人工介入点<br/>关键决策暂停] end P1 --> P2 --> P3 --> E1 E1 --> E2 E1 --> E3 E1 --> E4 E2 --> S1 E3 --> S1 E4 --> S1 S1 --> S2 --> S3 --> S4 S4 -->|否| S5 --> S1 S4 -->|是| C1 C1 --> C2 C2 --> C3 C2 --> M1 M1 --> M2 M1 --> M3这套架构有三个关键点。一是 DAG 调度:任务分解成有向无环图,没依赖的步骤并行执行,有依赖的串行执行,最大化吞吐。二是自修复循环:每步输出经过 JSON Schema 校验,不通过时把错误信息回注 Prompt 重试,而不是直接失败。三是上下文压缩:步骤间只传递必要信息,通过摘要和关键信息提取控制上下文膨胀。
三、生产级 Agent 工作流引擎的代码实现
下面这段代码实现了一个完整的 Agent 工作流引擎,包含任务分解、DAG 调度、自修复执行和上下文管理。
import asyncio import json import re import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Optional import httpx # === 工作流基础模型 === class StepStatus(Enum): """步骤状态""" PENDING = "pending" RUNNING = "running" SUCCESS = "success" FAILED = "failed" SKIPPED = "skipped" @dataclass class WorkflowStep: """工作流步骤定义""" step_id: str name: str prompt_template: str # 支持 {variable} 占位符 dependencies: list[str] = field(default_factory=list) # 依赖的步骤 ID output_schema: dict = field(default_factory=dict) # JSON Schema 校验 max_retries: int = 2 timeout_seconds: int = 60 # 运行时状态 status: StepStatus = StepStatus.PENDING output: Any = None error: str = "" attempts: int = 0 latency_ms: float = 0.0 @dataclass class WorkflowContext: """工作流上下文:步骤间共享的状态""" variables: dict = field(default_factory=dict) step_outputs: dict[str, Any] = field(default_factory=dict) def set(self, key: str, value: Any) -> None: self.variables[key] = value def get(self, key: str, default: Any = None) -> Any: return self.variables.get(key, default) def save_step_output(self, step_id: str, output: Any) -> None: self.step_outputs[step_id] = output def get_step_output(self, step_id: str) -> Any: return self.step_outputs.get(step_id) def get_step_summary(self, step_ids: list[str], max_chars: int = 500) -> str: """获取指定步骤的压缩摘要,控制上下文长度""" parts = [] total_chars = 0 for sid in step_ids: output = self.step_outputs.get(sid) if output is None: continue text = json.dumps(output, ensure_ascii=False) if not isinstance(output, str) else output if total_chars + len(text) > max_chars: # 截断并标注 remaining = max_chars - total_chars text = text[:remaining] + "...[已截断]" parts.append(f"[{sid}的输出] {text}") total_chars += len(text) return "\n".join(parts) # === DAG 调度器 === class DAGScheduler: """DAG 调度器:分析步骤依赖,生成执行计划""" @staticmethod def build_execution_plan(steps: list[WorkflowStep]) -> list[list[str]]: """构建执行计划:每层包含可并行执行的步骤 ID""" # 拓扑排序 + 分层 step_map = {s.step_id: s for s in steps} in_degree: dict[str, int] = {s.step_id: 0 for s in steps} dependents: dict[str, list[str]] = {s.step_id: [] for s in steps} for step in steps: for dep in step.dependencies: if dep not in step_map: raise ValueError(f"步骤 {step.step_id} 依赖不存在的步骤 {dep}") dependents[dep].append(step.step_id) in_degree[step.step_id] += 1 # Kahn 算法分层 layers: list[list[str]] = [] queue = [sid for sid, deg in in_degree.items() if deg == 0] while queue: # 当前层所有入度为 0 的步骤可并行 layers.append(sorted(queue)) next_queue = [] for sid in queue: for dependent in dependents[sid]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: next_queue.append(dependent) queue = next_queue # 检测循环依赖 total_scheduled = sum(len(layer) for layer in layers) if total_scheduled != len(steps): raise ValueError("检测到循环依赖,无法构建执行计划") return layers # === Prompt 模板渲染器 === class PromptRenderer: """Prompt 模板渲染:变量注入 + 约束注入""" @staticmethod def render(template: str, context: WorkflowContext, step: WorkflowStep) -> str: """渲染 Prompt 模板,注入变量和输出约束""" # 替换 {variable} 占位符 rendered = template for key, value in context.variables.items(): placeholder = "{" + key + "}" if placeholder in rendered: rendered = rendered.replace(placeholder, str(value)) # 替换 {step:step_id} 占位符(引用其他步骤的输出) step_ref_pattern = r"\{step:([\w_]+)\}" for match in re.finditer(step_ref_pattern, rendered): ref_step_id = match.group(1) output = context.get_step_output(ref_step_id) if output is not None: text = json.dumps(output, ensure_ascii=False) if not isinstance(output, str) else output rendered = rendered.replace(match.group(0), text) # 注入输出格式约束 if step.output_schema: schema_str = json.dumps(step.output_schema, ensure_ascii=False, indent=2) format_constraint = ( f"\n\n请严格按照以下 JSON Schema 格式输出," f"不要输出任何其他内容:\n```json\n{schema_str}\n```" ) rendered += format_constraint return rendered # === 输出解析与校验器 === class OutputValidator: """输出解析与校验:JSON 提取 + Schema 校验""" @staticmethod def parse_and_validate(raw_output: str, schema: dict) -> tuple[Any, str]: """解析 LLM 输出并校验,返回 (解析结果, 错误信息)""" # 尝试提取 JSON json_str = OutputValidator._extract_json(raw_output) if json_str is None: return None, "无法从输出中提取 JSON,请确保输出为合法 JSON 格式" try: parsed = json.loads(json_str) except json.JSONDecodeError as e: return None, f"JSON 解析失败:{str(e)}" # 简化的 Schema 校验:检查必需字段 if schema: required = schema.get("required", []) if isinstance(parsed, dict): missing = [f for f in required if f not in parsed] if missing: return None, f"缺少必需字段:{', '.join(missing)}" return parsed, "" @staticmethod def _extract_json(text: str) -> Optional[str]: """从文本中提取 JSON 字符串""" # 尝试提取 ```json ... ``` 代码块 pattern = r"```json\s*([\s\S]*?)\s*```" match = re.search(pattern, text) if match: return match.group(1) # 尝试提取 { ... } 或 [ ... ] for start_char, end_char in [("{", "}"), ("[", "]")]: start = text.find(start_char) if start != -1: depth = 0 for i in range(start, len(text)): if text[i] == start_char: depth += 1 elif text[i] == end_char: depth -= 1 if depth == 0: return text[start : i + 1] return None # === Agent 工作流引擎 === class AgentWorkflowEngine: """Agent 工作流引擎:串联 DAG 调度、执行、校验、自修复""" def __init__(self, llm_client, checkpoint_callback: Optional[Callable] = None): self._llm = llm_client self._scheduler = DAGScheduler() self._renderer = PromptRenderer() self._validator = OutputValidator() self._checkpoint = checkpoint_callback # 断点回调 async def execute( self, steps: list[WorkflowStep], initial_context: WorkflowContext, ) -> dict: """执行完整工作流""" start_time = time.monotonic() # 构建执行计划 execution_plan = self._scheduler.build_execution_plan(steps) step_map = {s.step_id: s for s in steps} # 逐层执行 for layer_idx, layer in enumerate(execution_plan): # 同层步骤并行执行 tasks = [ self._execute_step(step_map[sid], initial_context) for sid in layer ] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理结果 for sid, result in zip(layer, results): step = step_map[sid] if isinstance(result, Exception): step.status = StepStatus.FAILED step.error = str(result) elif result.get("success"): step.status = StepStatus.SUCCESS step.output = result["output"] initial_context.save_step_output(sid, result["output"]) else: step.status = StepStatus.FAILED step.error = result.get("error", "未知错误") # 检查是否有失败步骤,决定是否继续 failed_in_layer = [ sid for sid in layer if step_map[sid].status == StepStatus.FAILED ] if failed_in_layer: # 标记依赖失败步骤的后续步骤为 SKIPPED self._mark_dependents_skipped( failed_in_layer, step_map, execution_plan[layer_idx + 1 :] ) # 断点回调 if self._checkpoint: self._checkpoint(initial_context, step_map) total_latency = (time.monotonic() - start_time) * 1000 return { "status": "completed" if all( s.status in (StepStatus.SUCCESS, StepStatus.SKIPPED) for s in steps ) else "partial_failure", "steps": [ { "step_id": s.step_id, "name": s.name, "status": s.status.value, "attempts": s.attempts, "latency_ms": round(s.latency_ms, 2), "error": s.error, } for s in steps ], "outputs": initial_context.step_outputs, "total_latency_ms": round(total_latency, 2), } async def _execute_step( self, step: WorkflowStep, context: WorkflowContext ) -> dict: """执行单个步骤,带自修复重试""" step.status = StepStatus.RUNNING step_start = time.monotonic() for attempt in range(step.max_retries + 1): step.attempts = attempt + 1 # 渲染 Prompt prompt = self._renderer.render(step.prompt_template, context, step) # 调用 LLM(带超时) try: raw_output = await asyncio.wait_for( self._llm.chat(prompt), timeout=step.timeout_seconds, ) except asyncio.TimeoutError: if attempt < step.max_retries: continue step.latency_ms = (time.monotonic() - step_start) * 1000 return {"success": False, "error": f"步骤超时({step.timeout_seconds}s)"} except Exception as e: if attempt < step.max_retries: continue step.latency_ms = (time.monotonic() - step_start) * 1000 return {"success": False, "error": f"LLM 调用失败:{str(e)}"} # 校验输出 if step.output_schema: parsed, error = self._validator.parse_and_validate( raw_output, step.output_schema ) if error: if attempt < step.max_retries: # 自修复:将错误信息回注 Prompt step.prompt_template += ( f"\n\n[上一次尝试的错误:{error}]" f"\n请修正上述问题后重新输出。" ) continue step.latency_ms = (time.monotonic() - step_start) * 1000 return {"success": False, "error": error} else: parsed = raw_output step.latency_ms = (time.monotonic() - step_start) * 1000 return {"success": True, "output": parsed} step.latency_ms = (time.monotonic() - step_start) * 1000 return {"success": False, "error": "超过最大重试次数"} @staticmethod def _mark_dependents_skipped( failed_ids: list[str], step_map: dict[str, WorkflowStep], remaining_layers: list[list[str]], ) -> None: """将依赖失败步骤的后续步骤标记为 SKIPPED""" for layer in remaining_layers: for sid in layer: step = step_map[sid] if any(dep in failed_ids for dep in step.dependencies): step.status = StepStatus.SKIPPED step.error = "上游步骤失败" # === 使用示例:会议纪要整理工作流 === async def demo_meeting_workflow(): """演示:会议纪要整理 Agent 工作流""" # 模拟 LLM 客户端 class MockLLM: async def chat(self, prompt: str) -> str: # 模拟不同步骤的输出 if "提取关键议题" in prompt: return '```json\n{"topics": ["Q3 目标回顾", "新功能排期", "资源分配"], "decisions": ["Q3 目标下调 10%"]}\n```' elif "待办事项" in prompt: return '```json\n{"action_items": [{"task": "更新 Q3 目标文档", "assignee": "张三", "deadline": "2025-07-01"}, {"task": "新功能技术方案评审", "assignee": "李四", "deadline": "2025-07-05"}]}\n```' elif "格式化" in prompt: return '```json\n{"summary": "本次会议回顾了 Q3 目标并决定下调 10%,讨论了新功能排期和资源分配问题。", "formatted_output": "会议纪要已生成"}\n```' return '{"result": "ok"}' engine = AgentWorkflowEngine(llm_client=MockLLM()) # 定义工作流步骤 steps = [ WorkflowStep( step_id="extract_topics", name="提取关键议题", prompt_template="从以下会议记录中提取关键议题和决策:\n{meeting_notes}", output_schema={ "type": "object", "required": ["topics", "decisions"], }, max_retries=2, ), WorkflowStep( step_id="extract_actions", name="提取待办事项", prompt_template=( "基于会议记录和已提取的议题,提取待办事项:\n" "会议记录:{meeting_notes}\n" "关键议题:{step:extract_topics}" ), output_schema={ "type": "object", "required": ["action_items"], }, dependencies=["extract_topics"], max_retries=2, ), WorkflowStep( step_id="format_output", name="格式化输出", prompt_template=( "将以下信息整理为结构化会议纪要:\n" "议题:{step:extract_topics}\n" "待办:{step:extract_actions}" ), output_schema={ "type": "object", "required": ["summary", "formatted_output"], }, dependencies=["extract_topics", "extract_actions"], max_retries=2, ), ] # 初始化上下文 context = WorkflowContext() context.set("meeting_notes", "张三:Q3 目标完成度只有 70%,建议下调 10%...") # 执行工作流 result = await engine.execute(steps, context) print(json.dumps(result, ensure_ascii=False, indent=2)) if __name__ == "__main__": asyncio.run(demo_meeting_workflow())核心设计要点如下。DAGScheduler用 Kahn 算法对步骤做拓扑排序和分层,同层步骤可并行执行。PromptRenderer支持变量占位符{variable}和步骤引用{step:step_id},自动注入输出格式约束。OutputValidator从 LLM 输出中提取 JSON 并校验必需字段,校验失败时将错误信息回注 Prompt 实现自修复。AgentWorkflowEngine逐层调度执行,失败步骤的下游自动标记为 SKIPPED,断点回调支持从失败步骤恢复。
四、Agent 工作流的架构成本与适用边界
Agent 工作流显著提升了复杂任务的处理质量,但也引入了不可忽视的架构复杂度。
延迟的线性叠加。串行步骤的延迟是累加的,4 步工作流如果每步平均 3 秒,总延迟至少 12 秒。虽然 DAG 调度允许并行,但多数实际工作流中步骤间存在强依赖,并行度有限。流式输出可以缓解用户感知延迟,但无法缩短实际执行时间。
Token 消耗的放大效应。每步都需要独立的系统提示和上下文注入,4 步工作流的 Token 消耗通常是单次调用的 2-3 倍。自修复重试进一步放大消耗,一次校验失败就多一轮完整调用。
错误处理的复杂度。步骤失败后的处理策略(重试、跳过、降级、人工介入)需要针对每个步骤单独配置,配置不当可能导致工作流卡死或产出不完整。_mark_dependents_skipped是简化处理,生产环境需要更细粒度的降级策略。
适用边界。Agent 工作流适合多步骤、有依赖、需校验的复杂任务,如内容生产流水线、数据分析管道、自动化测试流程。不适合简单的单轮问答或实时交互场景,这些场景中工作流的编排开销远大于收益。
禁用场景。当任务步骤间的依赖关系不确定或动态变化时,静态 DAG 无法处理,需要更灵活的动态规划策略。当步骤数量超过 20 时,DAG 的可维护性急剧下降,应考虑子工作流拆分。
五、总结
Agent 工作流的核心价值,是将复杂任务从单次 Prompt 调用拆解为多步骤、可校验、可自修复的结构化编排。DAG 调度实现步骤并行,Prompt 模板渲染实现变量注入与约束注入,输出校验与自修复循环保障每步质量,上下文压缩控制 Token 膨胀。架构代价集中在延迟叠加、Token 放大和错误处理复杂度三个方面。选择 Agent 工作流需确认任务具备多步骤、有依赖、需校验三个特征,且延迟预算允许秒级响应。结构化编排不是过度工程,而是复杂任务走向生产可用的必经之路。
