当前位置: 首页 > news >正文

从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析

从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析

场景:在做 VLA 机器人项目时需要一套高效的传感器数据传输方案——机器人端发送传感器数据,推理端接收后模型推理,再将结果以 chunk 流式返回。本文以此为背景,把 TCP 通信 + msgpack 序列化涉及的每个知识点都讲清楚。


一、为什么用 msgpack 而不是 JSON

TCP 传输的是字节流,任何数据发送前都需要序列化成字节。

JSONmsgpack
格式文本,可读二进制,不可读
体积较大比 JSON 小 20–50%
速度较慢序列化/反序列化更快
适合场景对外 API、配置文件传感器数据流、服务间通信

机器人传感器数据高频(100Hz+)、数据量大,msgpack 是更合适的选择。


二、msgpack 序列化原理:逐字节拆解

机器人端发送一帧传感器数据:

sensor_request={'type':'sensor','joint':[0.1,-0.2,0.3],# 关节角度,单位 rad'ts':1700000000# 时间戳,Unix 秒}packed=msgpack.packb(sensor_request,use_bin_type=True)# 共 43 字节

用 hex 查看原始字节:

83 a4 74797065 a6 73656e736f72 a5 6a6f696e74 93 ca 3dcccccd ca be4ccccd ca 3e99999a a2 7473 ce 6553f100

逐字节对照表

偏移字节(hex)含义
[00]83fixmap,3 个键值对(0x80 + 3
[01]a4fixstr,长度 4(0xa0 + 4
[02-05]74 79 70 65"type"ASCII
[06]a6fixstr,长度 6(0xa0 + 6
[07-12]73 65 6e 73 6f 72"sensor"ASCII
[13]a5fixstr,长度 5(0xa0 + 5
[14-18]6a 6f 69 6e 74"joint"ASCII
[19]93fixarray,3 个元素(0x90 + 3
[20]cafloat32 类型标记
[21-24]3d cc cc cd0.1的 IEEE 754 float32
[25]cafloat32 类型标记
[26-29]be 4c cc cd-0.2的 IEEE 754 float32
[30]cafloat32 类型标记
[31-34]3e 99 99 9a0.3的 IEEE 754 float32
[35]a2fixstr,长度 2(0xa0 + 2
[36-37]74 73"ts"ASCII
[38]ceuint32 类型标记
[39-42]65 53 f1 001700000000大端 uint32

字节数验证:

1 ← fixmap 头 + (1+4) + (1+6) ← 'type': 'sensor' + (1+5) ← 'joint' 键 + 1 + 3×(1+4) ← fixarray 头 + 3个float32(每个1字节标记+4字节数据) + (1+2) ← 'ts' 键 + (1+4) ← uint32 值 = 43 字节 ✓

msgpack 类型编码规律

前缀规则范围
0x80 + nfixmap(字典)n ≤ 15
0x90 + nfixarray(列表)n ≤ 15
0xa0 + nfixstr(字符串)n ≤ 31 字节
0x00~0x7f正整数直接存,单字节0–127
0xce+ 4字节uint320 – 4,294,967,295
0xca+ 4字节float32(IEEE 754)

Python 打印bytes时,能表示为 ASCII 的字节会直接显示成字母,所以看到的是\x83\xa4type\xa6sensor...而不是全十六进制。


三、TCP 粘包问题与长度前缀协议

什么是粘包

TCP 是流式协议,没有消息边界。sendall一次发出 47 字节(4 字节头 + 43 字节体),接收方可能:

第一次 recv → 20 字节 第二次 recv → 27 字节

如果推理端连续推理多帧并返回,接收方甚至可能一次收到多条消息粘在一起。

解决方案:4 字节长度前缀

协议约定:每条消息前加固定 4 字节,存储消息体的字节长度。

发送的 47 字节: ┌──────────────────┬────────────────────────────────────────────┐ │ 00 00 00 2b │ 83 a4 74 79 70 65 a6 73 65 6e ... │ │ (4字节,值=43) │ (43字节 msgpack 消息体) │ └──────────────────┴────────────────────────────────────────────┘

接收方先读 4 字节知道长度(43),再精确读 43 字节,完全消除粘包。


四、struct.pack / unpack:字节与整数互转

发送端:整数 → 字节

struct.pack('>I',43)# b'\x00\x00\x00\x2b'

格式字符串'>I'

字符含义
>大端序(Big-endian),高位字节在前,即网络字节序
Iunsigned int,4 字节无符号整数
43 = 0x0000002b 大端序: 00 00 00 2b ← 高位在前(标准网络传输顺序) 小端序: 2b 00 00 00 ← x86 CPU 本地字节序

接收端:字节 → 整数

msg_length=struct.unpack('>I',raw_length)[0]# b'\x00\x00\x00\x2b' → (43,) → 43

struct.unpack固定返回元组(支持一次解多个值),[0]取第一个元素:

struct.unpack('>I',...)# → (43,) 一个值也是元组struct.unpack('>II',...)# → (43, 7) 解两个值

五、recv_all:确保读满指定字节数

defrecv_all(conn,length):data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNone# 对端关闭连接,recv 返回 b''data+=packetreturndata

conn.recv(n)语义是"最多读 n 字节",不保证一次读满。循环示例(目标读 43 字节,TCP 分两次到达):

初始:data = b'' 第 1 次循环:len(data)=0 < 43,recv(43) → 实际到了 20 字节,data = 20字节 第 2 次循环:len(data)=20 < 43,recv(23) → 实际到了 23 字节,data = 43字节 第 3 次循环:len(data)=43 == 43,退出,return data

if not packet处理对端正常关闭的情况,此时recv返回b'',不判断会死循环。


六、Socket 对象解读

conn,addr=server_sock.accept()# <socket.socket fd=4, family=2, type=1, proto=0,# laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 49724)>
字段含义
fd=44文件描述符,Linux 中 socket 也是文件
family=2AF_INETIPv4
type=1SOCK_STREAMTCP
laddr('127.0.0.1', 9999)推理服务端地址和监听端口
raddr('127.0.0.1', 49724)机器人端地址和临时端口

server_sock 与 conn 的区别

server_sock.listen(5)# 只负责监听,等待机器人连接conn,addr=server_sock.accept()# 每来一个连接,新建 conn 专门通信
  • server_sock:守着 9999 端口,不做数据收发
  • conn:和某个具体机器人节点通信的 socket

关于客户端临时端口

机器人端connect()时,OS 随机分配一个空闲端口(Ephemeral Port,通常 49152–65535),用完即释放。TCP 连接由四元组唯一标识:

机器人端 IP : 临时端口 → 推理端 IP : 监听端口 127.0.0.1 : 49724 → 127.0.0.1 : 9999

七、完整通信流程

机器人端(client) 推理端(server) │ │ │ connect(推理端 IP:9999) │ │──────────────────────────────────────────>│ │ │ accept() → conn │ │ │ sendall(4字节长度头 + 43字节传感器数据) │ │──────────────────────────────────────────>│ │ │ recv_all(conn, 4) 读长度 → 43 │ │ recv_all(conn, 43) 读消息体 │ │ msgpack.unpackb() 还原字典 │ │ VLA 模型推理 │ │ msgpack.packb() 序列化 action │ │ sendall(4字节头 + 41字节响应) │ │ │ recv_all(4) 读长度 → 41 │ │ recv_all(41) 读响应体 │ │ msgpack.unpackb() 还原 action 字典 │ │ │ │ close() │ close(conn)

八、完整代码

server.py(推理端)

""" 推理端 TCP 服务 - 接收机器人传感器数据(msgpack 序列化) - 模拟 VLA 模型推理,返回控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(conn,length):"""循环读取,确保收满 length 字节,解决 TCP 流式拆包问题"""data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefinfer(sensor_data:dict)->dict:"""模拟 VLA 推理:输入传感器数据,返回控制 action"""joint=sensor_data.get('joint',[])# 实际场景替换为模型前向推理action=[round(j*0.5,4)forjinjoint]return{'status':'ok','action':action,'chunk':1,}defhandle_client(conn,addr):print(f"[推理端] 机器人已连接:{addr}")try:# 1. 读 4 字节长度前缀raw_length=recv_all(conn,4)ifnotraw_length:return# b'\x00\x00\x00\x2b' → 43msg_length=struct.unpack('>I',raw_length)[0]print(f"[推理端] 消息体长度:{msg_length}字节")# 2. 按长度读消息体raw_data=recv_all(conn,msg_length)ifnotraw_data:return# 3. msgpack 反序列化sensor_data=msgpack.unpackb(raw_data,raw=False)print(f"[推理端] 收到传感器数据:{sensor_data}")# 4. 推理response=infer(sensor_data)print(f"[推理端] 推理结果:{response}")# 5. 序列化响应 + 长度前缀,发送packed=msgpack.packb(response,use_bin_type=True)conn.sendall(struct.pack('>I',len(packed))+packed)exceptExceptionase:print(f"[推理端] 出错:{e}")finally:conn.close()print(f"[推理端] 关闭连接:{addr}\n")defmain():withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asserver_sock:server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server_sock.bind((HOST,PORT))server_sock.listen(5)print(f"[推理端] 监听{HOST}:{PORT}...")whileTrue:conn,addr=server_sock.accept()handle_client(conn,addr)# 并发版:# import threading# threading.Thread(target=handle_client, args=(conn, addr)).start()if__name__=='__main__':try:main()exceptKeyboardInterrupt:print("\n[推理端] 已退出")

client.py(机器人端)

""" 机器人端 TCP 客户端 - 采集传感器数据,msgpack 序列化后发送给推理端 - 接收推理端返回的控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(sock,length):data=b''whilelen(data)<length:packet=sock.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefsend_sensor(sensor_data:dict)->dict:"""发送一帧传感器数据,返回推理端的 action"""withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)assock:sock.connect((HOST,PORT))# msgpack 序列化# {'type': 'sensor', 'joint': [0.1, -0.2, 0.3], 'ts': 1700000000}# → 43 字节,hex: 83 a4 74797065 a6 73656e736f72 ...packed=msgpack.packb(sensor_data,use_bin_type=True)print(f"[机器人端] 序列化后{len(packed)}字节:{packed.hex()}")# 加 4 字节长度前缀后发送# 43 → struct.pack('>I', 43) = b'\x00\x00\x00\x2b'sock.sendall(struct.pack('>I',len(packed))+packed)print(f"[机器人端] 已发送:{sensor_data}")# 接收推理结果raw_length=recv_all(sock,4)msg_length=struct.unpack('>I',raw_length)[0]raw_data=recv_all(sock,msg_length)response=msgpack.unpackb(raw_data,raw=False)print(f"[机器人端] 收到 action:{response}\n")returnresponsedefmain():# 模拟多帧传感器数据frames=[{'type':'sensor','joint':[0.1,-0.2,0.3],'ts':1700000000},{'type':'sensor','joint':[0.2,-0.15,0.25],'ts':1700000001},{'type':'sensor','joint':[0.0,0.0,0.0],'ts':1700000002},]forframeinframes:try:send_sensor(frame)exceptConnectionRefusedError:print("[机器人端] 无法连接推理端,请先启动 server.py")breakif__name__=='__main__':main()

九、运行

pipinstallmsgpack# 终端 1:启动推理端python server.py# 终端 2:启动机器人端python client.py

十、扩展方向

本文 demo 是单帧一问一答,实际 VLA 场景可在此基础上扩展:

  • 长连接多帧:一次connect后循环发送多帧传感器数据,避免频繁建连开销,需在handle_client中加while True循环读包
  • 推理 chunk 流式返回:推理端每推理出一个 token/chunk 就发一帧响应,机器人端循环接收,同样用长度前缀帧封装每个 chunk
  • 并发多机器人handle_client改为threading.Threadasyncio,同时服务多个机器人节点
  • 消息类型扩展:在请求字典中加type字段区分关节角度、图像帧(base64)、力传感器等不同数据,服务端按type分发处理
http://www.jsqmd.com/news/880676/

相关文章:

  • OpenClaw强势推出V2026.5.20版本地部署最新教程来啦!3分钟一键安装中文版可视化操作指南
  • 2026年Q2西南老小区电梯加装服务商排行:加装一台电梯多少钱、四川电梯加装、四川电梯安装公司、家用电梯加装、成都电梯加装费用选择指南 - 优质品牌商家
  • ChatGPT翻译到底靠不靠谱?从神经机器翻译原理到提示词工程优化,一文讲透质量波动的底层逻辑,现在不看就晚了!
  • Arm DS/DS-5 JTAG解锁序列配置与调试指南
  • 别再乱改/etc/profile了!Kylin麒麟系统环境变量配置的3种正确姿势(附永久生效方法)
  • 统信UOS 20.1060专业版:三步搞定桌面、锁屏和开机GRUB壁纸(附高清图源推荐)
  • 财务报销预警智能体开发与部署指南
  • AI写代码,用户和开发者都慌?
  • 纯视觉无感空间定位 实现煤矿井下人员精准全域管控技术白皮书
  • I Pack You加密壳:实现页粒度的动态解密和惰性加密
  • 四川螺纹钢厂家现货批发|工程专用钢材一站式配送 - 四川盛世钢联营销中心
  • Ubuntu 20.04 ROS新手避坑:catkin_make报‘empy’错误的完整解决流程
  • 新电脑到手第一件事:关闭Windows 11/10的自动BitLocker加密(附详细路径图)
  • PyTorch代码(5)
  • Android原生代码调试:DS-5环境配置与实战技巧
  • 2026Q2艺术楼梯定制哪家专业:别墅楼梯定制、实木楼梯定制、实木艺术楼梯、弧形钢构楼梯定制、成都实木楼梯、成都楼梯选择指南 - 优质品牌商家
  • Linux 文件权限 rwx 与数字权限 755/644 彻底详解(新手必懂)
  • 现代计算架构优化:零开销循环、SIMD与张量加速
  • 2026年5月视频剪辑制作培训机构排行实测盘点:软件测试线下就业培训/AI软件测试培训/外贸电商设计培训/影视特效剪辑培训/选择指南 - 优质品牌商家
  • 【数据集】省级农村创业活跃度/农户创业活跃度(2005-2024年)
  • 洛谷p1419
  • Arm ETE嵌入式追踪技术:架构解析与调试优化
  • 2026年5月新发布河南IPO企业股权激励选择指南 - 2026年企业推荐榜
  • 基于ISO/IEC 27004的机器学习模型风险测量框架(RMF)实战解析
  • 2026年至今,黄金回收行业口碑与服务标杆企业深度解析:广州宝奢科技 - 2026年企业推荐榜
  • C语言三大经典排序算法详解:快速排序、冒泡排序与选择排序
  • python async/await异步编程设计常用插件
  • 别再死记硬背了!通过一个成绩分析项目,彻底搞懂Linux静态库和共享库的区别
  • 2026负压隔离器技术深度解析:惰性气体手套箱、放射性药品生产热室、放射性药物热室、核医药热室、生物隔离器、真空手套箱选择指南 - 优质品牌商家
  • 2026年现阶段,北京高端住宅两联供优选:合宜人居高端住宅隐蔽工程一体化服务专家 - 2026年企业推荐榜