当前位置: 首页 > news >正文

Python gRPC

📘 基于CSDN博客的Python gRPC详细教程

本教程整合了CSDN博客《gRPC Python 详细入门教程(一)》的核心内容,并针对你关注的流式传输(特别是客户端流式RPC)进行深入解析,辅以固件上传的实战代码示例。

一、环境准备与快速入门

在开始编写代码前,需要安装必要的Python包。

1.1 安装gRPC核心库

pip install grpcio

1.2 安装gRPC工具(包含protoc编译器及插件)

pip install grpcio-tools

1.3 快速体验:运行示例程序

  1. 获取示例代码
    git clone -b v1.74.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc
    cd grpc/examples/python/helloworld
    
  2. 运行服务器
    python greeter_server.py
    
  3. 运行客户端(新终端):
    python greeter_client.py
    
    如果看到"Greeter client received: Hello, you!",则环境搭建成功。

二、核心概念:Protocol Buffers与服务定义

gRPC的核心是使用Protocol Buffers(protobuf)作为接口定义语言(IDL)。

2.1 .proto文件结构

一个典型的.proto文件包含:

  • 消息类型(Message):定义数据结构。
  • 服务(Service):定义RPC方法接口。

示例helloworld.proto):

syntax = "proto3";// 定义服务
service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {}
}// 定义请求消息
message HelloRequest {string name = 1;
}// 定义响应消息
message HelloReply {string message = 1;
}

2.2 四种服务方法类型

gRPC支持四种RPC类型,这在处理不同业务场景时非常关键:

类型 语法 描述 应用场景
一元 RPC (Unary) rpc Method(Request) returns (Response) 客户端发送一个请求,服务器返回一个响应。 简单的查询、状态获取(如GetSWInfos)。
服务器流式 RPC rpc Method(Request) returns (stream Response) 客户端发送一个请求,服务器返回一个响应流。 服务器推送大量数据,如地图路线点列表。
客户端流式 RPC rpc Method(stream Request) returns (Response) 客户端发送一个请求流,服务器返回一个响应。 大文件上传(如固件升级)、数据采集。
双向流式 RPC rpc Method(stream Request) returns (stream Response) 双方使用流同时发送和接收消息。 实时聊天、游戏状态同步。

你的固件升级功能正属于客户端流式RPC


三、从.proto生成Python代码

使用grpcio-tools提供的protoc编译器生成Python代码。

3.1 基本生成命令

python -m grpc_tools.protoc \-I../../protos \                # 指定.proto文件搜索路径--python_out=. \                 # 生成消息类的输出目录--grpc_python_out=. \            # 生成gRPC服务类的输出目录../../protos/route_guide.proto   # 要编译的.proto文件

执行后会生成两个核心文件:

  • route_guide_pb2.py:包含protobuf消息类。
  • route_guide_pb2_grpc.py:包含gRPC服务相关的Stub(客户端存根)Servicer(服务端抽象基类)

3.2 代码生成与导入逻辑

生成的*_pb2_grpc.py文件会自动导入*_pb2文件中的消息类,因此在客户端/服务端代码中,你只需导入*_pb2_grpc即可。


四、实现Python gRPC服务器

以CSDN博客中的RouteGuide服务为例,展示如何实现各类RPC方法。

4.1 创建Servicer子类

你需要继承生成的RouteGuideServicer基类,并实现所有RPC方法。

# route_guide_server.py (简化版)
import route_guide_pb2
import route_guide_pb2_grpcclass RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):# 1. 实现一元RPC: GetFeaturedef GetFeature(self, request, context):# request 是 Point 对象feature = get_feature_from_db(request.latitude, request.longitude)if feature is None:return route_guide_pb2.Feature(name="", location=request)return feature# 2. 实现客户端流式RPC: RecordRoutedef RecordRoute(self, request_iterator, context):# request_iterator 是一个迭代器,用于逐个获取客户端发来的Pointpoint_count = 0for point in request_iterator:point_count += 1# 处理每个point,例如记录距离、时间等pass# 返回一个RouteSummary响应return route_guide_pb2.RouteSummary(point_count=point_count)# 3. 实现双向流式RPC: RouteChatdef RouteChat(self, request_iterator, context):# 对于客户端发来的每个RouteNote,回复一个或多个RouteNotefor note in request_iterator:# 处理收到的note,并可能发送回复yield route_guide_pb2.RouteNote(message=f"Echo: {note.message}", location=note.location)

4.2 启动gRPC服务器

def serve():server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))route_guide_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuideServicer(), server)server.add_insecure_port('[::]:50051')server.start()server.wait_for_termination()

五、实现Python gRPC客户端

客户端通过Stub调用远程方法。

5.1 创建客户端Stub

import grpc
import route_guide_pb2
import route_guide_pb2_grpcchannel = grpc.insecure_channel('localhost:50051')
stub = route_guide_pb2_grpc.RouteGuideStub(channel)

5.2 调用不同类型RPC

一元RPC调用

point = route_guide_pb2.Point(latitude=409146138, longitude=-746188906)
feature = stub.GetFeature(point)
print(feature.name)

客户端流式RPC调用(核心:生成器)
这是你固件升级功能的核心模式。通过一个生成器函数来产生请求流。

def generate_points():# 模拟从文件或数据源逐个产生Pointpoints = [route_guide_pb2.Point(latitude=407838351, longitude=-746143763),route_guide_pb2.Point(latitude=408122808, longitude=-743999179),]for point in points:print(f"Sending point: {point}")yield point# 生成器结束,自动通知服务器流结束# 调用客户端流式RPC,将生成器作为参数传入
summary = stub.RecordRoute(generate_points())
print(f"Route summary: {summary}")

双向流式RPC调用
同时使用生成器发送,并通过迭代响应流接收。

def generate_notes():notes = [route_guide_pb2.RouteNote(message="First", location=point1),route_guide_pb2.RouteNote(message="Second", location=point2),]for note in notes:yield note# 调用双向流式RPC,返回的也是一个迭代器
responses = stub.RouteChat(generate_notes())
for response in responses:print(f"Received echo: {response.message}")

六、实战解析:固件升级的客户端流式RPC

结合你的具体需求,我们将固件上传流程映射到Python gRPC的客户端流式RPC模式。

6.1 假设的.proto定义

service FirmwareUpdate {// 客户端流式RPC:上传固件rpc UploadFirmware(stream FirmwareChunk) returns (UploadStatus);
}message FirmwareChunk {bytes data = 1;          // 固件数据块string product_id = 2;   // 可选:第一个块携带产品ID
}message UploadStatus {int32 code = 1;string message = 2;
}

6.2 Python客户端实现(带超时和进度)

import grpc
import firmware_pb2
import firmware_pb2_grpc
import osdef upload_firmware(file_path, product_id, server_addr='192.168.2.61:50050'):# 1. 创建channel和stubchannel = grpc.insecure_channel(server_addr)stub = firmware_pb2_grpc.FirmwareUpdateStub(channel)# 2. 定义生成器函数:分块读取文件并yielddef chunk_generator():file_size = os.path.getsize(file_path)sent = 0# 第一个块可以携带元信息,例如产品IDfirst_chunk = firmware_pb2.FirmwareChunk(product_id=product_id, data=b'')yield first_chunkwith open(file_path, 'rb') as f:while True:chunk_data = f.read(1024 * 1024)  # 1MB per chunkif not chunk_data:breaksent += len(chunk_data)progress = (sent / file_size) * 100print(f"Progress: {progress:.1f}%")yield firmware_pb2.FirmwareChunk(data=chunk_data)# 3. 调用流式RPC,设置超时try:response = stub.UploadFirmware(chunk_generator(),timeout=600  # 10分钟超时,适用于大文件)if response.code == 0:print("Upload successful:", response.message)else:print("Upload failed:", response.message)except grpc.RpcError as e:print(f"gRPC error: {e.code()} - {e.details()}")

关键点阐述:

  1. 生成器即请求流chunk_generator函数每次yield一个FirmwareChunk消息,gRPC运行时负责将其打包成HTTP/2 DATA帧发送。
  2. 流结束信号:当生成器函数执行完毕(文件读完),gRPC自动发送半关闭信号,通知服务器客户端已无更多数据。
  3. 超时控制timeout参数为整个RPC调用设置了截止时间,包含所有数据块的发送时间和服务器处理时间。
  4. 错误处理:通过捕获grpc.RpcError可以处理网络问题、超时(DEADLINE_EXCEEDED)或服务器返回的错误。

七、总结与对比(Python vs C++)

特性 Python gRPC C++ gRPC (基于你的博客)
请求流实现 生成器 (Generator)。通过yield产生消息,隐式控制流。 ClientWriter对象。显式调用Write()WritesDone()Finish()
代码风格 简洁、声明式,符合Python惯用法。 控制精细、显式,符合C++ RAII理念。
错误处理 主要使用异常grpc.RpcError)。 检查每个Write()的返回值,通过Finish()返回的Status判断。
超时设置 在Stub方法调用时直接传入timeout参数。 通过ClientContextset_deadline()方法设置。
资源管理 生成器内的with语句自动管理文件;gRPC资源通过引用计数管理。 智能指针管理ClientWriter,RAII管理资源。

核心结论:尽管实现风格迥异,但两者底层都基于相同的gRPC C Core,因此网络传输、流控、消息序列化等机制完全一致。Python的生成器模型让流式编程更加直观,非常适合快速开发和测试工具,与你固件升级客户端的定位完美契合。

希望这份详尽的阐述能帮助你深入理解Python gRPC,特别是客户端流式RPC的实现原理。如果后续在编码过程中遇到具体问题,欢迎随时继续探讨。

http://www.jsqmd.com/news/417421/

相关文章:

  • 政策引领・数字赋能 临研通重塑临床研究数字化新范式 - 速递信息
  • AI-Native企业系统架构实践(总篇)
  • PolarDB支撑悠悠有品多地域业务,实现库存毫秒级智能检索
  • 2026厂房环保工程改造扩建公司推荐,靠谱服务商怎么选 - 品牌2025
  • Phpstudy如何安装软件不带的PHP版本?
  • 赋能商用车 ADAS 研发:多工况、多车型适配的多传感器数据采集方案
  • 2月最新!市面上口碑好的零售平台口碑排行信息,数字经济电商/全渠道零售/数字化商品交易/消费升级零售,零售系统找哪家 - 品牌推荐师
  • Solutions P2678 [NOIP 2015 提高组] 跳石头
  • 聊聊湖南有机肥生产线全套设备价格多少、怎么选购更合适? - 工业设备
  • pg_dump/pg_dumpall/pgbackrest
  • RWS 前置驱动,临研通解锁慢病真实价值与研发定位 - 速递信息
  • 2026 厂房机电安装工程专业公司推荐 设计施工一体化承包商参考_ - 品牌2025
  • 2026年2月上海搬家服务公司最新推荐,本地靠谱搬家公司精选 - 品牌鉴赏师
  • 说说高频电火花,汉霸数控靠谱吗,在上海地区口碑咋样 - mypinpai
  • 模拟器市场2026新看点:优质厂家亮点解析,知名的模拟器厂家口碑排行优质品牌选购指南 - 品牌推荐师
  • 2026年 铝合金桥架型材厂家推荐排行榜,铝合金大跨距桥架、不锈钢线槽、热镀锌桥架及各类工业型材源头实力精选 - 品牌企业推荐师(官方)
  • 2026厂房恒温恒湿工程哪家好 靠谱承包商与设计施工一体化企业推荐 - 品牌2025
  • 2026驻马店时尚全屋定制品牌推荐,哪家比较靠谱 - 工业品网
  • Linux Shell 到底是什么?从 0 讲清命令解释器本质
  • 细聊青岛新顺办公家具厂家,究竟靠不靠谱,口碑怎么样? - 工业设备
  • 2026年推荐MRO工业品一站式采购专业公司,选购要点有哪些 - 工业推荐榜
  • 解读泰艺包装特色,金华地区首饰包装如何选择 - 工业品网
  • 聊聊免清洗油烟机合作案例多的加工厂,哪家性价比更高 - 工业品牌热点
  • 昆仑保险深耕齐鲁基层 济南新泺北健康之家开启便民健康保障新征程 - 速递信息
  • 使用插件pg_stat_monitor监控PG数据库性能
  • 导师严选 8个AI论文平台:专科生毕业论文+开题报告全攻略
  • 聊聊上海靠谱的万国府优质楼盘房产服务商有哪些 - myqiye
  • 论文写不动?AI论文工具千笔AI VS 学术猹,专科生专属神器!
  • 论文省心了!9个降AIGC软件测评:本科生降AI率必备指南
  • AI大模型就业风口已来!掌握这些技能,月入过万不是梦!AI大模型的就业岗位及薪资(附学习指南)