brpc异步请求封装
使用brpc发送proto格式的消息时,针对不同类型的请求,每个模块可能各自使用一套接口。但是对于该客户端上的各类请求,其链接管理,故障重发,这些是一致的。为了统一这些请求的处理,因此做了如何封装。https://gitee.com/lang2020/brpc_async
一、核心设计定位和价值
基于 BRPC 框架封装的异步 RPC 客户端通用框架,核心目标是屏蔽底层 BRPC 异步调用细节,为上层业务提供统一、简洁、可复用的异步 / 同步 RPC 调用能力,同时内置完善的可靠性保障机制,实现业务逻辑与 RPC 通信逻辑解耦。
- 简化使用:业务方只需关注业务请求和回调逻辑,无需编写复杂的 BRPC 异步代码。
- 统一规范:全系统 RPC 调用遵循统一范式,降低协作和维护成本。
- 高性能高可用:异步架构保证高并发,内置重试、超时保证服务可靠性。
- 易扩展:模板化 + 多态设计,快速支持新业务 RPC 接口和新特性扩展。
二、使用示例
针对一个具体的proto的rpc协议,只需要加两行声明具体的模板类。
/* package example; service EchoService { rpc Echo(EchoRequest) returns (EchoResponse); }; */ struct EchoTypes { using Client = BRPCAsyncClientTemplate<example::EchoRequest, example::EchoResponse, example::EchoService_Stub, &example::EchoService_Stub::Echo>; using ORequest = ObjectRequest<example::EchoRequest, example::EchoResponse, Client>; using Operation = ObjectOperation<example::EchoRequest, example::EchoResponse>; }; using EchoObjectRequest = EchoTypes::ORequest; using EchoObjectOperation = EchoTypes::Operation;然后就可以直接定义自己的请求,实现具体的同步或者异步发送。每一个请求new一个ObjectRequest,然后发送出去,请求完成后会自动释放资源。
class TestOperation { public: explicit TestOperation() = default; void test_sync() { EchoObjectOperation op; op.request.set_message("test_value"); op.request.set_id(1); auto* sync_req = new EchoObjectRequest(op); sync_req->request(); ldout(nullptr, 0) << "test1: sync_req->request() end" << dendl; } void test_async_callback() { EchoObjectOperation op; op.request.set_message("test_value"); op.request.set_id(2); //支持ceph的Context回调,可以改成通用lambda Context* ctx = create_context_callback<TestOperation, &TestOperation::handle_echo_response>(this); auto* async_req = new EchoObjectRequest(op, ctx); async_req->async_request(); } void handle_echo_response(int result) { ldout(nullptr, 0) << "handle_echo_response: result=" << result << dendl; } void test_async_lambda() { EchoObjectOperation op; op.request.set_message("test_value"); op.request.set_id(3); Context* ctx = make_lambda_context([this, response = op.response](int result) { ldout(nullptr, 0) << " test3: response.use_count()=" << response.use_count() << dendl; handle_echo_response(result, response.get()); }); auto* async_req = new EchoObjectRequest(op, ctx); async_req->async_request(); } void handle_echo_response(int result, example::EchoResponse* response) { ldout(nullptr, 0) << "handle_echo_response: " << result << ", message: " << response->message() << dendl; } ~TestOperation() {} private: };三、核心设计思想
1. 模板化泛型设计:通用化 + 高复用
- 采用模板元编程抽象 RPC 客户端核心逻辑,通过模板参数绑定请求 / 响应体、RPC 方法,一套模板支撑所有业务 RPC 接口(如 Echo、Kv)。
- 无需为每个业务接口重复编写 RPC 调用代码,极大提升框架复用性和扩展性,新增 RPC 服务只需实例化模板即可。
2. 异步为主、同步兼容:灵活适配业务场景
- 核心能力:原生支持异步非阻塞 RPC 调用,通过回调机制实现请求发送后立即返回,不阻塞主线程,适配高并发、高性能场景。
- 异步仿照ceph rbd层、rados的aiocompletion实现。后续可扩展回调函数为lamba表达式,提高灵活性。
- 兼容能力:封装同步调用接口,基于异步能力实现等待完成的同步语义,满足简单业务的同步调用需求,兼顾灵活性与易用性。
3. 分层解耦:职责清晰、易于维护
代码严格分为三层架构,层与层之间低耦合、高内聚:
- 底层通信层:封装 BRPC Channel、Controller、定时器等底层组件,负责网络通信、请求发送 / 接收。
- 中间框架层:提供异步上下文管理、回调调度、重试机制、超时控制等通用能力,是框架核心。
- 上层业务层:业务请求封装(如 读写请求的封装),仅关注业务逻辑,无需关心 RPC 底层细节。
4. 可靠性内置
框架原生集成高可用机制,无需业务层关注:
- 自动重试:支持可配置的重试次数、退避延迟、抖动策略,应对网络抖动、临时故障。
- 超时控制:区分单次调用超时和总超时,精细化管控请求耗时。
- 故障容错:支持服务地址计算、通道管理、定时器取消,保障异常场景下的资源释放和系统稳定。
四、待实现
- server端考虑开启brpc自带的自适应限流。
- 由于server端可能开启qos限流导致client即时延迟发送也可能由于和非重试请求导致重试风暴,因此client端重试和非重试请求准备加上基于令牌桶的限流,且重试请求优先级高于普通请求。
