当前位置: 首页 > news >正文

【C/C++】无锁SPSC环形队列

无锁 SPSC 环形队列

引言

环形队列(Ring Buffer)是系统编程中最常用的数据结构之一。本文将详细剖析一个**单生产者单消费者(SPSC, Single Producer Single Consumer)**无锁环形队列的实现,逐行解释每个设计决策背后的原因。


完整代码

#include<algorithm>#include<array>#include<atomic>#include<concepts>#include<cstddef>#include<type_traits>#include<utility>template<typenameT,std::size_t Capacity,boolPlacementNew=true>classRingBuffer{public:// Capacity 必须是 2 的幂,用于位运算优化取模static_assert(Capacity>0&&(Capacity&(Capacity-1))==0,"Capacity must be power of 2");// 使用 std::array 时,T 必须可默认构造static_assert(PlacementNew||std::is_default_constructible_v<T>,"std::array (PlacementNew=false) needs T to be default constructible");RingBuffer():head_(0),tail_(0){}~RingBuffer(){// 只有 placement new 模式需要手动析构ifconstexpr(PlacementNew){size_t head=head_.load(std::memory_order_relaxed);size_t tail=tail_.load(std::memory_order_relaxed);while(head!=tail){at(head)->~T();head=(head+1)&(Capacity-1);}}}// 禁止拷贝和移动RingBuffer(constRingBuffer&)=delete;RingBuffer(RingBuffer&&)=delete;RingBuffer&operator=(constRingBuffer&)=delete;RingBuffer&operator=(RingBuffer&&)=delete;// 完美转发的 Push 接口template<typenameU>requiresstd::constructible_from<T,U&&>boolPush(U&&value){size_t tail=tail_.load(std::memory_order_relaxed);size_t next_tail=(tail+1)&(Capacity-1);// acquire: 确保看到 consumer 最新的 headif(next_tail==head_.load(std::memory_order_acquire)){returnfalse;// 队列满}// 构造或赋值元素ifconstexpr(PlacementNew){new(at(tail))T(std::forward<U>(value));}else{*at(tail)=std::forward<U>(value);}// release: 确保元素构造完成后才更新 tailtail_.store(next_tail,std::memory_order_release);returntrue;}boolPop(T&value){size_t head=head_.load(std::memory_order_relaxed);// acquire: 确保看到 producer 最新的 tailif(head==tail_.load(std::memory_order_acquire)){returnfalse;// 队列空}// 移动元素并析构(如果需要)ifconstexpr(PlacementNew){T*ptr=at(head);value=std::move(*ptr);ptr->~T();}else{value=std::move(*at(head));}size_t next_head=(head+1)&(Capacity-1);// release: 确保元素移动完成后才更新 headhead_.store(next_head,std::memory_order_release);returntrue;}private:// 原始字节缓冲区,手动管理生命周期structByteBuffer{alignas(alignof(T))std::byte data[sizeof(T)*Capacity];};// 获取指定索引的元素指针T*at(size_t index){ifconstexpr(PlacementNew){returnreinterpret_cast<T*>(storage_.data)+index;}else{return&storage_[index];}}// 根据模式选择存储类型usingStorage=std::conditional_t<PlacementNew,ByteBuffer,std::array<T,Capacity>>;// 缓存行对齐,防止 false sharingalignas(64)std::atomic<std::size_t>head_;alignas(64)std::atomic<std::size_t>tail_;alignas(64)Storage storage_;};

第一部分:模板参数设计

template<typenameT,std::size_t Capacity,boolPlacementNew=true>classRingBuffer{

为什么 Capacity 是模板参数?

编译期确定大小意味着:

  1. 零动态分配:所有内存都在栈上或作为对象的一部分
  2. 编译器优化:编译器知道确切大小,可以展开循环、内联函数
  3. 位运算优化:当 Capacity 是编译期常量且为 2 的幂时,取模可以变成位与
// 运行时取模index%Capacity// 需要除法指令,很慢// 编译期已知 Capacity = 1024index&(Capacity-1)// 单条 AND 指令

PlacementNew 参数的作用

这个布尔参数控制两种完全不同的内存管理策略:

PlacementNew存储方式元素生命周期适用场景
true原始字节数组手动管理复杂类型、避免默认构造开销
falsestd::array<T, N>自动管理简单类型、POD 类型

第二部分:静态断言

static_assert(Capacity>0&&(Capacity&(Capacity-1))==0,"Capacity must be power of 2");

为什么必须是 2 的幂?

这是为了用位与运算代替取模运算

// 假设 Capacity = 8 (二进制: 1000)// Capacity - 1 = 7 (二进制: 0111)index=7next=(7+1)&7// 8 & 7 = 1000 & 0111 = 0// 正确回绕到 0index=5next=(5+1)&7// 6 & 7 = 0110 & 0111 = 6// 没有回绕

位与检测 2 的幂的原理:

8 = 1000 8 - 1 = 0111 8 & 7 = 0000 ✓ 是 2 的幂 6 = 0110 6 - 1 = 0101 6 & 5 = 0100 ✗ 不是 2 的幂

第二个静态断言

static_assert(PlacementNew||std::is_default_constructible_v<T>,"std::array (PlacementNew=false) needs T to be default constructible");

PlacementNew = false时,使用std::array<T, Capacity>。数组在创建时会默认构造所有元素,所以T必须支持默认构造。


第三部分:存储设计

原始字节缓冲区

structByteBuffer{alignas(alignof(T))std::byte data[sizeof(T)*Capacity];};

这是未初始化内存。要点:

  1. sizeof(T) * Capacity:足够存储 Capacity 个 T 对象
  2. alignas(alignof(T)):确保对齐满足 T 的要求
  3. std::byte:C++17 引入,表示原始字节,比char更语义化

为什么用 struct 包装?

直接用数组会有问题:

// 不好:alignas 可能不生效alignas(alignof(T))std::byte data[sizeof(T)*Capacity];// 好:包装成 struct,对齐一定生效structByteBuffer{alignas(alignof(T))std::byte data[sizeof(T)*Capacity];};

条件类型选择

usingStorage=std::conditional_t<PlacementNew,ByteBuffer,std::array<T,Capacity>>;

std::conditional_t<条件, 真类型, 假类型>是编译期的 “if-else”。


第四部分:False Sharing 与缓存行对齐

alignas(64)std::atomic<std::size_t>head_;alignas(64)std::atomic<std::size_t>tail_;alignas(64)Storage storage_;

什么是 False Sharing?

现代 CPU 以**缓存行(cache line)**为单位读写内存,通常是 64 字节。

不对齐时的内存布局: |---- 64 bytes cache line ----| [ head_ ][ tail_ ][ ... ] Producer 写 tail_ → 整个缓存行失效 Consumer 写 head_ → 整个缓存行失效 两个核心不断争抢同一缓存行!
对齐后的内存布局: |---- cache line 1 ----|---- cache line 2 ----|---- cache line 3 ----| [ head_ ][ tail_ ][ storage_ ] Producer 只访问 cache line 2 Consumer 只访问 cache line 1 互不干扰!

性能差异

False sharing 可能导致10 倍以上的性能下降。64 字节对齐是最简单有效的解决方案。


第五部分:内存序详解

这是整个实现中最关键的部分。

Push 函数的内存序

boolPush(U&&value){// (1) relaxed: 只有 producer 写 tail_,不需要同步size_t tail=tail_.load(std::memory_order_relaxed);size_t next_tail=(tail+1)&(Capacity-1);// (2) acquire: 需要看到 consumer 对 head_ 的最新写入if(next_tail==head_.load(std::memory_order_acquire)){returnfalse;}// (3) 构造新元素ifconstexpr(PlacementNew){new(at(tail))T(std::forward<U>(value));}else{*at(tail)=std::forward<U>(value);}// (4) release: 确保 (3) 完成后,consumer 才能看到新的 tailtail_.store(next_tail,std::memory_order_release);returntrue;}
为什么 (1) 用 relaxed?

tail_只有 producer 会写。自己读自己写的变量,不存在竞争,不需要同步。

为什么 (2) 用 acquire?

Producer 需要检查队列是否满。必须看到 consumer 最新的head_值。

acquire确保:consumer 在head_.store(release)之前的所有操作,对 producer 可见。

为什么 (4) 用 release?

这是最关键的一步。看这个时序:

Producer Consumer -------- -------- 构造元素 T tail_.store(release) ─────────→ tail_.load(acquire) 读取元素 T

如果没有 release-acquire:

可能的乱序: tail_.store() // CPU 先执行这个 构造元素 T // 然后才执行这个 Consumer 看到新的 tail,但元素还没构造完! 读到未初始化数据!

Release 语义保证:store之前的所有写操作,不会被重排到store之后。

Pop 函数的内存序

boolPop(T&value){// (1) relaxed: 只有 consumer 写 head_,不需要同步size_t head=head_.load(std::memory_order_relaxed);// (2) acquire: 需要看到 producer 对 tail_ 的最新写入// 以及 producer 构造的元素if(head==tail_.load(std::memory_order_acquire)){returnfalse;}// (3) 移动并析构元素ifconstexpr(PlacementNew){T*ptr=at(head);value=std::move(*ptr);ptr->~T();}else{value=std::move(*at(head));}size_t next_head=(head+1)&(Capacity-1);// (4) release: 确保 (3) 完成后,producer 才能看到新的 headhead_.store(next_head,std::memory_order_release);returntrue;}

对称的逻辑

  • Consumer 读tail_用 acquire → 与 producer 的 release 配对
  • Consumer 写head_用 release → 与 producer 的 acquire 配对

Acquire-Release 配对总结

Producer Consumer -------- -------- 构造元素 tail_.store(release) ──────────────→ tail_.load(acquire) 读取元素 head_.load(acquire) ←────────────── head_.store(release) 析构元素

两对配对:

  1. tail_.store(release)tail_.load(acquire)
  2. head_.load(acquire)head_.store(release)

第六部分:完美转发

template<typenameU>requiresstd::constructible_from<T,U&&>boolPush(U&&value){// ...new(at(tail))T(std::forward<U>(value));// ...}

U&&是转发引用,不是右值引用

U是模板参数时,U&&有特殊的推导规则:

RingBuffer<std::string,16>rb;std::string s="hello";rb.Push(s);// U = std::string&, U&& = std::string&rb.Push(std::string("world"));// U = std::string, U&& = std::string&&rb.Push("literal");// U = const char*, U&& = const char*&&

引用折叠规则

T& & → T& T& && → T& T&& & → T& T&& && → T&&

只有&& &&才保持&&

std::forward的作用

std::forward<U>(value)

根据U的类型决定返回 lvalue 还是 rvalue:

  • U = std::string&→ 返回std::string&(lvalue)→ 调用拷贝构造
  • U = std::string→ 返回std::string&&(rvalue)→ 调用移动构造

为什么约束用U&&

requiresstd::constructible_from<T,U&&>

因为传给构造函数的是std::forward<U>(value),它的类型是U&&。约束必须匹配实际调用。


第七部分:析构函数

~RingBuffer(){ifconstexpr(PlacementNew){size_t head=head_.load(std::memory_order_relaxed);size_t tail=tail_.load(std::memory_order_relaxed);while(head!=tail){at(head)->~T();head=(head+1)&(Capacity-1);}}}

为什么用 relaxed?

析构函数执行时,所有线程必须已经join()join()内部有 acquire-release 语义,已经完成了同步。不需要额外的内存序保证。

为什么std::array模式不需要手动析构?

std::array的析构函数会自动析构所有元素。但这意味着:

  • 所有 Capacity 个元素都会被析构
  • 包括从未使用的槽位

这就是为什么std::array模式要求T可默认构造——即使是"空"槽位也有有效对象。

对基本类型调用析构函数

int*p=newint(42);p->~int();// 完全合法,编译成空操作

C++ 允许对任何类型调用析构函数,对基本类型是 no-op。这让模板代码可以统一处理。


第八部分:正确使用方式

intmain(){RingBuffer<std::string,1024>queue;std::atomic<bool>done{false};std::threadproducer([&](){for(inti=0;i<10000;++i){while(!queue.Push(std::to_string(i))){// 自旋等待,或 yield}}done.store(true,std::memory_order_release);});std::threadconsumer([&](){std::string value;while(!done.load(std::memory_order_acquire)||/* 可能还有数据 */){if(queue.Pop(value)){// 处理 value}}});producer.join();// 必须 join!consumer.join();// 必须 join!// 析构函数在这里执行,此时没有其他线程访问 queue}

关键点:必须在join()之后才能析构RingBuffer。内存序不能解决生命周期问题。


总结

设计决策原因
Capacity 必须是 2 的幂用位与代替取模,提升性能
alignas(64)防止 false sharing
Placement new 模式避免默认构造开销,精确控制生命周期
转发引用U&&接受任意类型,保留值类别
std::forward完美转发,避免不必要的拷贝
acquire-release保证数据可见性,避免数据竞争
relaxed 读自己的变量没有竞争,不需要同步

这个实现展示了现代 C++ 的多个核心概念:模板元编程、完美转发、内存模型、缓存优化。理解这些概念,对于编写高性能系统软件至关重要。

http://www.jsqmd.com/news/453974/

相关文章:

  • JVM中的垃圾回收机制(速记版)
  • VMware虚拟机的安装
  • 毕设程序javaKTV点歌系统 基于SpringBoot的在线音乐点播与管理系统 智能化歌厅曲目服务平台的设计与实现
  • Nexpose 8.38.0 for Linux Windows 发布 - 漏洞扫描
  • 电力系统优化运行与编程:电网规划、负荷预测及潮流计算的Matlab代码模型复现
  • 让预测模型自己进化:BES-SVM黑科技实战
  • AI视频三巨头:一场关于未来想象力的终极PK
  • 瑞祥卡余额怎么提现到支付宝,高效变现指南 - 淘淘收小程序
  • 【C++初阶】:(3)C++基础类和对象(中)
  • 《从零开始的java从入门到入土的学习生活——JavaWeb前端篇》Chapter16——JavaWeb前端篇学习记录——HTML、CSS、盒子模型、flex弹性布局、表单标签
  • 毕设程序javaweb的计算机课程在线学习平台 基于Java Web的计算机技术在线教学与实训平台 计算机专业网络教育及技能测评系统
  • TechWiz LCD 1D应用:高延迟膜(彩虹mura仿真)
  • 企业策略路由(PBR)实战:原理、场景与故障排查(多出口必看)
  • 跨境卖家如何建立供应商考核指标提升稳定性
  • 2026年 喷雾干燥机厂家推荐排行榜:高速离心、气流喷雾、锂电池专用等十大机型核心优势与选购指南 - 品牌企业推荐师(官方)
  • Dify 实战系列(4):实现新闻内容概要生成
  • GLM-4.5 vs GLM-4.7 vs GLM-5 全方位技术演进对比
  • 如何选择优质品牌设计公司
  • 选购费氏粒度仪的关键指标:不仅仅是看测量范围 - 品牌推荐大师1
  • 数据同步备份软件:数字化时代的“双保险”策略
  • 西门子S7-1200PLC双轴定位算法在电池焊接控制中的应用:博图程序案例与威纶触摸屏操作界面
  • 觉察 改变
  • 全栈开发核心技术解析
  • 互联网大厂Java求职面试实战:三轮技术问答与热点技术深度解析
  • 并网逆变器VSG虚拟同步控制Matlab/Simulink仿真模型及其完全正确结果
  • 2026年阿里云企业邮箱代理商哪家好?真实案例解析靠谱伙伴 - 品牌2026
  • 2026年 拉力带厂家推荐排行榜:弹性拉力带/11件套拉力带/练背拉伸带,专业健身辅具助力科学塑形 - 品牌企业推荐师(官方)
  • 京东e卡怎么换成现金,亲测快捷的三种方式 - 猎卡回收公众号
  • 咱们直接动手搭个T型逆变器模型试试。先整明白核心结构:三相桥臂中间各接两个双向开关,形成T字拓扑。这种结构优势在于能输出五电平电压,谐波特性比传统三电平好不少
  • 国产化、安可、信创、自主可控说的是什么?一文读懂