构建跨模型智能调度系统:复刻Claude Dispatch体验的技术实践
1. 项目概述:一次跨模型工作流复刻的探索
最近在尝试把Claude的“Dispatch”体验搬到Gemini和OpenCode上,这活儿听起来有点意思,本质上是在做一次跨模型工作流的复刻与适配。Claude的“Dispatch”功能,简单来说,就是一种智能的任务分发与上下文管理机制,它能理解你的复杂指令,自动将任务拆解、分配给合适的内部“专家”模块,并整合结果,提供连贯、精准的响应。这种体验的核心魅力在于“丝滑”——用户感觉是在和一个高度协同的团队对话,而非一个单一的、可能在某些领域存在短板的模型。
那么,为什么要把这个体验带到Gemini和OpenCode呢?原因很直接:生态互补与能力拓展。Claude固然强大,但每个模型都有其独特的优势领域和资源限制。Gemini在多模态理解和长上下文处理上表现突出,而OpenCode则在代码生成与结构化输出方面有独到之处。如果能将“Dispatch”式的智能协调能力赋予它们,就意味着我们可以根据任务特性,动态调用最合适的模型或工具,构建一个更强大、更灵活的AI应用后端。这适合任何希望提升复杂任务处理自动化水平、追求更高响应质量的开发者、产品经理或是AI应用构建者。
这个项目的目标,不是简单地复制一个功能,而是理解“Dispatch”体验背后的设计哲学——智能路由、上下文保持与结果合成——并在一套新的技术栈上实现它。我们将深入拆解其核心组件,并一步步构建一个能够协调Gemini和OpenCode的调度系统。
2. 核心设计思路与架构拆解
2.1 “Dispatch”体验的本质解构
要复刻体验,首先要解构它。经过分析,Claude的“Dispatch”体验可以拆解为三个核心环节:
- 意图识别与任务分解:系统需要准确理解用户的自然语言请求,判断其复杂程度,并将其分解为一系列原子化的子任务。例如,用户请求“分析这份财报PDF,总结其财务亮点,并用Python画一个趋势图”,这至少涉及文档解析、文本总结、数据提取和代码生成四个子任务。
- 智能路由与执行器调度:针对每个原子子任务,系统需要决定由哪个“执行器”来处理最合适。在我们的场景中,执行器就是Gemini、OpenCode,或者未来可能接入的其他模型、API甚至本地函数。路由决策的依据包括任务类型(文本、代码、多模态)、对模型能力的先验知识、成本考量以及当前上下文。
- 上下文管理与结果整合:这是保证体验“连贯性”的关键。子任务之间可能存在依赖关系(例如,画图需要先用总结出的数据)。系统必须妥善管理整个会话的上下文,将上游任务的输出作为下游任务的输入,并最终将所有子任务的结果整合成一个自然、统一的回复呈现给用户。
2.2 我们的系统架构设计
基于以上解构,我们设计了一个轻量级但功能完整的调度系统架构。整个系统围绕一个中央调度器(Dispatcher)展开。
用户请求 | v [中央调度器 (Dispatcher)] | |-- 1. 意图解析 & 任务分解 |-- 2. 为每个子任务选择最佳执行器 (Router) |-- 3. 编排执行顺序,管理共享上下文 (Orchestrator) | |--> [执行器池] | | | |-- [Gemini 执行器] (处理文本分析、多模态理解、创意生成) | |-- [OpenCode 执行器] (处理代码生成、代码解释、结构化输出) | |-- [未来可扩展的其他执行器...] | v 结果整合与格式化输出为什么选择这样的架构?
- 松耦合:调度器与执行器分离,方便我们独立升级或替换任一模型。例如,如果OpenCode发布了新版本,我们只需更新对应的执行器模块,无需改动核心调度逻辑。
- 可扩展性:新的模型或工具可以很容易地以“执行器”的形式接入系统,只需实现统一的接口。
- 可控性:中央调度器让我们能够集中实现复杂的路由策略、限流、熔断和日志记录,这是直接链式调用多个API难以做到的。
2.3 关键技术选型与考量
- 后端框架:选择FastAPI。它异步性能好,能高效处理多个并发的模型API调用,自动生成API文档,并且编写简洁。对于需要高并发的调度任务,异步IO至关重要。
- 模型API接入:
- Google Gemini API:使用官方
google-generativeaiPython SDK。其优势在于对多模态输入(图片、PDF)的原生支持,以及可配置的安全策略。 - OpenCode:由于其可能指代不同的开源或特定代码模型,我们假设通过其提供的API端点(如 OpenAI 格式的兼容API)进行调用。使用
httpx库进行异步HTTP请求,保持灵活性。
- Google Gemini API:使用官方
- 上下文管理:使用简单的内存字典或Redis来管理会话上下文。对于简单的演示或低频使用,内存存储足够;但如果需要持久化或支持多用户会话,Redis是更可靠的选择,它支持设置过期时间,避免内存泄漏。
- 任务队列(可选进阶):对于耗时较长的任务(如处理超大文档),可以引入Celery或RQ搭配 Redis 作为消息代理,实现异步任务队列,避免HTTP请求超时。
注意:在实际开发中,直接、频繁地调用商业模型API会产生费用。务必在代码中实现成本监控和用量限制,例如为每个用户或每个会话设置Token消耗上限。
3. 核心模块实现详解
3.1 中央调度器(Dispatcher)的实现
调度器是整个系统的大脑。我们将其核心功能实现为一个Python类。
from typing import List, Dict, Any, Optional from pydantic import BaseModel import asyncio class SubTask(BaseModel): """原子子任务数据模型""" id: str description: str task_type: str # 如 “text_analysis”, “code_generation”, “multimodal” depends_on: List[str] = [] # 依赖的其他子任务ID required_capabilities: List[str] = [] # 所需能力,如 “vision”, “long_context” class Dispatcher: def __init__(self, router, orchestrator): self.router = router # 路由决策模块 self.orchestrator = orchestrator # 编排执行模块 async def dispatch(self, user_query: str, session_id: str) -> Dict[str, Any]: """ 核心调度流程 """ # 1. 意图解析与任务分解 subtasks = await self._analyze_and_decompose(user_query, session_id) # 2. 为每个子任务路由决策 routed_tasks = [] for task in subtasks: best_executor = self.router.select_executor(task, session_id) routed_tasks.append({"task": task, "executor": best_executor}) # 3. 编排执行并获取结果 final_result = await self.orchestrator.execute(routed_tasks, session_id) return final_result async def _analyze_and_decompose(self, query: str, session_id: str) -> List[SubTask]: """ 利用一个LLM(这里我们先用Gemini)来分析用户查询并分解任务。 这是一个递归提示工程的过程。 """ # 构建一个专门用于任务分解的提示词 decomposition_prompt = f""" 你是一个高级任务规划AI。请将以下用户请求分解为一系列可以独立或顺序执行的原子子任务。 每个子任务应该足够简单,可以由一个专门的AI模型(如文本专家、代码专家、视觉专家)处理。 用户请求: {query} 请以JSON格式输出一个子任务列表,每个子任务包含以下字段: - id: 唯一标识符 (如 “task_1”) - description: 清晰的任务描述 - task_type: 任务类型 (从以下选择: text_analysis, code_generation, data_extraction, multimodal_understanding, summary) - depends_on: 该任务所依赖的其他任务id列表 (如果没有,则为空列表) - required_capabilities: 执行此任务所需的能力 (如 [“long_context”], [“vision”]) 只输出JSON,不要有其他解释。 """ # 调用Gemini API获取分解结果 # 此处为伪代码,实际调用需处理API密钥和错误 from .executors.gemini_executor import GeminiExecutor gemini = GeminiExecutor() decomposition_result = await gemini.generate_text(decomposition_prompt) # 解析返回的JSON,转化为SubTask对象列表 import json try: task_dicts = json.loads(decomposition_result) return [SubTask(**t) for t in task_dicts] except json.JSONDecodeError: # 如果模型没有返回标准JSON,这里需要更健壮的解析或重试逻辑 # 一个备选方案是让模型以特定标记格式输出,我们再解析 raise ValueError("Failed to parse task decomposition result.")关键点解析:
_analyze_and_decompose方法:这是实现“智能”的起点。我们利用LLM(此处是Gemini)来理解复杂意图并拆解任务。提示词工程的质量直接决定了分解的准确性。在实际应用中,可能需要多轮调试和优化提示词,甚至准备一些示例(Few-shot)来引导模型。- 错误处理:模型输出不一定总是规整的JSON,必须有完善的异常处理机制。生产环境中,可以考虑使用像
instructor或Pydantic与OpenAI函数调用结合的方式,来强制结构化输出。
3.2 路由决策器(Router)的逻辑
路由器的职责是:给定一个子任务,返回最适合处理它的执行器名称。
class Router: def __init__(self): # 定义执行器能力矩阵。这是一个静态配置,也可以设计成动态学习的。 self.executor_capabilities = { "gemini-pro": ["text_analysis", "summary", "multimodal_understanding", "long_context", "creative_writing"], "gemini-pro-vision": ["multimodal_understanding", "text_analysis"], "opencode": ["code_generation", "code_explanation", "structured_output", "data_extraction"], } # 执行器成本(每千Token的假设成本,单位任意,用于比较) self.executor_cost = { "gemini-pro": 1.0, "gemini-pro-vision": 1.5, "opencode": 0.8, } def select_executor(self, task: SubTask, session_context: Dict) -> str: """ 基于任务需求和策略选择执行器。 策略可以基于:能力匹配、成本、负载、历史表现等。 """ candidates = [] for executor_name, capabilities in self.executor_capabilities.items(): # 基础筛选:执行器是否具备任务所需的所有能力? if all(req in capabilities for req in task.required_capabilities): candidates.append(executor_name) if not candidates: # 如果没有完全匹配的,降级处理:选择能满足核心需求的 # 这里简化处理,直接返回一个默认的 return "gemini-pro" # 策略:从候选者中,选择成本最低的 # 更复杂的策略可以在这里实现,如加权评分(能力匹配度、成本、响应速度) selected = min(candidates, key=lambda x: self.executor_cost.get(x, float('inf'))) # 可以在此处记录路由决策,用于后续分析和优化 # log_route_decision(task.id, selected, session_context) return selected路由策略的考量:
- 能力匹配优先:这是底线,不能让一个不懂代码的模型去生成代码。
- 成本优化:在能力满足的前提下,选择更经济的模型。这对于控制项目运营成本非常重要。
- 进阶策略:可以引入性能历史数据,比如某个模型对某类任务的平均响应时间和准确率,实现基于效用的路由。甚至可以引入简单的负载均衡,避免所有请求涌向同一个模型端点。
3.3 编排执行器(Orchestrator)与上下文管理
编排器负责按照依赖关系执行任务,并管理任务间的数据传递。
class Orchestrator: def __init__(self, executor_registry): self.executors = executor_registry # 一个包含所有执行器实例的字典 self.context_store = {} # 简化版内存上下文存储 {session_id: {task_id: result, ...}} async def execute(self, routed_tasks: List[Dict], session_id: str) -> str: """ 执行路由后的任务列表,处理依赖,整合结果。 """ # 初始化或获取本次会话的上下文 session_context = self.context_store.setdefault(session_id, {}) # 构建任务依赖图(简化版,使用拓扑排序思想) # 这里我们假设依赖关系不构成复杂环,简单按顺序执行依赖已满足的任务 task_results = {} executed = set() # 一个简单的执行循环,直到所有任务完成 while len(executed) < len(routed_tasks): progress = False for item in routed_tasks: task = item["task"] executor_name = item["executor"] if task.id in executed: continue # 检查依赖是否都已满足 if all(dep in executed for dep in task.depends_on): progress = True # 准备输入:合并用户原始query和依赖任务的输出 input_context = f"原始用户请求的上下文。\n" for dep_id in task.depends_on: input_context += f"\n[子任务 {dep_id} 的结果]:\n{task_results[dep_id]}\n" full_prompt = f"{input_context}\n请执行以下任务:{task.description}" # 获取执行器并运行 executor = self.executors.get(executor_name) if not executor: raise ValueError(f"Executor {executor_name} not found.") try: result = await executor.execute(full_prompt, task_type=task.task_type) task_results[task.id] = result executed.add(task.id) # 更新到会话上下文 session_context[task.id] = result except Exception as e: # 任务执行失败,记录错误,可以尝试重试或降级 task_results[task.id] = f"任务执行失败: {str(e)}" executed.add(task.id) # 标记为已执行(失败) # 根据策略决定是否终止整个流程 if not progress: # 检测到循环依赖或死锁 raise RuntimeError("无法解决任务依赖关系,可能存在循环依赖。") # 所有子任务完成,进行最终整合 final_integration_prompt = f""" 你是一个结果整合专家。以下是用户原始请求和一系列子任务的结果。 请将这些结果整合成一个连贯、完整、直接回答用户问题的最终回复。 回复应自然流畅,就像由一个专家一次性完成的一样。 原始用户请求:{routed_tasks[0]['task'].description if routed_tasks else 'N/A'} (这是初始分解的源头) 子任务结果汇总: {self._format_subtask_results_for_integration(task_results)} 请开始你的最终整合回复: """ # 通常,最终整合也由一个LLM执行器完成,比如Gemini final_executor = self.executors.get("gemini-pro") final_output = await final_executor.execute(final_integration_prompt, task_type="text_analysis") return final_output def _format_subtask_results_for_integration(self, results: Dict) -> str: formatted = "" for task_id, result in results.items(): formatted += f"\n--- 子任务 {task_id} 结果 ---\n{result}\n" return formatted编排器的核心挑战与解决方案:
- 依赖管理:我们实现了一个简单的轮询检查依赖的算法。对于更复杂的DAG(有向无环图)依赖,可以使用像
networkx这样的库进行拓扑排序,实现真正的并行执行独立任务。 - 上下文构建:如何将上游任务的结果有效地传递给下游任务,是体验“连贯性”的关键。我们采用的方式是将所有依赖结果以清晰的结构化格式拼接到提示词中。对于非常长的上下文,可能需要采用摘要、选择性注入等更高级的技术。
- 错误处理与重试:生产系统中,必须为每个子任务执行配置超时、重试策略和降级方案(如主执行器失败后切换到备用执行器)。
3.4 执行器(Executor)的抽象与实现
执行器是对不同模型API的一层统一封装。
from abc import ABC, abstractmethod import httpx import google.generativeai as genai class BaseExecutor(ABC): @abstractmethod async def execute(self, prompt: str, **kwargs) -> str: pass class GeminiExecutor(BaseExecutor): def __init__(self, model_name="gemini-1.5-pro"): genai.configure(api_key=os.getenv("GEMINI_API_KEY")) self.model = genai.GenerativeModel(model_name) async def execute(self, prompt: str, task_type: str = None, image_data: bytes = None) -> str: """ 执行任务。支持纯文本和多模态。 """ try: if image_data: # 处理多模态请求 image_part = genai.upload_file(image_data, mime_type="image/jpeg") # 需根据实际类型调整 contents = [image_part, prompt] else: contents = [prompt] # 根据任务类型调整生成配置(如温度、max_tokens) generation_config = self._get_generation_config(task_type) response = await self.model.generate_content_async( contents, generation_config=generation_config ) return response.text except Exception as e: # 记录日志,并抛出或返回一个友好的错误信息 return f"[Gemini 处理错误]:{str(e)}" def _get_generation_config(self, task_type): """根据任务类型返回不同的生成配置""" configs = { "code_generation": genai.GenerationConfig(temperature=0.1, max_output_tokens=2048), "creative_writing": genai.GenerationConfig(temperature=0.8, max_output_tokens=1024), "text_analysis": genai.GenerationConfig(temperature=0.2, max_output_tokens=1024), } return configs.get(task_type, genai.GenerationConfig(temperature=0.3, max_output_tokens=1024)) class OpenCodeExecutor(BaseExecutor): def __init__(self, api_base: str, api_key: str): self.api_base = api_base self.api_key = api_key self.client = httpx.AsyncClient(timeout=30.0) async def execute(self, prompt: str, task_type: str = None) -> str: """ 调用类OpenAI API格式的OpenCode端点。 """ headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } payload = { "model": "opencode-model", # 根据实际模型名调整 "messages": [{"role": "user", "content": prompt}], "temperature": 0.1 if task_type == "code_generation" else 0.3, "max_tokens": 2048 } try: resp = await self.client.post(f"{self.api_base}/v1/chat/completions", json=payload, headers=headers) resp.raise_for_status() data = resp.json() return data["choices"][0]["message"]["content"] except httpx.HTTPStatusError as e: return f"[OpenCode API 错误] HTTP {e.response.status_code}" except Exception as e: return f"[OpenCode 处理错误]:{str(e)}" finally: await self.client.aclose()执行器设计的要点:
- 统一接口:所有执行器都继承自
BaseExecutor并实现execute方法,这使调度器可以用相同的方式调用它们。 - 配置化:在
execute方法中接收task_type等参数,允许根据任务动态调整调用参数(如温度、最大Token数),这是优化输出质量的重要手段。 - 健壮性:每个执行器内部都应有完善的错误处理和日志记录,避免一个模型的故障导致整个调度流程崩溃。
4. 系统集成与API暴露
将上述模块组装起来,并通过FastAPI暴露为一个服务。
from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uuid app = FastAPI(title="Gemini & OpenCode Dispatcher API") # 初始化全局组件 router = Router() executor_registry = { "gemini-pro": GeminiExecutor(model_name="gemini-1.5-pro"), "opencode": OpenCodeExecutor(api_base=os.getenv("OPENCODE_API_BASE"), api_key=os.getenv("OPENCODE_API_KEY")), } orchestrator = Orchestrator(executor_registry) dispatcher = Dispatcher(router, orchestrator) class DispatchRequest(BaseModel): query: str session_id: Optional[str] = None # 如果提供,则继续之前的会话 @app.post("/dispatch") async def handle_dispatch(request: DispatchRequest): """ 主调度接口。 """ session_id = request.session_id or str(uuid.uuid4()) try: result = await dispatcher.dispatch(request.query, session_id) return { "session_id": session_id, "response": result, "status": "success" } except Exception as e: # 记录详细日志 logger.error(f"Dispatch failed for session {session_id}: {e}") raise HTTPException(status_code=500, detail=f"调度处理失败: {str(e)}") @app.get("/session/{session_id}/context") async def get_session_context(session_id: str): """获取某个会话的上下文(用于调试或前端状态保持)""" if session_id in orchestrator.context_store: return {"session_id": session_id, "context": orchestrator.context_store[session_id]} else: raise HTTPException(status_code=404, detail="Session not found")这个API提供了一个/dispatch端点,接收用户查询,返回处理结果,并维护一个会话ID以支持多轮对话的上下文延续。
5. 实战测试、常见问题与优化策略
5.1 端到端测试案例
假设我们向部署好的服务发送如下请求:
curl -X POST "http://localhost:8000/dispatch" \ -H "Content-Type: application/json" \ -d '{ "query": "请解析这张图片中的图表,告诉我2023年Q4的趋势数据,然后用Python的matplotlib生成一个类似的折线图代码。" }'系统内部流转:
- Dispatcher收到请求,调用
_analyze_and_decompose。 - Gemini(作为任务分解器)可能返回如下子任务:
[ { "id": "task_1", "description": "识别并描述图片中图表的数据,特别是2023年Q4的趋势。", "task_type": "multimodal_understanding", "depends_on": [], "required_capabilities": ["vision"] }, { "id": "task_2", "description": "根据任务1提取的数据,生成一段Python代码,使用matplotlib绘制类似的折线图。", "task_type": "code_generation", "depends_on": ["task_1"], "required_capabilities": [] } ] - Router为
task_1选择gemini-pro-vision,为task_2选择opencode。 - Orchestrator先执行
task_1,将图片和提示词发给Gemini Vision,得到文本描述(如“图表显示Q4销售额从10月的120万增长到12月的150万”)。 - Orchestrator将
task_1的结果作为上下文,与task_2的描述合并,发送给OpenCode执行器。 - OpenCode生成对应的Matplotlib代码。
- Orchestrator最后可能调用Gemini进行一次结果整合,将数据描述和代码块组织成一个友好的回答。
5.2 常见问题与排查技巧
任务分解不准确或过于琐碎
- 现象:模型把简单请求拆成太多无意义的子任务,或漏掉了关键步骤。
- 排查:检查任务分解的提示词。尝试提供更明确的指令和格式要求。在提示词中加入几个高质量的示例(Few-shot learning)通常能极大改善效果。
- 技巧:可以设置一个“复杂度阈值”。先让一个轻量级模型判断请求是否真的需要分解,对于简单查询,直接路由给单一模型处理,避免不必要的开销和延迟。
上下文过长导致后续任务失败
- 现象:上游任务输出内容很长,导致下游任务的提示词超出模型上下文窗口。
- 排查:监控每个子任务调用时的Token消耗。在Orchestrator构建提示词时进行长度检查。
- 技巧:实现“上下文摘要”功能。对于需要传递给下游的长文本,先调用一次模型进行摘要浓缩,只传递核心信息。或者,在设计任务链时,让下游任务主动去“询问”所需的具体数据,而非被动接收全部上下文。
模型API调用失败或超时
- 现象:某个执行器频繁报错,导致整个流程中断。
- 排查:检查网络、API密钥配额和模型服务状态。在执行器内部实现详细的错误日志记录。
- 技巧:为每个执行器实现重试机制(如使用
tenacity库)和熔断器模式。当某个模型失败率超过阈值时,暂时将其从路由候选池中移除,并尝试降级到其他可用模型。
最终整合结果生硬、不连贯
- 现象:最终回复读起来像是拼凑的,有明显的段落割裂感。
- 排查:检查最终整合的提示词。是否提供了足够的指引让模型进行“创作性”的融合,而不是简单罗列。
- 技巧:在最终整合提示词中,明确要求模型以特定的口吻(如“专业的分析师”)和格式进行回复。可以将用户最初的问题再次强调,让整合工作始终围绕核心问题展开。
5.3 性能与成本优化策略
- 异步并发执行:利用
asyncio.gather并行执行没有依赖关系的子任务,可以显著降低总响应时间。Orchestrator中的执行循环可以优化为依赖图驱动的并行执行。 - 缓存策略:对于常见、耗时的子任务结果(如“将这张图转成表格数据”),可以考虑将输入(图片哈希)和输出(表格文本)缓存起来,下次遇到相同请求直接返回,节省成本和时间。
- 成本监控与预算:在每个执行器的调用前后记录Token使用量,并在路由决策中引入成本因子。可以为每个用户或每个会话设置预算,超出后自动切换到更经济的模型或拒绝服务。
- 路由策略动态优化:定期收集任务执行结果的质量反馈(如通过人工评估或自动化评分),用于动态调整
Router中的能力矩阵和权重,让系统越用越“聪明”。
将Claude的“Dispatch”体验迁移到Gemini和OpenCode的生态中,是一次富有挑战但极具价值的工程实践。它迫使我们去深入思考复杂AI工作流的设计模式,而不仅仅是简单的API调用。这套系统的核心价值在于其可观测性和可控性——每一个决策、每一次调用都清晰可见,并且可以通过策略进行调整。在实际部署中,你会不断在分解的粒度、路由的准确性、上下文的有效性和系统的响应速度之间进行权衡。从我搭建类似系统的经验来看,起步时不必追求全自动的完美分解,可以从一些预设的、高频的复杂任务模板开始,逐步增加系统的智能,这样更容易获得稳定可靠的效果。
