当前位置: 首页 > news >正文

Gliding Horse 的 L2 作战地图:让多 Agent 协作从“摸黑”变成“透明”

一、L2 黑板的定位:不只是“共享内存”

在 Gliding Horse 的四层记忆架构中,L2 是承上启下的关键一层:

  • 向下,它缓存 L0 持久化层中的热数据,提供毫秒级的读写性能。
  • 向上,它为 L3 投影引擎提供实时数据源,按需裁剪上下文注入 LLM。
  • 横向,它是多个 Agent 实例唯一共享的工作区,承载着任务状态、节点数据和协调消息。

传统 Agent 框架往往通过消息队列或轮询来传递状态,而流马的 L2 黑板则将这些能力内建在同一个图存储中,所有 Agent 通过 SPARQL 直接读写,无需额外的通信中间件。

Agent 集群

读写

读写

读写

读写

SPARQL 查询

SA 态势面板

实时仪表盘
Agent 状态 / 任务进度 / 资源冲突

L2 作战地图 (Blackboard)

节点缓存 + Oxigraph SPARQL

AgentTracker
实时状态跟踪

任务树 DAG
依赖管理

资源锁
并发控制

SharedZone
协调消息

PA (计划)

DA-1 (执行)

DA-2 (执行)

CA (检查)

二、AgentTracker:每个 Agent 的“生命体征”

L2 作战地图的核心组件是AgentTracker——一个实时跟踪所有 Agent 状态的子系统。每当一个 Agent 实例启动、执行任务或结束时,它都会在 L2 中更新自己的“生命体征”。

pub struct AgentStatus { pub agent_id: String, // Agent 唯一标识 pub agent_role: String, // PA / DA / CA / AA / SA pub task_iri: String, // 当前执行的任务 IRI pub status: AgentActivity, // Idle / Working / Blocked / Error pub started_at: DateTime<Utc>, pub last_heartbeat: DateTime<Utc>, // 最后心跳时间 pub current_operation: Option<String>, // 当前操作描述 pub resource_locks: Vec<ResourceLock>, // 持有的资源锁 } pub enum AgentActivity { Idle, Working, Blocked, Error, }

pub enum AgentActivity {
Idle,
Working,
Blocked,
Error,
}

下面是一个完整的 Python 示例,展示如何创建 `AgentStatus` 对象、更新心跳、以及 SA 如何通过 SPARQL 查询超时 Agent: ```python import time from datetime import datetime, timezone from typing import Optional, List from enum import Enum class AgentActivity(Enum): """Agent 活动状态枚举""" IDLE = "Idle" WORKING = "Working" BLOCKED = "Blocked" ERROR = "Error" class ResourceLock: """资源锁信息(简化版)""" def __init__(self, resource_type: str, resource_id: str, lock_type: str): self.resource_type = resource_type self.resource_id = resource_id self.lock_type = lock_type class AgentStatus: """Agent 状态对象,对应 Rust 中的 AgentStatus 结构体""" def __init__(self, agent_id: str, agent_role: str): self.agent_id = agent_id self.agent_role = agent_role # PA / DA / CA / AA / SA self.task_iri: Optional[str] = None # 当前执行的任务 IRI self.status: AgentActivity = AgentActivity.IDLE self.started_at: datetime = datetime.now(timezone.utc) self.last_heartbeat: datetime = self.started_at self.current_operation: Optional[str] = None # 当前操作描述 self.resource_locks: List[ResourceLock] = [] # 持有的资源锁 def update_heartbeat(self): """更新心跳时间戳,Agent 定期调用""" self.last_heartbeat = datetime.now(timezone.utc) print(f"[{self.agent_id}] 心跳已更新: {self.last_heartbeat.isoformat()}") def start_task(self, task_iri: str, operation: str): """开始执行任务""" self.task_iri = task_iri self.status = AgentActivity.WORKING self.current_operation = operation self.update_heartbeat() print(f"[{self.agent_id}] 开始任务: {task_iri} → {operation}") def mark_blocked(self, reason: str): """标记为阻塞状态""" self.status = AgentActivity.BLOCKED self.current_operation = reason self.update_heartbeat() print(f"[{self.agent_id}] 阻塞: {reason}") def mark_error(self, error_msg: str): """标记为错误状态""" self.status = AgentActivity.ERROR self.current_operation = error_msg self.update_heartbeat() print(f"[{self.agent_id}] 错误: {error_msg}") class SchedulerAgent: """调度器(SA),负责监控所有 Agent 状态""" HEARTBEAT_TIMEOUT = 30 # 心跳超时阈值(秒) def __init__(self): self.agents: dict[str, AgentStatus] = {} def register_agent(self, agent: AgentStatus): """注册 Agent 到 SA 的监控列表""" self.agents[agent.agent_id] = agent print(f"[SA] 注册 Agent: {agent.agent_id} ({agent.agent_role})") def detect_stale_agents(self) -> List[str]: """ 检测超时 Agent(对应 Rust 中的 detect_stale_agents()) 返回所有超过 30 秒未心跳的 Agent ID 列表 """ now = datetime.now(timezone.utc) stale = [] for agent_id, status in self.agents.items(): elapsed = (now - status.last_heartbeat).total_seconds() if elapsed > self.HEARTBEAT_TIMEOUT: stale.append(agent_id) print(f"[SA] ⚠️ 检测到僵尸 Agent: {agent_id},已离线 {elapsed:.1f} 秒") return stale def query_working_agents(self) -> List[AgentStatus]: """ 模拟 SPARQL 查询:获取所有 Working 状态的 Agent 对应原文中的 SPARQL: SELECT ?agent ?role ?status ?operation WHERE { GRAPH <blackboard:shared> { ?agent a <http://agent-os.org/type/AgentStatus> ; <http://agent-os.org/prop/status> "Working" ; <http://agent-os.org/prop/current_operation> ?operation . } } """ return [a for a in self.agents.values() if a.status == AgentActivity.WORKING] # ========== 使用示例 ========== if __name__ == "__main__": # 1. 创建 SA 调度器 sa = SchedulerAgent() # 2. 创建三个 Agent 并注册 da1 = AgentStatus(agent_id="da-001", agent_role="DA") da1.start_task("task:code-gen", "生成用户模块代码") da2 = AgentStatus(agent_id="da-002", agent_role="DA") da2.start_task("task:db-migrate", "执行数据库迁移") ca1 = AgentStatus(agent_id="ca-001", agent_role="CA") ca1.mark_blocked("等待代码审查结果") for agent in [da1, da2, ca1]: sa.register_agent(agent) # 3. 模拟心跳更新 time.sleep(1) da1.update_heartbeat() # DA-001 正常心跳 da2.update_heartbeat() # DA-002 正常心跳 # CA-001 故意不更新心跳,模拟超时 # 4. 模拟等待 32 秒后检测超时 print("\n--- 等待 32 秒后检测超时 ---") # 手动将 ca1 的心跳时间调早,模拟超时 ca1.last_heartbeat = datetime.now(timezone.utc).replace(year=2020) stale_agents = sa.detect_stale_agents() print(f"超时 Agent 列表: {stale_agents}") # 5. 查询当前正在工作的 Agent print("\n--- 查询 Working 状态的 Agent ---") working = sa.query_working_agents() for w in working: print(f" {w.agent_id} ({w.agent_role}) → {w.current_operation}")

心跳超时检测是这个子系统的关键机制。每个 Agent 定期更新自己的心跳时间戳,SA 则通过detect_stale_agents()方法扫描所有超过阈值(默认 30 秒)未心跳的 Agent。一旦发现“僵尸” Agent,SA 可以立即回收其持有的资源锁,并将它负责的子任务重新分配给其他 Agent。

所有状态数据同步写入 Oxigraph 的blackboard:shared命名图,这意味着 SA 可以通过 SPARQL 直接查询实时态势:

SELECT ?agent ?role ?status ?operation WHERE { GRAPH <blackboard:shared> { ?agent a <http://agent-os.org/type/AgentStatus> ; <http://agent-os.org/prop/status> "Working" ; <http://agent-os.org/prop/current_operation> ?operation . } }

这种设计让 SA 的态势感知从“被动等待通知”变为“主动实时查询”,调度决策不再依赖猜测。

三、资源锁:防止 Agent 互相踩脚

并行 Agent 最常见的冲突场景是资源竞争——两个 DA 同时试图修改同一个文件,或者一个 Agent 正在读数据,另一个 Agent 却开始写。传统的做法是通过消息队列串行化,但这会牺牲并行性。

流马的方案是三级资源锁,直接在 L2 中实现:

pub struct ResourceLock { pub resource_type: String, // "file", "db", "api", "graph" pub resource_id: String, // 如 "file:///data/sales.csv" pub acquired_at: DateTime<Utc>, pub acquired_by: String, // agent_id pub lock_type: LockType, // Read / Write / Exclusive } pub enum LockType { Read, // 多个 Agent 可同时持有 Write, // 仅一个 Agent 可持有,与其他 Write/Exclusive 互斥 Exclusive, // 仅一个 Agent 可持有,与其他所有锁互斥 }

锁冲突检测是实时的:当一个 Agent 尝试获取资源锁时,系统会检查该资源的当前锁状态。如果锁冲突(例如两个 Agent 同时请求 Write 锁),后来的请求会被拒绝,Agent 需要等待或选择其他方案。所有锁信息同步到blackboard:shared图,SA 可以随时查看“哪些资源正在被谁锁定”。

四、跨任务依赖:让任务树从“孤立”到“关联”

在实际的软件工程流程中,任务之间往往存在复杂的依赖关系——“设计文档”完成之后才能开始“编码”,“数据库迁移”完成之后才能执行“API 测试”。如果 L2 只跟踪单个任务的子树,SA 就无法判断跨任务的阻塞状态。

我们在TaskTreeNode中新增了跨任务依赖边

pub struct TaskTreeNode { pub task_iri: String, pub parent: Option<String>, // 父任务 pub children: Vec<String>, // 子任务 pub dependencies: Vec<String>, // 依赖的其他任务 pub dependents: Vec<String>, // 依赖此任务的其他任务(反向索引) pub status: String, }

通过add_task_dependency()方法,任意两个任务之间可以建立依赖关系。get_task_dag()方法则利用拓扑排序,将任务树展开为层级化的有向无环图(DAG)。

这使得 SA 可以回答关键调度问题:“任务 B 为什么还没开始?”——因为它依赖的任务 A 还在执行。“如果我取消任务 C,哪些任务会受影响?”——查询dependents列表即可。

五、SharedZone:Agent 间的“群聊”

除了结构化的状态数据和资源锁,Agent 有时还需要松耦合的沟通——比如一个 DA 发现了某个潜在问题,希望通知 CA 特别关注;或者 SA 广播一条紧急指令让所有 Agent 暂停当前操作。

SharedZone 提供了这样的能力:

pub struct CoordinationMessage { pub from_agent: String, // 发送者 pub msg_type: CoordinationMsgType, // 消息类型 pub payload: serde_json::Value, // 消息内容 pub timestamp: DateTime<Utc>, // 时间戳 } pub enum CoordinationMsgType { TaskAnnouncement, // 任务公告 ProgressUpdate, // 进度更新 ResourceRequest, // 资源请求 ConflictWarning, // 冲突警告 SyncRequest, // 同步请求 }

Agent 可以发布协调消息到blackboard:shared,其他 Agent 则可以按时间戳或发送者过滤读取。这相当于给 Agent 们开了一个“群聊”,但消息是有结构的、可查询的、持久化的——不是简单的文本广播。

六、给平台带来的核心优势

能力传统方案L2 作战地图
Agent 状态可见性通过日志或外部监控间接推断SA 可实时 SPARQL 查询每个 Agent 的状态、心跳、当前操作
资源冲突处理事后检测,人工介入锁冲突实时拒绝,所有锁状态可视
跨任务依赖靠文档或约定,容易遗漏结构化依赖关系,拓扑排序,自动化调度
Agent 间通信消息队列,额外基础设施同图存储内协调消息,零延迟,可查询
故障恢复手动排查心跳超时自动检测僵尸 Agent,自动回收资源和任务

一句话总结:L2 作战地图让多 Agent 协作从“摸黑干活”变成了“开着雷达飞行”。调度器可以实时掌握全局态势,Agent 之间可以在同一个图空间里安全地共享数据和协调行动,而所有这些信息都是可追溯、可审计的。

http://www.jsqmd.com/news/1093410/

相关文章:

  • 具身智能2.0时代洗牌局:2026国内头部具身企业第一梯队为何是“宇树、智元、越疆”?
  • 暗黑3终极自动化战斗宏:D3KeyHelper技术解析与实战应用
  • STC8H单片机IAP串口升级实战:告别冷启动,实现远程程序更新
  • 【单片机毕业设计】基于 STM32 的智能感应开盖垃圾桶设计,基于单片机的溢满检测自动垃圾桶控制系统(013101)
  • 应用场景与方案优势
  • 告别会议低效:智能会议系统的本地化部署方案
  • Java毕设项目:基于 SpringBoot+Vue 的网络域名管理系统设计与实现 前后端分离架构下 Web 域名运维管理平台 (源码+文档,讲解、调试运行,定制等)
  • tensorRT整个系列的总结(包括量化,减枝)
  • 立个flag。周四发表一篇文章。
  • Python变量作用域全解析:从局部到全局,彻底掌握LEGB规则
  • 无需备份即可从 iPhone 恢复已删除短信的 4 种方法
  • 智慧安防行业物联网技术与方案指南:从监控到应急响应的全方位解决方案
  • 【RISC-V】解决WSL2命令行总是出现bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)的问题
  • 【计算机毕业设计案例】网络域名资源分配与统筹管理系统设计 信息化视角下域名生命周期管理系统设计(程序+文档+讲解+定制)
  • Android 开发问题:Invalid <color> for given resource value.
  • Shopify分销系统搭建指南:适合初创团队的低成本增长方案
  • 我用 Claude Code 做 Code Review 两个月,Bug 漏检率从 41% 降到 11%
  • 服装收银系统究竟哪个好?最后我选了这个
  • 别再混着说了:2026 AI Agent 技术栈分层(tool / Skill / MCP / A2A / Context Harness Engineering)
  • Codex Agent Legion 实现原理与 GitHub 使用指南
  • 剪流AI员工手机数据安全架构解析:企业客户资料是否存在泄露风险?
  • 墨香情手游全域自由轻功,无束缚飞檐走壁闯江湖
  • .Net如何在AgentFramework中给AI智能体给AI添加执行python脚本和运行代码的能力后——后续可用于对接openClaw技能
  • Mybatis基础操作
  • Rust的async函数中的await点优化与编译器在状态机生成中的转换
  • 各类幕墙验收时应提供的资料
  • Skill用得好,下班走得早:一文讲透Skill的结构与设计
  • AI native: Casebook 面向 AI Agent 时代的测试用例工程化工作流
  • 149期目录 黄大年茶思屋“难题揭榜”第149期--云核心网领域第四期
  • 一篇搞懂SpringMVC XML 配置标签<context:component-scan>