【C/C++】SPSC无锁队列
优化技术目的环形缓冲区(Ring Buffer)避免动态内存分配,O(1) 操作浪费一个槽位区分空和满,无需额外原子变量缓存行对齐(alignas(64))消除伪共享索引缓存(cached_head/tail)减少跨核原子读取release-acquire 语义保证数据可见性,避免过度同步无 CASSPSC 无竞争,直接 store 即可SPSC Queue 是无锁编程的入门案例,也是实际生产中高
SPSC 无锁队列详解 (Lock-Free Single Producer Single Consumer Queue)
概述 (Overview)
SPSC Queue 是一种特殊的无锁队列(Lock-Free Queue),专为单生产者单消费者(Single Producer Single Consumer)场景设计。由于只有一个线程写入、一个线程读取,我们可以避免使用互斥锁(Mutex)或复杂的 CAS 循环,从而获得极高的性能。
这种数据结构在高频交易(HFT, High-Frequency Trading)、音视频处理、日志系统等低延迟场景中广泛使用。
基础实现 (Basic Implementation)
环形缓冲区设计 (Ring Buffer Design)
SPSC Queue 通常基于环形缓冲区(Circular Buffer / Ring Buffer)实现。我们使用两个索引:
- head:消费者(Consumer)读取位置
- tail:生产者(Producer)写入位置
为了区分队列满和队列空的状态,我们浪费一个槽位(Slot):
- 空(Empty):
head == tail - 满(Full):
(tail + 1) % size == head
#include <array>
#include <atomic>
#include <cstddef>
template <typename T, size_t Cap>
class SPSCQueue {
// 缓冲区大小为 Cap + 1,浪费一个槽位用于区分空和满
std::array<T, Cap + 1> buffer;
// head: 消费者读取位置,由消费者修改,生产者读取
std::atomic<size_t> head{0};
// tail: 生产者写入位置,由生产者修改,消费者读取
std::atomic<size_t> tail{0};
public:
// 生产者调用:将元素推入队列
bool push(const T& val) {
// relaxed: 只有生产者修改 tail,无需同步
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = (pos + 1) % (Cap + 1);
// acquire: 需要看到消费者对 head 的修改
if (next == head.load(std::memory_order_acquire))
return false; // 队列已满
buffer[pos] = val;
// release: 确保 buffer 写入对消费者可见
tail.store(next, std::memory_order_release);
return true;
}
// 消费者调用:从队列弹出元素
bool pop(T& val) {
// relaxed: 只有消费者修改 head,无需同步
size_t pos = head.load(std::memory_order_relaxed);
// acquire: 需要看到生产者对 tail 的修改
if (pos == tail.load(std::memory_order_acquire))
return false; // 队列为空
val = std::move(buffer[pos]);
size_t next = (pos + 1) % (Cap + 1);
// release: 确保 buffer 读取完成后再更新 head
head.store(next, std::memory_order_release);
return true;
}
};
内存序解释 (Memory Order Explanation)
为什么用 relaxed 加载自己的索引?
在 SPSC 场景下:
- 只有生产者写
tail - 只有消费者写
head
因此,加载自己的索引时不存在数据竞争(Data Race),使用 memory_order_relaxed 足够。
为什么用 acquire 加载对方的索引?
生产者读取 head 时,需要确保能看到消费者之前对 head 的写入。这形成了 acquire-release 同步关系(Synchronizes-With Relationship):
Consumer: head.store(next, release) ---> Producer: head.load(acquire)
同理,消费者读取 tail 也需要 acquire。
为什么用 release 存储?
存储时使用 memory_order_release,确保之前的写操作(buffer 写入或读取)对其他线程可见。
缓存行优化 (Cache Line Optimization)
伪共享问题 (False Sharing)
在上述实现中,head 和 tail 可能位于同一缓存行(Cache Line,通常 64 字节)。当生产者更新 tail 时,会导致消费者的缓存行失效;反之亦然。
这种不必要的缓存失效称为伪共享(False Sharing),会严重影响性能。
解决方案:缓存行对齐 (Cache Line Alignment)
#include <array>
#include <atomic>
#include <cstddef>
template <typename T, size_t Cap>
class SPSCQueue {
// 将 head 和 tail 放在不同的缓存行上
// 热数据放在结构体前部,便于访问
alignas(64) std::atomic<size_t> head{0};
alignas(64) std::atomic<size_t> tail{0};
// 缓冲区也对齐,避免与 tail 共享缓存行
alignas(64) std::array<T, Cap + 1> buffer;
public:
bool push(const T& val) {
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = (pos + 1) % (Cap + 1);
if (next == head.load(std::memory_order_acquire))
return false;
buffer[pos] = val;
tail.store(next, std::memory_order_release);
return true;
}
bool pop(T& val) {
size_t pos = head.load(std::memory_order_relaxed);
if (pos == tail.load(std::memory_order_acquire))
return false;
val = std::move(buffer[pos]);
size_t next = (pos + 1) % (Cap + 1);
head.store(next, std::memory_order_release);
return true;
}
};
使用 alignas(64) 确保每个原子变量独占一个缓存行。
索引缓存优化 (Index Caching Optimization)
动机 (Motivation)
每次 push 都需要读取 head(跨核访问),每次 pop 都需要读取 tail。这些原子加载开销较大。
观察关键特性:
head只会增加(消费者向前移动)tail只会增加(生产者向前移动)
因此,我们可以缓存对方的索引。即使缓存过时,也只会保守地认为"空间更少"或"数据更少",不会导致正确性问题。
实现 (Implementation)
#include <array>
#include <atomic>
#include <cstddef>
template <typename T, size_t Cap>
class SPSCQueue {
// 原子索引:用于跨线程通信
alignas(64) std::atomic<size_t> head{0};
alignas(64) std::atomic<size_t> tail{0};
// 缓存索引:本地缓存,避免频繁跨核读取
// cached_head: 生产者缓存的 head 值,仅生产者读写
// cached_tail: 消费者缓存的 tail 值,仅消费者读写
alignas(64) size_t cached_head{0};
alignas(64) size_t cached_tail{0};
alignas(64) std::array<T, Cap + 1> buffer;
public:
bool push(const T& val) {
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = (pos + 1) % (Cap + 1);
// 先检查缓存的 head
if (next == cached_head) {
// 缓存显示已满,刷新真实值
cached_head = head.load(std::memory_order_acquire);
if (next == cached_head)
return false; // 确实满了
}
// 缓存显示有空间,或刷新后有空间
buffer[pos] = val;
tail.store(next, std::memory_order_release);
return true;
}
bool pop(T& val) {
size_t pos = head.load(std::memory_order_relaxed);
// 先检查缓存的 tail
if (pos == cached_tail) {
// 缓存显示为空,刷新真实值
cached_tail = tail.load(std::memory_order_acquire);
if (pos == cached_tail)
return false; // 确实空了
}
// 缓存显示有数据,或刷新后有数据
val = std::move(buffer[pos]);
size_t next = (pos + 1) % (Cap + 1);
head.store(next, std::memory_order_release);
return true;
}
};
为什么 cached_head/cached_tail 不需要原子?
每个缓存变量只有一个线程访问:
cached_head:仅生产者读写cached_tail:仅消费者读写
没有数据竞争,无需原子操作或同步。
性能提升原理
常见情况(队列非空非满)下,只访问本地缓存变量,避免昂贵的跨核原子读取。只有当缓存"认为"队列满或空时,才刷新真实值。
为什么不用 CAS? (Why Not CAS?)
在实现自旋锁或 MPMC 队列时,我们通常使用 compare_exchange(CAS)来保证原子性。但 SPSC 不需要:
// 错误示范:SPSC 中不需要 CAS
while (!tail.compare_exchange_weak(pos, next, ...));
原因:只有一个线程修改 tail,不存在竞争。简单的 store 即可:
// 正确:直接存储
tail.store(next, std::memory_order_release);
CAS 引入不必要的开销和复杂性。
size() 方法的注意事项 (Notes on size())
size_t size() const {
size_t h = head.load(std::memory_order_acquire);
size_t t = tail.load(std::memory_order_acquire);
return (t - h + Cap + 1) % (Cap + 1);
}
需要注意:这不是一致性快照(Consistent Snapshot)。在加载 h 和 t 之间,另一个线程可能已经修改了值。返回的大小是某一时刻的近似值,不能用于同步决策。
完整实现 (Complete Implementation)
#include <array>
#include <atomic>
#include <cstddef>
template <typename T, size_t Cap>
class SPSCQueue {
// === 跨线程共享的原子变量 ===
// head: 消费者的读取位置
alignas(64) std::atomic<size_t> head{0};
// tail: 生产者的写入位置
alignas(64) std::atomic<size_t> tail{0};
// === 线程本地缓存 ===
// cached_head: 生产者缓存的 head(仅生产者访问)
alignas(64) size_t cached_head{0};
// cached_tail: 消费者缓存的 tail(仅消费者访问)
alignas(64) size_t cached_tail{0};
// === 数据存储 ===
// 容量为 Cap + 1,牺牲一个槽位区分空和满
alignas(64) std::array<T, Cap + 1> buffer;
// 辅助函数:计算下一个位置
static constexpr size_t next_pos(size_t pos) {
return (pos + 1) % (Cap + 1);
}
public:
// 生产者调用:推入元素
// 返回 true 表示成功,false 表示队列已满
bool push(const T& val) {
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = next_pos(pos);
// 检查是否已满(先用缓存,必要时刷新)
if (next == cached_head) {
cached_head = head.load(std::memory_order_acquire);
if (next == cached_head)
return false;
}
// 写入数据并发布新的 tail
buffer[pos] = val;
tail.store(next, std::memory_order_release);
return true;
}
// 支持移动语义的 push
bool push(T&& val) {
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = next_pos(pos);
if (next == cached_head) {
cached_head = head.load(std::memory_order_acquire);
if (next == cached_head)
return false;
}
buffer[pos] = std::move(val);
tail.store(next, std::memory_order_release);
return true;
}
// 消费者调用:弹出元素
// 返回 true 表示成功,false 表示队列为空
bool pop(T& val) {
size_t pos = head.load(std::memory_order_relaxed);
// 检查是否为空(先用缓存,必要时刷新)
if (pos == cached_tail) {
cached_tail = tail.load(std::memory_order_acquire);
if (pos == cached_tail)
return false;
}
// 读取数据并发布新的 head
val = std::move(buffer[pos]);
head.store(next_pos(pos), std::memory_order_release);
return true;
}
// 近似大小(非一致性快照)
size_t size() const {
size_t h = head.load(std::memory_order_acquire);
size_t t = tail.load(std::memory_order_acquire);
return (t - h + Cap + 1) % (Cap + 1);
}
// 容量
static constexpr size_t capacity() {
return Cap;
}
// 是否为空(近似)
bool empty() const {
return head.load(std::memory_order_acquire) ==
tail.load(std::memory_order_acquire);
}
};
使用示例 (Usage Example)
以下是一个完整可运行的示例,展示如何使用 SPSCQueue 在两个线程间传递数据:
#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <thread>
template <typename T, size_t Cap>
class SPSCQueue {
alignas(64) std::atomic<size_t> head{0};
alignas(64) std::atomic<size_t> tail{0};
alignas(64) size_t cached_head{0};
alignas(64) size_t cached_tail{0};
alignas(64) std::array<T, Cap + 1> buffer;
static constexpr size_t next_pos(size_t pos) {
return (pos + 1) % (Cap + 1);
}
public:
bool push(const T& val) {
size_t pos = tail.load(std::memory_order_relaxed);
size_t next = next_pos(pos);
if (next == cached_head) {
cached_head = head.load(std::memory_order_acquire);
if (next == cached_head)
return false;
}
buffer[pos] = val;
tail.store(next, std::memory_order_release);
return true;
}
bool pop(T& val) {
size_t pos = head.load(std::memory_order_relaxed);
if (pos == cached_tail) {
cached_tail = tail.load(std::memory_order_acquire);
if (pos == cached_tail)
return false;
}
val = std::move(buffer[pos]);
head.store(next_pos(pos), std::memory_order_release);
return true;
}
size_t size() const {
size_t h = head.load(std::memory_order_acquire);
size_t t = tail.load(std::memory_order_acquire);
return (t - h + Cap + 1) % (Cap + 1);
}
};
// ============== 示例 1: 基础用法 ==============
void basic_example() {
std::cout << "=== 基础用法示例 ===" << std::endl;
SPSCQueue<int, 8> queue;
std::atomic<bool> done{false};
// 生产者线程(Producer Thread)
std::thread producer([&]() {
for (int i = 1; i <= 20; ++i) {
// 自旋等待直到 push 成功
while (!queue.push(i)) {
std::this_thread::yield(); // 让出 CPU
}
std::cout << "[Producer] Pushed: " << i << std::endl;
}
done.store(true, std::memory_order_release);
});
// 消费者线程(Consumer Thread)
std::thread consumer([&]() {
int val;
int count = 0;
while (count < 20) {
if (queue.pop(val)) {
std::cout << "[Consumer] Popped: " << val << std::endl;
++count;
} else {
std::this_thread::yield();
}
}
});
producer.join();
consumer.join();
std::cout << std::endl;
}
// ============== 示例 2: 性能测试 ==============
void benchmark_example() {
std::cout << "=== 性能测试示例 ===" << std::endl;
constexpr size_t NUM_ITEMS = 10'000'000;
SPSCQueue<uint64_t, 1024> queue;
std::atomic<bool> done{false};
auto start = std::chrono::high_resolution_clock::now();
// 生产者:尽可能快地推送数据
std::thread producer([&]() {
for (uint64_t i = 0; i < NUM_ITEMS; ++i) {
while (!queue.push(i)) {
// 忙等待(Busy Wait)
}
}
done.store(true, std::memory_order_release);
});
// 消费者:尽可能快地消费数据
std::thread consumer([&]() {
uint64_t val;
uint64_t count = 0;
uint64_t sum = 0;
while (count < NUM_ITEMS) {
if (queue.pop(val)) {
sum += val;
++count;
}
}
// 验证正确性:sum 应该等于 0 + 1 + ... + (N-1)
uint64_t expected = (NUM_ITEMS - 1) * NUM_ITEMS / 2;
if (sum == expected) {
std::cout << "[Consumer] 校验通过! Sum = " << sum << std::endl;
} else {
std::cout << "[Consumer] 校验失败! Got " << sum
<< ", expected " << expected << std::endl;
}
});
producer.join();
consumer.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
double ops_per_sec = static_cast<double>(NUM_ITEMS) / duration.count() * 1000;
std::cout << "传输 " << NUM_ITEMS << " 个元素" << std::endl;
std::cout << "耗时: " << duration.count() << " ms" << std::endl;
std::cout << "吞吐量: " << ops_per_sec / 1'000'000 << " M ops/sec" << std::endl;
std::cout << std::endl;
}
// ============== 示例 3: 结构体传输 ==============
struct Order {
uint64_t id;
double price;
int quantity;
char side; // 'B' for buy, 'S' for sell
};
void struct_example() {
std::cout << "=== 结构体传输示例 ===" << std::endl;
SPSCQueue<Order, 16> order_queue;
std::atomic<bool> done{false};
// 交易引擎(Producer):生成订单
std::thread trading_engine([&]() {
for (uint64_t i = 1; i <= 5; ++i) {
Order order{
.id = i,
.price = 100.0 + i * 0.5,
.quantity = static_cast<int>(i * 100),
.side = (i % 2 == 0) ? 'B' : 'S'
};
while (!order_queue.push(order)) {
std::this_thread::yield();
}
std::cout << "[TradingEngine] 发送订单 #" << order.id << std::endl;
}
done.store(true, std::memory_order_release);
});
// 风控系统(Consumer):处理订单
std::thread risk_system([&]() {
Order order;
int count = 0;
while (count < 5) {
if (order_queue.pop(order)) {
std::cout << "[RiskSystem] 收到订单 #" << order.id
<< " | " << (order.side == 'B' ? "买入" : "卖出")
<< " | 价格: " << order.price
<< " | 数量: " << order.quantity
<< std::endl;
++count;
} else {
std::this_thread::yield();
}
}
});
trading_engine.join();
risk_system.join();
std::cout << std::endl;
}
// ============== 主函数 ==============
int main() {
basic_example();
benchmark_example();
struct_example();
std::cout << "所有示例运行完毕!" << std::endl;
return 0;
}
编译与运行 (Compile and Run)
# 使用 g++ 编译(需要 C++20 支持)
g++ -std=c++20 -O3 -pthread -o spsc_demo spsc_demo.cpp
# 运行
./spsc_demo
预期输出 (Expected Output)
=== 基础用法示例 ===
[Producer] Pushed: 1
[Consumer] Popped: 1
[Producer] Pushed: 2
[Consumer] Popped: 2
...
[Producer] Pushed: 20
[Consumer] Popped: 20
=== 性能测试示例 ===
[Consumer] 校验通过! Sum = 49999995000000
传输 10000000 个元素
耗时: 45 ms
吞吐量: 222.22 M ops/sec
=== 结构体传输示例 ===
[TradingEngine] 发送订单 #1
[RiskSystem] 收到订单 #1 | 卖出 | 价格: 100.5 | 数量: 100
[TradingEngine] 发送订单 #2
[RiskSystem] 收到订单 #2 | 买入 | 价格: 101 | 数量: 200
...
所有示例运行完毕!
总结 (Summary)
| 优化技术 | 目的 |
|---|---|
| 环形缓冲区(Ring Buffer) | 避免动态内存分配,O(1) 操作 |
| 浪费一个槽位 | 区分空和满,无需额外原子变量 |
| 缓存行对齐(alignas(64)) | 消除伪共享 |
| 索引缓存(cached_head/tail) | 减少跨核原子读取 |
| release-acquire 语义 | 保证数据可见性,避免过度同步 |
| 无 CAS | SPSC 无竞争,直接 store 即可 |
SPSC Queue 是无锁编程的入门案例,也是实际生产中高性能通信的基石。理解其设计思想后,可以进一步学习 MPMC Queue、无锁栈等更复杂的数据结构。
更多推荐


所有评论(0)