CppCon 2024 学习:Multi Producer, Multi Consumer, Lock Free, Atomic Queue User APl and Implementation
理解:下面是一个简化版 C++ MPMC 队列示例,带注释:注释解析哑节点 (dummy node)用于简化边界条件处理, 和都指向哑节点开始。数学上可以理解为:head=tail=Node0\text{head} = \text{tail} = \text{Node}_0head=tail=Node0原子操作 (atomic)保证在多线程环境下队尾更新不会产生数据竞争。atomic_excha
背景说明
● Trading platform - every message is tracked, at microseconds resolution
● Queues are essential data structure used to transfer messages from one component to another
● Business decisions depends on the message content and arrival time
理解:
- 交易平台消息追踪
在交易平台中,每条消息都需要被追踪,而且精度达到 微秒级。也就是说系统对时间非常敏感,消息处理的延迟直接影响业务决策。- 微秒精度可以用 t arrival t_\text{arrival} tarrival 表示消息到达时间:
t arrival ∈ R , 单位: 微秒 t_\text{arrival} \in \mathbb{R},\quad \text{单位: 微秒} tarrival∈R,单位: 微秒
- 微秒精度可以用 t arrival t_\text{arrival} tarrival 表示消息到达时间:
- 队列是核心数据结构
队列(Queue)是连接系统各组件的桥梁,负责消息在不同模块之间的传递。- 如果有多个生产者(Producers)和多个消费者(Consumers),队列必须支持 MPMC(Multi-Producer Multi-Consumer)操作。
- 对延迟敏感的系统通常会使用 lock-free(无锁)队列,避免线程阻塞,提高吞吐量。
- 业务决策依赖消息内容和时间
- 不只是消息本身重要,消息的 到达时间 t arrival t_\text{arrival} tarrival 同样关键。
- 比如,高频交易策略可能依据先到先处理的原则执行买卖决策。
代码示例(Lock-free MPMC Queue)
下面是一个简化版 C++ MPMC 队列示例,带注释:
#include <atomic>
#include <vector>
#include <optional>
template<typename T>
class LockFreeMPMCQueue {
private:
struct Node {
T value;
std::atomic<Node*> next; // 指向下一个节点
Node(T val) : value(val), next(nullptr) {}
};
std::atomic<Node*> head; // 队头
std::atomic<Node*> tail; // 队尾
public:
LockFreeMPMCQueue() {
Node* dummy = new Node(T{}); // 哑节点,简化队列操作
head.store(dummy);
tail.store(dummy);
}
~LockFreeMPMCQueue() {
while (Node* n = head.load()) {
head.store(n->next);
delete n;
}
}
// 入队操作(生产者使用)
void enqueue(T value) {
Node* node = new Node(value);
Node* prevTail = tail.exchange(node); // 原子交换尾节点
prevTail->next.store(node); // 将前一个尾节点指向新节点
}
// 出队操作(消费者使用)
std::optional<T> dequeue() {
Node* h = head.load();
Node* next = h->next.load();
if (next == nullptr) return {}; // 队列为空
T value = next->value;
head.store(next); // 移动头节点
delete h; // 删除旧头节点
return value;
}
};
注释解析
- 哑节点 (dummy node)
- 用于简化边界条件处理,
head和tail都指向哑节点开始。 - 数学上可以理解为:
head = tail = Node 0 \text{head} = \text{tail} = \text{Node}_0 head=tail=Node0
- 用于简化边界条件处理,
- 原子操作 (atomic)
tail.exchange(node)保证在多线程环境下队尾更新不会产生数据竞争。- atomic_exchange \text{atomic\_exchange} atomic_exchange 确保:
∀ i , j , i ≠ j : tail i ≠ tail j \forall i, j,\ i \neq j: \text{tail}_i \neq \text{tail}_j ∀i,j, i=j:taili=tailj
- 入队和出队保证顺序一致
- 先进先出(FIFO),满足微秒级消息顺序的要求。
总结
- Lock-free MPMC 队列 是高频交易系统的核心组件,保证多生产者、多消费者之间的 高吞吐、低延迟。
- 每条消息不仅需要被处理,还需要被准确追踪到 微秒级时间。
- 数学上,队列保证了顺序 x 1 → x 2 → ⋯ → x n x_1 \rightarrow x_2 \rightarrow \dots \rightarrow x_n x1→x2→⋯→xn,并通过原子操作确保线程安全。
这个 SVG 图展示了 Lock-free MPMC 队列中消息的流动和时间关系:
图示的概念解析
这张图描述的是 多生产者多消费者(MPMC)队列中消息的流动与时间特性,在高频交易或低延迟系统中非常典型。
1. 生产者(Producers)
- 概念:左侧的多个生产者代表消息来源,可以是交易订单生成模块、行情数据源或者其他事件触发器。
- 特点:
- 多线程并发产生消息。
- 每条消息都有 生成时间 t in t_\text{in} tin。
- 数学表示:
P i = m i 1 , m i 2 , … , m i n , i = 1 , … , N P_i = { m_{i1}, m_{i2}, \dots, m_{in} },\quad i = 1,\dots,N Pi=mi1,mi2,…,min,i=1,…,N
其中 m i j m_{ij} mij 表示第 i i i 个生产者生成的第 j j j 条消息,生成时间为 t in i j t_\text{in}^{ij} tinij。
2. 队列(Queue)
- 概念:中间的 Queue 是核心缓冲区,负责连接生产者和消费者。
- 特点:
- 支持 多生产者多消费者(MPMC)。
- Lock-free 设计保证高并发下低延迟。
- 保证消息 先进先出(FIFO)顺序。
- 数学模型:
- 队列内消息顺序:
Queue = m 1 , m 2 , … , m n , m 1 最早入队 \text{Queue} = { m_1, m_2, \dots, m_n },\quad m_1 \text{最早入队} Queue=m1,m2,…,mn,m1最早入队 - 消息在队列中的等待时间(队列延迟):
t q = t dequeue − t enqueue t_q = t_\text{dequeue} - t_\text{enqueue} tq=tdequeue−tenqueue
- 队列内消息顺序:
3. 消费者(Consumers)
- 概念:右侧的消费者代表消息处理模块,例如交易策略、风控系统或日志处理。
- 特点:
- 可以并行消费队列中的消息。
- 消费顺序依赖队列顺序。
- 消息处理时间记为 t out t_\text{out} tout。
- 数学表示:
C j = f j ( Queue ) , j = 1 , … , M C_j = f_j(\text{Queue}),\quad j = 1,\dots,M Cj=fj(Queue),j=1,…,M
其中 f j f_j fj 表示第 j j j 个消费者的处理函数。
4. 消息流(Message Flow)
- 概念:
- 消息从生产者流入队列,再由队列分发到消费者。
- 流动的箭头体现了 并发、多路径和异步。
- 延迟计算:
Δ t = t out − t in \Delta t = t_\text{out} - t_\text{in} Δt=tout−tin - 意义:
- 在高频交易中, Δ t \Delta t Δt 越小,决策越及时。
- 队列的设计直接影响系统的吞吐量和延迟。
5. 时间轴(Time Axis)
- 概念:
- 底部的横轴表示 消息时间流向。
- 左端 t in t_\text{in} tin:消息产生时间。
- 右端 t out t_\text{out} tout:消息消费完成时间。
- 直观理解:
- 队列是消息在 t in → t out t_\text{in} \rightarrow t_\text{out} tin→tout 之间的 暂存器。
- 消息在队列停留的时间就是系统的 队列延迟 t q t_q tq。
6. 系统意义
- 微秒级消息追踪:
- 每条消息不仅内容重要,生成和消费的时间也是关键。
- 时间精度通常在微秒级别,用 t in , t out t_\text{in}, t_\text{out} tin,tout 表示。
- 并发与吞吐量:
- 多生产者、多消费者同时操作,队列必须 无锁设计,保证吞吐量和顺序。
- 业务决策依赖消息顺序和时间:
- 交易策略可能根据先到消息优先执行。
- 队列延迟直接影响策略执行效果。
7. 核心公式总结
- 消息延迟:
Δ t = t out − t in \Delta t = t_\text{out} - t_\text{in} Δt=tout−tin - 队列等待时间:
t q = t dequeue − t enqueue t_q = t_\text{dequeue} - t_\text{enqueue} tq=tdequeue−tenqueue - 消息顺序:
Queue = m 1 , m 2 , … , m n ( FIFO ) \text{Queue} = { m_1, m_2, \dots, m_n } \quad (\text{FIFO}) Queue=m1,m2,…,mn(FIFO)
1. 队列的核心作用
Queues transfer messages and synchronize thread
- 队列不仅 传递消息,还 同步线程。
- 在多线程环境下,队列可以避免线程直接阻塞等待,降低竞争,提高吞吐量。
- 数学上可以抽象为:
Q : P → C Q : P \rightarrow C Q:P→C
其中 P P P 是生产者线程集合, C C C 是消费者线程集合。消息从 P P P 到 C C C 的映射由队列保证顺序。
2. 使用场景解析
(1) 消息源到线程池(Message Source → Thread Pool)
- 概念:
消息源产生大量任务,需要线程池异步处理。队列在这里起到 缓冲和调度 的作用。 - 模型:
ThreadPoolQueue = m 1 , m 2 , … , m n , m i 等待线程处理 \text{ThreadPoolQueue} = { m_1, m_2, \dots, m_n }, \quad m_i \text{等待线程处理} ThreadPoolQueue=m1,m2,…,mn,mi等待线程处理 - 示例代码片段(C++):
LockFreeMPMCQueue<Task> taskQueue;
// 生产者线程
taskQueue.enqueue(Task{ "task1" });
// 线程池中的消费者线程
auto taskOpt = taskQueue.dequeue();
if (taskOpt) {
processTask(*taskOpt);
}
- 注释:
- 队列缓冲任务,线程池异步消费,避免线程频繁阻塞。
(2) 缓冲突发消息(Buffer to handle bursts)
- 概念:
队列可以作为 消息缓冲区,平滑处理高峰流量。 - 数学公式:
burst = ∑ i = 1 N m i \text{burst} = \sum_{i=1}^{N} m_i burst=i=1∑Nmi
Q size ( t ) ≥ max burst size Q_\text{size}(t) \ge \text{max burst size} Qsize(t)≥max burst size - 含义:队列大小应至少能够容纳峰值消息数,避免丢失消息。
(3) 对象池(Object Pool)共享线程使用
- 概念:
队列可以管理 可重用对象池,多个线程共享对象而不频繁创建/销毁。 - 示例:
struct Connection { int id; /* ... */ };
LockFreeMPMCQueue<Connection*> connPool;
// 初始化对象池
for(int i=0;i<10;i++) connPool.enqueue(new Connection{i});
// 线程获取对象使用
auto connOpt = connPool.dequeue();
if (connOpt) {
useConnection(*connOpt);
connPool.enqueue(*connOpt); // 使用后归还池
}
- 优势:
- 降低对象创建销毁开销。
- 提高系统性能,尤其在高并发场景。
(4) 进程间消息队列(IPC: Inter-Process Communication)
- 概念:
队列不仅在同一进程内使用,也可用于 进程间通信。 - 模型:
Q IPC : P 1 → P 2 Q_\text{IPC} : P_1 \rightarrow P_2 QIPC:P1→P2 - 说明:
- P 1 P_1 P1 和 P 2 P_2 P2 是不同进程。
- 队列通过共享内存或环形缓冲区实现消息传递。
- Lock-free 设计减少上下文切换延迟。
3. 总结
- 队列不仅传递消息,还同步线程。
- 应用场景:
- 消息源 → 线程池
- 缓冲突发消息
- 线程共享对象池
- 进程间消息队列(IPC)
- 公式抽象:
Queue : Producer/Source → Consumer/Thread/Process \text{Queue} : \text{Producer/Source} \rightarrow \text{Consumer/Thread/Process} Queue:Producer/Source→Consumer/Thread/Process
t q = t dequeue − t enqueue , Δ t = t out − t in t_q = t_\text{dequeue} - t_\text{enqueue}, \quad \Delta t = t_\text{out} - t_\text{in} tq=tdequeue−tenqueue,Δt=tout−tin - 代码实践:
- 使用
enqueue添加消息/对象 - 使用
dequeue消费消息/对象 - 无锁队列确保高并发下低延迟和线程安全
- 使用
1. 生产者/消费者数量分类
Number of producers / consumers: SPSC, SPMC, MPSC, MPMC
- SPSC (Single Producer Single Consumer):单生产者、单消费者
- 最简单的场景,无需复杂的原子操作。
- 消息流:
P → Q u e u e C P \xrightarrow{Queue} C PQueueC
- SPMC (Single Producer Multiple Consumers):单生产者、多消费者
- 消费端需要原子操作或 CAS(Compare-And-Swap)保证线程安全。
- MPSC (Multiple Producers Single Consumer):多生产者、单消费者
- 生产端需要原子操作,消费端可以直接读。
- MPMC (Multiple Producers Multiple Consumers):多生产者、多消费者
- 生产端和消费端都需要原子操作,最复杂也最通用。
2. 队列容量(Capacity)
Bounded / dynamic & memory allocation
- Bounded(有界队列):
- 队列容量固定,适合延迟敏感系统。
- 数学表示:
∣ Q ∣ ≤ N max |Q| \le N_\text{max} ∣Q∣≤Nmax - 优点:无额外分配,延迟可控。
- Dynamic(动态队列):
- 容量可扩展,支持内存动态分配。
- 优点:不丢消息;缺点:可能增加延迟。
3. 序列化(Serialization)
Strict global order or relaxed per producer order
- 严格全局顺序:
- 队列保证所有消息严格按入队顺序出队。
- 数学:
m i enqueue order < m j ⟹ m i dequeue order < m j m_i \text{ enqueue order } < m_j \implies m_i \text{ dequeue order } < m_j mi enqueue order <mj⟹mi dequeue order <mj
- 放宽顺序(per producer order):
- 保证同一个生产者的消息顺序,跨生产者的消息可能乱序。
- 优点:降低同步成本,提高吞吐。
4. API 设计
single item / multiple items, atomicity-ready or not
- 单条消息 API:
queue.enqueue(msg); // 推送单条消息
auto msg = queue.dequeue(); // 弹出单条消息
- 批量消息 API:
queue.enqueue_bulk(vec); // 批量入队
auto vec = queue.dequeue_bulk(n); // 批量出队
- 原子性准备(atomicity-ready):
- 支持多线程同时入队/出队而不加锁。
5. 消息大小(Message size)
fixed / dynamic
- 固定大小:
- 内存连续、效率高。
- 适合高频交易、延迟敏感系统。
- 动态大小:
- 内存分配灵活,支持变长数据。
- 可能产生内存分配延迟。
6. 基本操作(Push / Pop)
Queue - push(back / tail), pop(front / head)
- 入队(push back / tail):
Q new = Q old ∪ m Q_\text{new} = Q_\text{old} \cup {m} Qnew=Qold∪m - 出队(pop front / head):
m = dequeue ( Q ) , Q new = Q old ∖ m m = \text{dequeue}(Q), \quad Q_\text{new} = Q_\text{old} \setminus {m} m=dequeue(Q),Qnew=Qold∖m
7. 等待策略(Polling vs System Calls)
Busy polling on full / empty vs. sync using system calls
- 忙轮询 (Busy polling):
- 消费者/生产者不断检查队列状态。
- 优点:延迟低,适合微秒级系统。
- 缺点:CPU 占用高。
- 系统调用同步:
- 使用
mutex/condvar或semaphore。 - 优点:CPU 占用低。
- 缺点:延迟可能不可控。
- 使用
8. 阻塞策略(Blocking / Non-blocking / Wait-free)
- Blocking 队列:
- 队列空或满时,线程阻塞等待。
- Non-blocking 队列:
- 队列空或满时立即返回,线程可以继续做其他事情。
- Wait-free 队列:
- 保证每个操作在有限步内完成,无阻塞。
9. 数据所有权(Data ownership)
Data ownership or just value propagation
- 所有权转移:
- 入队后队列拥有消息,消费完成后释放。
- 仅值传递:
- 队列只传播消息引用,原对象仍由生产者管理。
- 代码示例:
// 所有权转移
queue.enqueue(std::move(msg)); // Queue 接管 msg 所有权
// 值传递
queue.enqueue(msg); // Queue 仅拷贝 msg
总结公式化
- 消息顺序:
m 1 , m 2 , . . . , m n FIFO m_1, m_2, ..., m_n \quad \text{FIFO} m1,m2,...,mnFIFO - 消息延迟:
Δ t = t out − t in , t q = t dequeue − t enqueue \Delta t = t_\text{out} - t_\text{in}, \quad t_q = t_\text{dequeue} - t_\text{enqueue} Δt=tout−tin,tq=tdequeue−tenqueue - 队列容量约束:
∣ Q ∣ ≤ N max (bounded) , ∣ Q ∣ 可动态扩展 |Q| \le N_\text{max} \quad \text{(bounded)}, \quad |Q| \text{可动态扩展} ∣Q∣≤Nmax(bounded),∣Q∣可动态扩展
整体理解:
- 队列分类从 生产者/消费者数量、容量、消息顺序、API、消息大小、操作方式、阻塞策略、数据所有权 八个维度区分。
- 不同场景选择不同队列类型:
- 高频交易 → 固定大小、无锁、SPSC 或 MPMC、忙轮询。
- 通用应用 → 动态大小、阻塞、MPSC/SPMC、系统调用同步。
1. 背景理解
- 该代码展示了一个 无锁队列 在高并发场景下可能出现的 调度器影响问题(Scheduler Interaction)。
- 关键点:
- 队列使用 原子索引
writeIndex和readIndex管理生产者和消费者。 - 消费者在读取时可能会 自旋等待(spin-wait),如果调度器将线程挂起,队列进度可能被阻塞。
- 队列使用 原子索引
- 核心问题:即使是无锁队列,线程调度器调度延迟仍会引起队列性能波动。
数学模型:
writeIndex → atomic increment next free slot \text{writeIndex} \xrightarrow{\text{atomic increment}} \text{next free slot} writeIndexatomic incrementnext free slot
readIndex → atomic increment next available item \text{readIndex} \xrightarrow{\text{atomic increment}} \text{next available item} readIndexatomic incrementnext available item
2. try_push 方法解析
bool try_push(T&& value) {
auto my_entry = writeIndex.fetch_add(1); // 获取当前写入索引,并原子+1
elements[my_entry].first = std::move(value); // 写入消息
// No code, does not mean no-time.
// scheduler will hit here, and block the queue progress.
elements[my_entry].second = my_entry; // 标记该槽已就绪
return true;
}
- 步骤解析:
fetch_add(1):原子操作,确保多个生产者同时入队时索引不冲突。- 写入消息到
elements[my_entry].first。 - 标记消息就绪
elements[my_entry].second = my_entry。
- 调度器影响:
- 即便这段代码看似没有阻塞,线程可能被调度器挂起。
- 对应数学公式:
t enqueue = t fetch_add + t write + t scheduler_delay t_\text{enqueue} = t_\text{fetch\_add} + t_\text{write} + t_\text{scheduler\_delay} tenqueue=tfetch_add+twrite+tscheduler_delay
3. try_pop 方法解析
bool try_pop(T& value) {
auto my_entry = readIndex.fetch_add(1); // 获取读取索引
while (elements[my_entry].second != my_entry)
; // spin and wait, maybe with pause();
value = std::move(elements[my_entry].first); // 取出消息
return true;
}
- 步骤解析:
readIndex.fetch_add(1):原子获取当前读取位置。while(elements[my_entry].second != my_entry):- 自旋等待,直到该槽被生产者写入完成。
- 可能用
_mm_pause()或类似指令减少 CPU 占用。
- 取出消息,并从队列获取所有权。
- 调度器问题:
- 自旋等待时,如果线程被操作系统挂起,队列可能 进度停滞。
- 消费者延迟:
t dequeue = t fetch_add + t spin_wait + t read t_\text{dequeue} = t_\text{fetch\_add} + t_\text{spin\_wait} + t_\text{read} tdequeue=tfetch_add+tspin_wait+tread - 其中:
t spin_wait ≈ 0 (理想情况) 或 t scheduler_pause (被挂起情况) t_\text{spin\_wait} \approx 0 \text{(理想情况)} \quad \text{或} \quad t_\text{scheduler\_pause} \text{(被挂起情况)} tspin_wait≈0(理想情况)或tscheduler_pause(被挂起情况)
4. 核心概念总结
| 概念 | 说明 |
|---|---|
| 原子索引 | writeIndex / readIndex 用于多线程安全访问队列槽 |
| 自旋等待 | 消费者在读取前等待生产者写入完成 |
| 调度器影响 | 即便队列无锁,线程调度器可能挂起,增加延迟 |
| 消息就绪标记 | elements[my_entry].second 确保消息可见性 |
5. 延迟公式化
- 入队延迟:
t enqueue = t write + t scheduler_delay t_\text{enqueue} = t_\text{write} + t_\text{scheduler\_delay} tenqueue=twrite+tscheduler_delay - 出队延迟:
t dequeue = t spin_wait + t read + t scheduler_delay t_\text{dequeue} = t_\text{spin\_wait} + t_\text{read} + t_\text{scheduler\_delay} tdequeue=tspin_wait+tread+tscheduler_delay - 总消息延迟:
Δ t = t dequeue − t enqueue \Delta t = t_\text{dequeue} - t_\text{enqueue} Δt=tdequeue−tenqueue
6. 注意事项
- 自旋等待风险:
- CPU 占用高,如果线程被调度器挂起,会阻塞整个队列。
- 优化手段:
- 使用 pause/yield 指令降低自旋开销:
while (elements[my_entry].second != my_entry)
_mm_pause(); // x86 pause hint
- 在延迟敏感系统中,调度器行为必须考虑。
7. 总结理解
- 这段伪代码展示了 Lock-free MPMC 队列在多线程和操作系统调度交互下的典型问题:
- 使用原子索引保证线程安全。
- 使用自旋等待保证顺序,但可能受调度器影响。
- 即使无锁队列,实际延迟仍包含操作系统调度延迟。
- 关键公式:
Δ t message = t dequeue − t enqueue = t write + t spin + t scheduler_delay + t read \Delta t_\text{message} = t_\text{dequeue} - t_\text{enqueue} = t_\text{write} + t_\text{spin} + t_\text{scheduler\_delay} + t_\text{read} Δtmessage=tdequeue−tenqueue=twrite+tspin+tscheduler_delay+tread
#include <atomic>
#include <memory>
#include <thread>
#include <iostream>
#include <optional>
#include <chrono>
// ============================
// Lock-free MPMC Queue
// ============================
// 多生产者多消费者无锁队列(Lock-free MPMC Queue)
// 适合高并发场景,保证线程安全而不使用互斥锁
template <typename T>
class LockFreeMPMCQueue {
public:
// 构造函数,初始化队列容量和索引
explicit LockFreeMPMCQueue(size_t capacity)
: capacity_(capacity), // 队列最大容量
writeIndex_(0), // 写索引(生产者使用)
readIndex_(0), // 读索引(消费者使用)
elements_(new Element[capacity]) // 分配环形缓冲区
{}
// 尝试入队(生产者使用)
bool try_push(T&& value) {
// 获取当前写入索引,并原子+1
// 使用 % capacity_ 实现环形缓冲区
unsigned my_entry = writeIndex_.fetch_add(1, std::memory_order_acq_rel) % capacity_;
// 写入消息到指定槽位
elements_[my_entry].value = std::move(value);
// 标记该槽位消息已就绪
// memory_order_release 确保对消费者可见
elements_[my_entry].ready.store(true, std::memory_order_release);
return true; // 入队成功
}
// 尝试出队(消费者使用)
std::optional<T> try_pop() {
// 获取当前读索引,并原子+1
unsigned my_entry = readIndex_.fetch_add(1, std::memory_order_acq_rel) % capacity_;
// 自旋等待消息就绪
// 如果生产者还没写完,该线程会不断轮询
while (!elements_[my_entry].ready.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 避免长时间自旋占用 CPU
// 在 x86 上可以用 _mm_pause() 进一步优化自旋
}
// 消息就绪后,获取消息
T result = std::move(elements_[my_entry].value);
// 重置就绪标记,供下次循环或覆盖使用
elements_[my_entry].ready.store(false, std::memory_order_release);
return result; // 返回消息
}
private:
// 队列中的元素结构
struct Element {
T value; // 消息值
std::atomic<bool> ready{false}; // 就绪标记,保证消费者可见
};
size_t capacity_; // 队列容量
std::atomic<unsigned> writeIndex_; // 写索引(原子)
std::atomic<unsigned> readIndex_; // 读索引(原子)
std::unique_ptr<Element[]> elements_; // 元素数组
};
// ============================
// 测试示例:生产者/消费者
// ============================
int main() {
LockFreeMPMCQueue<int> queue(8); // 队列容量 8
// 生产者线程
auto producer = [&queue]() {
for (int i = 1; i <= 20; ++i) {
queue.try_push(std::move(i)); // 将消息入队
std::cout << "Producer pushed: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 模拟生产间隔
}
};
// 消费者线程
auto consumer = [&queue]() {
for (int i = 1; i <= 20; ++i) {
auto val = queue.try_pop(); // 尝试从队列出队
if (val) {
std::cout << "Consumer popped: " << *val << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(15)); // 模拟处理时间
}
};
// 启动生产者和消费者线程
std::thread t1(producer);
std::thread t2(consumer);
// 等待线程结束
t1.join();
t2.join();
return 0;
}
注释说明
- 原子索引
writeIndex_.fetch_add(1, std::memory_order_acq_rel)
readIndex_.fetch_add(1, std::memory_order_acq_rel)
- 保证多生产者/多消费者同时操作时索引不会冲突。
acq_rel保证内存顺序正确。
- 自旋等待
while (!elements_[my_entry].ready.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
- 消费者等待生产者写入完成。
- 用
yield避免占用 CPU 过高。 - 可替换
_mm_pause()或更复杂的 backoff 策略降低自旋成本。
- 就绪标记
elements_[my_entry].ready.store(true, std::memory_order_release);
- 保证消息写入对消费者可见。
release+acquire保证 内存可见性。
- 循环取模
% capacity_
- 实现环形缓冲区,避免索引越界。
- 数学上:
slot = index m o d capacity \text{slot} = \text{index} \mod \text{capacity} slot=indexmodcapacity
- 线程调度影响
- 即使无锁,如果线程被操作系统挂起,消费者自旋等待也会延迟队列进度。
- 总延迟:
Δ t = t dequeue − t enqueue = t write + t spin + t scheduler_delay + t read \Delta t = t_\text{dequeue} - t_\text{enqueue} = t_\text{write} + t_\text{spin} + t_\text{scheduler\_delay} + t_\text{read} Δt=tdequeue−tenqueue=twrite+tspin+tscheduler_delay+tread
这个代码可以直接编译运行,并展示 生产者/消费者消息入队和出队过程。
1. 最小化延迟(Minimal latency for all messages)
- 概念:
- 队列设计的目标是 每条消息的延迟最小,不仅仅是平均延迟低。
- 高频交易或低延迟系统对每条消息的延迟都有严格要求。
- 数学表示:
∀ m i ∈ Q , t dequeue ( m i ) − t enqueue ( m i ) ≈ t avg \forall m_i \in Q, \quad t_\text{dequeue}(m_i) - t_\text{enqueue}(m_i) \approx t_\text{avg} ∀mi∈Q,tdequeue(mi)−tenqueue(mi)≈tavg - 含义:延迟的方差越小越好,避免偶发的长延迟。
2. 最坏情况接近平均(Worst-case close to average)
- 概念:
- 队列必须保证 最坏延迟尽量接近平均延迟。
- 避免峰值延迟影响系统决策。
- 数学公式:
max ( Δ t ) − Δ t ‾ → 0 \max(\Delta t) - \overline{\Delta t} \rightarrow 0 max(Δt)−Δt→0 - 实现考虑:
- 避免锁、系统调用。
- 避免动态内存分配。
- 使用固定大小的环形缓冲区。
3. 多生产者线程(Multi producers - multi CPU cores)
- 概念:
- 多个生产者线程在不同 CPU 核心并发写入队列。
- 实现要求:
- 写索引使用 原子操作。
- 避免锁和调度器干预。
- 数学公式:
writeIndex → atomic next free slot , 多个线程安全 \text{writeIndex} \xrightarrow{\text{atomic}} \text{next free slot}, \quad \text{多个线程安全} writeIndexatomicnext free slot,多个线程安全
4. 多消费者线程(Multi consumers - multi CPU cores)
- 概念:
- 多个消费者线程在不同 CPU 核心并发读取队列。
- 实现要求:
- 读索引使用 原子操作。
- 自旋等待或非阻塞方式处理空队列。
- 数学公式:
readIndex → atomic next available item , 线程安全 \text{readIndex} \xrightarrow{\text{atomic}} \text{next available item}, \quad \text{线程安全} readIndexatomicnext available item,线程安全
5. C++17 支持
- 概念:
- 只需要支持 C++17,不必兼容 C++14。
- 好处:
- 可以使用
std::optional、std::atomic_ref、if constexpr等特性。 - 避免向后兼容性复杂性。
- 可以使用
6. 关键路径无系统调用(No system calls during critical path)
- 概念:
- 入队和出队操作 不使用 mutex/condvar/semaphore 等系统调用。
- 原因:
- 系统调用可能导致调度器挂起线程,增加最坏延迟。
- 关键原则:
t critical = t enqueue + t dequeue 无系统调用 t_\text{critical} = t_\text{enqueue} + t_\text{dequeue} \quad \text{无系统调用} tcritical=tenqueue+tdequeue无系统调用
7. 无锁(No locking)
- 概念:
- 队列操作不使用互斥锁或其他阻塞机制。
- 避免线程与调度器交互,提高可预测性。
- 实现方法:
- 使用原子索引 + 自旋等待。
- 消息就绪标记保证可见性。
- 公式:
enqueue/dequeue = atomic ops only + memory order fence \text{enqueue/dequeue} = \text{atomic ops only} + \text{memory order fence} enqueue/dequeue=atomic ops only+memory order fence
8. 非阻塞(Non-blocking)
- 概念:
- 入队满了立即返回 full-status。
- 出队空了立即返回 empty-status。
- 好处:
- 不阻塞线程,线程可以去做其他工作。
- 代码示例:
bool try_push(T&& value) {
unsigned idx = writeIndex_.load(std::memory_order_relaxed);
if ((idx - readIndex_.load(std::memory_order_acquire)) >= capacity_)
return false; // 队列已满
// 否则入队...
}
9. 简单数据类型(No need of C++ object transfers)
- 概念:
- 队列只存储简单数据类型(例如 int, double, struct POD)。
- 避免复杂对象的移动或析构开销。
- 好处:
- 避免构造/析构带来的额外延迟。
- 保证 predictable latency。
10. 核心公式总结
- 消息延迟:
Δ t = t dequeue − t enqueue \Delta t = t_\text{dequeue} - t_\text{enqueue} Δt=tdequeue−tenqueue - 队列容量约束:
0 ≤ writeIndex − readIndex ≤ N capacity 0 \le \text{writeIndex} - \text{readIndex} \le N_\text{capacity} 0≤writeIndex−readIndex≤Ncapacity - 最坏延迟约束:
max ( Δ t ) − Δ t ‾ ≈ 0 \max(\Delta t) - \overline{\Delta t} \approx 0 max(Δt)−Δt≈0 - 多线程原子操作:
writeIndex i + 1 = writeIndex ∗ i + 1 ( atomic ) \text{writeIndex}_{i+1} = \text{writeIndex}*i + 1 \quad (\text{atomic}) writeIndexi+1=writeIndex∗i+1(atomic)
readIndex ∗ j + 1 = readIndex j + 1 ( atomic ) \text{readIndex}*{j+1} = \text{readIndex}_j + 1 \quad (\text{atomic}) readIndex∗j+1=readIndexj+1(atomic)
总结
为了满足这些需求,Lock-free MPMC Queue 必须:
- 使用 固定容量环形缓冲区,避免动态分配。
- 使用 原子索引 管理生产者和消费者。
- 通过 非阻塞自旋等待 或返回 full/empty 状态保证线程不阻塞。
- 只存储 简单数据类型,不涉及复杂对象移动。
- 避免 任何系统调用 在入队/出队的关键路径上。
- 在多核 CPU 上保证 多生产者多消费者安全且延迟可预测。
1. 严格顺序(Strict Ordering)
prevents the use queues with relaxed ordering - multiple send queues (SPMC)
理解:
- 严格顺序要求队列中的消息严格按照入队顺序出队。
- 不能使用“放宽顺序(relaxed ordering)”的队列(例如 SPMC 队列只保证每个生产者的顺序,跨生产者消息可能乱序)。
- 在 高频交易或微秒级延迟场景中,每条消息的顺序非常重要。
数学表示:
m i enqueue order < m j ⟹ m i dequeue order < m j m_i \text{ enqueue order } < m_j \implies m_i \text{ dequeue order } < m_j mi enqueue order <mj⟹mi dequeue order <mj - 对于多生产者多消费者(MPMC),队列必须保证全局 FIFO。
实现考虑:
- 使用 单个环形缓冲区 + 原子索引 保证顺序。
- 每个槽位写入完成后,通过就绪标记通知消费者,防止乱序读取。
代码注释示例:
// 写入消息
elements_[my_entry].value = std::move(value);
// 标记就绪,确保消费者读取顺序与生产者一致
elements_[my_entry].ready.store(true, std::memory_order_release);
2. 保证进度(Guaranteed Progress)
no blocking due to scheduler interruption between two operations, like placing data and increment index
理解:
- 保证进度意味着队列操作不能因为 操作系统调度器挂起 而阻塞队列进度。
- 具体来说:
- 在生产者写入数据和更新索引之间,线程被挂起可能会导致消费者等待长时间。
- 队列必须设计为 无阻塞(non-blocking),即使线程被挂起,也不会影响其他线程操作。
数学公式:
- 生产者写入:
t enqueue = t write + t index_increment t_\text{enqueue} = t_\text{write} + t_\text{index\_increment} tenqueue=twrite+tindex_increment - 消费者读取:
t dequeue = t wait_for_ready + t read t_\text{dequeue} = t_\text{wait\_for\_ready} + t_\text{read} tdequeue=twait_for_ready+tread - 保证进度:
∃ , t enqueue/dequeue < ∞ ∀ 线程调度器挂起情况 \exists , t_\text{enqueue/dequeue} < \infty \quad \forall \text{线程调度器挂起情况} ∃,tenqueue/dequeue<∞∀线程调度器挂起情况
实现考虑:
- 使用 原子索引 + 自旋等待。
- 不依赖 系统调用或锁,确保队列操作不被调度器阻塞。
- 通过 就绪标记,消费者知道槽位什么时候可用。
代码示例注释:
// 尝试出队(消费者)
while (!elements_[my_entry].ready.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 非阻塞等待,线程被挂起不会影响其他线程
}
3. 总结 Unique Requirements
| 需求 | 理解 | 关键实现方法 |
|---|---|---|
| Strict Ordering | 全局严格 FIFO,消息顺序不可乱 | 单环形缓冲区 + 原子索引 + 就绪标记 |
| Guaranteed Progress | 队列操作不能被调度器挂起阻塞 | 非阻塞自旋等待 + 原子操作,无锁设计 |
公式化总结
- 全局顺序保证:
∀ i < j , m i . enqueue < m j . enqueue ⟹ m i . dequeue < m j . dequeue \forall i < j, \quad m_i.\text{enqueue} < m_j.\text{enqueue} \implies m_i.\text{dequeue} < m_j.\text{dequeue} ∀i<j,mi.enqueue<mj.enqueue⟹mi.dequeue<mj.dequeue - 进度保证:
t enqueue , t dequeue < ∞ ( 无阻塞,即使线程被挂起 ) t_\text{enqueue}, t_\text{dequeue} < \infty \quad (\text{无阻塞,即使线程被挂起}) tenqueue,tdequeue<∞(无阻塞,即使线程被挂起) - 消息可见性保证:
ready [ i ] . store(true) ⟹ 消费者可见 value[i] \text{ready}[i].\text{store(true)} \implies \text{消费者可见 value[i]} ready[i].store(true)⟹消费者可见 value[i]
1. C++ 原子类型与基本操作
#include <atomic>
std::atomic<uint64_t> u8{45}; // 64位原子整数,初始值为45
理解:
std::atomic<T>提供 原子操作接口,保证多线程访问时数据一致性。- 常用操作:
load():原子读取值store(value):原子写入值compare_exchange_strong(old, value):原子比较并交换(CAS)
- 可以检查
std::atomic<T>::is_always_lock_free来确认类型在当前平台是否总是无锁。
2. 64位原子操作示例
uint64_t a_get() { return u8.load(); } // 原子读取
void a_set(uint64_t v) { u8.store(v); } // 原子写入
void a_cas(uint64_t& old, uint64_t value) { // 原子比较并交换
u8.compare_exchange_strong(old, value);
}
理解:
- a_get()
- 原子读取当前值,不会被其他线程打断。
- 数学表示:
v = u8.load() ( atomic ) v = \text{u8.load()} \quad (\text{atomic}) v=u8.load()(atomic)
- a_set(v)
- 原子写入新值,保证对其他线程可见。
u8 = v ( atomic store ) \text{u8} = v \quad (\text{atomic store}) u8=v(atomic store)
- 原子写入新值,保证对其他线程可见。
- a_cas(old, value)
- 如果当前值等于
old,原子更新为value,否则old被更新为当前值。 - CAS 是 MPMC 队列的核心原子操作。
if u8 = o l d then u8 ← v a l u e else o l d ← u8 \text{if } \text{u8} = old \text{ then } \text{u8} \gets value \text{ else } old \gets \text{u8} if u8=old then u8←value else old←u8
- 如果当前值等于
3. 128位原子操作示例(CAS16)
__extension__ using uint128_t = unsigned __int128; // GCC 扩展,128位整数
std::atomic<uint128_t> big16{45}; // 128位原子整数
uint128_t b_get() { return big16.load(); } // 原子读取
void b_set(uint128_t v) { big16.store(v); } // 原子写入
bool b_cas(uint128_t& old, uint128_t value) { // 原子比较并交换
return big16.compare_exchange_strong(old, value);
}
理解:
uint128_t用于 128 位原子操作(在 x86_64 上通常对应CMPXCHG16B指令)。- CAS16 能够一次性更新两个 64 位槽或复合状态,非常适合高性能 MPMC 队列。
- 原子保证:
- 线程安全
- 防止 ABA 问题(通常配合版本号使用)
公式表示:
if big16 = o l d then big16 ← v a l u e else o l d ← big16 \text{if } \text{big16} = old \text{ then } \text{big16} \gets value \text{ else } old \gets \text{big16} if big16=old then big16←value else old←big16
- CAS16 可以同时修改队列的索引 + 就绪标记等多个字段,保证操作的原子性。
4. 原子操作在 MPMC 队列中的作用
- 保证多生产者/多消费者安全
- 写索引 (
writeIndex) 和读索引 (readIndex) 使用 CAS 或fetch_add原子更新:
writeIndex i + 1 = writeIndex ∗ i + 1 ( atomic ) \text{writeIndex}_{i+1} = \text{writeIndex}*i + 1 \quad (\text{atomic}) writeIndexi+1=writeIndex∗i+1(atomic)
readIndex ∗ j + 1 = readIndex j + 1 ( atomic ) \text{readIndex}*{j+1} = \text{readIndex}_j + 1 \quad (\text{atomic}) readIndex∗j+1=readIndexj+1(atomic)
- 写索引 (
- 保证顺序和进度
- 使用 128 位 CAS 可同时更新数据值和序号,确保:
- 消息严格 FIFO
- 不依赖调度器,不阻塞
- 使用 128 位 CAS 可同时更新数据值和序号,确保:
- 实现非阻塞队列核心
- 原子操作代替锁和系统调用
- 关键路径无需调用操作系统,延迟可预测
5. 核心概念总结
| 操作 | 描述 | 数学公式 |
|---|---|---|
| load | 原子读取 | v = atomic_var.load() v = \text{atomic\_var.load()} v=atomic_var.load() |
| store | 原子写入 | atomic_var = v \text{atomic\_var} = v atomic_var=v |
| CAS / compare_exchange | 原子比较并交换 | if atomic_var = old then atomic_var = new \text{if atomic\_var = old then atomic\_var = new} if atomic_var = old then atomic_var = new |
| fetch_add | 原子加法 | atomic_var i + 1 = atomic_var i + 1 \text{atomic\_var}_{i+1} = \text{atomic\_var}_i + 1 atomic_vari+1=atomic_vari+1 |
小结
- 64位 CAS 适用于单槽更新。
- 128位 CAS16 适用于复合状态(例如队列槽位 + 序号)。
- 原子操作是 MPMC 队列的基石,保证:
- 多线程安全
- 非阻塞
- 严格顺序
- 可预测延迟
1. CAS(Compare-And-Swap)比较并交换
概念
- CAS 是一种 原子操作,用于多线程场景下安全修改共享变量。
- 操作原理:
- 提供三个值:
address:目标内存地址expected:期望的旧值new_value:想要写入的新值
- 操作逻辑:
- 如果内存值 ==
expected,则写入new_value并返回 true - 否则,将内存实际值写回
expected,返回 false
- 如果内存值 ==
数学表示
CAS ( V , E , N ) = { V ← N if V = E E ← V otherwise \text{CAS}(V, E, N) = \begin{cases} V \gets N & \text{if } V = E \ E \gets V & \text{otherwise} \end{cases} CAS(V,E,N)={V←Nif V=E E←Votherwise
- V V V:目标值(内存)
- E E E:期望值
- N N N:新值
C++ 示例
std::atomic<uint64_t> u8{45};
uint64_t old = 45;
bool ok = u8.compare_exchange_strong(old, 100);
// 如果 u8 == 45,则更新为100,ok = true
// 否则 old = u8 的当前值,ok = false
- 用途:
- 在 无锁队列中,更新
writeIndex、readIndex或槽位状态。 - 原子操作保证多线程下不冲突。
- 在 无锁队列中,更新
2. ABA 问题
概念
- ABA 问题是 CAS 在多线程环境下的经典问题。
- 情景:
- 线程 T1 读取内存值 A,计划将其改为 C
- 在 T1 CAS 之前:
- 线程 T2 将 A 改为 B
- 然后又改回 A
- T1 执行 CAS,发现值仍是 A,于是认为可以安全更新,但实际上内存经历过变化
问题:CAS 仅检测值是否等于期望值,但无法检测中间是否被其他线程修改过。
图示
initial: A → T 2 B → T 2 A \text{initial: } A \quad \xrightarrow{T2} B \quad \xrightarrow{T2} A initial: AT2BT2A
T1 CAS: expect = A, actual = A ⟹ CAS succeeds erroneously \text{T1 CAS: } \text{expect = A, actual = A} \implies \text{CAS succeeds erroneously} T1 CAS: expect = A, actual = A⟹CAS succeeds erroneously
解决方法
- 版本号/标记(Tagged Pointer)
- 在指针或索引中附加版本号,每次修改版本号 +1
- CAS 比较 值 + 版本号,避免 ABA
atomic_var = ( value , version ) \text{atomic\_var} = (\text{value}, \text{version}) atomic_var=(value,version)
CAS ( ( V , v e r ) , ( E , v e r E ) , ( N , v e r + 1 ) ) \text{CAS}((V, ver), (E, verE), (N, ver+1)) CAS((V,ver),(E,verE),(N,ver+1))
- 双宽 CAS / 128位 CAS
- 使用 CAS16 同时更新数据 + 版本号
- 避免 ABA
C++ 示例
struct Node {
int value;
uint64_t version;
};
std::atomic<uint128_t> atomic_node; // 64bit value + 64bit version
uint128_t old = pack(node.value, node.version);
uint128_t new_val = pack(new_value, node.version + 1);
atomic_node.compare_exchange_strong(old, new_val);
3. CAS + ABA 在 Lock-free MPMC 队列中的应用
| 场景 | 描述 |
|---|---|
| 写索引更新 | 使用 CAS / fetch_add 原子更新 writeIndex,防止多生产者冲突 |
| 读索引更新 | 使用 CAS / fetch_add 原子更新 readIndex,防止多消费者冲突 |
| 槽位就绪状态 | 用 CAS16 更新 (value, version),防止 ABA |
| 环形缓冲区 | 使用版本号 + 索引判断槽位是否安全被覆盖 |
- 公式表示:
- 带版本号 CAS:
( V , v e r ) → CAS ( N , v e r + 1 ) if ( V , v e r ) = ( E , v e r E ) (V, ver) \xrightarrow{\text{CAS}} (N, ver+1) \quad \text{if } (V, ver) = (E, verE) (V,ver)CAS(N,ver+1)if (V,ver)=(E,verE) - 队列安全更新:
writeIndex i + 1 = writeIndex ∗ i + 1 ( atomic, versioned ) \text{writeIndex}_{i+1} = \text{writeIndex}*i + 1 \quad (\text{atomic, versioned}) writeIndexi+1=writeIndex∗i+1(atomic, versioned)
readIndex ∗ j + 1 = readIndex j + 1 ( atomic, versioned ) \text{readIndex}*{j+1} = \text{readIndex}_j + 1 \quad (\text{atomic, versioned}) readIndex∗j+1=readIndexj+1(atomic, versioned)
4. 总结
- CAS(Compare-And-Swap)
- 核心原子操作,保证无锁线程安全
- 用于索引更新、槽位就绪标记等
- ABA 问题
- CAS 可能误判,原因是值被修改后又恢复
- 解决方法:
- 添加版本号 / 标记
- 使用双宽 CAS(128位 CAS16)
- 在 Lock-free MPMC 队列中
- CAS + 版本号保证:
- 严格顺序
- 无阻塞
- 多生产者多消费者安全
- CAS + 版本号保证:
1. std::atomic<T> 基本理解
std::atomic<T>提供 原子访问,保证多线程读写不会被中断。- 如果类型
T太大,编译器可能使用内部锁(lock)来实现原子性。 - 如果类型
T足够小,且 CPU 支持对应原子操作,std::atomic<T>::is_always_lock_free返回true。
理解:
- 对小数据类型(例如 8/16/32/64 位整数),CPU 指令本身可以保证原子性。
- 对大类型(例如结构体、128位整数),可能需要:
- 双宽 CAS 指令(如
CMPXCHG16B) - 或编译器内部锁
- 双宽 CAS 指令(如
2. 队列对硬件原子性的需求
- Lock-free MPMC 队列必须保证硬件原子性,否则无法安全操作多生产者多消费者。
- 如果编译器提供的
std::atomic<>实现 无法提供原子性,队列会使用 编译器内置函数(intrinsic functions),通常依赖 CPU 指令如:
CMPXCHG16B ( Compare-and-Swap 128-bit ) \text{CMPXCHG16B} \quad (\text{Compare-and-Swap 128-bit}) CMPXCHG16B(Compare-and-Swap 128-bit) - 作用:在一个原子操作中比较并更新 128 位数据(例如同时更新值 + 序号/版本号)。
3. 公式化理解
- 小类型原子操作:
atomic<T>::load()/store() 保证原子性,如果 T <= CPU 支持的原子大小 \text{atomic<T>::load()/store()} \text{ 保证原子性,如果 T <= CPU 支持的原子大小} atomic<T>::load()/store() 保证原子性,如果 T <= CPU 支持的原子大小 - 大类型或复合结构原子操作:
atomic<T>::compare_exchange_strong() → CPU intrinsic CMPXCHG16B \text{atomic<T>::compare\_exchange\_strong()} \xrightarrow{\text{CPU intrinsic}} \text{CMPXCHG16B} atomic<T>::compare_exchange_strong()CPU intrinsicCMPXCHG16B - 队列操作保证:
∀ 线程 i , j : enqueue/dequeue 操作是原子的且无锁 \forall \text{线程 } i, j: \text{ enqueue/dequeue 操作是原子的且无锁 } ∀线程 i,j: enqueue/dequeue 操作是原子的且无锁
4. C++ 示例
#include <atomic>
#include <iostream>
struct Data128 {
uint64_t a;
uint64_t b;
};
std::atomic<uint64_t> u64{45};
std::atomic<Data128> big16; // 如果CPU支持,可以是lock-free,否则编译器会加锁
int main() {
// 检查原子性
std::cout << "u64 lock-free? " << u64.is_always_lock_free << std::endl;
std::cout << "big16 lock-free? " << big16.is_always_lock_free << std::endl;
// 原子操作示例
uint64_t old = u64.load();
u64.compare_exchange_strong(old, 100); // CAS 更新
}
输出
u64 lock-free? 1
big16 lock-free? 0
注释说明:
is_always_lock_free- 检查类型是否 总是无锁原子,决定是否需要 CPU 指令支持。
- 小类型(如
uint64_t)通常是 lock-free。 - 大类型(如
Data128)在 x86_64 平台如果支持CMPXCHG16B指令,也可以 lock-free,否则内部加锁。 - CAS 是队列安全的核心操作。
5. 与 MPMC 队列的关系
- 队列索引(writeIndex/readIndex)使用
uint64_t原子类型。 - 槽位状态 + 版本号可能使用 128 位原子类型(
CMPXCHG16B)。 - 核心要求:
- 所有关键路径操作必须原子、无锁
- 避免系统调用和调度器干预
公式表示:
writeIndex/readIndex/value+version → atomic 线程安全 \text{writeIndex/readIndex/value+version} \xrightarrow{\text{atomic}} \text{线程安全} writeIndex/readIndex/value+versionatomic线程安全
6. 总结
std::atomic<T>是实现 Lock-free 队列的基础。- 小类型直接使用 CPU 原子指令,无锁。
- 大类型使用双宽 CAS(如 CMPXCHG16B)或编译器内置函数保证原子性。
- MPMC 队列关键路径要求原子、无锁、硬件支持,以保证最小延迟和严格顺序。
1. std::atomic<T>::is_always_lock_free
- 概念:
- C++17 提供接口检查类型
T是否总是无锁原子(lock-free)。
- C++17 提供接口检查类型
- 用途:
- 决定队列能否在关键路径上完全依赖 CPU 原子指令,而不调用库函数或内部锁。
- 代码示例:
std::atomic<uint64_t> a;
if (a.is_always_lock_free) {
// 可以直接使用原子操作,无需锁
}
- 公式表示:
is_always_lock_free ( T ) ⟹ CPU 原子指令支持 T \text{is\_always\_lock\_free}(T) \implies \text{CPU 原子指令支持 T} is_always_lock_free(T)⟹CPU 原子指令支持 T
2. Padding 与 Alignment(填充与对齐)
- 概念:
- 避免 false sharing:不同线程操作的变量如果在同一缓存行,会导致频繁缓存一致性同步,影响性能。
- 实现方式:
- 使用填充(padding)或对齐(alignment)保证每个原子变量占满一个或多个缓存行。
- 代码示例:
struct alignas(64) Element { // 对齐到64字节缓存行
std::atomic<uint64_t> value;
char padding[64 - sizeof(std::atomic<uint64_t>)]; // 填充避免 false sharing
};
- 公式表示:
sizeof(Element) ≥ 缓存行大小 ⟹ 避免 false sharing \text{sizeof(Element)} \ge \text{缓存行大小} \quad \implies \text{避免 false sharing} sizeof(Element)≥缓存行大小⟹避免 false sharing
3. CAS(Compare-and-Swap)16字节原子操作
- 概念:
- 在 MPMC 队列中,有时需要对 16 字节连续内存做原子更新(例如
value + version)。 - 需要使用 128 位整数类型,配合 CMPXCHG16B 指令。
- 在 MPMC 队列中,有时需要对 16 字节连续内存做原子更新(例如
- 代码示例:
__extension__ using uint128_t = unsigned __int128;
std::atomic<uint128_t> big16;
uint128_t old = big16.load();
uint128_t new_val = ...;
big16.compare_exchange_strong(old, new_val); // 原子CAS16
- 公式表示:
CAS16 ( V , E , N ) = { V ← N V = E E ← V V ≠ E \text{CAS16}(V, E, N) = \begin{cases} V \gets N & V = E \\ E \gets V & V \neq E \end{cases} CAS16(V,E,N)={V←NE←VV=EV=E
4. 对齐要求(Alignment requirements)
- 概念:
- 16 字节条目必须对齐到 16 字节边界,否则 CMPXCHG16B 可能失败。
- 其他类型根据 CPU 原子指令要求对齐。
- 代码示例:
struct alignas(16) Entry {
uint64_t value;
uint64_t version;
};
- 公式:
Address \text{Address} % 16 = 0 \quad \text{(16字节对齐)} Address
5. 编译器优化选项
- GCC / Clang 提供
-mcx16选项:- 指示编译器使用 CMPXCHG16B 指令,而非调用原子库。
- 作用:
- 减少关键路径函数调用开销
- 提高队列性能和可预测延迟
6. NUMA 考虑
- 概念:
- 在多 CPU 节点系统(NUMA)中:
- 生产者与消费者可能在不同 NUMA 节点
- 队列的
entries数组最好靠近 消费者所在节点,减少跨节点访问延迟
- 在多 CPU 节点系统(NUMA)中:
- 公式表示:
t memory access ≈ { low , local NUMA node high , remote NUMA node t_\text{memory access} \approx \begin{cases} \text{low}, & \text{local NUMA node} \\ \text{high}, & \text{remote NUMA node} \end{cases} tmemory access≈{low,high,local NUMA noderemote NUMA node - 实现方式:
- 使用 NUMA 分配函数(如
numa_alloc_onnode) - 保证每个线程操作的数据尽量局部化
- 使用 NUMA 分配函数(如
7. 总结硬件交互要点
| 要点 | 理解 | 实现方式 |
|---|---|---|
| is_always_lock_free | 检查原子类型是否完全 lock-free | 使用 std::atomic<T> |
| Padding / Alignment | 避免 false sharing | alignas + 填充 |
| CAS16 | 16 字节原子操作 | std::atomic<uint128_t> + CMPXCHG16B |
| 对齐要求 | 16 字节条目必须对齐 | alignas(16) |
| 编译器指令 | 提高关键路径性能 | GCC/Clang -mcx16 |
| NUMA | 降低跨节点延迟 | entries 数组分配靠近读/写线程 |
8. 核心公式总结
- 原子性保证:
atomic<T> → CPU instr lock-free \text{atomic<T>} \xrightarrow{\text{CPU instr}} \text{lock-free} atomic<T>CPU instrlock-free - 16 字节 CAS 原子操作:
( V , v e r ) → CAS16 ( N , v e r + 1 ) (V, ver) \xrightarrow{\text{CAS16}} (N, ver+1) (V,ver)CAS16(N,ver+1) - 避免 false sharing:
sizeof(Element) ≥ 缓存行大小 \text{sizeof(Element)} \ge \text{缓存行大小} sizeof(Element)≥缓存行大小 - NUMA 本地化:
Memory node affinity ⟹ min ( t access ) \text{Memory node affinity} \implies \min(t_\text{access}) Memory node affinity⟹min(taccess)
1. 固定大小数组(Fixed-size array)
- 队列采用 固定大小数组,大小为 2 N 2^N 2N(通常是 2 的幂次方便环形取模操作)。
- 每个 Entry 是原子变量(8 或 16 字节),并对齐到缓存行,避免 false sharing。
公式表示
capacity = 2 N \text{capacity} = 2^N capacity=2N
Entry address \text{Entry address } % \text{cacheline size} = 0 Entry address
C++ 示例
struct alignas(64) Entry { // 对齐到缓存行
uint64_t sequence; // 序号/索引
uint64_t value; // 数据值
};
std::unique_ptr<Entry[]> entries(new Entry[1 << N]);
2. 每个 Entry 的结构
- 每个数组元素包含:
- Sequence / Index:序号,用于判断槽位是否可用
- Data flag:标记数据是否就绪
- Data value:实际存储的消息数据
C++ 示例
struct alignas(16) Entry {
uint64_t sequence; // entry 序号
std::atomic<bool> ready; // 数据就绪标志
int value; // 实际数据
};
- 其中
ready是原子布尔,保证生产者/消费者之间可见性。
3. CAS 操作修改数组元素
- 数组中的元素只能通过 CAS 操作修改。
- 推送(push)或弹出(pop)操作的成功标志就是 CAS 成功。
- CAS 可以同时修改 数据 + 序号,保证原子性和顺序。
公式表示
CAS ( e n t r y i ) ⟹ push/pop 成功 \text{CAS}(entry_i) \implies \text{push/pop 成功} CAS(entryi)⟹push/pop 成功
- 假设
entry_i初始值(seq, ready, value):
CAS ( ( s e q , r e a d y , o l d v a l u e ) , ( s e q + 1 , t r u e , n e w v a l u e ) ) \text{CAS}((seq, ready, old_value), (seq+1, true, new_value)) CAS((seq,ready,oldvalue),(seq+1,true,newvalue))
C++ 示例
uint64_t expected_seq = entry.sequence;
Entry new_entry{expected_seq + 1, true, value};
entry_cas(entry, expected_seq, new_entry); // 原子 CAS
4. 索引的原子修改
- **读索引(readIndex)和写索引(writeIndex)**也使用原子变量,CAS 更新。
- 保证 多生产者/多消费者操作安全。
公式表示
writeIndex i + 1 = writeIndex ∗ i + 1 ( atomic ) \text{writeIndex}_{i+1} = \text{writeIndex}*i + 1 \quad (\text{atomic}) writeIndexi+1=writeIndex∗i+1(atomic)
readIndex ∗ j + 1 = readIndex j + 1 ( atomic ) \text{readIndex}*{j+1} = \text{readIndex}_j + 1 \quad (\text{atomic}) readIndex∗j+1=readIndexj+1(atomic)
C++ 示例
std::atomic<uint64_t> writeIndex{0};
std::atomic<uint64_t> readIndex{0};
uint64_t my_entry = writeIndex.fetch_add(1) % capacity; // 原子自增
5. Push/Pop 的核心逻辑
- Push(生产者):
- 读取
writeIndex,计算目标槽位 - 使用 CAS 写入数据 + 标记 ready
- CAS 成功 → 推送完成
- 读取
- Pop(消费者):
- 读取
readIndex,计算目标槽位 - 检查
ready标记 - 使用 CAS 读取数据并标记 slot 可重用
- CAS 成功 → 弹出完成
- 读取
公式表示
push ( v ) ≡ CAS ( e n t r y w r i t e I n d e x % c a p a c i t y , old , new ) \text{push}(v) \equiv \text{CAS}(entry_{writeIndex \% capacity}, \text{old}, \text{new}) push(v)≡CAS(entrywriteIndex%capacity,old,new)
pop ( ) ≡ CAS ( e n t r y r e a d I n d e x % c a p a c i t y , old , new ) \text{pop}() \equiv \text{CAS}(entry_{readIndex \% capacity}, \text{old}, \text{new}) pop()≡CAS(entryreadIndex%capacity,old,new)
6. 总结
| 设计要点 | 解释 |
|---|---|
| 固定大小数组 | 队列容量为 2 N 2^N 2N,方便环形索引取模 |
| 原子 Entry | 每个 Entry 8 或 16 字节,原子操作,缓存行对齐 |
| Entry 内容 | 包含序号/索引、就绪标志、实际数据 |
| CAS 操作 | Entry 修改必须使用 CAS,成功即完成 push/pop |
| 原子索引 | 写/读索引使用 CAS 或 fetch_add 原子操作,保证多线程安全 |
这样设计可以保证:
- 多生产者多消费者安全
- 严格 FIFO 顺序
- 无锁操作,关键路径延迟可预测
1. 模板参数说明
// 多生产者多消费者无锁队列模板类
template<
typename DataT, // 数据类型:1~12 字节,存储的消息类型
size_t N = 0, // 队列容量,0 表示在构造函数中动态设置
typename IndexT = uint32_t,// 队列索引类型,用于 readIndex / writeIndex,4 或 8 字节
bool lazy_push = false, // 是否延迟更新写索引(提高性能,但可能暂时不精确)
bool lazy_pop = false // 是否延迟更新读索引(提高性能,但可能暂时不精确)
>
class mpmc_queue {
public:
// ===========================
// 构造函数
// ===========================
explicit mpmc_queue(uint64_t n = N);
// 构造队列,容量为 n(如果模板参数 N != 0,则默认使用 N)
// 内部会分配固定大小数组(环形缓冲区),每个元素对齐缓存行
// 用于无锁操作避免 false sharing
// ===========================
// 基本 push / pop
// ===========================
bool push(value_type d);
// 尝试将数据 d 入队
// 返回 true 表示入队成功
// 返回 false 表示队列已满,无法入队(非阻塞)
bool pop(value_type& d);
// 尝试将队列头的数据弹出到 d
// 返回 true 表示成功获取
// 返回 false 表示队列为空(非阻塞)
// ===========================
// push / pop 带索引(Sequence Number)
// ===========================
bool push(value_type d, index_type& i);
// 功能同 push(d),但同时返回该入队操作对应的索引(或序列号)
// i 可用于追踪操作顺序或实现严格顺序消费
bool pop(value_type& d, index_type& i);
// 功能同 pop(d),但同时返回该出队操作对应的索引(或序列号)
// i 可用于追踪操作顺序或实现严格顺序消费
// ===========================
// 原子交换操作
// ===========================
bool exchange(index_type& i, value_type old_value, value_type new_value);
// 对指定索引的 entry 执行原子交换:
// 仅当 entry 的当前值等于 old_value 时,才更新为 new_value
// 成功返回 true,否则返回 false
// CAS 操作实现原子交换,可避免多线程冲突
// ===========================
// push_keep_n 系列操作
// ===========================
bool push_keep_n(value_type d);
// 入队新数据 d,如果队列已满则丢弃最旧的数据
// 保证队列始终保留最新 N 条数据(环形缓冲策略)
bool push_keep_n(value_type d, index_type& i);
// 与 push_keep_n(d) 功能相同,同时返回操作的序列号/索引
// 可用于跟踪最新数据的顺序
};
template<typename DataT, // 数据类型:1-12 字节
size_t N = 0, // 队列容量,0 表示在构造函数中设置
typename IndexT = uint32_t,// 索引类型:4 或 8 字节
bool lazy_push = false, // 是否延迟更新写索引
bool lazy_pop = false> // 是否延迟更新读索引
class mpmc_queue { ... };
理解:
- DataT:存储的数据类型,通常 1~12 字节(小类型保证 CPU 原子操作)。
- N:队列容量,若模板参数未指定,可在构造函数中动态设置。
- IndexT:索引类型,用于
readIndex/writeIndex,可选择 32/64 位。 - lazy_push / lazy_pop:延迟更新索引,用于优化性能,减少 CAS 次数,但可能增加延迟或使读写索引短暂不一致。
2. 构造函数
explicit mpmc_queue(uint64_t n = N);
- 作用:
- 初始化队列容量(如果模板参数 N=0)
- 分配数组,每个元素对齐到缓存行
- 公式表示:
capacity = { N 模板指定 n 构造函数参数指定 \text{capacity} = \begin{cases} N & \text{模板指定} \\ n & \text{构造函数参数指定} \end{cases} capacity={Nn模板指定构造函数参数指定
3. push / pop 基本操作
bool push(value_type d); // 尝试入队,队列满则返回 false
bool pop(value_type& d); // 尝试出队,队列空则返回 false
理解:
- push(d):
- 读取
writeIndex,计算槽位 - CAS 写入数据 + 更新序号
- 成功返回 true,队列满返回 false
- 读取
- pop(d):
- 读取
readIndex,计算槽位 - 检查数据就绪标志
- CAS 读取数据 + 更新序号
- 成功返回 true,队列空返回 false
- 读取
公式表示
push ( d ) ≡ CAS ( e n t r y _ w r i t e I n d e x % c a p a c i t y , o l d , n e w ) \text{push}(d) \equiv \text{CAS}(entry\_{writeIndex \% capacity}, old, new) push(d)≡CAS(entry_writeIndex%capacity,old,new)
pop ( ) ≡ CAS ( e n t r y r e a d I n d e x % c a p a c i t y , o l d , n e w ) \text{pop}() \equiv \text{CAS}(entry_{readIndex \% capacity}, old, new) pop()≡CAS(entryreadIndex%capacity,old,new)
4. push / pop 带索引版本
bool push(value_type d, index_type& i);
bool pop(value_type& d, index_type& i);
- 作用:
- 除了 push/pop 功能,还返回该操作对应的 序列号 / 索引号。
- 方便做严格顺序保证或事件追踪。
数学表示
i op = writeIndex or readIndex after successful CAS i_\text{op} = \text{writeIndex or readIndex after successful CAS} iop=writeIndex or readIndex after successful CAS
5. exchange 操作
bool exchange(index_type& i, value_type old_value, value_type new_value);
- 作用:
- 原子交换槽位值,仅当槽位值为
old_value时更新为new_value - CAS 原子操作实现
- 原子交换槽位值,仅当槽位值为
- 公式表示
if e n t r y i = o l d _ v a l u e then e n t r y i ← n e w _ v a l u e \text{if } entry_i = old\_value \text{ then } entry_i \gets new\_value if entryi=old_value then entryi←new_value
6. push_keep_n 操作
bool push_keep_n(value_type d);
bool push_keep_n(value_type d, index_type& i);
- 作用:
- 当队列已满时,丢弃最旧的数据,保留最新 N 条
- 用于环形缓冲区或实时数据流场景
- 公式表示
if queue full: remove oldest entry ⟹ push new entry \text{if queue full: remove oldest entry} \implies \text{push new entry} if queue full: remove oldest entry⟹push new entry
7. API 设计总结
| API | 功能 | 理解 |
|---|---|---|
push(d) |
入队 | 尝试入队,队列满返回 false |
pop(d) |
出队 | 尝试出队,队列空返回 false |
push(d, i) |
入队 + 返回索引 | push 完成后返回序列号/索引 |
pop(d, i) |
出队 + 返回索引 | pop 完成后返回序列号/索引 |
exchange(i, old, new) |
原子交换 | 仅当 entry = old 时更新为 new |
push_keep_n(d) |
入队并保留最新 N 条 | 队列满时丢弃最旧数据 |
push_keep_n(d, i) |
入队 + 返回索引 | 同上,同时返回序列号 |
8. 核心公式总结
- 入队原子操作:
push ( d ) ≡ CAS ( e n t r y _ w r i t e I n d e x % c a p a c i t y , o l d , n e w ) \text{push}(d) \equiv \text{CAS}(entry\_{writeIndex \% capacity}, old, new) push(d)≡CAS(entry_writeIndex%capacity,old,new) - 出队原子操作:
pop ( ) ≡ CAS ( e n t r y r e a d I n d e x % c a p a c i t y , o l d , n e w ) \text{pop}() \equiv \text{CAS}(entry_{readIndex \% capacity}, old, new) pop()≡CAS(entryreadIndex%capacity,old,new) - 带索引操作:
i op = writeIndex / readIndex after CAS success i_\text{op} = \text{writeIndex / readIndex after CAS success} iop=writeIndex / readIndex after CAS success - 保留最新 N 条操作:
if full: oldest entry removed ⟹ push new entry \text{if full: oldest entry removed} \implies \text{push new entry} if full: oldest entry removed⟹push new entry
这样设计的 MPMC Queue API 满足:
- 多生产者多消费者安全
- 原子无锁操作
- 支持严格顺序、索引追踪
- 可选择延迟索引更新以优化性能
template<
typename DataT, // 数据类型:1~12字节,用于存储消息
size_t N = 0, // 队列容量,0 表示在构造函数中设置
typename IndexT = uint32_t,// 索引类型:4或8字节,用于 readIndex / writeIndex
bool lazy_push = false, // 是否延迟更新写索引(性能优化)
bool lazy_pop = false // 是否延迟更新读索引(性能优化)
>
class mpmc_queue {
// ...
private:
std::atomic<index_type> _write_index alignas(2 * cachelinesize);
// ------------------------------
// 写索引(生产者使用)
// 原子类型,保证多线程安全
// alignas(2*cachelinesize):对齐到两个缓存行
// 目的:
// 1. 避免 false sharing(生产者和消费者同时操作时不会在同一缓存行)
// 2. 提高 CPU 缓存效率
// ------------------------------
std::atomic<index_type> _read_index alignas(2 * cachelinesize);
// ------------------------------
// 读索引(消费者使用)
// 原子类型,多线程安全
// alignas 同样避免 false sharing
// ------------------------------
array_t _array;
// ------------------------------
// Entry 数组,固定大小或构造时设置大小
// 每个 Entry 包含:
// 1. Sequence/Index:序号,用于判断槽位可用性
// 2. Data flag:数据就绪标志(原子布尔)
// 3. Data value:实际存储的数据
// 所有 Entry 的修改均通过 CAS 原子操作完成
// _array 尺寸通常为 2^N,方便环形取模操作
// ------------------------------
};
理解总结
- _write_index:
- 生产者线程更新,用于计算下一个写入槽位
- 原子类型保证多线程安全
- 对齐 2 个缓存行,防止和 _read_index 共享缓存行导致 false sharing
- _read_index:
- 消费者线程更新,用于计算下一个读取槽位
- 同样是原子类型,多线程安全
- 对齐 2 个缓存行,避免和 _write_index 共享缓存行
- _array:
- 队列的核心数组,每个元素是 Entry
- Entry 内部包含 序号 + 就绪标志 + 数据
- 修改 Entry 必须使用 CAS 原子操作,保证 多生产者多消费者安全
公式化表示
- 写索引计算下一个槽位:
write_slot = _write_index \text{write\_slot} = \text{\_write\_index} % \text{capacity} write_slot=_write_index - 读索引计算下一个槽位:
read_slot = _read_index \text{read\_slot} = \text{\_read\_index} % \text{capacity} read_slot=_read_index - Entry 修改:
CAS ( Entry i ) ⟹ 数据写入或读取成功 \text{CAS}(\text{Entry}_i) \implies \text{数据写入或读取成功} CAS(Entryi)⟹数据写入或读取成功 - 对齐保证:
& _ w r i t e _ i n d e x % 2*cacheline = 0 , & _ r e a d _ i n d e x % 2*cacheline = 0 \&\_write\_index \% \text{2*cacheline} = 0, \quad \&\_read\_index \% \text{2*cacheline} = 0 &_write_index%2*cacheline=0,&_read_index%2*cacheline=0
设计要点
- 原子索引 + CAS Entry:保证多生产者/多消费者安全
- 对齐和填充:避免 false sharing,降低缓存一致性开销
- 固定数组或 2^N 环形缓冲区:支持快速索引和高效入队/出队
#include <mpmc_queue.h> // 引入 MPMC Queue 实现头文件
int main()
{
// ------------------------------
// 创建一个锁自由 MPMC 队列,存储 unsigned 类型,容量 32
// ------------------------------
es::lockfree::mpmc_queue<unsigned> q{32};
// ------------------------------
// 总消息数量 N 和生产者/消费者数量 P
// ------------------------------
constexpr unsigned N{1000000}; // 每个生产者产生的元素数量
constexpr unsigned P{2}; // 生产者和消费者线程数
// ------------------------------
// 原子变量记录生产者和消费者累加和
// ------------------------------
std::atomic<uint64_t> prod_sum{0}; // 生产者累加值
std::atomic<uint64_t> cons_sum{0}; // 消费者累加值
// ==============================
// 生产者 lambda
// ==============================
auto producer = [&]() {
for (unsigned x = 0; x < N; ++x) {
// 循环尝试入队,直到成功(非阻塞队列)
while (!q.push(x))
; // 空循环自旋
// 累加生产者总和
prod_sum += x;
}
};
// ------------------------------
// 创建生产者线程
// ------------------------------
std::vector<std::thread> producers;
producers.resize(P);
for (auto& p : producers)
p = std::thread{producer}; // 启动 P 个生产者线程
// ==============================
// 消费者 lambda
// ==============================
auto consumer = [&]() {
unsigned v{0};
for (unsigned x = 0; x < N; ++x) {
// 循环尝试出队,直到成功
while (!q.pop(v))
; // 空循环自旋
// 累加消费者总和
cons_sum += v;
}
};
// ------------------------------
// 创建消费者线程
// ------------------------------
std::vector<std::thread> consumers;
consumers.resize(P);
for (auto& c : consumers)
c = std::thread{consumer}; // 启动 P 个消费者线程
// ------------------------------
// 等待所有线程完成
// ------------------------------
for (auto& p : producers) p.join();
for (auto& c : consumers) c.join();
// ------------------------------
// 校验生产者与消费者累加和是否一致
// ------------------------------
std::cout << (cons_sum && cons_sum == prod_sum ? "OK" : "ERROR")
<< " " << cons_sum << '\n';
return 0;
}
https://godbolt.org/z/Mnqrqhjsq
理解总结
- 队列初始化:
mpmc_queue<unsigned> q{32};- 队列容量为 32
- 数据类型为
unsigned(简单整数) - 队列是 Lock-free,支持多生产者多消费者并发访问
- 生产者线程逻辑:
- 循环生成 0 到 N-1 的整数
while (!q.push(x));保证非阻塞入队,若队列满则自旋等待- 使用原子变量
prod_sum记录生产者累加总和
- 消费者线程逻辑:
- 循环从队列中取 N 个元素
while (!q.pop(v));保证非阻塞出队,若队列为空则自旋等待- 使用原子变量
cons_sum累加消费者总和
- 线程创建与同步:
- P 个生产者线程 + P 个消费者线程
join()等待所有线程完成
- 结果校验:
- 检查累加和
cons_sum == prod_sum - 保证 数据完整性,验证 Lock-free 队列正确性
- 检查累加和
核心公式表示
- 生产者累加:
prod_sum = ∑ p = 0 P − 1 ∑ x = 0 N − 1 x \text{prod\_sum} = \sum_{p=0}^{P-1} \sum_{x=0}^{N-1} x prod_sum=p=0∑P−1x=0∑N−1x - 消费者累加:
cons_sum = ∑ c = 0 P − 1 ∑ x = 0 N − 1 x \text{cons\_sum} = \sum_{c=0}^{P-1} \sum_{x=0}^{N-1} x cons_sum=c=0∑P−1x=0∑N−1x - 正确性校验:
cons_sum = ? prod_sum ⟹ Queue OK \text{cons\_sum} \stackrel{?}{=} \text{prod\_sum} \implies \text{Queue OK} cons_sum=?prod_sum⟹Queue OK - 环形索引与入队/出队:
write_slot = _write_index % capacity , read_slot = _read_index % capacity \text{write\_slot} = \text{\_write\_index} \% \text{capacity},\quad \text{read\_slot} = \text{\_read\_index} \% \text{capacity} write_slot=_write_index%capacity,read_slot=_read_index%capacity
特别说明
- 这个例子 传递简单数值类型,无需复杂对象构造/析构
- 队列使用 自旋等待 处理满/空情况
- 使用原子累加
prod_sum/cons_sum验证并发操作正确性 - 适合测试 Lock-free MPMC Queue 的 基本正确性与性能
#include <pointer_mpmc_queue.h>
// 引入 Pointer MPMC Queue 的实现头文件
// 该队列支持存储 std::unique_ptr<T>,实现对象指针的无锁传递
// ================================
// 数据结构定义
// ================================
struct DataRecord {
std::array<uint64_t, 64> _sample;
// 一个数据记录,内部有固定大小数组 _sample
// 示例中只使用 _sample[0] 存储消息值
};
// ================================
// 队列类型定义
// ================================
using queue_type =
es::lockfree::pointer_mpmc_queue<es::lockfree::mpmc_queue, DataRecord>;
// 使用 pointer_mpmc_queue 包装 mpmc_queue<DataRecord>
// 这样队列存储对象指针,而非直接存储对象
// 好处:避免大对象复制,支持动态对象生命周期管理
int main()
{
// ------------------------------
// 创建队列实例,容量 1024
// ------------------------------
queue_type q0{1024};
// ------------------------------
// 总消息数量 N 和生产者/消费者数量 PCPC
// ------------------------------
constexpr unsigned N{10000}; // 每个生产者生成的消息数
constexpr unsigned PCPC{2}; // 生产者和消费者线程数
// ------------------------------
// 原子变量记录生产者和消费者累加和
// ------------------------------
std::atomic<uint64_t> prod_sum{0};
std::atomic<uint64_t> cons_sum{0};
// ==============================
// 生产者 lambda
// ==============================
auto producer = [&](queue_type* const q) {
uint64_t m{0}; // 本地累加器
for (unsigned x = 0; x < N; ++x)
{
// 动态创建一个 DataRecord 对象
auto p = std::make_unique<DataRecord>();
p->_sample[0] = x; // 准备消息数据
// 循环尝试入队,直到成功(非阻塞)
while (!q->push(std::move(p)))
;
m += x; // 累加本地生产者总和
}
prod_sum += m; // 原子更新全局生产者累加和
};
// ==============================
// 消费者 lambda
// ==============================
auto consumer = [&](queue_type* const q) {
uint64_t m{0}; // 本地累加器
for (unsigned x = 0; x < N; ++x)
{
std::unique_ptr<DataRecord> p{};
// 循环尝试出队,直到成功
while (!q->pop(p))
;
m += p->_sample[0]; // 处理消息
// 指针出队后自动释放对象,不需手动 delete
}
cons_sum += m; // 原子更新全局消费者累加和
};
// ------------------------------
// 创建消费者线程
// ------------------------------
std::vector<std::thread> consumers;
consumers.resize(PCPC);
for (auto& c : consumers)
c = std::thread{consumer, &q0}; // 启动 PCPC 个消费者线程
// ------------------------------
// 创建生产者线程
// ------------------------------
std::vector<std::thread> producers;
producers.resize(PCPC);
for (auto& p : producers)
p = std::thread{producer, &q0}; // 启动 PCPC 个生产者线程
// ------------------------------
// 等待所有线程完成
// ------------------------------
for (auto& p : producers) p.join();
for (auto& c : consumers) c.join();
// ------------------------------
// 校验生产者和消费者累加和是否一致
// ------------------------------
std::cout << (cons_sum && cons_sum == prod_sum ? "OK" : "ERROR")
<< " " << cons_sum << '\n';
return 0;
}
wandbox:https://wandbox.org/permlink/vdby5pO3kTwHC1zZ
or
godbolt:https://godbolt.org/z/zrv6hbKdr
理解总结
- 队列初始化:
pointer_mpmc_queue包装mpmc_queue,存储std::unique_ptr<DataRecord>- 解决大对象复制问题,同时支持对象生命周期管理
- 队列容量 1024,支持多生产者多消费者
- 生产者线程逻辑:
- 动态创建
DataRecord对象 - 使用
push(std::move(p))将对象指针入队 - 自旋等待直到入队成功
- 累加本地消息值,最后更新全局生产者累加和
- 动态创建
- 消费者线程逻辑:
- 从队列中出队对象指针
- 处理消息(累加
_sample[0]) - 出队后
unique_ptr自动释放对象 - 累加本地消费者总和,最后更新全局原子累加和
- 线程同步:
- PCPC 个生产者 + PCPC 个消费者
- 使用
join()等待所有线程完成
- 数据完整性校验:
- 校验
cons_sum == prod_sum - 保证 Lock-free 指针队列正确传递每条消息
- 非阻塞多线程安全
- 校验
核心公式表示
- 生产者累加:
prod_sum = ∑ p = 0 P C P C − 1 ∑ x = 0 N − 1 x \text{prod\_sum} = \sum_{p=0}^{PCPC-1} \sum_{x=0}^{N-1} x prod_sum=p=0∑PCPC−1x=0∑N−1x - 消费者累加:
cons_sum = ∑ c = 0 P C P C − 1 ∑ x = 0 N − 1 x \text{cons\_sum} = \sum_{c=0}^{PCPC-1} \sum_{x=0}^{N-1} x cons_sum=c=0∑PCPC−1x=0∑N−1x - 正确性校验:
cons_sum = ? prod_sum ⟹ Queue OK \text{cons\_sum} \stackrel{?}{=} \text{prod\_sum} \implies \text{Queue OK} cons_sum=?prod_sum⟹Queue OK - 队列入队/出队流程(指针版本):
push ( ptr ) ⟹ CAS 原子操作更新队列槽位 \text{push}(\text{ptr}) \implies \text{CAS 原子操作更新队列槽位} push(ptr)⟹CAS 原子操作更新队列槽位
pop ( ptr ) ⟹ CAS 原子操作读取队列槽位 \text{pop}(\text{ptr}) \implies \text{CAS 原子操作读取队列槽位} pop(ptr)⟹CAS 原子操作读取队列槽位
特别说明
- 对象指针传递:
- 使用
unique_ptr避免对象复制和内存泄漏 - 出队后自动释放,不需要手动 delete
- 使用
- 自旋等待:
while (!q->push(p));/while (!q->pop(p));- 非阻塞队列满/空时,使用自旋等待 CPU,适合高性能低延迟场景
- 生产者和消费者计数累加验证:
- 保证所有消息都正确传递且未丢失
1. 队列的基本单元——Entry / Cell
- 每个 Entry(单元格)保存以下内容:
- 数据值(Data Value):可以是指针、
unique_ptr<>或简单类型(4~12 字节)。 - 序列号 + 数据标记位(Sequence & Data-present flag):最低位(LSB)用作数据是否有效的标记位。
- 数据值(Data Value):可以是指针、
- 对应数学表示:
Entry = ( seq , data_flag , data_value ) \text{Entry} = (\text{seq}, \text{data\_flag}, \text{data\_value}) Entry=(seq,data_flag,data_value)
- 序列号(Sequence Number)
- 32 位或 64 位,用于判断该 Entry 是否已经被写入或读取过。
- 序列号加上数据标记位,可以唯一确定这个 Cell 的状态:
条件 状态 seq == index && data_flag==1 可读(已写入数据) seq != index && data_flag==0 可写(空槽)
- 写索引(Write_Index)
- 指向下一个应该写入的 Entry。
- 推送(push)操作使用它定位目标 Entry。
- 注意:写入数据和索引增加可以由不同线程完成,这样提高并发性。
- 读索引(Read_Index)
- 指向下一个应该读取的 Entry。
- 弹出(pop)操作使用它定位目标 Entry。
- 读取数据和索引增加也可以由不同线程完成,从而实现线程协作。
- CAS 原子操作
- Entry 内容和读写索引的更新,全部通过 Compare-And-Swap(CAS) 原子操作完成:
- 写数据:
do { old_seq = entry.seq } while (!CAS(&entry.seq, old_seq, old_seq + 1)) // 成功才算写入 - 推动索引:
do { old_index = write_index } while (!CAS(&write_index, old_index, old_index + 1))
- 写数据:
- 这样保证队列操作 无锁、线程安全。
- Entry 内容和读写索引的更新,全部通过 Compare-And-Swap(CAS) 原子操作完成:
- 线程协作的特点
- 不要求同一个线程完成写入数据和索引递增。
- 不要求同一个线程完成读取数据和索引递增。
- 利用不同线程共同推进队列状态,提高并发吞吐量。
伪代码示例
struct Entry {
std::atomic<uint64_t> seq; // 序列号 + 数据标记位
T value; // 数据
};
// 全局队列状态
std::atomic<uint64_t> write_index{0};
std::atomic<uint64_t> read_index{0};
Entry entries[N];
// 推送数据
bool push(T val) {
uint64_t idx;
Entry* e;
do {
idx = write_index.load();
e = &entries[idx % N];
// 检查 Entry 是否可写
} while (e->seq.load() != idx);
e->value = val; // 写入数据
e->seq.store(idx + 1); // 更新 seq + data_flag
write_index.fetch_add(1); // 移动写索引
return true;
}
// 弹出数据
bool pop(T& val) {
uint64_t idx;
Entry* e;
do {
idx = read_index.load();
e = &entries[idx % N];
// 检查 Entry 是否可读
} while (e->seq.load() != idx + 1);
val = e->value; // 读取数据
e->seq.store(idx + N); // 重置 Entry 状态为空
read_index.fetch_add(1); // 移动读索引
return true;
}
核心理解
- 每个 Entry 的状态由
(seq, data_flag)决定。 - 写索引/读索引 + CAS 保证多线程安全。
- 推/弹操作可由不同线程分阶段完成,实现高并发无锁设计。
- 适合 MPMC(多生产者、多消费者)队列场景。
1⃣ 类定义
// 队列中的基本单元 Entry
// alignas(sizeof(helper_entry)) 保证该类实例的内存对齐与 helper_entry 相同
// 主要是为了防止伪共享(false sharing),提高多线程访问性能
class alignas(sizeof(helper_entry)) entry {
public:
// 使用 union 提供两种访问方式:
// 1. _value: 整体原子访问,用于 CAS 操作
// 2. _x: 结构化访问,包含具体数据和序列号
union entry_union {
mutable entry_as_value _value; // 原子操作访问整个 Entry
struct entry_data {
value_type _data; // 数据值,可以是简单类型或指针
index_type _seq; // 序列号,最低位作为“数据是否就绪”标记
} _x;
// 构造函数,将整个 Entry 初始化为 0
entry_union() { _value = 0; }
} _u;
// 获取序列号
index_type get_seq() noexcept {
return _u._x._seq;
}
// 获取 Entry 中的数据值
value_type get_data() noexcept {
return _u._x._data;
}
// 判断 Entry 是否为空(可写)
// 原理:最低位 0 表示空闲
bool is_empty() const {
return !(_u._x._seq & 1U);
}
// 判断 Entry 是否已满(可读)
// 与 is_empty 相反
bool is_full() const {
return !is_empty();
}
};
class alignas(sizeof(helper_entry)) entry
entry是队列的基本单元(Entry/Cell)。alignas(sizeof(helper_entry)):- 使用 对齐,保证每个 Entry 占用的内存大小等于
helper_entry。 - 目的是 避免 false sharing(伪共享),提高多线程访问性能。
- 使用 对齐,保证每个 Entry 占用的内存大小等于
2⃣ 内部联合体 entry_union
union entry_union
{
mutable entry_as_value _value;
struct entry_data
{
value_type _data;
index_type _seq;
} _x;
entry_union() { _value = 0; }
} _u;
- 使用 union 节省空间,并提供两种访问方式:
_value:整体按整数/原子值访问,可以做 CAS 操作。_x:结构化访问,包含:_data:存储 Entry 的具体数据值_seq:序列号(Sequence Number),最低位通常作为 数据是否就绪标记
- 构造函数
_value = 0初始化整个 Entry 为“空”。 - 原理:
- CAS 操作可以直接对
_value进行原子比较交换。 _data和_seq提供结构化读写,便于逻辑处理。
- CAS 操作可以直接对
3⃣ 成员函数
index_type get_seq() noexcept { return _u._x._seq; }
- 获取 Entry 的序列号
seq。 - 用于判断该 Entry 当前是否可读/可写。
value_type get_data() noexcept { return _u._x._data; }
- 获取 Entry 中存储的数据。
bool is_empty() const { return !(_u._x._seq & 1U); }
- 判断 Entry 是否为空:
_seq & 1U→ 最低位为 1 表示已写入数据- 取反
!→ 如果最低位是 0,说明 Entry 空闲,可写。
bool is_full() const { return !is_empty(); }
- 判断 Entry 是否已满(可读),与
is_empty相反。
4⃣ 内部原理总结
- 每个 Entry 占用固定大小,并与缓存行对齐,防止伪共享。
- 通过 union + seq 标记 同时支持:
- 原子操作(CAS)
- 结构化访问(_data / _seq)
_seq的最低位用于标记数据是否就绪(类似 “data-present flag”)。- 判断 Entry 状态:
Entry is empty ⟺ ( _ s e q & 1 ) = = 0 \text{Entry is empty} \iff (\_seq \& 1) == 0 Entry is empty⟺(_seq&1)==0
Entry is full ⟺ ( _ s e q & 1 ) = = 1 \text{Entry is full} \iff (\_seq \& 1) == 1 Entry is full⟺(_seq&1)==1 - CAS 操作针对
_value,保证 无锁、多线程安全。
队列为空 (Queue is empty)
- 条件:
- 读索引(read_index) 指向的 Entry 的
data_flag为false(数据未就绪) - Entry 的序列号
seqnum与当前read_index相等
Queue empty ⟺ ( entry.data_flag = = 0 ) ∧ ( entry.seqnum = = read_index ) \text{Queue empty} \iff (\text{entry.data\_flag} == 0) \wedge (\text{entry.seqnum} == \text{read\_index}) Queue empty⟺(entry.data_flag==0)∧(entry.seqnum==read_index)
- 读索引(read_index) 指向的 Entry 的
- 含义:
- 当前 Entry 为空,队列没有可读数据。
- 消费者线程不需要阻塞,只需返回空状态。
2⃣ 顺序递增读索引
- 当 Entry 的序列号指示下一个循环周期时:
if (entry.seqnum == read_index + size) {
// 当前 Entry 已处理,增加 read_index
++read_index;
}
- 含义:
size为队列容量- 这种情况出现在 环形队列循环一周 后,Entry 仍然保留上一次的 seqnum
- 消费者只需递增
read_index,即可继续扫描下一个 Entry
3⃣ 队列有可读数据
if (entry.seqnum == read_index && entry.data_flag) {
// 消费者获取数据后,将 Entry 重置为空
// 并更新序列号为 read_index + size
}
- 含义:
entry.data_flag= 1 → 表示数据已就绪- 消费者可以读取数据
- 为了让生产者继续写入,Entry 需要被“清空”:
data_flag置 0seqnum增加size(下一个循环周期的标记)
- 这样保证:
- 多生产者/多消费者无锁协作
- 顺序和状态的正确性
总结
- 队列状态由 Entry 的两个字段决定:
seqnum→ 序列号,控制 Entry 循环周期和顺序data_flag→ 数据是否就绪
- 空队列:
data_flag = false且seqnum == read_index
- 循环一周的 Entry:
seqnum == read_index + size→ 更新read_index
- 可读 Entry:
seqnum == read_index && data_flag = true→ 读取数据,并重置 Entry 为下一个循环周期
- 无锁协作:
- 通过 CAS 原子操作更新 Entry
- 生产者和消费者可以在不同线程安全操作
- 保证严格顺序和进度
Lockfree, MPMC Queue - Depth Capacity 4 - S₀ - Empty SVG 状态图,并结合理解和注释。
理解
这个图描述的是一个 容量为 4 的无锁多生产者多消费者队列(MPMC Queue) 在 初始状态 S₀ 的情况:
1⃣ 队列容量与索引
- 队列容量:4 个槽位(slots)
- 写索引(Write Index) = 0
- 读索引(Read Index) = 0
含义:队列为空,下一条写入的消息将放在槽位 0,下一条读取操作也会从槽位 0 取。
2⃣ 队列槽位(Buffer Slots)状态
每个槽位显示三个重要信息:
| 槽位 | State | Seq | D.Flag |
|---|---|---|---|
| 0 | Empty | 0 | 0 |
| 1 | Empty | 1 | 0 |
| 2 | Empty | 2 | 0 |
| 3 | Empty | 3 | 0 |
- State: Empty → 当前槽位没有数据
- Seq: x → Entry 的序列号(Sequence Number),用于检测循环状态
- D.Flag: 0 → 数据标记,0 表示数据未就绪,1 表示数据已就绪
这里 D.Flag = 0,说明所有槽位都为空,初始状态下没有可读数据。
3⃣ 写索引和读索引指向关系
- 写索引箭头 指向槽位 0 → 下一条写入操作会在槽位 0
- 读索引箭头 指向槽位 0 → 下一条读取操作会尝试从槽位 0 读取
因为队列为空,所以读操作会发现 D.Flag = 0,需要等待生产者写入。
4⃣ 公式和逻辑
- 队列空的判断公式:
empty ⟺ ( entry.seqnum = = read_index ) ∧ ( entry.data_flag = 0 ) \text{empty} \iff (\text{entry.seqnum} == \text{read\_index}) \wedge (\text{entry.data\_flag} = 0) empty⟺(entry.seqnum==read_index)∧(entry.data_flag=0) - 写入操作(push)将会更新:
- 数据内容(entry.data)
- 数据标记(entry.data_flag)
- 序列号(entry.seqnum += size)
- 读取操作(pop)则会检查:
seqnum == read_index并且data_flag = 1才能读取- 读取后将
data_flag清零,并增加seqnum为下一个循环周期
总结
- 初始状态 S₀:
- 队列为空
- 所有槽位数据标记 D.Flag = 0
- 写索引和读索引都指向槽位 0
- 此状态下:
- 消费者无法读取数据
- 生产者可以从槽位 0 写入数据
- 队列利用序列号 Seq 和数据标记 D.Flag 来管理环形循环和并发安全
Lockfree, MPMC Queue - Depth Capacity 4 - S₁(插入一个元素后的状态) 的 SVG 状态图,并添加理解和注释。
理解
这个图描述了一个 容量为 4 的无锁多生产者多消费者队列(MPMC Queue) 在 状态 S₁:已经插入一个数据项后的情况。
1⃣ 队列容量与索引
- 队列容量:4 个槽位(slots)
- 写索引(Write Index) = 1
写索引增加 1,表示下一条写入操作会放到槽位 1。
- 读索引(Read Index) = 0
读索引仍然指向槽位 0,表示下一条读取操作会从槽位 0 读取。
2⃣ 队列槽位(Buffer Slots)状态
| 槽位 | State | Seq | D.Flag |
|---|---|---|---|
| 0 | Full | 0 | 1 |
| 1 | Empty | 1 | 0 |
| 2 | Empty | 2 | 0 |
| 3 | Empty | 3 | 0 |
- State: Full → 表示槽位 0 存在一个已经写入的数据
- D.Flag = 1 → 数据标记为已就绪,消费者可以读取
- 其他槽位仍为空,D.Flag = 0
这体现了无锁队列的 单元格状态管理:通过序列号 Seq 和数据标记 D.Flag 控制循环队列状态。
3⃣ 写索引与读索引箭头关系
- 写索引箭头 指向槽位 1 → 下一条写入操作会放在槽位 1
- 读索引箭头 指向槽位 0 → 下一条读取操作会从槽位 0 取
写和读索引独立更新,实现了多线程协作:
- push 和 pop 不一定由同一个线程完成
- 保证了 队列无锁、非阻塞、并发安全。
4⃣ 公式和逻辑
- 写入操作公式(push):
entry.data ← new value , entry.data_flag ← 1 , write_index ← ( write_index + 1 ) m o d capacity \text{entry.data} \gets \text{new value}, \quad \text{entry.data\_flag} \gets 1, \quad \text{write\_index} \gets (\text{write\_index} + 1) \bmod \text{capacity} entry.data←new value,entry.data_flag←1,write_index←(write_index+1)modcapacity - 读取操作公式(pop):
if ( entry.seqnum = read_index ∧ entry.data_flag = 1 ) then read data and reset D.Flag = 0 \text{if } (\text{entry.seqnum} = \text{read\_index} \wedge \text{entry.data\_flag} = 1) \text{ then read data and reset D.Flag} = 0 if (entry.seqnum=read_index∧entry.data_flag=1) then read data and reset D.Flag=0
这样保证了:
- 读取时不会读取未就绪的数据
- 写入时不会覆盖未读取的数据
总结
- 状态 S₁ 描述 队列中已有一个元素:
- 槽位 0 已写入数据,D.Flag = 1 → 可读
- 写索引已递增,指向槽位 1 → 下一个写入目标
- 读索引仍指向槽位 0 → 下一个读取目标
- 队列利用 序列号 Seq + 数据标记 D.Flag 实现:
- 并发安全(多生产者、多消费者)
- 无锁循环队列管理
- 写入和读取操作可由不同线程完成
Lockfree, MPMC Queue - Depth Capacity 4 - S₂(1 in, 1 out) 的 SVG 状态图,并添加理解和注释。
理解
这个图描述了一个 容量为 4 的无锁多生产者多消费者队列(MPMC Queue) 在 状态 S₂:已经插入一个元素,并且该元素被读取出队后的状态。
1⃣ 队列索引状态
- 写索引(Write Index) = 1
写索引仍然指向槽位 1,表示下一条写入操作会放到槽位 1。
- 读索引(Read Index) = 1
读索引已经移动到槽位 1,表示槽位 0 的数据已经被读取,下一条读取操作会从槽位 1 取。
这体现了 先进先出(FIFO) 的循环队列机制。
2⃣ 队列槽位状态
| 槽位 | State | Seq | D.Flag |
|---|---|---|---|
| 0 | Empty | 4 | 0 |
| 1 | Empty | 1 | 0 |
| 2 | Empty | 2 | 0 |
| 3 | Empty | 3 | 0 |
- 槽位 0(原先 Full)现在变 Empty:
- 数据已经被 pop 取出
- D.Flag = 0 → 数据标记已清空
- Seq = 4 → 表示循环队列已经往下一轮移动
- 其他槽位仍为空
- 所有槽位均 可供写入
这是 无锁循环队列的循环利用,通过序列号 Seq 管理循环逻辑。
3⃣ 写索引和读索引箭头
- 写索引箭头 → 指向槽位 1
- 下一条写入操作会放在槽位 1
- 读索引箭头 → 指向槽位 1
- 下一条读取操作会从槽位 1 取
通过 独立更新读写索引,实现多生产者、多消费者无锁协作。
push 和 pop 不必在同一线程完成,保证高并发和低延迟。
4⃣ 公式与逻辑说明
- 读取操作(pop)公式:
if ( entry.seqnum = read_index ∧ entry.data_flag = 1 ) then: read data and mark entry empty, update seqnum += capacity \text{if } (\text{entry.seqnum} = \text{read\_index} \wedge \text{entry.data\_flag} = 1) \\ \text{ then: read data and mark entry empty, update seqnum += capacity} if (entry.seqnum=read_index∧entry.data_flag=1) then: read data and mark entry empty, update seqnum += capacity - 写入操作(push)公式:
entry.data ← new value , entry.data_flag ← 1 , write_index ← ( write_index + 1 ) m o d capacity \text{entry.data} \gets \text{new value},\quad \text{entry.data\_flag} \gets 1,\quad \\ \text{write\_index} \gets (\text{write\_index} + 1) \bmod \text{capacity} entry.data←new value,entry.data_flag←1,write_index←(write_index+1)modcapacity
通过 seqnum 与容量 mod 运算实现循环队列,保证不会覆盖未读取的数据。
总结
- 状态 S₂ 描述 队列已经读出第一个元素后的状态:
- 槽位 0 数据已读取,状态变 Empty,Seq 递增
- 写索引仍指向槽位 1 → 下一个写入目标
- 读索引已移动到槽位 1 → 下一个读取目标
- 队列状态由 Seq + D.Flag 管理,保证:
- 读写分离
- 高并发安全
- 循环复用槽位
1⃣ 序列号 Seq 的作用
每个槽位的 entry 都有一个序列号 _seq,用于判断:
- 槽位是否为空(可写入)
- 槽位数据是否已被消费(可读取)
- 对于容量为
C的循环队列,Seq 初始值就是槽位索引:
初始状态: entry[0].seq = 0 , entry[1].seq = 1 , … \text{初始状态: } \text{entry[0].seq} = 0, \text{entry[1].seq} = 1, \ldots 初始状态: entry[0].seq=0,entry[1].seq=1,… - D.Flag = 0 → 空槽
- D.Flag = 1 → 有数据
2⃣ 写入和读取的 Seq 更新规则
假设队列容量 C = 4。
写入(push):
- 找到槽位
i - 确认槽位
i是空的(entry.seq == write_index) - 写入数据,并设置 D.Flag = 1
- write_index += 1
读取(pop):
- 找到槽位
i - 确认槽位
i有数据(entry.seq == read_index && D.Flag = 1) - 读取数据
- 标记槽位为空,并 Seq += 队列容量 C
保证下次写入时,write_index 与 seq 匹配判断正确
3⃣ 应用到 S₂ 状态
- 队列容量
C = 4 - 最开始插入的数据在槽位 0(Seq = 0)
- pop 读取后,槽位 0 被清空:
entry[0].seq = 0 + C = 0 + 4 = 4 \text{entry[0].seq} = 0 + C = 0 + 4 = 4 entry[0].seq=0+C=0+4=4 - 这个 Seq = 4 用于下轮循环判断:
- 下次写索引指向槽位 0 时,write_index = 4
- 判断
(entry.seq == write_index)→ 成功,允许写入
- D.Flag = 0 → 表示空槽
所以在 S₂ 状态,槽位 0 Seq = 4 是为了实现 循环队列的安全复用。
4⃣ 总结
- Seq 的更新公式:
entry.seq + = capacity (C) \text{entry.seq} += \text{capacity (C)} entry.seq+=capacity (C) - 这样保证了:
- 无锁安全(读写索引不需锁)
- 循环使用槽位,不覆盖未读取的数据
- 多生产者/多消费者场景下仍能通过 CAS 正确判断空/满
S₃ 状态(Depth = 4, 2 in, 1 out) 的 SVG 表示,以及它在无锁 MPMC 队列中的含义。
1⃣ 当前队列索引状态
- Write Index = 2
- 下一个 push 操作会写入槽位 2
- Read Index = 1
- 下一个 pop 操作会读取槽位 1
队列容量C = 4,槽位索引[0, 1, 2, 3]。
- 下一个 pop 操作会读取槽位 1
2⃣ 每个槽位状态
| Slot | Seq | D.Flag | 状态 | 解释 |
|---|---|---|---|---|
| 0 | 4 | 0 | Empty | 已被 pop 读取,Seq += 4 → 下轮可写入 |
| 1 | 1 | 1 | Full | 仍有数据,等待 pop |
| 2 | 2 | 0 | Empty | 下一个写入目标槽位 |
| 3 | 3 | 0 | Empty | 尚未写入 |
3⃣ Seq 的作用
Seq = 序列号,保证循环队列 安全复用:
- Slot 0 的 Seq = 4
- 最初 Seq = 0,数据被 pop 后,Seq 更新:
entry[0].seq = old seq + capacity = 0 + 4 = 4 \text{entry[0].seq} = \text{old seq} + \text{capacity} = 0 + 4 = 4 entry[0].seq=old seq+capacity=0+4=4 - 下次 write_index 指向槽位 0 时:
write_index = 4 ⟹ entry[0].seq == write_index → 可以写入 \text{write\_index} = 4 \implies \text{entry[0].seq == write\_index → 可以写入} write_index=4⟹entry[0].seq == write_index → 可以写入
- 最初 Seq = 0,数据被 pop 后,Seq 更新:
- Slot 1 的 Seq = 1
- 数据尚未被消费,D.Flag = 1 → Full
- Slot 2 的 Seq = 2
- 空槽,下一个写入目标
- Slot 3 的 Seq = 3
- 空槽,尚未写入
这样 Seq + D.Flag 的组合让多线程同时读写也能判断空/满状态,无锁安全。
4⃣ 可视化理解
- 箭头 Write Index → Slot 2
- 表示下一个 push 会写入槽位 2
- 箭头 Read Index → Slot 1
- 表示下一个 pop 会读取槽位 1
- Slot 0 Seq = 4 是因为它已经被 pop,更新了序列号,为循环复用做准备。
5⃣ 总结
- S₃ 状态 = 2 in, 1 out
- 队列里有 1 个数据已经被 pop
- 下一个写入在槽位 2
- Slot 0 已空,Seq 更新到 4 → 循环队列复用
- Seq 的核心作用:
- 保证槽位在循环复用时可以安全判断是否空
- 避免写入覆盖未消费的数据
- 多生产者/多消费者场景下,通过 CAS 判断空/满
1⃣ 当前队列索引状态
- Write Index = 2
- 下一个 push 操作仍然会写入槽位 2
- Read Index = 2
- 下一个 pop 操作会读取槽位 2
队列容量C = 4,槽位索引[0, 1, 2, 3]。
- 下一个 pop 操作会读取槽位 2
2⃣ 每个槽位状态
| Slot | Seq | D.Flag | 状态 | 解释 |
|---|---|---|---|---|
| 0 | 4 | 0 | Empty | 原先写入的数据已经被 pop,Seq 更新为 4 → 循环队列复用 |
| 1 | 5 | 0 | Empty | 数据已被 pop,Seq 更新为 5 → 下轮可写入 |
| 2 | 2 | 0 | Empty | 下一个写入目标槽位 |
| 3 | 3 | 0 | Empty | 空槽,尚未写入 |
注意:相比 S₃ 状态,Slot 0 和 Slot 1 都已经被 pop,队列里暂时没有实际数据,但 Write Index 和 Read Index 指向下一轮循环的位置。
3⃣ Seq 的计算逻辑
当数据被 pop 之后,序列号更新公式如下:
entry.seq = old seq + capacity = old seq + C \text{entry.seq} = \text{old seq} + \text{capacity} = \text{old seq} + C entry.seq=old seq+capacity=old seq+C
因此:
- Slot 0: 初始 Seq = 0 → pop → Seq = 0 + 4 = 4
- Slot 1: 初始 Seq = 1 → pop → Seq = 1 + 4 = 5
Slot 2 和 Slot 3 保持原来的 Seq 值(尚未被 push 或 pop)。
4⃣ 可视化理解
- 箭头 Write Index → Slot 2
- 下一个 push 会写入槽位 2
- 箭头 Read Index → Slot 2
- 下一个 pop 也会读取槽位 2
- Slot 0 和 Slot 1 的 Seq 更新,保证 循环复用安全
- 队列为空,但循环队列的 Seq 和 Index 确保后续操作正确进行。
5⃣ 总结
- S₄ 状态 = 2 in, 2 out
- 队列暂时为空,Write/Read Index 指向下一轮循环位置
- Slot 0、Slot 1 已被 pop,Seq 增加容量
C,可安全复用
- Seq + D.Flag 保证了无锁多生产者/多消费者情况下的 空/满判断
- Write Index 和 Read Index 不必相等即可安全协作
1⃣ 函数签名
bool push(T value);
- 输入参数
value:要写入队列的数据 - 返回值
bool:true→ 写入成功false→ 队列满,无法写入
注意:这是无锁多生产者多消费者队列(MPMC),多个线程可以同时调用
push。
2⃣ 操作步骤概览
push 操作主要执行 两次 CAS(Compare-And-Swap)操作,并依赖 Entry 里的 sequence 和 data_flag 来判断是否可写。
3⃣ 步骤详解
Step 1:检查目标 Entry 是否为空
- 获取当前 Write Index 指向的槽位
entry - 判断
entry是否为空:- 空条件:
entry.is_empty() ≡ ( entry.seq % C ) = = write_index 且 data_flag = 0 \text{entry.is\_empty()} \equiv (\text{entry.seq} \% C) == \text{write\_index} \quad \text{且} \quad \text{data\_flag} = 0 entry.is_empty()≡(entry.seq%C)==write_index且data_flag=0 C为队列容量- 如果不为空 → 队列已满 → 返回
false
- 空条件:
Step 2:CAS 更新 Entry
- 执行 第一次 CAS,将 Entry 更新为新数据:
entry.seq = old seq (保持不变)
entry.data = value
entry.data_flag = 1 // 数据有效标记
- CAS 确保:只有 Entry 状态仍为空时才写入成功
- 这是 无锁安全的核心,多个线程同时 push 时只会有一个线程成功
Step 3:CAS 更新 Write Index
- 执行 第二次 CAS,推进 Write Index 到下一个槽位:
write_index = ( write_index + 1 ) % C \text{write\_index} = (\text{write\_index} + 1) \% C write_index=(write_index+1)%C - 保证下一个 push 可以写入下一个循环槽位
- 多线程情况下,Write Index 的更新也是原子操作,避免竞态条件
4⃣ 队列满判断逻辑
- 如果 Write Index 指向的 Entry 是 full(上一轮数据未被 pop),就说明队列满:
if entry.is_full() return false;
- 此时
push直接返回false,调用者可以选择重试或丢弃数据
5⃣ 总结
push本质是 先 CAS 写入 Entry,再 CAS 推进 Write Index- Entry 中的 sequence + data_flag 用于判断空/满状态
- 队列满时不会阻塞,直接返回
false - 两次 CAS 确保在 MPMC 环境下 多线程安全
- 数据和索引的更新 可以由不同线程完成,支持线程协作
Lockfree MPMC Queue 的 push() 实现代码
[[using gnu: hot, flatten]]
bool push(value_type d) noexcept
{
while (true) {
// 1⃣ 读取当前写入索引(Write Index)
index_type wr_index = _write_index.load();
// 2⃣ 读取当前索引对应 Entry 的序列号
index_type seq = _array[wr_index].get_seq();
// 3⃣ 情况 1:Entry 为空(可写)
if (seq == static_cast<index_type>(wr_index << 1)) {
// 创建两个 Entry 对象
entry e{static_cast<index_type>(wr_index << 1)}; // 空 Entry
entry data_entry{static_cast<index_type>((wr_index << 1) | 1U), d}; // 带数据的 Entry(data_flag=1)
// 4⃣ CAS 尝试写入数据(双字 CAS, DWCAS)
if (_array[wr_index].compare_exchange(e, data_entry)) {
// 可选:推进写索引(非 lazy_push 模式)
if constexpr (!lazy_push) {
_write_index.compare_exchange_strong(wr_index, wr_index + 1);
}
return true; // 写入成功
}
}
// 5⃣ 情况 2:Entry 已写满 或 已被 wrap-around(循环队列)
else if ((seq == static_cast<index_type>((wr_index << 1) | 1U)) ||
(static_cast<index_type>(seq) == static_cast<index_type>((wr_index + _array.size()) << 1))) {
// 推进写索引,尝试写入下一个槽位
_write_index.compare_exchange_strong(wr_index, wr_index + 1);
}
// 6⃣ 情况 3:Entry 的序列号落在更远的环路,说明队列满
else if (static_cast<index_type>(seq + (_array.size() << 1)) ==
static_cast<index_type>((wr_index << 1) | 1U)) {
return false; // 队列满,写入失败
}
// 7⃣ 循环重试
}
}
理解
[[using gnu: hot, flatten]]- 告诉编译器这是热点函数(hot)并尽量内联展开(flatten),优化性能。
- Entry 序列号 (seq) 的设计
- 每个 Entry 使用 序列号和数据标志管理状态
- 空 Entry:
seq = wr_index << 1 - 满 Entry:
seq = (wr_index << 1) | 1(最低位 data_flag=1) - wrap-around(循环队列)序列号加上
array.size() << 1
- DWCAS(Double Word Compare-And-Swap)
_array[wr_index].compare_exchange(e, data_entry)是 原子更新 Entry- 同时更新数据和 data_flag,保证无锁线程安全
- 推进写索引
- 非 lazy 模式下,写成功后推进
_write_index - 如果该槽位已满或 wrap-around,也会尝试推进
_write_index - 防止单线程阻塞其他线程写入
- 非 lazy 模式下,写成功后推进
- 队列满判断
- 当序列号落在更远的循环轮次时,说明队列已满 → 返回
false
- 当序列号落在更远的循环轮次时,说明队列已满 → 返回
- 循环重试机制
while(true)保证在高并发环境下,多线程安全重试- 直到写入成功或检测到队列满
数学公式表达
- 空 Entry 条件:
seq = wr_index ≪ 1 \text{seq} = \text{wr\_index} \ll 1 seq=wr_index≪1 - 满 Entry 条件:
seq = ( wr_index ≪ 1 ) ∣ 1 \text{seq} = (\text{wr\_index} \ll 1) | 1 seq=(wr_index≪1)∣1 - wrap-around 条件(队列满/推进索引):
seq = ( wr_index + capacity ) ≪ 1 \text{seq} = (\text{wr\_index} + \text{capacity}) \ll 1 seq=(wr_index+capacity)≪1
Lockfree MPMC Queue 的 pop() / try_pop() 操作
理解
try_pop(T& value) 的作用:从多生产者多消费者队列中取出一个元素,如果队列为空则返回 false。整个操作使用 两次 CAS(Compare-And-Swap)原子操作保证线程安全。
1⃣ 对 Entry 执行 CAS 操作
- 目标:清空当前 Entry 并准备给下一轮写入使用,同时获取旧值。
- 操作内容:
- 更新序列号
- 将序列号增加队列容量 s i z e size size,表示该槽位可用于下一次写入。
- entry.seq ← entry.seq + size \text{entry.seq} \gets \text{entry.seq} + \text{size} entry.seq←entry.seq+size
- 清除数据标志
- 设置
data_present_flag = 0,表示该 Entry 已空。
- 设置
- 获取旧值
- 使用 CAS 的原子交换,将旧值写入传入的
value引用参数。
- 使用 CAS 的原子交换,将旧值写入传入的
- 更新序列号
这一步是核心:确保 在高并发环境下,多个消费者同时访问时仍能正确获取数据且不会覆盖其他线程的数据。
2⃣ 对 Read Index 执行 CAS 操作
- 目标:推进队列的读索引,使下一个 pop 操作访问下一个槽位。
- 操作内容:
- 原子更新
_read_index:
read_index ← read_index + 1 \text{read\_index} \gets \text{read\_index} + 1 read_index←read_index+1
- 原子更新
- 如果 CAS 成功,则下一个 pop 会访问下一个 Entry;如果失败,说明其他线程已经推进过读索引,当前线程会重试。
核心逻辑总结
- 取出数据 → 原子获取旧值
- 清空槽位 → 为下一轮写入准备
- 推进读索引 → 保持队列顺序
- 无锁并发安全 → 多生产者多消费者场景下线程安全
公式总结
- 清空 Entry 并准备下一次写入:
{ entry.seq ← entry.seq + size data_present_flag ← 0 value ← old entry value \begin{cases} \text{entry.seq} \gets \text{entry.seq} + \text{size} \\ \text{data\_present\_flag} \gets 0 \\ \text{value} \gets \text{old entry value} \end{cases} ⎩ ⎨ ⎧entry.seq←entry.seq+sizedata_present_flag←0value←old entry value - 推进读索引:
read_index ← read_index + 1 \text{read\_index} \gets \text{read\_index} + 1 read_index←read_index+1
Lockfree MPMC Queue 的 pop() 函数实现,
// pop 函数:从无锁 MPMC 队列中弹出一个元素
// 参数 d 用于存储弹出的值
bool pop(value_type& d) noexcept {
while (true) { // 无限循环直到成功弹出或队列为空
// 原子读取当前读索引 rd_index
index_type rd_index = _read_index.load();
// 原子读取队列槽位对应 Entry 的内容到本地变量 e
entry e{_array[rd_index].load()};
// 情况 1:当前 Entry 可读(已写入数据,data_flag = 1)
// seq 编码规则:低位表示 data_flag,奇数表示已写入
if (e.get_seq() == static_cast<index_type>((rd_index << 1) | 1U)) {
// 构造一个新的空 Entry,为下一轮写入做准备
// seq 增加队列容量 size,data_flag = 0 表示空
entry empty_entry{static_cast<index_type>((rd_index + _array.size()) << 1U)};
// 使用双字 CAS(DWCAS)原子操作尝试替换 Entry
// 成功表示当前线程成功弹出该元素
if (_array[rd_index].compare_exchange(e, empty_entry)) { // <=== DWCAS 原子指令
// 将弹出的数据存入外部引用 d
d = e.get_data();
// 非延迟模式下推进读索引
// 原子更新 _read_index,使其指向下一个可读槽位
if constexpr (!lazy_pop) {
_read_index.compare_exchange_strong(rd_index, rd_index + 1);
}
// 弹出成功,返回 true
return true;
}
}
// 情况 2:当前 Entry 已被消费(旧轮次数据)
// seq 的奇数位仍为 1,但属于上一轮数据
// 推进读索引到下一个槽位
else if (static_cast<index_type>(e.get_seq() | 1U) ==
static_cast<index_type>(((rd_index + _array.size()) << 1) | 1U)) {
_read_index.compare_exchange_strong(rd_index, rd_index + 1);
}
// 情况 3:当前 Entry 为空(data_flag = 0)
// 队列为空,无法弹出数据,返回 false
else if (e.get_seq() == static_cast<index_type>(rd_index << 1)) {
return false;
}
}
}
函数概览
bool pop(value_type& d) noexcept
- 功能:从队列中弹出一个元素,并将值写入
d。 - 返回值:
true:成功弹出一个元素false:队列为空,无法弹出
这是一个 无锁 (lock-free) 的操作,适用于多生产者多消费者队列(MPMC Queue)。
代码逐行解析
while (true) {
- 无限循环,直到成功弹出元素或确定队列为空。
- 高并发环境下,CAS 可能失败,需要重试。
index_type rd_index = _read_index.load();
- 原子读取当前 读索引
rd_index。 - 这个索引指向队列中下一个可读取的 Entry。
entry e{_array[rd_index].load()};
- 原子读取 队列槽位 Entry 的内容,保存在本地变量
e中。
if (e.get_seq() == static_cast<index_type>((rd_index << 1) | 1U)) {
- 判断当前 Entry 是否 已满且可读:
rd_index << 1:将读索引左移 1 位| 1U:设置 data_flag = 1 表示数据存在
- 条件成立说明 Entry 已写入数据,可弹出。
entry empty_entry{static_cast<index_type>((rd_index + _array.size()) << 1U)};
- 构造一个新的空 Entry,准备写回:
- 序列号增加队列容量
size,标记下一个写入循环可以使用 data_flag = 0表示 Entry 已空
empty_entry.seq = ( rd_index + size ) × 2 \text{empty\_entry.seq} = (\text{rd\_index} + \text{size}) \times 2 empty_entry.seq=(rd_index+size)×2
- 序列号增加队列容量
if (_array[rd_index].compare_exchange(e, empty_entry)) {
- 双字 CAS (DWCAS) 原子操作:
- 如果 Entry 当前仍为
e,就用empty_entry替换 - 保证在高并发下只有一个线程能成功弹出该 Entry
- 如果 Entry 当前仍为
d = e.get_data();
- 获取弹出的数据写入
d。
if constexpr (!lazy_pop) {
_read_index.compare_exchange_strong(rd_index, rd_index + 1);
}
- 非延迟模式下推进读索引:
- 原子更新
_read_index,指向下一个可读槽位
- 原子更新
- 延迟模式下索引可能稍后更新,由其他操作推进。
return true;
- 弹出成功,返回
true。
队列空或索引推进逻辑
} else if (static_cast<index_type>(e.get_seq() | 1U) ==
static_cast<index_type>(((rd_index + _array.size()) << 1) | 1U)) {
_read_index.compare_exchange_strong(rd_index, rd_index + 1);
- 如果 Entry 已被消费过(旧轮次的数据),推进读索引到下一个位置。
- CAS 保证原子更新。
} else if (e.get_seq() == static_cast<index_type>(rd_index << 1)) {
return false;
}
- 如果 Entry 的序列号表示该槽为空(data_flag = 0),说明队列空,直接返回
false。
pop() 核心总结
- 读取当前读索引 rd_index
- 读取对应 Entry
- 判断 Entry 状态:
- 可读 → 原子获取数据,清空 Entry,推进索引
- 已消费 → 推进索引
- 空 → 队列空,返回 false
- CAS 保证原子性和无锁并发安全
公式总结
- 弹出数据并清空 Entry:
{ empty_entry.seq = ( rd_index + size ) ≪ 1 empty_entry.data_flag = 0 d = e . data \begin{cases} \text{empty\_entry.seq} = (\text{rd\_index} + \text{size}) \ll 1 \\ \text{empty\_entry.data\_flag} = 0 \\ d = e.\text{data} \end{cases} ⎩ ⎨ ⎧empty_entry.seq=(rd_index+size)≪1empty_entry.data_flag=0d=e.data - 推进读索引(非延迟模式):
read_index ← read_index + 1 \text{read\_index} \gets \text{read\_index} + 1 read_index←read_index+1 - 队列为空判断:
if e . seq = rd_index ≪ 1 ⇒ return false \text{if } e.\text{seq} = \text{rd\_index} \ll 1 \Rightarrow \text{return false} if e.seq=rd_index≪1⇒return false
// exchange 函数:无锁队列中原子替换指定索引的值
// 参数说明:
// i - 目标索引(index)
// old_value - 期望的旧值
// new_value - 要替换的新值
// 返回值:
// true -> 替换成功
// false -> 替换失败(旧值不匹配)
bool exchange(index_type& i, value_type old_value, value_type new_value) noexcept
{
// 构造旧 Entry,seq 编码为 (i << 1) | 1U 表示该槽位已写入
entry old_entry{static_cast<index_type>((i << 1) | 1U), old_value};
// 构造新 Entry,seq 与旧 Entry 相同,保持 data_flag = 1
entry new_entry{static_cast<index_type>((i << 1) | 1U), new_value};
// 使用 CAS(Compare-And-Swap)原子操作替换槽位的值
// 如果当前槽位的内容与 old_entry 匹配,则用 new_entry 替换
// 成功返回 true,失败返回 false
return _array[i].compare_exchange(old_entry, new_entry);
}
理解与公式解释
- seq 编码规则
Entry.seq = ( i ≪ 1 ) ∣ 1 \text{Entry.seq} = (i \ll 1) | 1 Entry.seq=(i≪1)∣1
- 左移 1 位保留索引信息(轮次计算)
- 或上 1 表示 data_flag = 1(数据有效)
- CAS 原子操作
compare_exchange(old, new) ⟺ { _array[i] = new 如果 _array[i] == old 返回 false 否则不修改 \text{compare\_exchange(old, new)} \iff \begin{cases} \text{\_array[i] = new} & \text{如果 \_array[i] == old} \\ \text{返回 false} & \text{否则不修改} \end{cases} compare_exchange(old, new)⟺{_array[i] = new返回 false如果 _array[i] == old否则不修改
- 这是 典型的无锁操作,保证并发情况下不会出现竞争条件
- 适用于替换已经存在的值而无需锁机制
- 函数特点
- 线程安全:多个线程可以同时尝试替换同一个槽位
- 原子性:保证替换操作不可被中断
- 用途:类似 std::atomic 的 compare_exchange,可以在队列中做更新操作而不破坏其他线程的读写
理解
- Full 状态
- data_flag = 1 \text{data\_flag} = 1 data_flag=1 表示槽位中有数据。
- seq \text{seq} seq 保持当前值,用于判断索引匹配和轮次。
- Push 后写入数据,写索引 W . I n d e x W.Index W.Index 前进。
- Empty 状态
- data_flag = 0 \text{data\_flag} = 0 data_flag=0 表示槽位为空。
- Pop 后清空数据并更新序列号 s e q + = N seq += N seq+=N,读索引 R . I n d e x R.Index R.Index 前进。
- 下一个写入可重用此槽位。
- 操作流程
- Push:保持 seq,设置 data_flag = 1,放入数据,写索引前进。
- Pop:序列号加上队列容量 N N N,设置 data_flag = 0,读索引前进。
- 队列利用 seq 与 data_flag 判断槽位状态,实现 无锁多生产者多消费者(MPMC)。
- 公式说明
- 判断 Full: Entry.seq = ( index ≪ 1 ) ∣ 1 \text{Entry.seq} = (\text{index} \ll 1) | 1 Entry.seq=(index≪1)∣1
- 判断 Empty: Entry.seq = ( index ≪ 1 ) \text{Entry.seq} = (\text{index} \ll 1) Entry.seq=(index≪1)
<?xml version="1.0" encoding="UTF-8"?>
理解
- 初始化状态
- Cell / Entry 索引为 0。
- 队列轮次(Round)开始。
- 初始 Empty 状态:
- data_flag = 0 \text{data\_flag} = 0 data_flag=0
- seq = 0 \text{seq} = 0 seq=0
- Full 状态只是示意: data_flag = 1 \text{data\_flag} = 1 data_flag=1,seq = 0。
- Push 操作
- 条件:槽位 Empty 且 seq 匹配当前写索引。
- 操作步骤:
- 保持 seq 不变。
- data_flag 设置为 1,表示数据有效。
- 写入数据。
- 写索引 W . I n d e x W.Index W.Index 前进。
- 公式表示:
Entry.seq = Entry.seq (原值) Entry.data_flag = 1 \text{Entry.seq} = \text{Entry.seq (原值)} \quad \text{Entry.data\_flag} = 1 Entry.seq=Entry.seq (原值)Entry.data_flag=1
- Pop 操作
- 条件:槽位 Full 且 seq 匹配当前读索引。
- 操作步骤:
- 序列号加上队列容量 N N N(下轮可复用)。
- data_flag = 0,表示空。
- 读索引 R . I n d e x R.Index R.Index 前进。
- 公式表示:
Entry.seq + = N , Entry.data_flag = 0 \text{Entry.seq} += N, \quad \text{Entry.data\_flag} = 0 Entry.seq+=N,Entry.data_flag=0
- Round 概念
- 序列号 seq 用于判断槽位是否可写或可读。
- 随着 Push/Pop 进行,seq 会累加,确保无锁多生产者多消费者(MPMC)环境下的安全。
- 箭头说明
- 左箭头:Empty → Full,表示 Push。
- 右箭头:Full → Empty,表示 Pop。
详细理解
- 当前状态
- Cell/Entry 索引:0
- Round(轮次):0
- 状态:Full
- seq = 0(第一轮)
- data_flag = 1(表示槽位已有数据)
- Empty 状态为初始对照,data_flag = 0,seq = 0。
- Push 操作
- 条件:槽位为空且 seq 匹配写索引。
- 动作:
- seq 保持不变。
- data_flag = 1,表示已写入数据。
- 写入新数据。
- 写索引 W . I n d e x W.Index W.Index 前进。
- 公式:
Entry.seq = Entry.seq (原值) , Entry.data_flag = 1 \text{Entry.seq} = \text{Entry.seq (原值)}, \quad \text{Entry.data\_flag} = 1 Entry.seq=Entry.seq (原值),Entry.data_flag=1
- Pop 操作
- 条件:槽位已满且 seq 匹配读索引。
- 动作:
- seq 加上队列容量 N N N,准备下一轮写入。
- data_flag = 0,表示清空。
- 读索引 R . I n d e x R.Index R.Index 前进。
- 公式:
Entry.seq + = N , Entry.data_flag = 0 \text{Entry.seq} += N, \quad \text{Entry.data\_flag} = 0 Entry.seq+=N,Entry.data_flag=0
- 状态转换
- Push 箭头:Empty → Full
- Pop 箭头:Full → Empty
- 轮次(Round)概念
- seq 用于判断槽位是否可写或可读,确保多生产者多消费者(MPMC)环境下无锁安全。
- 第一轮操作后,seq 仍为 0,但 data_flag 已标记为 1。
详细理解
- 当前状态
- Cell/Entry 索引:0
- Round(轮次):1(上一轮 Push+Pop 后进入)
- 状态:Empty
- seq = N(队列容量,表示下一轮写入起始序列号)
- data_flag = 0(槽位已清空)
- Push 操作
- 条件:槽位为空且 seq 与写索引匹配。
- 动作:
- seq 保持不变。
- data_flag = 1,表示写入数据。
- 写入数据。
- 写索引 W . I n d e x W.Index W.Index 前进。
- 公式:
Entry.seq = Entry.seq (原值) , Entry.data_flag = 1 \text{Entry.seq} = \text{Entry.seq (原值)}, \quad \text{Entry.data\_flag} = 1 Entry.seq=Entry.seq (原值),Entry.data_flag=1
- Pop 操作
- 条件:槽位已满且 seq 与读索引匹配。
- 动作:
- seq += N,表示进入下一轮。
- data_flag = 0,槽位清空。
- 读索引 R . I n d e x R.Index R.Index 前进。
- 公式:
Entry.seq + = N , Entry.data_flag = 0 \text{Entry.seq} += N, \quad \text{Entry.data\_flag} = 0 Entry.seq+=N,Entry.data_flag=0
- 状态转换
- Push 箭头:Empty → Full
- Pop 箭头:Full → Empty
- 轮次(Round)概念
- seq 用于判断槽位是否可写或可读,确保多生产者多消费者(MPMC)无锁安全。
- Round 1 表示上一轮已完成 Pop,槽位可用于下一轮写入。
详细理解
- 当前状态
- Cell/Entry 索引:0
- Round:1
- 状态:Full(经过 push1、pop1,再 push2 后填充)
- seq = 0(Push 后槽位序列号),data_flag = 1(槽位含数据)
- 序列号演变
- 对于每个槽位 Cell_i:
Cell i : seq = i , N + i , 2 N + i , 3 N + i , … \text{Cell}_i:\ \text{seq} = i, N+i, 2N+i, 3N+i, \dots Celli: seq=i,N+i,2N+i,3N+i,…- Cell 0: 0, N, 2N, 3N, …
- Cell 1: 1, N+1, 2N+1, …
- Cell 2: 2, N+2, 2N+2, …
- seq 用于判断当前槽位是否可读或可写,保证 MPMC 无锁安全。
- 对于每个槽位 Cell_i:
- Push 操作
- 条件:槽位为空且 seq 与写索引匹配。
- 动作:
- seq 保持不变。
- data_flag = 1。
- 写入数据。
- 写索引 W . I n d e x W.Index W.Index 前进。
- 公式:
Entry.seq = Entry.seq (原值) , Entry.data_flag = 1 \text{Entry.seq} = \text{Entry.seq (原值)},\quad \text{Entry.data\_flag} = 1 Entry.seq=Entry.seq (原值),Entry.data_flag=1
- Pop 操作
- 条件:槽位已满且 seq 与读索引匹配。
- 动作:
- seq += N。
- data_flag = 0。
- 读索引 R . I n d e x R.Index R.Index 前进。
- 公式:
Entry.seq + = N , Entry.data_flag = 0 \text{Entry.seq} += N, \quad \text{Entry.data\_flag} = 0 Entry.seq+=N,Entry.data_flag=0
- 状态转换
- Push 箭头:Empty → Full
- Pop 箭头:Full → Empty
- 每个槽位循环使用 seq 来控制读写权限,多生产者多消费者之间不会发生冲突。
理解和注释
- 单 Entry 状态循环
- Entry 初始为空 (Empty),seq=0, data_flag=0。
- Push1 后填充数据,seq 保持 0, data_flag=1。
- Pop1 后清空,seq += N(容量 N),data_flag=0。
- Push2 再次写入,seq 保持 N, data_flag=1。
- seq 循环
- 每轮 Push / Pop 后,seq 增加队列容量 N,用于多轮循环。
seq i r o u n d = i + r o u n d ⋅ N \text{seq}_{i}^{round} = i + round \cdot N seqiround=i+round⋅N
- 每轮 Push / Pop 后,seq 增加队列容量 N,用于多轮循环。
- Push / Pop 动作
- Push:
Entry.data_flag = 1 , W . I n d e x + + \text{Entry.data\_flag} = 1,\quad W.Index++ Entry.data_flag=1,W.Index++ - Pop:
Entry.data_flag = 0 , Entry.seq + = N , R . I n d e x + + \text{Entry.data\_flag} = 0,\quad \text{Entry.seq} += N, \quad R.Index++ Entry.data_flag=0,Entry.seq+=N,R.Index++
- Push:
- 多轮循环特点
- 多生产者多消费者可安全操作同一个 Entry。
- seq 保证读写冲突被正确检测。
- data_flag 直接反映 Entry 是否可读。
Lockfree, MPMC Queue - 特殊特性
- 跨线程和跨进程可用
- 该队列不仅支持同一进程中不同线程间的数据传递,也可以用于 不同进程(不同地址空间)间的通信。
- 原理:底层结构可放置在 共享内存 中,读写操作通过原子指令保证一致性。
- 优势:在多进程场景下无需额外的 IPC 消息复制,性能高。
- 支持原子覆盖(Atomic Conflation)
- 写者可以 原子地替换一个值,前提是该值还没有被任何读者读取。
- 机制:
- 每个 Entry 有 seq 与 data_flag。
- 写者在写入前检查 Entry 是否未被读者读取(data_flag = 0)。
- 原子 CAS(Compare-And-Swap)替换值,保证不会覆盖已经被读取的值。
- 优势:
- 可用于缓存更新、状态覆盖。
- 保证不会丢失写入(只要 Entry 未被读)。
Lockfree, MPMC Queue - 测试
测试目标是保证队列在多线程/多进程下的可靠性和正确性:
- 无消息丢失
- 每条消息从写入到被读取必须保证 至少被一个读者消费。
- 核心依赖 seq 与 data_flag 的同步。
- 无消息重复
- 每条消息被读取 最多一次。
- seq 和 CAS 操作保证了读者不会重复读取同一 Entry。
- 消息顺序不被打乱
- 在同一生产者产生的消息序列中:
seq ∗ i < seq ∗ i + 1 \text{seq}*{i} < \text{seq}*{i+1} seq∗i<seq∗i+1 - 确保 FIFO(先进先出)语义。
- 注意:多生产者写入可能打乱整体顺序,但单生产者顺序保持。
- 在同一生产者产生的消息序列中:
- 读者或写者不会饥饿(No starvation)
- 理论上,原子操作保证每个读者/写者最终能执行成功。
- CPU 平台调度与自旋次数可能影响实际延迟,但在 Intel / AMD 平台经过测试可正常。
- 平台适配
- 在 Intel 和 AMD CPU 上,使用的原子指令(CAS / DWCAS 等)均有效。
- 注意:需要保证内存屏障和指令序列正确,避免乱序问题。
伪代码注释示例
// Push 操作(支持原子覆盖 atomic conflation)
bool push(value_type val) {
while(true) {
Entry& e = _array[write_index % N];
// 检查 Entry 是否可写(未被读者读取)
if(e.data_flag == 0) {
Entry new_e = {seq: e.seq, data_flag: 1, data: val};
// 原子 CAS 替换
if(e.compare_exchange(old: e, new: new_e)) {
++write_index; // 更新写索引
return true;
}
} else {
// Entry 正在被读者使用,等待或跳过
spin_wait();
}
}
}
// Pop 操作
bool pop(value_type& out) {
while(true) {
Entry& e = _array[read_index % N];
if(e.data_flag == 1) {
out = e.data;
// 原子修改 Entry,标记为空并推进 seq
Entry empty_e = {seq: e.seq + N, data_flag: 0};
if(e.compare_exchange(old: e, new: empty_e)) {
++read_index; // 更新读索引
return true;
}
} else {
// Entry 尚未写入数据,返回失败
return false;
}
}
}
Lockfree, MPMC Queue - Benchmark 测量指标
- 消息吞吐量(Messages per Second)
- 目标:衡量队列在单位时间内能处理多少条消息。
- 影响因素:
- 不同架构(Architecture)
- Intel CPU 与 AMD CPU 的缓存一致性协议、指令序列优化和 CAS 延迟不同,会影响吞吐量。
- 数据大小(Data Size)
- 测试常见数据大小:4B, 8B, 12B
- 数据越大,占用的缓存行越多,可能导致更多的缓存同步延迟。
- 生产者/消费者数量(Producers/Consumers Count)
- 测试模式包括:
- 1-1, 2-2, 3-3(生产者与消费者数量相等)
- 1-2, 1-3, 2-3, 2-1, 3-2, 3-3(不等数量组合)
- 观察多线程争用对吞吐量的影响。
- 测试模式包括:
- 懒增量操作(Lazy Increment)
- push/pop 中索引是否延迟更新(lazy)
- 默认情况下为 非懒更新,每次操作后立即更新索引。
- 不同架构(Architecture)
- 伪公式:
MessagesPerSecond = TotalMessages ElapsedTime \text{MessagesPerSecond} = \frac{\text{TotalMessages}}{\text{ElapsedTime}} MessagesPerSecond=ElapsedTimeTotalMessages
- 每条消息在队列内的时间(Time inside queue per message)
- 假设 读者速度快于写者,即队列不会积压太多消息。
- 测量从 push 到 pop 的时间:
T queue = t pop − t push T_\text{queue} = t_\text{pop} - t_\text{push} Tqueue=tpop−tpush - 用于衡量队列延迟(latency),尤其在高并发场景下的表现。
- 半往返时间(Half RTT)
- 使用 回声服务器(Echo Server) 测量:
- 生产者 push → 消费者 pop → 回写到生产者。
- 使用两个 Lockfree MPMC Queue(一个写,一个读)。
- 半 RTT 公式:
Half RTT = t echo − t send 2 \text{Half RTT} = \frac{t_\text{echo} - t_\text{send}}{2} Half RTT=2techo−tsend
- 使用 回声服务器(Echo Server) 测量:
Lockfree, MPMC Queue - 性能指标
- CPU 开销(CPU overhead)
- 测量每条消息在队列读写中占用的 CPU 周期。
- 影响因素:
- CAS / DWCAS 原子操作次数
- 索引更新策略(lazy 或非 lazy)
- 缓存一致性(cache line ping-pong)
- 往返时间(RTT)
- 消息从生产者到消费者再回到生产者的延迟。
- 对高频低延迟应用尤为关键。
- 带宽(Bandwidth)
- 每秒处理的消息数:
Bandwidth = MessagesPerSecond × DataSize 单位时间 \text{Bandwidth} = \frac{\text{MessagesPerSecond} \times \text{DataSize}}{\text{单位时间}} Bandwidth=单位时间MessagesPerSecond×DataSize - 数据大小和索引大小直接影响带宽。
- 每秒处理的消息数:
Benchmark 输入参数
| 参数 | 描述 |
|---|---|
| Data-size | 单条消息的数据大小(4B/8B/12B 等) |
| Producers-count | 生产者线程数量 |
| Consumer-count | 消费者线程数量 |
| Lazy read/write | push/pop 是否使用 lazy 索引更新 |
最坏情况测量
- Worst case: 队列中消息在生产者到消费者之间停留的最大时间。
- 用于评估在高并发和争用条件下的延迟峰值:
T max = max i ( t pop,i − t push,i ) T_\text{max} = \max_i (t_\text{pop,i} - t_\text{push,i}) Tmax=imax(tpop,i−tpush,i)
伪代码示意 Benchmark 测试流程
auto start = high_resolution_clock::now();
for(int i=0; i<total_messages; ++i) {
queue.push(data[i]); // 生产者写入
}
for(int i=0; i<total_messages; ++i) {
queue.pop(result[i]); // 消费者读取
}
auto end = high_resolution_clock::now();
auto total_time = duration_cast<nanoseconds>(end - start).count();
double messages_per_sec = total_messages / (total_time / 1e9);
double average_latency_ns = total_time / total_messages;
- 可以循环不同 数据大小、线程数量,记录吞吐量、延迟、CPU 占用。
- 对 Lazy / Non-lazy push/pop 分别测试对性能影响。
一、测试命令与参数先整体理解
示例命令:
./q_bandwidth -W4 -p1 -c1
含义说明(这是理解后面所有数据的基础):
-W4 : data size = 4 bytes(每条消息 4 字节)
-p1 : producers = 1(1 个生产者)
-c1 : consumers = 1(1 个消费者)
-d N : capacity = N(环形队列容量,entry 数)
二、输出字段逐项解释(非常关键)
以第一条输出为例:
Q BW: data size: 4 index size: 4 capacity: 32
producers: 1 consumers: 1 for: 1000ms
mpmc_queue<ff>
push: 73842434 pop: 73842434
tsc: 2470341002
tsc/op: 33
push/pop per sec: 73120193
1⃣ mpmc_queue<ff> / <ft> / <tf> / <tt>
这是 队列的配置变体:
| 标志 | 含义 |
|---|---|
| 第一个字母 | push 是否 lazy |
| 第二个字母 | pop 是否 lazy |
| f | false(非 lazy,立即更新 index) |
| t | true(lazy,延迟更新 index) |
| 所以: |
ff = push 非 lazy, pop 非 lazy
ft = push 非 lazy, pop lazy
tf = push lazy, pop 非 lazy
tt = push lazy, pop lazy
2⃣ push / pop
push: 73842434
pop: 73842434
说明:
- 无丢消息
- 无重复
- 严格一一对应
这是 Lockfree MPMC Queue 正确性的第一条铁证。
3⃣ tsc 与 tsc/op
tsc:测试期间消耗的 CPU 时间戳计数(TSC)tsc/op:每一次 push+pop 平均消耗的 CPU cycle
公式:
tsc/op = total tsc push 次数 \text{tsc/op} = \frac{\text{total tsc}}{\text{push 次数}} tsc/op=push 次数total tsc
这就是单条消息的 CPU 成本
4⃣ push/pop per sec
push/pop per sec: 73120193
表示:
每秒完成 7312 万次 push+pop
也就是吞吐量(bandwidth)
公式:
Throughput = Total operations Time (sec) \text{Throughput} = \frac{\text{Total operations}}{\text{Time (sec)}} Throughput=Time (sec)Total operations
三、1 Producer / 1 Consumer(1-1)性能分析(data = 4B)
汇总表(你给的数据)
| 模式 | tsc/op | 吞吐量(M/s) |
|---|---|---|
| ff | 33 | 73.1 |
| ft | 30 | 78.9 |
| tf | 28 | 85.1 |
| tt | 32 | 75.9 |
关键结论 1:tf 最快
mpmc_queue<tf>
tsc/op: 28
push/pop per sec: 85,112,818
原因:
- push 是 lazy → 写线程减少 index 原子操作
- pop 非 lazy → 读线程及时推进 read index
- 减少了 cache line ping-pong
在 1-1 场景下,写入路径更关键
关键结论 2:lazy ≠ 永远更快
你可以看到:
tt比tf慢- 因为:
- pop 也 lazy → read index 推进不及时
- 导致生产者看到“队列满”的概率上升
- 增加了失败 CAS 重试
四、2 Producer / 2 Consumer(2-2)性能分析
命令:
./q_bandwidth -W4 -p2 -c2 -d 1024
汇总表
| 模式 | tsc/op | 吞吐量(M/s) |
|---|---|---|
| ff | 206 | 11.84 |
| ft | 292 | 8.37 |
| tf | 296 | 8.24 |
| tt | 260 | 9.38 |
关键结论 3:并发上来后,性能骤降是正常现象
对比 1-1:
1-1 : ~85 M ops/s
2-2 : ~12 M ops/s
原因:
- CAS 冲突暴涨
- 多核 cache line 抢占
_write_index/_read_index成为热点- entry 上的 DWCAS 失败重试增多
关键结论 4:2-2 场景下 ff 反而最好
原因非常重要:
高并发下,确定性 > 延迟更新
- 非 lazy:
- index 推进明确
- 减少“误判 full / empty”
- lazy:
- 在多生产者、多消费者下
- 放大了状态不一致窗口
这是并发算法中的经典现象
五、4 Producer / 4 Consumer(4-4)初步结论
mpmc_queue<ff>
push/pop per sec: 7.3 M
tsc/op: 302
说明:
- CAS 竞争已经成为主瓶颈
- 系统进入 扩展性极限区
- 性能下降不是算法错误,而是:
- 内存一致性物理极限
六、从带宽角度总结(非常重要)
1⃣ 单生产者 / 单消费者
- 极限性能
- 接近 L1/L2 cache 带宽
- lazy push 非常有效
2⃣ 多生产者 / 多消费者
- 性能主要由:
- CAS 失败率
- cache line bouncing
- index 原子操作次数
- lazy 需要谨慎
3⃣ 数据宽度只有 4B 的意义
- 测到的是:
- 控制路径性能
- 而不是数据拷贝性能
- 非常适合验证:
- lock-free 结构设计优劣
七、一句话总结(工程视角)
这个 MPMC Queue 在 1-1 场景下已接近硬件极限,在多核竞争下行为稳定、无错误、无重排,是一个工程级别非常扎实的实现。
一、Derived work 是什么意思?
**Derived work(派生工作)**指的是:
在同一个 Lock-free MPMC 队列核心算法不变的前提下,
通过 组合、约束、扩展 entry 语义,得到新的并发通信模型。
核心不变的东西:
- 环形数组
- per-entry
sequence+data_flag - CAS / DWCAS
- 无锁、无阻塞、无 ABA
变化的只是 “entry 里装什么、怎么用”
二、Q Pack —— Multiple Queues(多队列打包)
1⃣ 概念
Q Pack = 多个 MPMC Queue 并行使用
不是一个大队列,而是:
Q0 Q1 Q2 Q3 ... Qk
写入策略(示例)
- Producer 根据:
- CPU id
- thread id
- hash(key)
- 选择一个子队列写入
2⃣ 为什么能提升带宽?
原因一:减少热点
单队列的瓶颈是:
_write_index_read_index- entry cache line 竞争
Q Pack 把竞争分散:
Effective Contention ≈ 1 Queue Count \text{Effective Contention} \approx \frac{1}{\text{Queue Count}} Effective Contention≈Queue Count1
原因二:cache locality 更好
- 每个队列:
- 更可能被固定线程访问
- cache line bounce 显著下降
3⃣ 代价:不保证全局顺序
Q0: A, C
Q1: B, D
读取顺序可能是:
A → B → C → D
或
B → A → D → C
只保证单队列 FIFO,不保证全局 FIFO
4⃣ 适用场景
日志
telemetry
消息聚合
网络包分发
事务
强顺序系统
三、Unique Pointer —— 通过队列转移所有权
1⃣ 核心思想
队列不传数据本身,而是传 所有权
典型类型:
std::unique_ptr<T>
2⃣ 为什么 lock-free 队列特别适合?
- push:
- move 所有权
- pop:
- 独占数据
- 无共享 → 无引用计数 → 无额外原子操作
3⃣ 代码示例(加注释)
// 生产者:把所有权放进队列
std::unique_ptr<Item> p = std::make_unique<Item>(42);
queue.push(std::move(p));
// ↑ p 变为空,所有权进入 queue
// 消费者:拿走所有权
std::unique_ptr<Item> out;
if (queue.pop(out)) {
// out 独占 Item
}
并发语义
- 所有权流动是 单向、不可复制
- 天然避免 data race
4⃣ 适用场景
内存池
对象生命周期复杂
高性能 pipeline
四、Shared Memory —— 进程间通信(IPC)
1⃣ 为什么它能用于 IPC?
这个 MPMC Queue:
- 不依赖 TLS
- 不依赖地址相关指针
- 所有状态是:
- index
- sequence
- data
只要放在:
shm / mmap / hugepage
2⃣ 必须满足的条件
关键限制
- value_type 必须是 POD / trivially copyable
- 禁止:
- 裸指针
- 虚表
- 进程私有地址
3⃣ 示例布局
Shared Memory:
+-------------------+
| queue metadata |
| ring buffer |
| entries[] |
+-------------------+
4⃣ 并发保证
- CAS 是 CPU 指令
- 跨进程共享 cache coherence
- 无 syscalls
五、Replace items in queue —— exchange()
1⃣ 功能目标
在 item 尚未被 pop 之前,原子替换其值
用于:
- conflation(只保留最新值)
- 状态更新
- 配置刷新
2⃣ 代码(逐行注释,不拆开)
bool exchange(index_type& i,
value_type old_value,
value_type new_value) noexcept
{
// 构造“期望中的旧 entry”
// seq = (i << 1) | 1 → 表示:
// - 属于 index i
// - data_flag = 1(full 状态)
entry old_entry{
static_cast<index_type>((i << 1) | 1U),
old_value
};
// 构造“要替换成的新 entry”
// seq 完全相同,只换 data
entry new_entry{
static_cast<index_type>((i << 1) | 1U),
new_value
};
// 原子 CAS:
// 只有当 entry 仍然是 (seq, old_value)
// 才会替换为 (seq, new_value)
return _array[i].compare_exchange(old_entry, new_entry);
}
3⃣ 并发语义
- 若返回
true:- 没有 reader 读走数据
- 若返回
false:- 要么被 pop
- 要么被其他 writer 修改
保证
要么读到旧值,要么读到新值,绝不会是中间态
六、Multiple Read of Same Data(TBI)
1⃣ TBI 是什么?
To Be Implemented
这里的目标是:
- 同一条数据
- 被多个 reader 消费
2⃣ 为什么现在不支持?
当前模型:
pop()会:data_flag = 0seq += N
- entry 被立即释放
这是 单消费者语义
3⃣ 要支持多读需要什么?
至少需要:
- reader counter
- 或 epoch / hazard pointer
- 或 RCU 风格机制
数学上:
entry 可释放 ⟺ readers = 0 \text{entry 可释放} \iff \text{readers} = 0 entry 可释放⟺readers=0
4⃣ 为什么复杂?
- 引入共享生命周期
- 增加原子操作
- 破坏极简 lock-free 路径
所以被明确标为 TBI
七、整体总结(一张脑图式结论)
MPMC Queue Core
├── Q Pack → 吞吐优先
├── unique_ptr → 所有权安全
├── shared memory → IPC
├── exchange() → conflation
└── multi-read (TBI) → 复杂并发扩展
一句话工程总结
这个 MPMC Queue 的核心价值在于:
用最少的原子操作,构建了一块可以被无限“组合”的并发积木。
Lockfree, MPMC Queue — 总结(工程视角)
这是一个为极低延迟、高并发、强确定性场景设计的队列
不是“通用容器”,而是“并发基础设施”
1⃣ Header only queue
含义
- 所有实现都在
.hpp - 无
.cpp、无链接期依赖
工程价值
- 可 inline
- 可跨 TU 使用
- 便于:
- 嵌入式
- 内核态 / 用户态共享
- IPC 共享内存
2⃣ Low latency queue(低延迟)
核心原因
- 只用原子指令
- 无系统调用
即:
no mutex
no futex
no syscall
延迟来源仅剩:
Latency ≈ CAS + Cache Miss + Fence \text{Latency} \approx \text{CAS} + \text{Cache Miss} + \text{Fence} Latency≈CAS+Cache Miss+Fence
延迟可预测、可建模
3⃣ Lock-free —— 不与调度器交互
lock-free 的真实含义
系统整体保证前进(system-wide progress)
- 某个线程可能失败
- 但总有线程成功
为什么“不与调度器交互”很重要?
- 无
sleep - 无
yield - 无 priority inversion
适合:
实时系统
高频交易
中断线程
NUMA 场景
4⃣ Collaborative —— 生产者 / 消费者协作式
非传统“对抗式”并发
这个队列的核心是:
生产者与消费者共同推进状态
- producer 推进
write_index - consumer 推进
read_index - entry 的
sequence由双方共同维护
数学不变式(核心)
entry 可写 ⟺ s e q = i n d e x \text{entry 可写} \iff seq = index entry 可写⟺seq=index
entry 可读 ⟺ s e q = i n d e x + 1 \text{entry 可读} \iff seq = index + 1 entry 可读⟺seq=index+1
没有“锁谁谁等待”的概念
5⃣ Bounded —— 有界队列
含义
- 固定容量
- 初始化时一次性分配
mpmc_queue<T, Capacity> q;
好处
- 无
new - 无
delete - 无 allocator 抖动
工程代价
- 容量必须提前估算
- 满队列时 push 失败(或自旋)
6⃣ Multi Producer / Multi Consumer
支持任意组合
1P-1C
1P-NC
NP-1C
NP-NC
并发安全来自:
- 原子
write_index - 原子
read_index - 每个 entry 的
sequence
7⃣ Limited data size(≤ 12 bytes)
为什么限制数据大小?
因为 entry 通常设计为:
struct entry {
uint32_t seq_and_flag; // seq << 1 | data_flag
value_type data; // ≤ 12 bytes
};
目标是:
- 一个 entry ≤ 16 或 32 字节
- 恰好落在 cache line 内
性能动机
Entry Size ≤ Cache Line ⇒ No false sharing \text{Entry Size} \le \text{Cache Line} \Rightarrow \text{No false sharing} Entry Size≤Cache Line⇒No false sharing
8⃣ Ownership transfer —— unique_ptr<T>
设计理念
队列不是“共享数据”,而是“传递所有权”
示例代码(带注释)
// Producer
std::unique_ptr<Item> p = std::make_unique<Item>(123);
queue.push(std::move(p));
// ↑ p 被清空,队列成为唯一所有者
// Consumer
std::unique_ptr<Item> out;
queue.pop(out);
// ↑ out 独占 Item,无需加锁
并发优势
- 无引用计数
- 无 shared state
- 生命周期清晰
9⃣ Between threads or processes(线程 / 进程间)
为什么支持 IPC?
- 不依赖 TLS
- 不依赖进程私有指针
- 只使用:
- 原子整数
- POD 数据
IPC 使用条件
value_type 是 trivially copyable
不含指针
不含虚表
为什么“我们需要这个队列”
总结成一句话:
当你需要:确定性延迟 + 极高并发 + 零系统调用 + 强内存控制
典型使用场景
高频交易
网络 IO pipeline
实时日志
Telemetry
音视频处理
共享内存 IPC
最终工程总结(高度凝练)
这是一个:
- 为性能而生
- 为并发正确性而设计
- 为系统级使用而约束
的 MPMC 队列
更多推荐


所有评论(0)