当前位置: 首页 > news >正文

openclaw平替之nanobot源码解析(七):Gateway与多渠道集成素

背景

StreamJsonRpc 是微软官方维护的用于 .NET 和 TypeScript 的 JSON-RPC 通信库,以其强大的类型安全、自动代理生成和成熟的异常处理机制著称。在 HagiCode 项目中,为了通过 ACP (Agent Communication Protocol) 与外部 AI 工具(如 iflow CLI、OpenCode CLI)进行通信,并消除早期自定义 JSON-RPC 实现带来的维护成本和潜在 Bug,项目决定集成 StreamJsonRpc。然而,在集成过程中遇到了流式 JSON-RPC 特有的挑战,特别是在处理代理目标绑定和泛型参数识别时。

为了解决这些痛点,我们做了一个大胆的决定:整个构建系统推倒重来。这个决定带来的变化,可能比你想象的还要大——稍后我会具体说。

关于 HagiCode

先介绍一下本文的"主角项目"

如果你在开发中遇到过这些烦恼:

多项目、多技术栈,构建脚本维护成本高

CI/CD 流水线配置繁琐,每次改都要查文档

跨平台兼容性问题层出不穷

想让 AI 帮忙写代码,但现有工具不够智能

那么我们正在做的 HagiCode 可能你会感兴趣。

HagiCode 是什么?

一款 AI 驱动的代码智能助手

支持多语言、跨平台的代码生成与优化

内置游戏化机制,让编码不再枯燥

为什么在这里提它?

本文分享的 StreamJsonRpc 集成方案,正是我们在开发 HagiCode 过程中实践总结出来的。如果你觉得这套工程化方案有价值,说明我们的技术品味还不错——那么 HagiCode 本身也值得关注一下。想了

当前项目处于 ACP 协议集成的关键阶段,面临着以下几个技术痛点和架构挑战:

1. 自定义实现的局限

原有的 JSON-RPC 实现位于?src/HagiCode.ClaudeHelper/AcpImp/,包含?JsonRpcEndpoint?和?ClientSideConnection?等组件。维护这套自定义代码成本高,且缺乏成熟库的高级功能(如进度报告、取消支持)。

2. StreamJsonRpc 集成障碍

在尝试将现有的?CallbackProxyTarget?模式迁移到 StreamJsonRpc 时,发现?_rpc.AddLocalRpcTarget(target)?方法无法识别通过代理模式创建的目标。具体表现为,StreamJsonRpc 无法自动将泛型类型?T?的属性拆分为 RPC 方法参数,导致服务器端无法正确处理客户端发起的方法调用。

3. 架构分层混乱

现有的?ClientSideConnection?混合了传输层(WebSocket/Stdio)、协议层(JSON-RPC)和业务层(ACP Agent 接口),导致职责不清,且存在?AcpAgentCallbackRpcAdapter?方法绑定缺失的问题。

4. 日志缺失

WebSocket 传输层缺少对原始 JSON 内容的日志输出,导致在调试 RPC 通信问题时难以定位是序列化问题还是网络问题。

解决

针对上述问题,我们采用了以下系统化的解决方案,从架构重构、库集成和调试增强三个维度进行优化:

1. 全面迁移至 StreamJsonRpc

移除旧代码

删除?JsonRpcEndpoint.cs、AgentSideConnection.cs?及相关的自定义序列化转换器(JsonRpcMessageJsonConverter?等)。

集成官方库

引入?StreamJsonRpc?NuGet 包,利用其?JsonRpc?类处理核心通信逻辑。

抽象传输层

定义?IAcpTransport?接口,统一处理?WebSocket?和?Stdio?两种传输模式,确保协议层与传输层解耦。

// IAcpTransport 接口定义

public interface IAcpTransport

{

Task SendAsync(string message, CancellationToken cancellationToken = default);

Task ReceiveAsync(CancellationToken cancellationToken = default);

Task CloseAsync(CancellationToken cancellationToken = default);

}

// WebSocket 传输实现

public class WebSocketTransport : IAcpTransport

{

private readonly WebSocket _webSocket;

public WebSocketTransport(WebSocket webSocket)

{

_webSocket = webSocket;

}

// 实现发送和接收方法

// ...

}

// Stdio 传输实现

public class StdioTransport : IAcpTransport

{

private readonly StreamReader _reader;

private readonly StreamWriter _writer;

public StdioTransport(StreamReader reader, StreamWriter writer)

{

_reader = reader;

_writer = writer;

}

// 实现发送和接收方法

// ...

}

2. 修复代理目标识别问题

分析?CallbackProxyTarget

检查现有的动态代理生成逻辑,确定 StreamJsonRpc 无法识别的根本原因(通常是因为代理对象没有公开实际的方法签名,或者使用了 StreamJsonRpc 不支持的参数类型)。

重构参数传递

将泛型属性拆分为明确的 RPC 方法参数。不再依赖动态属性,而是定义具体的 Request/Response DTO(数据传输对象),确保 StreamJsonRpc 能通过反射正确识别方法签名。

// 原有的泛型属性方式

public class CallbackProxyTarget

{

public Func Callback { get; set; }

}

// 重构后的具体方法方式

public class ReadTextFileRequest

{

public string FilePath { get; set; }

}

public class ReadTextFileResponse

{

public string Content { get; set; }

}

public interface IAcpAgentCallback

{

Task ReadTextFileAsync(ReadTextFileRequest request);

// 其他方法...

}

使用?Attach?替代?AddLocalRpcTarget

在某些复杂场景下,手动代理?JsonRpc?对象并处理?RpcConnection?可能比直接添加目标更灵活。

3. 实现方法绑定与日志增强

实现?AcpAgentCallbackRpcAdapter

确保该组件显式实现 StreamJsonRpc 的代理接口,将 ACP 协议定义的方法(如?ReadTextFileAsync)映射到 StreamJsonRpc 的回调处理器上。

集成日志记录

在 WebSocket 或 Stdio 的消息处理管道中,拦截并记录 JSON-RPC 请求和响应的原始文本。利用?ILogger?在解析前和序列化后输出原始 payload,以便排查格式错误。

// 日志增强的传输包装器

public class LoggingAcpTransport : IAcpTransport

{

private readonly IAcpTransport _innerTransport;

private readonly ILogger _logger;

public LoggingAcpTransport(IAcpTransport innerTransport, ILogger logger)

{

_innerTransport = innerTransport;

_logger = logger;

}

public async Task SendAsync(string message, CancellationToken cancellationToken = default)

{

_logger.LogTrace("Sending message: {Message}", message);

await _innerTransport.SendAsync(message, cancellationToken);

}

public async Task ReceiveAsync(CancellationToken cancellationToken = default)

{

var message = await _innerTransport.ReceiveAsync(cancellationToken);

_logger.LogTrace("Received message: {Message}", message);

return message;

}

public async Task CloseAsync(CancellationToken cancellationToken = default)

{

_logger.LogDebug("Closing connection");

await _innerTransport.CloseAsync(cancellationToken);

}

}

4. 架构分层重构

传输层 (AcpRpcClient)

封装 StreamJsonRpc 连接,负责?InvokeAsync?和连接生命周期管理。

public class AcpRpcClient : IDisposable

{

private readonly JsonRpc _rpc;

private readonly IAcpTransport _transport;

public AcpRpcClient(IAcpTransport transport)

{

_transport = transport;

_rpc = new JsonRpc(new StreamRpcTransport(transport));

_rpc.StartListening();

}

public async Task InvokeAsync(string methodName, object parameters)

{

return await _rpc.InvokeAsync(methodName, parameters);

}

public void Dispose()

{

_rpc.Dispose();

_transport.Dispose();

}

// StreamRpcTransport 是对 IAcpTransport 的 StreamJsonRpc 适配器

private class StreamRpcTransport : IDuplexPipe

{

// 实现 IDuplexPipe 接口

// ...

}

}

协议层 (IAcpAgentClient?/?IAcpAgentCallback)

定义清晰的 client-to-agent 和 agent-to-client 接口,移除?Func?这种循环依赖的工厂模式,改用依赖注入或直接注册回调。

实践

基于 StreamJsonRpc 的最佳实践和项目经验,以下是实施过程中的关键建议:

1. 强类型 DTO 优于动态对象

StreamJsonRpc 的核心优势在于强类型。不要使用?dynamic?或?JObject?传递参数。应为每个 RPC 方法定义明确的 C# POCO 类作为参数。这不仅解决了代理目标识别问题,还能在编译时发现类型错误。

示例:将?CallbackProxyTarget?中的泛型属性替换为?ReadTextFileRequest?和?WriteTextFileRequest?等具体类。

2. 显式声明 Method Name

使用?[JsonRpcMethod]?特性显式指定 RPC 方法名称,不要依赖默认的方法名映射。这可以防止因命名风格差异(如 PascalCase vs camelCase)导致的调用失败。

public interface IAcpAgentCallback

{

[JsonRpcMethod("readTextFile")]

Task ReadTextFileAsync(ReadTextFileRequest request);

[JsonRpcMethod("writeTextFile")]

Task WriteTextFileAsync(WriteTextFileRequest request);

}

3. 利用连接状态回调

StreamJsonRpc 提供了?JsonRpc.ConnectionLost?事件。务必监听此事件以处理进程意外退出或网络断开的情况,这比单纯依赖 Orleans 的 Grain 失效检测更及时。

_rpc.ConnectionLost += (sender, e) =>

{

_logger.LogError("RPC connection lost: {Reason}", e.ToString());

// 处理重连逻辑或通知用户

};

4. 日志分层记录

Trace 级别:记录完整的 JSON Request/Response 原文。

Debug 级别:记录方法调用栈和参数摘要。

注意:确保日志中不包含敏感的 Authorization Token 或大文件内容的 Base64 编码。

5. 处理流式传输的特殊性

StreamJsonRpc 原生支持?IAsyncEnumerable。在实现 ACP 的流式 Prompt 响应时,应直接使用?IAsyncEnumerable?而不是自定义的分页逻辑。这能极大简化流式处理的代码量。

public interface IAcpAgentCallback

{

[JsonRpcMethod("streamText")]

IAsyncEnumerable StreamTextAsync(StreamTextRequest request);

}

6. 适配器模式 (Adapter Pattern)

保持?ACPSession?和?ClientSideConnection?的分离。ACPSession?应专注于 Orleans 的状态管理和业务逻辑(如消息入队),通过组合而非继承的方式使用 StreamJsonRpc 连接对象。

总结

通过全面集成 StreamJsonRpc,HagiCode 项目成功解决了原自定义实现的维护成本高、功能局限性和架构分层混乱等问题。关键改进包括:

采用强类型 DTO 替代动态属性,提高了代码的可维护性和可靠性

实现了传输层抽象和协议层分离,提升了架构的清晰性

增强了日志记录功能,便于排查通信问题粕拾夭谅

http://www.jsqmd.com/news/630531/

相关文章:

  • SP3485自动收发电路设计实战:如何用1个IO口搞定RS485通信(附电路图详解)
  • Xilinx 7系列FPGA配置接口实战:手把手教你搞定CCLK与Bank电压选择
  • 电转气与碳捕集的综合能源系统优化调度模型研究及MATLAB代码实现
  • 从一次调试失败讲起:Aurora链路不通,问题可能出在Shared Logic的时钟没连对
  • Chrome跑不动Cesium大场景?这5个浏览器层优化技巧让你的帧率翻倍
  • 保姆级教程:用R语言limma包搞定GSE65682数据集的差异表达分析
  • SiameseAOE模型助力互联网产品迭代:从用户反馈中挖掘需求
  • 星露谷物语模组加载器SMAPI终极指南:从安装到高级配置
  • 2026免费降AI率工具大PK:价格、效果、售后全面对比横评
  • ComfyUI-Inpaint-CropAndStitch:智能局部修复与拼接技术完全指南
  • 如何永久保存微信聊天记录:完整免费的数据守护指南
  • 3大核心功能重塑macOS滚动体验:Scroll Reverser智能滚动控制革新
  • 技术成长周记05|项目收官,调整节奏,沉淀后再出发
  • 浪潮NF5280M5装ESXi 6.7踩坑记:手把手教你给镜像注入PM8060 RAID驱动
  • Unity射线检测实战:从基础原理到LineRenderer动态可视化
  • 基于AI与多源数据融合的工业能耗智能优化平台:从实时监测到碳排管理
  • Toffoli 门:开启可逆计算新时代
  • 代谢组学数据分析终极解决方案:MetaboAnalystR 4.0全面指南
  • 利用 wget 高效镜像网站的实用指南
  • 明日方舟自动化助手终极指南:5分钟解放双手,智能刷本基建全搞定
  • 【实践】YOLOv8赋能视障出行:从模型训练到PyQt5界面集成的盲道守护系统
  • 如何通过手机号快速找回QQ账号?phone2qq工具终极指南
  • 利用代理服务器突破地域限制:OpenAI与Claude API的国内访问指南
  • Ansoft Maxwell 永磁同步直线电机仿真项目分析
  • L1-079 天梯赛的善良(Python)
  • YOLOv8模型‘长高’了?深入聊聊P2层如何让模型‘看’得更清楚,以及背后的计算代价
  • 清华大学PPT模板:3步打造专业学术演示文稿
  • LoRaWAN协议-MAC帧加密与校验机制解析
  • AI Agent的入门开发指南
  • 2026本地外卖系统技术解析:本地配送系统/校园外卖小程序/校园外卖系统/校园跑腿系统/校园配送系统/第三方配送系统/选择指南 - 优质品牌商家