SPSC2环形队列
SPSC 无锁队列(2-slot)与 uint32_t 共享变量对比实验
目标
生产者线程将 [0 ~ N) 有序数据发送给消费者线程
对比两种实现:
- ❌ uint32_t 共享变量(错误模型)
- ✔ SPSC 无锁队列(正确模型)
验证:是否满足有序、不丢失、可消费
核心原则
不能在两个线程同时修改同个变量
SpscQueue2
SpscQueue2.h
#pragmaonce#include<stdint.h>/** * @brief 无锁环形队列 */class SpscQueue2{private:uint32_tm_buf[2];volatileuint8_tw;volatileuint8_tr;public:SpscQueue2();boolWrite(uint32_tv);boolRead(uint32_t&v);};SpscQueue2.cpp
#include"SpscQueue2.h"#ifdefined(__ARM_ARCH)||defined(__aarch64__)||defined(__arm__)#defineDMB()__asmvolatile("dmb ish":::"memory")#else#defineDMB()do{}while(0)#endif/** * @brief 初始化 */SpscQueue2::SpscQueue2(){m_buf[0]=0;m_buf[1]=0;w=0;r=0;}/** * @brief 写入(生产者线程) */bool SpscQueue2::Write(uint32_tv){uint8_tnext=(w+1)&1;if(next==r)returnfalse;// fullm_buf[w]=v;// 发布顺序DMB();w=next;returntrue;}/** * @brief 读取(消费者线程) */bool SpscQueue2::Read(uint32_t&v){if(r==w)returnfalse;// emptyv=m_buf[r];// 消费顺序DMB();r=(r+1)&1;returntrue;}测试
main.cpp 反例
#include<iostream>#include<thread>#include<atomic>volatileuint32_tg_data=0;std::atomic<bool>start_flag{false};std::atomic<bool>stop_flag{false};staticconstuint32_tMAX=100000;voidwriter(){while(!start_flag.load());for(uint32_ti=0;i<MAX;i++){g_data=i;}stop_flag.store(true);}voidreader(){while(!start_flag.load(std::memory_order_acquire));uint32_texpect=0;uint32_terror=0;uint32_tv;while(expect<MAX){v=g_data;if(v!=expect){std::cout<<"ERROR: expect "<<expect<<" got "<<v<<std::endl;error++;expect=v;}expect++;}std::cout<<"unsafe done, error = "<<error<<std::endl;}intmain(){std::cout<<"start..."<<std::endl;std::threadt1(writer);std::threadt2(reader);start_flag.store(true);t1.join();t2.join();std::cout<<"finish"<<std::endl;}C:\Users\PC\CLionProjects\untitled23\main1.exe
start…
ERROR: expect 1 got 0
ERROR: expect 1 got 99999
unsafe done, error = 2
finish
main.cpp 正例
#include<iostream>#include<thread>#include<atomic>#include"SpscQueue2.h"SpscQueue2 q;std::atomic<bool>start_flag{false};std::atomic<bool>stop_flag{false};staticconstuint32_tMAX=100000;voidwriter(){while(!start_flag.load(std::memory_order_acquire));for(uint32_ti=0;i<MAX;){if(q.Write(i))i++;}stop_flag.store(true,std::memory_order_release);}voidreader(){while(!start_flag.load(std::memory_order_acquire));uint32_texpect=0;uint32_tv;uint32_terror=0;while(expect<MAX){bool got=false;while(q.Read(v)){got=true;if(v!=expect){std::cout<<"ERROR: expect "<<expect<<" got "<<v<<std::endl;error++;expect=v;}expect++;}// writer结束 + queue空 → 退出if(stop_flag.load(std::memory_order_acquire)&&!got)break;}std::cout<<"reader done, error = "<<error<<std::endl;}intmain(){std::cout<<"start..."<<std::endl;std::threadt1(writer);std::threadt2(reader);start_flag.store(true,std::memory_order_release);t1.join();t2.join();std::cout<<"finish"<<std::endl;}C:\Users\PC\CLionProjects\untitled23\cmake-build-debug\untitled23.exe
start…
reader done, error = 0
finish
