从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析
从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析
场景:在做 VLA 机器人项目时需要一套高效的传感器数据传输方案——机器人端发送传感器数据,推理端接收后模型推理,再将结果以 chunk 流式返回。本文以此为背景,把 TCP 通信 + msgpack 序列化涉及的每个知识点都讲清楚。
一、为什么用 msgpack 而不是 JSON
TCP 传输的是字节流,任何数据发送前都需要序列化成字节。
| JSON | msgpack | |
|---|---|---|
| 格式 | 文本,可读 | 二进制,不可读 |
| 体积 | 较大 | 比 JSON 小 20–50% |
| 速度 | 较慢 | 序列化/反序列化更快 |
| 适合场景 | 对外 API、配置文件 | 传感器数据流、服务间通信 |
机器人传感器数据高频(100Hz+)、数据量大,msgpack 是更合适的选择。
二、msgpack 序列化原理:逐字节拆解
机器人端发送一帧传感器数据:
sensor_request={'type':'sensor','joint':[0.1,-0.2,0.3],# 关节角度,单位 rad'ts':1700000000# 时间戳,Unix 秒}packed=msgpack.packb(sensor_request,use_bin_type=True)# 共 43 字节用 hex 查看原始字节:
83 a4 74797065 a6 73656e736f72 a5 6a6f696e74 93 ca 3dcccccd ca be4ccccd ca 3e99999a a2 7473 ce 6553f100逐字节对照表
| 偏移 | 字节(hex) | 含义 |
|---|---|---|
| [00] | 83 | fixmap,3 个键值对(0x80 + 3) |
| [01] | a4 | fixstr,长度 4(0xa0 + 4) |
| [02-05] | 74 79 70 65 | "type"ASCII |
| [06] | a6 | fixstr,长度 6(0xa0 + 6) |
| [07-12] | 73 65 6e 73 6f 72 | "sensor"ASCII |
| [13] | a5 | fixstr,长度 5(0xa0 + 5) |
| [14-18] | 6a 6f 69 6e 74 | "joint"ASCII |
| [19] | 93 | fixarray,3 个元素(0x90 + 3) |
| [20] | ca | float32 类型标记 |
| [21-24] | 3d cc cc cd | 0.1的 IEEE 754 float32 |
| [25] | ca | float32 类型标记 |
| [26-29] | be 4c cc cd | -0.2的 IEEE 754 float32 |
| [30] | ca | float32 类型标记 |
| [31-34] | 3e 99 99 9a | 0.3的 IEEE 754 float32 |
| [35] | a2 | fixstr,长度 2(0xa0 + 2) |
| [36-37] | 74 73 | "ts"ASCII |
| [38] | ce | uint32 类型标记 |
| [39-42] | 65 53 f1 00 | 1700000000大端 uint32 |
字节数验证:
1 ← fixmap 头 + (1+4) + (1+6) ← 'type': 'sensor' + (1+5) ← 'joint' 键 + 1 + 3×(1+4) ← fixarray 头 + 3个float32(每个1字节标记+4字节数据) + (1+2) ← 'ts' 键 + (1+4) ← uint32 值 = 43 字节 ✓msgpack 类型编码规律
| 前缀 | 规则 | 范围 |
|---|---|---|
0x80 + n | fixmap(字典) | n ≤ 15 |
0x90 + n | fixarray(列表) | n ≤ 15 |
0xa0 + n | fixstr(字符串) | n ≤ 31 字节 |
0x00~0x7f | 正整数直接存,单字节 | 0–127 |
0xce+ 4字节 | uint32 | 0 – 4,294,967,295 |
0xca+ 4字节 | float32(IEEE 754) | — |
Python 打印
bytes时,能表示为 ASCII 的字节会直接显示成字母,所以看到的是\x83\xa4type\xa6sensor...而不是全十六进制。
三、TCP 粘包问题与长度前缀协议
什么是粘包
TCP 是流式协议,没有消息边界。sendall一次发出 47 字节(4 字节头 + 43 字节体),接收方可能:
第一次 recv → 20 字节 第二次 recv → 27 字节如果推理端连续推理多帧并返回,接收方甚至可能一次收到多条消息粘在一起。
解决方案:4 字节长度前缀
协议约定:每条消息前加固定 4 字节,存储消息体的字节长度。
发送的 47 字节: ┌──────────────────┬────────────────────────────────────────────┐ │ 00 00 00 2b │ 83 a4 74 79 70 65 a6 73 65 6e ... │ │ (4字节,值=43) │ (43字节 msgpack 消息体) │ └──────────────────┴────────────────────────────────────────────┘接收方先读 4 字节知道长度(43),再精确读 43 字节,完全消除粘包。
四、struct.pack / unpack:字节与整数互转
发送端:整数 → 字节
struct.pack('>I',43)# b'\x00\x00\x00\x2b'格式字符串'>I':
| 字符 | 含义 |
|---|---|
> | 大端序(Big-endian),高位字节在前,即网络字节序 |
I | unsigned int,4 字节无符号整数 |
43 = 0x0000002b 大端序: 00 00 00 2b ← 高位在前(标准网络传输顺序) 小端序: 2b 00 00 00 ← x86 CPU 本地字节序接收端:字节 → 整数
msg_length=struct.unpack('>I',raw_length)[0]# b'\x00\x00\x00\x2b' → (43,) → 43struct.unpack固定返回元组(支持一次解多个值),[0]取第一个元素:
struct.unpack('>I',...)# → (43,) 一个值也是元组struct.unpack('>II',...)# → (43, 7) 解两个值五、recv_all:确保读满指定字节数
defrecv_all(conn,length):data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNone# 对端关闭连接,recv 返回 b''data+=packetreturndataconn.recv(n)语义是"最多读 n 字节",不保证一次读满。循环示例(目标读 43 字节,TCP 分两次到达):
初始:data = b'' 第 1 次循环:len(data)=0 < 43,recv(43) → 实际到了 20 字节,data = 20字节 第 2 次循环:len(data)=20 < 43,recv(23) → 实际到了 23 字节,data = 43字节 第 3 次循环:len(data)=43 == 43,退出,return dataif not packet处理对端正常关闭的情况,此时recv返回b'',不判断会死循环。
六、Socket 对象解读
conn,addr=server_sock.accept()# <socket.socket fd=4, family=2, type=1, proto=0,# laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 49724)>| 字段 | 值 | 含义 |
|---|---|---|
fd=4 | 4 | 文件描述符,Linux 中 socket 也是文件 |
family=2 | AF_INET | IPv4 |
type=1 | SOCK_STREAM | TCP |
laddr | ('127.0.0.1', 9999) | 推理服务端地址和监听端口 |
raddr | ('127.0.0.1', 49724) | 机器人端地址和临时端口 |
server_sock 与 conn 的区别
server_sock.listen(5)# 只负责监听,等待机器人连接conn,addr=server_sock.accept()# 每来一个连接,新建 conn 专门通信server_sock:守着 9999 端口,不做数据收发conn:和某个具体机器人节点通信的 socket
关于客户端临时端口
机器人端connect()时,OS 随机分配一个空闲端口(Ephemeral Port,通常 49152–65535),用完即释放。TCP 连接由四元组唯一标识:
机器人端 IP : 临时端口 → 推理端 IP : 监听端口 127.0.0.1 : 49724 → 127.0.0.1 : 9999七、完整通信流程
机器人端(client) 推理端(server) │ │ │ connect(推理端 IP:9999) │ │──────────────────────────────────────────>│ │ │ accept() → conn │ │ │ sendall(4字节长度头 + 43字节传感器数据) │ │──────────────────────────────────────────>│ │ │ recv_all(conn, 4) 读长度 → 43 │ │ recv_all(conn, 43) 读消息体 │ │ msgpack.unpackb() 还原字典 │ │ VLA 模型推理 │ │ msgpack.packb() 序列化 action │ │ sendall(4字节头 + 41字节响应) │ │ │ recv_all(4) 读长度 → 41 │ │ recv_all(41) 读响应体 │ │ msgpack.unpackb() 还原 action 字典 │ │ │ │ close() │ close(conn)八、完整代码
server.py(推理端)
""" 推理端 TCP 服务 - 接收机器人传感器数据(msgpack 序列化) - 模拟 VLA 模型推理,返回控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(conn,length):"""循环读取,确保收满 length 字节,解决 TCP 流式拆包问题"""data=b''whilelen(data)<length:packet=conn.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefinfer(sensor_data:dict)->dict:"""模拟 VLA 推理:输入传感器数据,返回控制 action"""joint=sensor_data.get('joint',[])# 实际场景替换为模型前向推理action=[round(j*0.5,4)forjinjoint]return{'status':'ok','action':action,'chunk':1,}defhandle_client(conn,addr):print(f"[推理端] 机器人已连接:{addr}")try:# 1. 读 4 字节长度前缀raw_length=recv_all(conn,4)ifnotraw_length:return# b'\x00\x00\x00\x2b' → 43msg_length=struct.unpack('>I',raw_length)[0]print(f"[推理端] 消息体长度:{msg_length}字节")# 2. 按长度读消息体raw_data=recv_all(conn,msg_length)ifnotraw_data:return# 3. msgpack 反序列化sensor_data=msgpack.unpackb(raw_data,raw=False)print(f"[推理端] 收到传感器数据:{sensor_data}")# 4. 推理response=infer(sensor_data)print(f"[推理端] 推理结果:{response}")# 5. 序列化响应 + 长度前缀,发送packed=msgpack.packb(response,use_bin_type=True)conn.sendall(struct.pack('>I',len(packed))+packed)exceptExceptionase:print(f"[推理端] 出错:{e}")finally:conn.close()print(f"[推理端] 关闭连接:{addr}\n")defmain():withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asserver_sock:server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server_sock.bind((HOST,PORT))server_sock.listen(5)print(f"[推理端] 监听{HOST}:{PORT}...")whileTrue:conn,addr=server_sock.accept()handle_client(conn,addr)# 并发版:# import threading# threading.Thread(target=handle_client, args=(conn, addr)).start()if__name__=='__main__':try:main()exceptKeyboardInterrupt:print("\n[推理端] 已退出")client.py(机器人端)
""" 机器人端 TCP 客户端 - 采集传感器数据,msgpack 序列化后发送给推理端 - 接收推理端返回的控制 action """importsocketimportmsgpackimportstruct HOST='127.0.0.1'PORT=9999defrecv_all(sock,length):data=b''whilelen(data)<length:packet=sock.recv(length-len(data))ifnotpacket:returnNonedata+=packetreturndatadefsend_sensor(sensor_data:dict)->dict:"""发送一帧传感器数据,返回推理端的 action"""withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)assock:sock.connect((HOST,PORT))# msgpack 序列化# {'type': 'sensor', 'joint': [0.1, -0.2, 0.3], 'ts': 1700000000}# → 43 字节,hex: 83 a4 74797065 a6 73656e736f72 ...packed=msgpack.packb(sensor_data,use_bin_type=True)print(f"[机器人端] 序列化后{len(packed)}字节:{packed.hex()}")# 加 4 字节长度前缀后发送# 43 → struct.pack('>I', 43) = b'\x00\x00\x00\x2b'sock.sendall(struct.pack('>I',len(packed))+packed)print(f"[机器人端] 已发送:{sensor_data}")# 接收推理结果raw_length=recv_all(sock,4)msg_length=struct.unpack('>I',raw_length)[0]raw_data=recv_all(sock,msg_length)response=msgpack.unpackb(raw_data,raw=False)print(f"[机器人端] 收到 action:{response}\n")returnresponsedefmain():# 模拟多帧传感器数据frames=[{'type':'sensor','joint':[0.1,-0.2,0.3],'ts':1700000000},{'type':'sensor','joint':[0.2,-0.15,0.25],'ts':1700000001},{'type':'sensor','joint':[0.0,0.0,0.0],'ts':1700000002},]forframeinframes:try:send_sensor(frame)exceptConnectionRefusedError:print("[机器人端] 无法连接推理端,请先启动 server.py")breakif__name__=='__main__':main()九、运行
pipinstallmsgpack# 终端 1:启动推理端python server.py# 终端 2:启动机器人端python client.py十、扩展方向
本文 demo 是单帧一问一答,实际 VLA 场景可在此基础上扩展:
- 长连接多帧:一次
connect后循环发送多帧传感器数据,避免频繁建连开销,需在handle_client中加while True循环读包 - 推理 chunk 流式返回:推理端每推理出一个 token/chunk 就发一帧响应,机器人端循环接收,同样用长度前缀帧封装每个 chunk
- 并发多机器人:
handle_client改为threading.Thread或asyncio,同时服务多个机器人节点 - 消息类型扩展:在请求字典中加
type字段区分关节角度、图像帧(base64)、力传感器等不同数据,服务端按type分发处理
