当前位置: 首页 > news >正文

Nanobot 从 gateway 启动命令来看个人助理Agent的实现

背景

在之前的文章中Nanobot 轻量级的个人AI助手,我们分析了nanobot onboard命令的实现,
该命令的主要作用是做一系列的初始化工作, 这次我们分析另一个命令nanobot gateway,
从整理上来看,该nanobot用到了Typer,Rich,Questionary,prompt_toolkit这种现代、美观且交互式命令行界面 (CLI) 的强大工具组合。
Typer 用于定义 CLI 结构和参数;Rich 负责文本样式、表格、面板和 Markdown 渲染;Questionary 用于创建交互式问答界面
其中Rich中的Console,Markdown,Table,Text用来进行渲染,支持颜色、表格、面板、语法高亮和 Markdown ,以更好的进行个性化的展示。

nanobot gateway命令

gateway 是 Typer 子命令,用于 启动 nanobot 网关进程:加载配置、同步工作区模板、创建消息总线与 LLM Provider、会话管理、Cron、AgentLoop、
ChannelManager(各聊天频道)、Heartbeat,最后用 asyncio.run 并发跑 agent 主循环 + 所有channel,并在退出时做清理.

defgateway(port:int|None=typer.Option(None,"--port","-p",help="Gateway port"),workspace:str|None=typer.Option(None,"--workspace","-w",help="Workspace directory"),verbose:bool=typer.Option(False,"--verbose","-v",help="Verbose output"),config:str|None=typer.Option(None,"--config","-c",help="Path to config file"),):
参数说明
-p/--port端口;未传入时使用配置中的config.gateway.port。貌似在这里也没有太大的作用
-w/--workspace覆盖配置中的工作区路径(经_load_runtime_config写回Config)。
-v/--verboseTrue时对标准库logging设置DEBUG,便于查看更底层日志。
-c/--config指定配置文件路径(同样由_load_runtime_config处理)。
  1. 核心组件实例化
  • MessageBus():进程内入站/出站消息队列。
  • _make_provider(config):按当前默认模型解析 provider,构造具体 Provider 实例(含 generation 设置);若缺少 API key 等会typer.Exit
  • SessionManager(config.workspace_path):会话持久化(JSONL 等),与工作区目录绑定。
  • CronService(cron_store_path),随后将cron.on_job设为下文所述异步回调。

MessageBus
这是消息总线,主要包括从channel传递过来的Message,和需要发送给channel的Message:

self.inbound:asyncio.Queue[InboundMessage]=asyncio.Queue()self.outbound:asyncio.Queue[OutboundMessage]=asyncio.Queue()

asyncio.Queue 是 Python asyncio 库中用于在异步协程(Coroutine)之间安全传递数据的先进先出(FIFO)数据结构。这里默认允许放入任意数量的元素。
它是协程同步的,能安全地在协程间共享数据,类似于线程同步。

make_provider
根据当前Config(主要是默认 model 和 providers.*)构造并返回一个 LLM Provider 实例,供 gateway、agent 等命令里的 AgentLoop 使用。
这里的provider主要有以下几种:

  • azure_openai 对应 AzureOpenAIProvider
  • openai_compat 对应 OpenAICompatProvider
  • openai_codex 对应 OpenAICodexProvider
  • anthropic 对应 AnthropicProvider
    这些provider提供与大模型的交互

SessionManager
这里主要负责在工作区里 按「会话键」加载、缓存、保存对话会话。每个会话对应磁盘上的一个 JSONL 文件,键一般是 channel:chat_id
如下:

├── sessions │ ├── cli_direct.jsonl │ └── feishu_ou_52720638d0cxxxxf.jsonl

这里的目录在workspace/session下
2. Cron 服务
任务持久化路径:workspace/cron/jobs.json。
CronService(cron_store_path),下面会把 cron.on_job 设为异步回调

if is_default_workspace(config.workspace_path): _migrate_cron_store(config) # Create cron service with workspace-scoped store cron_store_path = config.workspace_path / "cron" / "jobs.json" cron = CronService(cron_store_path) ... async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent.""" from nanobot.agent.tools.cron import CronTool from nanobot.agent.tools.message import MessageTool from nanobot.utils.evaluator import evaluate_response reminder_note = ( "[Scheduled Task] Timer finished.\n\n" f"Task '{job.name}' has been triggered.\n" f"Scheduled instruction: {job.payload.message}" ) cron_tool = agent.tools.get("cron") cron_token = None if isinstance(cron_tool, CronTool): cron_token = cron_tool.set_cron_context(True) try: resp = await agent.process_direct( reminder_note, session_key=f"cron:{job.id}", channel=job.payload.channel or "cli", chat_id=job.payload.to or "direct", ) finally: if isinstance(cron_tool, CronTool) and cron_token is not None: cron_tool.reset_cron_context(cron_token) response = resp.content if resp else "" message_tool = agent.tools.get("message") if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: return response if job.payload.deliver and job.payload.to and response: should_notify = await evaluate_response( response, job.payload.message, provider, agent.model, ) if should_notify: from nanobot.bus.events import OutboundMessage await bus.publish_outbound(OutboundMessage( channel=job.payload.channel or "cli", chat_id=job.payload.to, content=response, )) return response cron.on_job = on_cron_job

on_cron_job 这个是在定时任务触发时调用该方法,主要是逻辑是
构建一个Message信息并直接 让大模型去处理对应的信息,并且保留该job对应的channelchat_id(这个会在消息处理的时候进行传递,
具体是在_run_agent_loop方法中调用_set_tool_context),同时也会保留当前协程的上下文(用contextvars保留),防止在执行任务的增加任务而导致重复增加调度任务(具体见CronTool的execute方法中add的判断)
根据大模型的反馈来决定需不需要进行通知。

  1. ChannelManager
    ChannelManager(config, bus):按配置启动feishu/dingding等channel插件,与MessageBus对接,用于从channel发送消息到处理队列,以及传递消息给对应的Channel,
    这里会通过pkgutil.iter_modules(nanobot.channels)遍历nanobot.channels路径下所有子模块和子包,
    并结合importlib.import_module(如果模块已导入,则直接返回) dir(mod)动态的获取模块的BaseChannel子类。

  2. Heartbeat 服务
    周期性的读取 Heartbeat.md 文件,并使用大模型总结该文件下的活跃任务调用大模型去执行,最后还是调用大模型去判断是否需要把任务的结果回传给Channel
    对于 heartbeat session则只会保留最近几条信息,默认是8条,由gateway.heartbeat.keepRecentMessages来决定,
    对于 回传给哪个 Channel,会按照激活的Channel从最新的活跃时间排序,取最新的Channel(排除纯本地(CLI) / 系统(System))

  3. 启动cron hearbeat 服务,并开启主流程循环,启动所有的channel

    awaitcron.start()awaitheartbeat.start()awaitasyncio.gather(agent.run(),channels.start_all(),

注意这里的AgentLoop.run是个死循环。所以该命令会使进程hang在这里。

http://www.jsqmd.com/news/621775/

相关文章:

  • VC Spyglass实战指南:从零开始掌握CDC约束配置与调试
  • STAR-CCM+软件许可优化管理:如何降低许可成本、提升仿真效率与实现规范化管理
  • WAN2.2文生视频ComfyUI工作流定制:支持批量提示词导入与队列执行
  • Git 提交 LF will be replaced by CRLF the next time Git touches it 报错
  • 作业 2.0
  • 嘉善老房翻新质量哪家
  • 环瑞测试:老化试验技术如何解决电子产品早期失效难题
  • 滤波电路与谐振电路
  • FT-Mamba:一种高效的表回归的新深度学习模型
  • JxBrowser 8.18.1 版本发布啦!
  • ADS2011实战:基于Smith圆图的功率放大器宽带匹配设计技巧
  • Cookie 真的不能解决去中心化鉴权问题吗?——深度解析 Cookie + JWT 无状态分布式方案
  • 探针台主流品牌GBITEST(易捷测试)在全自动和射频测试领域的领先优势解析
  • EcomGPT-7B多语言部署教程:越南语Unicode处理+特殊符号过滤最佳实践
  • 【CSDN程序员副业图谱】干了3年私活后我醒了:技术人搞副业,为什么一定要戒掉“外包思维”?
  • Skill技术爆火背后:祛魅与理性看待其能力边界
  • 第四篇:认知刷新——GEO不是SEO的替代品,而是进化体
  • WordPress导航菜单进阶指南:从基础创建到个性化定制全解析
  • 告别“以刊评文”,中国顶刊《Vita》启航:一份不收费的CNS挑战者正式来了
  • SpringBoot + MyBatis实战:5分钟搞定用户查询接口(附完整代码)
  • Qwen Pixel Art实战案例:用‘8-bit robot wearing sunglasses’生成可商用素材
  • Qwen2.5-7B-Instruct完整指南:从部署到应用,一站式解决方案
  • 为什么你读论文这么慢?可能不是英语问题
  • CCS工程报错找不到库?别慌,手把手教你用XGCONF和工程属性搞定RTSC/裸机配置
  • 生命科学+AI深度融合:未来六年复合年增长率锁定20.3%,产业增长动能强劲
  • Revit插件越装越卡?选品茗HiBIM搞定建模、深化、机电出图......
  • OpenClaw 安装配置教程 - 峰哥版
  • 系统复位与防护总结
  • 保姆级教程:手把手教你为ROS机器人定制Rviz多目标点导航插件(基于move_base)
  • Mac 预览应用隐藏技巧:快速编辑图片的完整指南