在上一篇文章中,我们探讨了“当队列为空时,消费者可以通过 WaitStrategy 优雅地等待”。今天,我们将实现这个队列的本体——一个基于链表的高性能无界无锁队列(UnboundedQueue)

如果你阅读过百度 Apollo CyberRT 的底层源码,你会发现这种结构在消息分发、对象池回收等高并发场景中被高频使用。

写无锁队列就像是在刀尖上跳舞,稍不留神就会踩中“ABA 问题”、“内存泄漏”或者“内存指令重排(Memory Reordering)”的陷阱。

无界队列的最大痛点,在于复杂的动态内存管理。 因为它在运行时涉及到不断地 new/delete 节点,并发时极容易造成 Use-After-Free (UAF) 或野指针访问,最终引发 Core Dump。因此,这篇文章的核心,就是教你如何使用“引用计数 (Ref Counting)”和指针 CAS 来彻底驯服这些内存幽灵。

接下来,我们将硬核拆解这个无锁队列的实现细节。


一、 核心设计:为什么选择链表?

由于是**无界(Unbounded)**队列,其大小理论上只受限于机器物理内存,这意味我们无法像在“有界队列(如 RingBuffer)”中那样预先分配一块连续的数组。我们只能采用“动态节点分配”的单向链表结构:

  • Head 指针:指向队列的哨兵节点(Dummy Node)或头节点,用于出队(Dequeue)。
  • Tail 指针:指向队列的末尾节点,用于入队(Enqueue)。

为了支持多线程并发,我们对 head_tail_ 以及节点内部的 next 指针,一律使用 std::atomic 并配以 CAS (Compare-And-Swap) 操作。


二、 破局点:基于引用计数的内存安全释放

在无锁编程中,比“多线程入队”更难的是**“多线程如何安全回收内存”**。
试想:如果线程 A 刚刚完成出队并准备 delete 旧头节点,而线程 B 此时因为调度延迟,还在读取这个其实已经被释放的旧节点的 next 指针,就会触发严重的 Use-After-Free (UAF) 段错误崩溃!

为了解决这个问题,本代码采用了一种经典的“引用计数 (Ref Counting)”模型

struct Node {
    T data;
    std::atomic<uint32_t> ref_count;
    std::atomic<Node*> next; // 保证多线程并发读取 next 时的可见性
    
    Node() : next(nullptr) { 
        // 关键点:初始计数为 2
        // 表示有 2 个“逻辑所有者”:
        // 1 个代表队列本身结构持有着该节点(链表链接)
        // 1 个代表外部可能有线程(如入队/出队线程)正在操作它
        ref_count.store(2, std::memory_order_relaxed); 
    } 
    
    void release() {
        // 使用 release,并在归零时加上 acquire 内存屏障,保证对象析构的安全
        if (ref_count.fetch_sub(1, std::memory_order_release) == 1) {
            std::atomic_thread_fence(std::memory_order_acquire);
            delete this;
        }
    }
};

当节点出队时,我们不直接 delete 它,而是调用它内部的 release() 函数减去一层计数。只有当所有持有它的逻辑引用(包括尾指针的引用)都被释放时,物理内存才会被真正销毁。


三、 CAS 的艺术:入队与出队

结合 C++11 的内存模型(Memory Order),我们来看看核心逻辑如何避开各种坑。

1. 入队 (Enqueue) - 乐观自旋

void Enqueue(const T& element) {
    auto node = new Node();
    node->data = element;
    Node* old_tail = tail_.load(std::memory_order_acquire);

    // 核心循环:将新的 node CAS 替换到 tail_ 上
    while (!tail_.compare_exchange_weak(old_tail, node, std::memory_order_acq_rel, std::memory_order_acquire)) {
        // 使用 weak 比 strong 性能更好,偶尔 spurious fail 会由于外层 while 继续重试
    }
    // ------ 此时我们独占了把老 tail 往新 node 连线的过程 ------
    
    old_tail->next.store(node, std::memory_order_release); 
    old_tail->release(); // 回收原 tail 的逻辑身份引用
    size_.fetch_add(1, std::memory_order_relaxed);
}

亮点:在这个算法中,生产者是**Wait-Free (无等待)**的局部退化版本。生产者通过 CAS 直接霸占 tail_ 位置,然后从容不迫地将 old_tail->next 接过来。

2. 出队 (Dequeue)

bool Dequeue(T& element) {
    Node* old_head = head_.load(std::memory_order_acquire);
    Node* head_next = nullptr;
    do {
        head_next = old_head->next.load(std::memory_order_acquire);
        if (head_next == nullptr) {
            return false; // 队列为空
        }
    } while (!head_.compare_exchange_weak(old_head, head_next, std::memory_order_acq_rel, std::memory_order_acquire));
    
    // 取出数据
    element = head_next->data; 
    size_.fetch_sub(1, std::memory_order_relaxed);
    
    old_head->release(); // 剥夺老头节点的身份,如果它是该节点的最后一个引用,则物理回收
    return true;
}

四、 联动:当无锁队列遇上 WaitStrategy

你可能会问:“当上面的 Dequeue 因为 head_next == nullptr 返回 false 时,外部业务会怎么办?”

直接返回给业务去死循环调用 Dequeue() 是极不优雅的,这时候我们上一篇文章讲的 由多态实现的 WaitStrategy 等待策略 就完全派上用场了!

在一个完整的“生产者-消费者”引擎封装中,你通常会看到这样的联动代码:

T data;
// 一直取,直到取到为止
while (!queue.Dequeue(data)) {
    // 根据业务注入的不同的多态等待策略执行等待
    // 例如是 YieldWaitStrategy, SleepWaitStrategy 还是超时阻塞
    wait_strategy_->EmptyWait(); 
}
// 走到这里,代表安全拿到了 data
Process(data);

将底层的“纯粹状态(空还是非空)”与上层业务的“处理动作(阻塞还是自旋)”完美分离,这就是高级并发组件的架构魅力。

总结与预告

手写无锁队列并不简单,除了理解 CAS,你还必须像本文展示的那样,通过 std::atomic 强制保证 next 的多核可见性,并通过经典的引用计数彻底消灭了链表并发释放带来的 Use-After-Free 痛点。在这个地基打好后,结合之前封装的 WaitStrategy,我们就拥有了一个极为健壮的底层投递引擎。

但你发现了吗?无论我们的 CAS 怎么优化,基于链表的无界队列在入队时始终无法避免 new Node() 带来的动态内存分配开销。在普通场景下这毫无问题,但对于追求极限纳秒级延迟的系统(如高频交易/自动驾驶)来说,这依然是致命的。

为了彻底消除运行时的 new/delete 开销,我们需要一种内存预分配的连续介质
在下一篇文章中,我们将抛弃链表,直面无锁编程皇冠上的明珠——手写基于数组的有界无锁环形队列(Lock-Free Ring Buffer),敬请期待!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐