当前位置: 首页 > news >正文

CyberRT共享内存通信原理详解

CyberRT 共享内存通信原理详解

目录

  1. 概述
  2. 整体架构
  3. 核心数据结构
  4. 内存管理
  5. 同步机制
  6. 通知机制
  7. 通信流程详解
  8. 高级特性:Arena 消息
  9. 代码示例

概述

CyberRT 是 Apollo 自动驾驶平台的通信框架,共享内存(Shared Memory,简称 SHM)是其实现高吞吐量、低延迟进程间通信(IPC)的核心机制之一。

为什么使用共享内存?

相比其他 IPC 方式(如 Socket、管道),共享内存具有以下优势:

  • 零拷贝或极少拷贝:多个进程直接访问同一块物理内存
  • 低延迟:避免了内核态与用户态之间的频繁切换
  • 高吞吐量:适合高频大数据传输场景

整体架构

核心组件

┌─────────────────────────────────────────────────────────────────┐ │ 应用层(Writer/Reader) │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ ShmTransmitter / ShmReceiver │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ ShmDispatcher(分发器) │ │ ┌──────────────────────┐ ┌──────────────────────┐ │ │ │ Notifier(通知器) │◄───────►│ Segment(段) │ │ │ └──────────────────────┘ └──────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ 共享内存区域(物理内存) │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ State │ │ Blocks[] │ │ Arena │ │ Block │ │ │ │ │ │ │ │ Blocks[] │ │ Buffers │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────────┘

关键组件说明

  1. ShmTransmitter: 发送端,负责将消息写入共享内存
  2. ShmReceiver: 接收端,负责从共享内存读取消息
  3. ShmDispatcher: 单例分发器,管理所有共享内存段和通知
  4. Notifier: 通知器,用于进程间消息通知
  5. Segment: 共享内存段,每个 Channel 对应一个 Segment
  6. Block: 内存块,用于存储单个消息

核心数据结构

1. State(共享内存状态)

State位于共享内存的开头,用于管理整个共享内存段的元数据。

位置:cyber/transport/shm/state.h

classState{private:std::atomic<bool>need_remap_={false};// 是否需要重新映射std::atomic<uint32_t>seq_={0};// 普通消息序列号std::atomic<uint32_t>arena_seq_={0};// Arena消息序列号std::atomic<uint32_t>reference_count_={0};// 引用计数std::atomic<uint64_t>ceiling_msg_size_;// 最大消息大小};

关键成员解释

  • seq_:轮询分配 Block 时使用的序列号,通过FetchAddSeq(1)递增
  • reference_count_:记录使用该共享内存段的进程数,用于安全销毁
  • need_remap_:当共享内存需要重建时标记

2. Block(内存块)

Block管理单个消息块的锁和元数据。

位置:cyber/transport/shm/block.h

classBlock{private:std::atomic<int32_t>lock_num_={0};// 读写锁计数器uint64_tmsg_size_;// 消息大小uint64_tmsg_info_size_;// 消息信息大小};

读写锁机制

  • kRWLockFree = 0:空闲状态
  • kWriteExclusive = -1:写者独占
  • > 0:读者数量(允许多个读者同时读)

3. WritableBlock / ReadableBlock

用于操作 Block 的结构体。

位置:cyber/transport/shm/segment.h

structWritableBlock{uint32_tindex=0;// Block 在数组中的索引Block*block=nullptr;// 指向 Block 的指针uint8_t*buf=nullptr;// 指向实际数据缓冲区的指针};usingReadableBlock=WritableBlock;

4. ReadableInfo(可读信息)

用于在进程间通知哪个 Block 有新消息。

位置:cyber/transport/shm/readable_info.h

classReadableInfo{private:uint64_thost_id_;// 主机IDint32_tblock_index_;// 普通Block索引int32_tarena_block_index_;// Arena Block索引uint64_tchannel_id_;// Channel ID};

内存管理

1. 共享内存段布局

每个 Channel 对应一个共享内存段,布局如下:

┌───────────────────────────────────────────────────────────────┐ │ Offset 0 │ State │ ├───────────────────┼────────────────────────────────────────────┤ │ Offset sizeof(State) │ Block[0], Block[1], ..., Block[N-1] │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ ArenaBlock[0], ..., ArenaBlock[M-1] │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ BlockBuf[0], BlockBuf[1], ... │ │ │ (每个BlockBuf对应一个Block) │ ├───────────────────┼────────────────────────────────────────────┤ │ ... │ ArenaBlockBuf[0], ... │ └───────────────────────────────────────────────────────────────┘

2. ShmConf(共享内存配置)

ShmConf根据消息大小动态计算所需的内存配置。

位置:cyber/transport/shm/shm_conf.h

消息大小档位:

消息范围天花板大小Block 数量
0 - 10K16K512
10K - 100K128K128
100K - 1M1M64
1M - 6M8M32
6M - 10M16M16
10M+32M8

3. Segment 类

Segment是共享内存段的抽象基类,有两种实现:

  • PosixSegment:使用 POSIX 共享内存(shm_open
  • XsiSegment:使用 System V 共享内存(shmget

关键方法

// 写入流程boolAcquireBlockToWrite(size_t msg_size,WritableBlock*writable_block);voidReleaseWrittenBlock(constWritableBlock&writable_block);// 读取流程boolAcquireBlockToRead(ReadableBlock*readable_block);voidReleaseReadBlock(constReadableBlock&readable_block);

4. 内存初始化流程(OpenOrCreate)

PosixSegment::OpenOrCreate()为例(cyber/transport/shm/posix_segment.cc:39):

  1. 创建共享内存文件

    intfd=shm_open(shm_name_.c_str(),O_RDWR|O_CREAT|O_EXCL,0644);
  2. 设置大小

    ftruncate(fd,conf_.managed_shm_size());
  3. 映射到进程地址空间

    managed_shm_=mmap(nullptr,conf_.managed_shm_size(),PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
  4. 在共享内存上构造对象

    state_=new(managed_shm_)State(conf_.ceiling_msg_size());blocks_=new(managed_shm_+sizeof(State))Block[conf_.block_num()];

这里使用了Placement New技术,在已分配的内存上构造对象。


同步机制

1. Block 的读写锁(关键!)

这是最不容易理解的部分,让我们详细分析。

位置:cyber/transport/shm/block.cc

写锁获取(TryLockForWrite)
boolBlock::TryLockForWrite(){int32_trw_lock_free=kRWLockFree;// 0if(!lock_num_.compare_exchange_weak(rw_lock_free,kWriteExclusive,// 从 0 变为 -1std::memory_order_acq_rel,std::memory_order_relaxed)){returnfalse;}returntrue;}

原理:使用原子 CAS 操作,只有当lock_num_为 0(空闲)时,才成功将其设置为 -1(写独占)。

读锁获取(TryLockForRead)
boolBlock::TryLockForRead(){int32_tlock_num=lock_num_.load();if(lock_num<kRWLockFree){// 如果正在被写returnfalse;}int32_ttry_times=0;while(!lock_num_.compare_exchange_weak(lock_num,lock_num+1,// 读者数 +1std::memory_order_acq_rel,std::memory_order_relaxed)){++try_times;if(try_times==kMaxTryLockTimes){returnfalse;}lock_num=lock_num_.load();if(lock_num<kRWLockFree){returnfalse;}}returntrue;}

原理

  1. 检查当前是否正在被写(lock_num < 0
  2. 使用 CAS 原子递增读者计数
  3. 重试最多 5 次
锁释放
voidBlock::ReleaseWriteLock(){lock_num_.fetch_add(1);// -1 + 1 = 0,恢复空闲}voidBlock::ReleaseReadLock(){lock_num_.fetch_sub(1);// 读者数 -1}

2. Block 分配策略(轮询)

位置:cyber/transport/shm/segment.cc:295

uint32_tSegment::GetNextWritableBlockIndex(){constautoblock_num=conf_.block_num();while(1){uint32_ttry_idx=state_->FetchAddSeq(1)%block_num;if(blocks_[try_idx].TryLockForWrite()){returntry_idx;}}}

原理

  1. 序列号seq_原子递增
  2. 对 Block 数量取模,得到尝试索引
  3. 尝试获取该 Block 的写锁
  4. 如果失败,继续下一个(循环直到成功)

这是一个无锁的轮询分配策略,保证了公平性。


通知机制

当发送者写完消息后,需要通知接收者。CyberRT 提供了两种通知方式:

1. ConditionNotifier(条件变量通知)

使用共享内存 + 信号量实现,适合同一主机内的进程。

位置:cyber/transport/shm/condition_notifier.h

内部结构:

structIndicator{std::atomic<uint64_t>next_seq={0};ReadableInfo infos[kBufLength];// 4096 个槽位uint64_tseqs[kBufLength]={0};};

通知流程

  1. 发送者:Notify(info)→ 将 info 写入 ring buffer,更新 seq
  2. 接收者:Listen(timeout, info)→ 轮询等待 seq 变化,读取 info

2. MulticastNotifier(组播通知)

使用 UDP 组播,适合跨主机通信。

3. NotifierFactory

工厂类,根据配置创建对应的 Notifier。


通信流程详解

完整的发送流程

让我们跟随一条消息从 Writer 到 Reader 的完整旅程。

步骤 1:初始化 ShmTransmitter

位置:cyber/transport/transmitter/shm_transmitter.h:160

voidShmTransmitter::Enable(){// 创建 Segmentsegment_=SegmentFactory::CreateSegment(channel_id_);// 创建 Notifiernotifier_=NotifierFactory::CreateNotifier();}
步骤 2:发送消息(Transmit)

位置:cyber/transport/transmitter/shm_transmitter.h:201

1. AcquireBlockToWrite(msg_size, &wb) └─> 获取一个可写的 Block,会阻塞直到成功 2. message::SerializeToArray(msg, wb.buf, msg_size) └─> 将 Protobuf 消息序列化到 Block 的缓冲区 3. wb.block->set_msg_size(msg_size) └─> 设置消息大小 4. msg_info.SerializeTo(msg_info_addr, MessageInfo::kSize) └─> 在消息后面追加 MessageInfo(包含 seq, timestamp 等) 5. ReleaseWrittenBlock(wb) └─> 释放写锁,此时 Block 变为可读 6. notifier_->Notify(readable_info) └─> 通知接收者有新消息

完整的接收流程

步骤 1:初始化 ShmReceiver 和 ShmDispatcher

位置:cyber/transport/receiver/shm_receiver.h:62

voidShmReceiver::Enable(){// 向 Dispatcher 添加监听器dispatcher_->AddListener<M>(this->attr_,listener);}

ShmDispatcher是单例,内部有一个独立线程监听通知:

位置:cyber/transport/dispatcher/shm_dispatcher.cc

voidShmDispatcher::ThreadFunc(){while(!is_shutdown_.load()){ReadableInfo info;if(notifier_->Listen(100,&info)){// 等待通知ReadMessage(info.channel_id(),info.block_index());ReadArenaMessage(info.channel_id(),info.arena_block_index());}}}
步骤 2:读取消息

位置:cyber/transport/dispatcher/shm_dispatcher.cc

1. AcquireBlockToRead(&rb) └─> 获取读锁 2. message::ParseFromArray(rb->buf, msg_size, msg) └─> 反序列化消息 3. listener(msg, msg_info) └─> 调用用户注册的回调函数 4. ReleaseReadBlock(rb) └─> 释放读锁

高级特性:Arena 消息

这是另一个不容易理解的部分!Arena 消息是为了进一步减少内存拷贝而设计的优化。

为什么需要 Arena 消息?

普通消息流程:

用户消息 → 序列化到共享内存 → 接收者从共享内存反序列化 (两次拷贝/序列化)

Arena 消息流程:

用户直接在共享内存上构造消息 → 接收者直接使用 (零拷贝!)

Protobuf Arena

Arena 是 Protobuf 提供的内存分配器,可以在预分配的内存块上构造消息对象。

位置:cyber/transport/shm/protobuf_arena_manager.h

Arena 消息的通信流程

发送端:
  1. AcquireMessage(msg):从 Arena 获取一个消息对象
  2. 用户直接填充这个消息对象
  3. Transmit(msg):发送时只需要发送一个 wrapper(包含指向 arena 中消息的指针)
接收端:
  1. 从 wrapper 中恢复出消息指针
  2. 直接使用该消息,无需反序列化
  3. 消息析构时自动归还 arena 内存

代码示例

1. 基本使用示例(Writer)

#include"cyber/cyber.h"#include"cyber/proto/unit_test.pb.h"usingapollo::cyber::proto::Chatter;intmain(intargc,char**argv){apollo::cyber::Init(argv[0]);autonode=apollo::cyber::CreateNode("shm_writer");// 创建 Writer(默认使用 SHM)autowriter=node->CreateWriter<Chatter>("channel/chatter");automsg=std::make_shared<Chatter>();msg->set_timestamp(apollo::cyber::Time::Now().ToNanosecond());msg->set_seq(0);msg->set_content("Hello, SHM!");// 发送消息writer->Write(msg);apollo::cyber::WaitForShutdown();return0;}

2. 基本使用示例(Reader)

#include"cyber/cyber.h"#include"cyber/proto/unit_test.pb.h"usingapollo::cyber::proto::Chatter;voidMessageCallback(conststd::shared_ptr<Chatter>&msg,constapollo::cyber::transport::MessageInfo&msg_info){AINFO<<"Received message: seq="<<msg->seq()<<", content="<<msg->content();}intmain(intargc,char**argv){apollo::cyber::Init(argv[0]);autonode=apollo::cyber::CreateNode("shm_reader");// 创建 Readerautoreader=node->CreateReader<Chatter>("channel/chatter",MessageCallback);apollo::cyber::WaitForShutdown();return0;}

总结

关键要点回顾

  1. 每个 Channel 一个 Segment:Channel ID 作为共享内存名称
  2. Segment 包含多个 Block:Block 是消息存储的基本单位
  3. 原子读写锁:使用std::atomic实现无锁读写同步
  4. 轮询分配:序列号 + 取模实现公平的 Block 分配
  5. 双重通知:Notifier 负责进程间的消息到达通知
  6. Arena 优化:进一步减少序列化开销

难以理解的点总结

  1. Placement New 的使用:在共享内存上构造对象
  2. 原子读写锁的实现lock_num_的三种状态转换
  3. 轮询分配策略:为什么seq % block_num能工作
  4. Arena 消息的零拷贝原理:Protobuf Arena 与共享内存的结合

通过这份文档,你应该对 CyberRT 中共享内存通信的原理有了完整的理解!

http://www.jsqmd.com/news/564633/

相关文章:

  • 仙侠H5手游【九州封魔劫代金券内购版】服务端图文搭建教程(含资源下载+部署过程)
  • FreeRTOS任务调度优化:精准统计CPU使用率的实践指南
  • Qwen3-ForcedAligner批量处理技巧:Shell脚本自动化对齐音频
  • 3分钟突破9大平台资源限制:res-downloader让网络资源触手可及
  • Ubuntu 20.04下快速部署realsense SDK 2.0的完整指南
  • Qwen3-14B镜像部署效果展示:中文长文本生成、逻辑推理、代码补全实测
  • 突破B站缓存限制:m4s-converter视频格式转换完全指南
  • 2026最新上海人才引进落户/居转户/留学生落户推荐 - 十大品牌榜
  • 程序实现环境温度对传感器的误差补偿,不同温度下测量精度一致,颠覆温漂难题。
  • 保姆级教程:圣女司幼幽-造相Z-Turbo文生图模型快速入门
  • Phi-4-mini-reasoning vLLM动态批处理:吞吐量提升与首token延迟平衡策略
  • 一条命令克隆整个网站?这个开源项目把AI玩出了新高度
  • 深度学习炼丹避坑:运行Mamba模型时遇到selective_scan_fn未定义,我是如何一步步调试并修复的
  • Windows驱动管理与系统优化:DriverStore Explorer全方位解决方案
  • STM32 Bootloader开源方案|含IAP/ISP/DFU固件升级源码+上位机+图文视频教程,支持OTA远程更新
  • Phi-4-mini-reasoning应用场景:开源AI数学社区共建推理验证平台
  • 5分钟快速上手:AsrTools智能语音转文字工具全攻略
  • 2026年采购BOSE会议音响:设备商、集成商与代理商模式深度对比与选择策略 - 速递信息
  • 新手零基础入门:借助快马AI轻松制作你的第一个域名查询网页
  • 当仿真与FPGA打架时,你该信谁?
  • Nano Banana 相机控制
  • 2026年钢格板厂家推荐,多维度对比助你轻松选择,钢格板口碑推荐解决方案与实力解析 - 品牌推荐师
  • 2026年制药设备维修厂家推荐:制药设备生产厂家/制药设备应用技术服务商精选指南 - 品牌推荐官
  • Phi-4-mini-reasoning一文详解:专为多步推理设计的开源大模型实战
  • 异步上下文丢失、流式中断、内存泄漏——FastAPI 2.0 AI流式响应的3大“静默崩塌”场景(附可复用诊断工具包)
  • 嵌入式国际象棋规则引擎:纯C轻量级实现
  • Nginx四层代理实战:从数据库到游戏服务的全能端口转发
  • 避坑指南:在K210上跑人脸68关键点,这些细节让你的疲劳检测更准
  • Qt6 安卓环境配置
  • Web3D开发入门:5大引擎(Direct3D、OpenGL、UE、Unity、Three.js)选型指南