SPSC 无锁队列详解 (Lock-Free Single Producer Single Consumer Queue)

概述 (Overview)

SPSC Queue 是一种特殊的无锁队列(Lock-Free Queue),专为单生产者单消费者(Single Producer Single Consumer)场景设计。由于只有一个线程写入、一个线程读取,我们可以避免使用互斥锁(Mutex)或复杂的 CAS 循环,从而获得极高的性能。

这种数据结构在高频交易(HFT, High-Frequency Trading)、音视频处理、日志系统等低延迟场景中广泛使用。


基础实现 (Basic Implementation)

环形缓冲区设计 (Ring Buffer Design)

SPSC Queue 通常基于环形缓冲区(Circular Buffer / Ring Buffer)实现。我们使用两个索引:

  • head:消费者(Consumer)读取位置
  • tail:生产者(Producer)写入位置

为了区分队列满和队列空的状态,我们浪费一个槽位(Slot):

  • 空(Empty)head == tail
  • 满(Full)(tail + 1) % size == head
#include <array>
#include <atomic>
#include <cstddef>

template <typename T, size_t Cap>
class SPSCQueue {
  // 缓冲区大小为 Cap + 1,浪费一个槽位用于区分空和满
  std::array<T, Cap + 1> buffer;

  // head: 消费者读取位置,由消费者修改,生产者读取
  std::atomic<size_t> head{0};

  // tail: 生产者写入位置,由生产者修改,消费者读取
  std::atomic<size_t> tail{0};

public:
  // 生产者调用:将元素推入队列
  bool push(const T& val) {
    // relaxed: 只有生产者修改 tail,无需同步
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = (pos + 1) % (Cap + 1);

    // acquire: 需要看到消费者对 head 的修改
    if (next == head.load(std::memory_order_acquire))
      return false;  // 队列已满

    buffer[pos] = val;

    // release: 确保 buffer 写入对消费者可见
    tail.store(next, std::memory_order_release);
    return true;
  }

  // 消费者调用:从队列弹出元素
  bool pop(T& val) {
    // relaxed: 只有消费者修改 head,无需同步
    size_t pos = head.load(std::memory_order_relaxed);

    // acquire: 需要看到生产者对 tail 的修改
    if (pos == tail.load(std::memory_order_acquire))
      return false;  // 队列为空

    val = std::move(buffer[pos]);
    size_t next = (pos + 1) % (Cap + 1);

    // release: 确保 buffer 读取完成后再更新 head
    head.store(next, std::memory_order_release);
    return true;
  }
};

内存序解释 (Memory Order Explanation)

为什么用 relaxed 加载自己的索引?

在 SPSC 场景下:

  • 只有生产者写 tail
  • 只有消费者写 head

因此,加载自己的索引时不存在数据竞争(Data Race),使用 memory_order_relaxed 足够。

为什么用 acquire 加载对方的索引?

生产者读取 head 时,需要确保能看到消费者之前对 head 的写入。这形成了 acquire-release 同步关系(Synchronizes-With Relationship):

Consumer: head.store(next, release)  --->  Producer: head.load(acquire)

同理,消费者读取 tail 也需要 acquire。

为什么用 release 存储?

存储时使用 memory_order_release,确保之前的写操作(buffer 写入或读取)对其他线程可见。


缓存行优化 (Cache Line Optimization)

伪共享问题 (False Sharing)

在上述实现中,headtail 可能位于同一缓存行(Cache Line,通常 64 字节)。当生产者更新 tail 时,会导致消费者的缓存行失效;反之亦然。

这种不必要的缓存失效称为伪共享(False Sharing),会严重影响性能。

解决方案:缓存行对齐 (Cache Line Alignment)

#include <array>
#include <atomic>
#include <cstddef>

template <typename T, size_t Cap>
class SPSCQueue {
  // 将 head 和 tail 放在不同的缓存行上
  // 热数据放在结构体前部,便于访问
  alignas(64) std::atomic<size_t> head{0};
  alignas(64) std::atomic<size_t> tail{0};

  // 缓冲区也对齐,避免与 tail 共享缓存行
  alignas(64) std::array<T, Cap + 1> buffer;

public:
  bool push(const T& val) {
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = (pos + 1) % (Cap + 1);

    if (next == head.load(std::memory_order_acquire))
      return false;

    buffer[pos] = val;
    tail.store(next, std::memory_order_release);
    return true;
  }

  bool pop(T& val) {
    size_t pos = head.load(std::memory_order_relaxed);

    if (pos == tail.load(std::memory_order_acquire))
      return false;

    val = std::move(buffer[pos]);
    size_t next = (pos + 1) % (Cap + 1);
    head.store(next, std::memory_order_release);
    return true;
  }
};

使用 alignas(64) 确保每个原子变量独占一个缓存行。


索引缓存优化 (Index Caching Optimization)

动机 (Motivation)

每次 push 都需要读取 head(跨核访问),每次 pop 都需要读取 tail。这些原子加载开销较大。

观察关键特性:

  • head 只会增加(消费者向前移动)
  • tail 只会增加(生产者向前移动)

因此,我们可以缓存对方的索引。即使缓存过时,也只会保守地认为"空间更少"或"数据更少",不会导致正确性问题。

实现 (Implementation)

#include <array>
#include <atomic>
#include <cstddef>

template <typename T, size_t Cap>
class SPSCQueue {
  // 原子索引:用于跨线程通信
  alignas(64) std::atomic<size_t> head{0};
  alignas(64) std::atomic<size_t> tail{0};

  // 缓存索引:本地缓存,避免频繁跨核读取
  // cached_head: 生产者缓存的 head 值,仅生产者读写
  // cached_tail: 消费者缓存的 tail 值,仅消费者读写
  alignas(64) size_t cached_head{0};
  alignas(64) size_t cached_tail{0};

  alignas(64) std::array<T, Cap + 1> buffer;

public:
  bool push(const T& val) {
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = (pos + 1) % (Cap + 1);

    // 先检查缓存的 head
    if (next == cached_head) {
      // 缓存显示已满,刷新真实值
      cached_head = head.load(std::memory_order_acquire);
      if (next == cached_head)
        return false;  // 确实满了
    }
    // 缓存显示有空间,或刷新后有空间

    buffer[pos] = val;
    tail.store(next, std::memory_order_release);
    return true;
  }

  bool pop(T& val) {
    size_t pos = head.load(std::memory_order_relaxed);

    // 先检查缓存的 tail
    if (pos == cached_tail) {
      // 缓存显示为空,刷新真实值
      cached_tail = tail.load(std::memory_order_acquire);
      if (pos == cached_tail)
        return false;  // 确实空了
    }
    // 缓存显示有数据,或刷新后有数据

    val = std::move(buffer[pos]);
    size_t next = (pos + 1) % (Cap + 1);
    head.store(next, std::memory_order_release);
    return true;
  }
};

为什么 cached_head/cached_tail 不需要原子?

每个缓存变量只有一个线程访问:

  • cached_head:仅生产者读写
  • cached_tail:仅消费者读写

没有数据竞争,无需原子操作或同步。

性能提升原理

常见情况(队列非空非满)下,只访问本地缓存变量,避免昂贵的跨核原子读取。只有当缓存"认为"队列满或空时,才刷新真实值。


为什么不用 CAS? (Why Not CAS?)

在实现自旋锁或 MPMC 队列时,我们通常使用 compare_exchange(CAS)来保证原子性。但 SPSC 不需要:

// 错误示范:SPSC 中不需要 CAS
while (!tail.compare_exchange_weak(pos, next, ...));

原因:只有一个线程修改 tail,不存在竞争。简单的 store 即可:

// 正确:直接存储
tail.store(next, std::memory_order_release);

CAS 引入不必要的开销和复杂性。


size() 方法的注意事项 (Notes on size())

size_t size() const {
  size_t h = head.load(std::memory_order_acquire);
  size_t t = tail.load(std::memory_order_acquire);
  return (t - h + Cap + 1) % (Cap + 1);
}

需要注意:这不是一致性快照(Consistent Snapshot)。在加载 ht 之间,另一个线程可能已经修改了值。返回的大小是某一时刻的近似值,不能用于同步决策。


完整实现 (Complete Implementation)

#include <array>
#include <atomic>
#include <cstddef>

template <typename T, size_t Cap>
class SPSCQueue {
  // === 跨线程共享的原子变量 ===
  // head: 消费者的读取位置
  alignas(64) std::atomic<size_t> head{0};
  // tail: 生产者的写入位置
  alignas(64) std::atomic<size_t> tail{0};

  // === 线程本地缓存 ===
  // cached_head: 生产者缓存的 head(仅生产者访问)
  alignas(64) size_t cached_head{0};
  // cached_tail: 消费者缓存的 tail(仅消费者访问)
  alignas(64) size_t cached_tail{0};

  // === 数据存储 ===
  // 容量为 Cap + 1,牺牲一个槽位区分空和满
  alignas(64) std::array<T, Cap + 1> buffer;

  // 辅助函数:计算下一个位置
  static constexpr size_t next_pos(size_t pos) {
    return (pos + 1) % (Cap + 1);
  }

public:
  // 生产者调用:推入元素
  // 返回 true 表示成功,false 表示队列已满
  bool push(const T& val) {
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = next_pos(pos);

    // 检查是否已满(先用缓存,必要时刷新)
    if (next == cached_head) {
      cached_head = head.load(std::memory_order_acquire);
      if (next == cached_head)
        return false;
    }

    // 写入数据并发布新的 tail
    buffer[pos] = val;
    tail.store(next, std::memory_order_release);
    return true;
  }

  // 支持移动语义的 push
  bool push(T&& val) {
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = next_pos(pos);

    if (next == cached_head) {
      cached_head = head.load(std::memory_order_acquire);
      if (next == cached_head)
        return false;
    }

    buffer[pos] = std::move(val);
    tail.store(next, std::memory_order_release);
    return true;
  }

  // 消费者调用:弹出元素
  // 返回 true 表示成功,false 表示队列为空
  bool pop(T& val) {
    size_t pos = head.load(std::memory_order_relaxed);

    // 检查是否为空(先用缓存,必要时刷新)
    if (pos == cached_tail) {
      cached_tail = tail.load(std::memory_order_acquire);
      if (pos == cached_tail)
        return false;
    }

    // 读取数据并发布新的 head
    val = std::move(buffer[pos]);
    head.store(next_pos(pos), std::memory_order_release);
    return true;
  }

  // 近似大小(非一致性快照)
  size_t size() const {
    size_t h = head.load(std::memory_order_acquire);
    size_t t = tail.load(std::memory_order_acquire);
    return (t - h + Cap + 1) % (Cap + 1);
  }

  // 容量
  static constexpr size_t capacity() {
    return Cap;
  }

  // 是否为空(近似)
  bool empty() const {
    return head.load(std::memory_order_acquire) ==
           tail.load(std::memory_order_acquire);
  }
};

使用示例 (Usage Example)

以下是一个完整可运行的示例,展示如何使用 SPSCQueue 在两个线程间传递数据:

#include <array>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <thread>

template <typename T, size_t Cap>
class SPSCQueue {
  alignas(64) std::atomic<size_t> head{0};
  alignas(64) std::atomic<size_t> tail{0};
  alignas(64) size_t cached_head{0};
  alignas(64) size_t cached_tail{0};
  alignas(64) std::array<T, Cap + 1> buffer;

  static constexpr size_t next_pos(size_t pos) {
    return (pos + 1) % (Cap + 1);
  }

public:
  bool push(const T& val) {
    size_t pos = tail.load(std::memory_order_relaxed);
    size_t next = next_pos(pos);

    if (next == cached_head) {
      cached_head = head.load(std::memory_order_acquire);
      if (next == cached_head)
        return false;
    }

    buffer[pos] = val;
    tail.store(next, std::memory_order_release);
    return true;
  }

  bool pop(T& val) {
    size_t pos = head.load(std::memory_order_relaxed);

    if (pos == cached_tail) {
      cached_tail = tail.load(std::memory_order_acquire);
      if (pos == cached_tail)
        return false;
    }

    val = std::move(buffer[pos]);
    head.store(next_pos(pos), std::memory_order_release);
    return true;
  }

  size_t size() const {
    size_t h = head.load(std::memory_order_acquire);
    size_t t = tail.load(std::memory_order_acquire);
    return (t - h + Cap + 1) % (Cap + 1);
  }
};

// ============== 示例 1: 基础用法 ==============

void basic_example() {
  std::cout << "=== 基础用法示例 ===" << std::endl;

  SPSCQueue<int, 8> queue;
  std::atomic<bool> done{false};

  // 生产者线程(Producer Thread)
  std::thread producer([&]() {
    for (int i = 1; i <= 20; ++i) {
      // 自旋等待直到 push 成功
      while (!queue.push(i)) {
        std::this_thread::yield();  // 让出 CPU
      }
      std::cout << "[Producer] Pushed: " << i << std::endl;
    }
    done.store(true, std::memory_order_release);
  });

  // 消费者线程(Consumer Thread)
  std::thread consumer([&]() {
    int val;
    int count = 0;

    while (count < 20) {
      if (queue.pop(val)) {
        std::cout << "[Consumer] Popped: " << val << std::endl;
        ++count;
      } else {
        std::this_thread::yield();
      }
    }
  });

  producer.join();
  consumer.join();

  std::cout << std::endl;
}

// ============== 示例 2: 性能测试 ==============

void benchmark_example() {
  std::cout << "=== 性能测试示例 ===" << std::endl;

  constexpr size_t NUM_ITEMS = 10'000'000;
  SPSCQueue<uint64_t, 1024> queue;
  std::atomic<bool> done{false};

  auto start = std::chrono::high_resolution_clock::now();

  // 生产者:尽可能快地推送数据
  std::thread producer([&]() {
    for (uint64_t i = 0; i < NUM_ITEMS; ++i) {
      while (!queue.push(i)) {
        // 忙等待(Busy Wait)
      }
    }
    done.store(true, std::memory_order_release);
  });

  // 消费者:尽可能快地消费数据
  std::thread consumer([&]() {
    uint64_t val;
    uint64_t count = 0;
    uint64_t sum = 0;

    while (count < NUM_ITEMS) {
      if (queue.pop(val)) {
        sum += val;
        ++count;
      }
    }

    // 验证正确性:sum 应该等于 0 + 1 + ... + (N-1)
    uint64_t expected = (NUM_ITEMS - 1) * NUM_ITEMS / 2;
    if (sum == expected) {
      std::cout << "[Consumer] 校验通过! Sum = " << sum << std::endl;
    } else {
      std::cout << "[Consumer] 校验失败! Got " << sum
                << ", expected " << expected << std::endl;
    }
  });

  producer.join();
  consumer.join();

  auto end = std::chrono::high_resolution_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

  double ops_per_sec = static_cast<double>(NUM_ITEMS) / duration.count() * 1000;
  std::cout << "传输 " << NUM_ITEMS << " 个元素" << std::endl;
  std::cout << "耗时: " << duration.count() << " ms" << std::endl;
  std::cout << "吞吐量: " << ops_per_sec / 1'000'000 << " M ops/sec" << std::endl;
  std::cout << std::endl;
}

// ============== 示例 3: 结构体传输 ==============

struct Order {
  uint64_t id;
  double price;
  int quantity;
  char side;  // 'B' for buy, 'S' for sell
};

void struct_example() {
  std::cout << "=== 结构体传输示例 ===" << std::endl;

  SPSCQueue<Order, 16> order_queue;
  std::atomic<bool> done{false};

  // 交易引擎(Producer):生成订单
  std::thread trading_engine([&]() {
    for (uint64_t i = 1; i <= 5; ++i) {
      Order order{
        .id = i,
        .price = 100.0 + i * 0.5,
        .quantity = static_cast<int>(i * 100),
        .side = (i % 2 == 0) ? 'B' : 'S'
      };

      while (!order_queue.push(order)) {
        std::this_thread::yield();
      }

      std::cout << "[TradingEngine] 发送订单 #" << order.id << std::endl;
    }
    done.store(true, std::memory_order_release);
  });

  // 风控系统(Consumer):处理订单
  std::thread risk_system([&]() {
    Order order;
    int count = 0;

    while (count < 5) {
      if (order_queue.pop(order)) {
        std::cout << "[RiskSystem] 收到订单 #" << order.id
                  << " | " << (order.side == 'B' ? "买入" : "卖出")
                  << " | 价格: " << order.price
                  << " | 数量: " << order.quantity
                  << std::endl;
        ++count;
      } else {
        std::this_thread::yield();
      }
    }
  });

  trading_engine.join();
  risk_system.join();

  std::cout << std::endl;
}

// ============== 主函数 ==============

int main() {
  basic_example();
  benchmark_example();
  struct_example();

  std::cout << "所有示例运行完毕!" << std::endl;
  return 0;
}

编译与运行 (Compile and Run)

# 使用 g++ 编译(需要 C++20 支持)
g++ -std=c++20 -O3 -pthread -o spsc_demo spsc_demo.cpp

# 运行
./spsc_demo

预期输出 (Expected Output)

=== 基础用法示例 ===
[Producer] Pushed: 1
[Consumer] Popped: 1
[Producer] Pushed: 2
[Consumer] Popped: 2
...
[Producer] Pushed: 20
[Consumer] Popped: 20

=== 性能测试示例 ===
[Consumer] 校验通过! Sum = 49999995000000
传输 10000000 个元素
耗时: 45 ms
吞吐量: 222.22 M ops/sec

=== 结构体传输示例 ===
[TradingEngine] 发送订单 #1
[RiskSystem] 收到订单 #1 | 卖出 | 价格: 100.5 | 数量: 100
[TradingEngine] 发送订单 #2
[RiskSystem] 收到订单 #2 | 买入 | 价格: 101 | 数量: 200
...

所有示例运行完毕!

总结 (Summary)

优化技术 目的
环形缓冲区(Ring Buffer) 避免动态内存分配,O(1) 操作
浪费一个槽位 区分空和满,无需额外原子变量
缓存行对齐(alignas(64)) 消除伪共享
索引缓存(cached_head/tail) 减少跨核原子读取
release-acquire 语义 保证数据可见性,避免过度同步
无 CAS SPSC 无竞争,直接 store 即可

SPSC Queue 是无锁编程的入门案例,也是实际生产中高性能通信的基石。理解其设计思想后,可以进一步学习 MPMC Queue、无锁栈等更复杂的数据结构。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐