无锁队列的设计
文章目录
- 什么是无锁队列
- 有锁队列:
- 无锁队列:
- 为什么需要无锁队列
- 锁的局限:
- SPSC的简单思路
- 用ringbuffer实现SPSC无锁队列
什么是无锁队列
有锁队列:
通过互斥锁或其他同步机制保证线程安全的队列。
无锁队列:
通过原子操作来实现线程安全的队列,属于一种非阻塞队列。
为什么需要无锁队列
锁的局限:
- 线程阻塞带来的切换。
- 死锁风险。
- 性能瓶颈,高并发下锁竞争激烈,吞吐率下降。
SPSC的简单思路
用ringbuffer实现SPSC无锁队列
基本实现:头尾指针+固定大小的数组
存储任意类型:POD类型与非POD类型,模板类
FIFO类型:提供push和pop接口,用于生产者消费者模型
SPSC场景下线程安全:原子共享
性能优化:内存对齐,避免伪共享
什么是伪共享:
cpu的缓存是以缓存行为单位加载数据的
线程1修改A或线程2修改B都会造成缓存行标记为脏,从而导致缓存一致性协议将脏缓存行同步给其他核心,缓存行同步会导致性能下降。
因为A和B没有数据竞争。
所以我们要把A和B放在两个不同的缓存行。
下方用alignas(64)实现
#pragmaoncetemplate<typenameT,std::size_t Capacity>classRingBuffer{public:static_assert(Capacity&&!(Capacity&(Capacity-1)),"Capacity is not power of 2");RingBuffer():read_(0),write_(0){}~RingBuffer(){conststd::size_t r=read_.load(std::memory_order_relaxed);conststd::size_t w=write_.load(std::memory_order_relaxed);while(r!=w){reinterpret_cast<T*>(&buffer_[r])->~T();r=(r+1)&(Capacity-1);}}// 万能引用,完美转发template<typenameU>boolPush(U&&value){conststd::size_t w=write_.load(std::memory_order_relaxed);conststd::size_t next_w=(w+1)&(Capacity-1);if(next_w==read_.load(std::memory_order_acquire)){returnfalse;}new(&buffer_[w])T(std::forward<U>(value));write_.store(next_w,std::memory_order_release);returntrue;}boolPop(T&value){conststd::size_t r=read_.load(std::memory_order_relaxed);if(r==write_.load(std::memory_order_acquire)){returnfalse;}value=std::move(*reinterpret_cast<T*>(&buffer_[r]));reinterpret_cast<T*>(&buffer_[r])->~T();read_.store((r+1)&(Capacity-1),std::memory_order_release);returntrue;}std::size_tSize()const{conststd::size_t w=write_.load(std::memory_order_acquire);conststd::size_t r=read_.load(std::memory_order_acquire);return(w>=r)?(w-r):(Capacity-r+w);}private:alignas(64)std::atomic<std::size_t>read_;alignas(64)std::atomic<std::size_t>write_;alignas(64)std::aligned_storage_t<sizeof(T),alignof(T)>buffer_[];};RingBuffer<int,7>rb;