RCU 读写用例深度解析

一、数据结构与全局变量

// 被 RCU 保护的配置结构体
struct myconfig {
    int a;   // 配置项 A
    int b;   // 配置项 B
    // 注意:a 和 b 必须作为一个整体原子更新
    // 这正是 RCU 要解决的核心问题:
    // 读者要么看到旧的 {a,b},要么看到新的 {a,b}
    // 不能看到 {新a, 旧b} 这种中间状态
};
// 全局原子指针,指向当前配置
// memory_order_consume 配合 load 使用(见下文)
std::atomic<myconfig*> curconfig;  // 初始化略

二、读者实现逐步解析

2.1 完整读者代码

// 读者:获取当前配置的 a 和 b
// 保证:即使写者并发更新,读到的 {*cur_a, *cur_b} 一定来自同一个版本
void get(int* cur_a, int* cur_b) {
    struct myconfig* mcp;
    // ① 进入 RCU 读临界区
    // std::rcu_default_domain() 返回默认 RCU domain
    // scoped_lock 析构时自动退出临界区(RAII)
    // 对应 Linux 的 rcu_read_lock() / rcu_read_unlock()
    std::scoped_lock rlock(std::rcu_default_domain());
    // ② 读取指针(带 consume 语义)
    // memory_order_consume:
    //   - 比 acquire 更轻量
    //   - 只保证"依赖于 mcp 的读操作"不被重排到此 load 之前
    //   - *cur_a = mcp->a 依赖 mcp,所以保证 a 和 b 的读在 load 之后
    mcp = curconfig.load(std::memory_order_consume);
    // ③ 读取配置字段
    // 这两行都"依赖"mcp(通过指针解引用)
    // consume 语义保证:写者对 mcp->a 和 mcp->b 的写
    // happens-before 这里的读
    *cur_a = mcp->a;   // 依赖链:mcp → mcp->a
    *cur_b = mcp->b;   // 依赖链:mcp → mcp->b
    // ④ scoped_lock 析构 → 自动退出 RCU 临界区
    // 退出后 mcp 指针不再受保护,不能再使用
}

2.2 逐帧理解并发场景

幻灯片展示了写者在读者执行过程中更新配置的场景,用行号标注并发时序:

时间轴(行号对应代码执行位置):
读者线程                        写者线程
─────────────────────────────────────────────────────
rcu_read_lock()
mcp = load(curconfig)          // mcp → 旧配置{a=37, b=46}
*cur_a = mcp->a  → 37
                                mcp_new->a = 99
                                mcp_new->b = 88
                                curconfig.exchange(mcp_new)
                                // 此刻指针已切换!
*cur_b = mcp->b  → 46          // 读者仍持有旧 mcp,不受影响
rcu_read_unlock()
                                rcu_synchronize()  // 等读者退出
                                delete mcp_旧      // 现在安全了
─────────────────────────────────────────────────────
读者得到:{37, 46}  ← 来自同一个版本,一致!

关键结论:尽管写者在读者执行过程中替换了指针,读者仍然读到了一致的 {37, 46},因为:
读者持有旧 mcp 指针 → RCU 保证 旧对象在临界区内不会被 delete \text{读者持有旧 mcp 指针} \xrightarrow{\text{RCU 保证}} \text{旧对象在临界区内不会被 delete} 读者持有旧 mcp 指针RCU 保证 旧对象在临界区内不会被 delete

2.3 memory_order_consume 的依赖链

consume 保证的是"数据依赖链"上的顺序:
  mcp = load(consume)
    │
    ├──► mcp->a   ← 依赖 mcp(指针解引用)√ 受保护
    ├──► mcp->b   ← 依赖 mcp(指针解引用)√ 受保护
    └──► mcp->a + 1  ← 依赖 mcp->a,传递依赖  √ 受保护
  不依赖 mcp 的操作不受 consume 约束:
    int x = global_var;  ← 与 mcp 无关,可能被重排

consume ⊂ acquire (consume 是 acquire 的子集,开销更低) \text{consume} \subset \text{acquire} \quad \text{(consume 是 acquire 的子集,开销更低)} consumeacquireconsume  acquire 的子集,开销更低)
实际上,由于 consume 语义在编译器中难以正确实现,目前所有主流编译器都将 consume 提升为 acquire

// 编译器实际生成等价于:
mcp = curconfig.load(std::memory_order_acquire);
// 在 x86 上,acquire load = 普通 mov 指令,无额外开销
// 在 ARM 上,acquire load = ldar 指令

三、写者实现逐步解析

3.1 完整写者代码

// 写者:原子地更新配置
// 保证:读者要么看到完全旧的配置,要么看到完全新的配置
// 不存在"读到一半"的中间状态
void set(int cur_a, int cur_b) {
    // ① Copy:分配新配置对象
    struct myconfig* mcp = new myconfig();
    // ② Update:填写新配置的所有字段
    // 这些写操作在指针发布之前完成
    // release 语义保证它们对后续读者可见
    mcp->a = cur_a;
    mcp->b = cur_b;
    // ③ 原子替换指针(Publish)
    // exchange 返回旧指针,同时将新指针写入 curconfig
    // release 语义:保证 ①② 的写 happens-before 任何
    //              通过 consume/acquire 读到新指针的线程
    mcp = curconfig.exchange(mcp, std::memory_order_release);
    //     ↑ 此时 mcp 持有旧指针
    //     新指针已对所有新读者可见
    // ④ 等待宽限期(Grace Period)
    // 阻塞直到所有"在 exchange 之前进入临界区的读者"都退出
    // 此后没有任何线程持有旧指针
    rcu_synchronize();
    // 注:C++26 提案中为 std::rcu_default_domain().synchronize()
    // ⑤ 安全释放旧配置
    // 此时可以确保没有读者在访问旧 mcp
    delete mcp;
}

3.2 "一次写操作"的语义

幻灯片强调 set 函数代表一个原子写操作,理解如下:

从外部观察者角度,set(99, 88) 是原子的:
  任何读者只能观察到两种状态之一:
    状态 A(旧):{a=37, b=46}
    状态 B(新):{a=99, b=88}
  不可能观察到:
    {a=99, b=46}  ← 新a旧b(不可能)
    {a=37, b=88}  ← 旧a新b(不可能)
为什么?
  写者先把完整的新对象准备好(a和b都写完)
  然后才用一个原子操作 exchange 替换指针
  读者要么拿到旧指针(看到完整旧值)
  要么拿到新指针(看到完整新值)
  没有中间状态

原子性来源 = 指针的原子替换 ⏟ exchange + 新对象预先完整初始化 ⏟ 先写字段再发布 \text{原子性来源} = \underbrace{\text{指针的原子替换}}_{\text{exchange}} + \underbrace{\text{新对象预先完整初始化}}_{\text{先写字段再发布}} 原子性来源=exchange 指针的原子替换+先写字段再发布 新对象预先完整初始化

3.3 内存序保证的完整链条

写者:
  mcp->a = cur_a              } 普通写
  mcp->b = cur_b              }
       ↓ happens-before(sequenced-before)
  curconfig.exchange(release) ← Release 屏障
                    同步点:读者 load 读到新指针
                    ↓ synchronizes-with
读者:
  curconfig.load(consume)     ← Consume 屏障
       ↓ happens-before(依赖链)
  *cur_a = mcp->a             } 保证读到写者写的值
  *cur_b = mcp->b             }

写  a , b ⏟ 普通写 → s b e x c h a n g e ( r e l e a s e ) ⏟ 发布 → s w l o a d ( c o n s u m e ) ⏟ 订阅 → h b 读  a , b ⏟ 依赖链 \underbrace{写\ a,b}_{\text{普通写}} \xrightarrow{sb} \underbrace{exchange(release)}_{\text{发布}} \xrightarrow{sw} \underbrace{load(consume)}_{\text{订阅}} \xrightarrow{hb} \underbrace{读\ a,b}_{\text{依赖链}} 普通写  a,bsb 发布 exchange(release)sw 订阅 load(consume)hb 依赖链  a,b

四、完整可运行示例(C++26 风格)

// C++26 std::rcu 提案接口示例
// 编译:g++ -std=c++26 -O2 -lrcu main.cpp
// 当前可用:用 liburcu 替代,接口类似
#include <atomic>
#include <rcu>          // C++26 提案头文件
#include <mutex>
#include <thread>
#include <iostream>
#include <vector>
struct myconfig {
    int a;
    int b;
};
// 全局 RCU 保护指针
std::atomic<myconfig*> curconfig{new myconfig{37, 46}};
// ── 读者 ────────────────────────────────────────────────
void get(int* cur_a, int* cur_b) {
    myconfig* mcp;
    {
        // RAII:构造时 rcu_read_lock,析构时 rcu_read_unlock
        std::scoped_lock rlock(std::rcu_default_domain());
        // consume:只建立数据依赖链,比 acquire 开销更低
        // 实际编译器提升为 acquire,但语义上表达"我只依赖这个指针"
        mcp = curconfig.load(std::memory_order_consume);
        // 在临界区内读,mcp 不会被 delete
        *cur_a = mcp->a;
        *cur_b = mcp->b;
    } // ← 自动 rcu_read_unlock
}
// ── 写者 ────────────────────────────────────────────────
void set(int new_a, int new_b) {
    // Step 1: 分配并初始化新配置(临界区外,不影响读者)
    myconfig* mcp = new myconfig{new_a, new_b};
    // Step 2: 原子发布新指针,拿回旧指针
    // release 保证新配置的写对后续读者完全可见
    mcp = curconfig.exchange(mcp, std::memory_order_release);
    // Step 3: 等待所有持有旧指针的读者退出
    std::rcu_default_domain().synchronize();
    // Step 4: 安全释放(或用 retire 异步释放)
    delete mcp;
}
// ── 验证一致性的测试 ────────────────────────────────────
int main() {
    std::atomic<bool> stop{false};
    std::atomic<int>  inconsistent{0};  // 不一致计数
    // 4 个读者线程
    std::vector<std::thread> readers;
    for (int i = 0; i < 4; ++i) {
        readers.emplace_back([&] {
            while (!stop.load(std::memory_order_relaxed)) {
                int a, b;
                get(&a, &b);
                // 验证:合法的值对只有 {37,46} 或 {99,88}
                bool valid = (a == 37 && b == 46)
                          || (a == 99 && b == 88);
                if (!valid) {
                    // 读到了不一致的中间状态!
                    ++inconsistent;
                    std::cerr << "不一致!a=" << a << " b=" << b << "\n";
                }
            }
        });
    }
    // 1 个写者线程
    std::thread writer([&] {
        for (int i = 0; i < 100; ++i) {
            // 交替写两个版本
            if (i % 2 == 0) set(99, 88);
            else             set(37, 46);
        }
        stop.store(true);
    });
    writer.join();
    for (auto& t : readers) t.join();
    std::cout << "不一致次数:" << inconsistent.load()
              << "(应为 0)\n";
    delete curconfig.load();
    return 0;
}

五、与 Mutex 方案的对比

// ── 对比:用 Mutex 实现同样功能 ────────────────────────
struct myconfig_mutex {
    int a, b;
    std::shared_mutex mu;  // 读写锁
};
myconfig_mutex g_cfg{37, 46};
// 读者:需要加锁(前文已证明读多时性能极差)
void get_mutex(int* a, int* b) {
    std::shared_lock lk(g_cfg.mu);   // 原子 RMW → Cache 抖动
    *a = g_cfg.a;
    *b = g_cfg.b;
}
// 写者:独占锁,阻塞所有读者
void set_mutex(int a, int b) {
    std::unique_lock lk(g_cfg.mu);   // 等所有读者退出
    g_cfg.a = a;
    g_cfg.b = b;
}

对比项 Mutex/RWLock RCU
读者加锁代价 原子 RMW( ≈ 140 ns \approx 140\text{ns} 140ns 无( ≈ 1 ns \approx 1\text{ns} 1ns
读者扩展性 O ( 1 / N ) O(1/N) O(1/N)(越多越慢) O ( N ) O(N) O(N)(线性扩展)
一致性保证 临界区内强一致 同一版本内强一致
写者代价 低(等读者退出即可) 高(宽限期可能较长)
实现复杂度

RCU 的本质:用写者的等待换取读者的零开销 适用条件:读多写少,且能接受读到"稍旧"的数据 \boxed{ \text{RCU 的本质:用写者的等待换取读者的零开销}\\ \text{适用条件:读多写少,且能接受读到"稍旧"的数据} } RCU 的本质:用写者的等待换取读者的零开销适用条件:读多写少,且能接受读到"稍旧"的数据

RCU 语义(Redux 版本)

1⃣ RCU 语义的特点

RCU(Read-Copy-Update)语义相对于传统的读写锁来说有所弱化:

  1. 写者(Writer)仅在释放资源前等待读者完成
    • 与传统写锁不同,写者不会阻塞其他操作,它只在释放旧对象(或指针切换)前等待已经存在的读者结束。
    • 理解:写者延迟删除,而不是阻塞整个操作
  2. 读者(Reader)仍然会等待写者持有者
    • 也就是说,如果有写者正在更新指针或资源,新的读者会被延迟,保证不会读取正在被修改的数据。
    • 理解:读者只在必要时阻塞,保持数据一致性

RCU 用“弱化的时间语义(temporal semantics)” + “空间语义(spatial semantics)”来补偿传统读写锁的严格一致性要求。

2⃣ RCU 的时间语义限制

RCU 中 synchronize_rcu() 的规则:

如果 synchronize_rcu() 无法证明它在某个 rcu_read_lock() 之前开始,它必须等待,直到该读者调用 rcu_read_unlock() 完成之后才返回。

  • 理解:
    • synchronize_rcu() 保证在旧读者完成之前不会删除资源
    • 这样就不会出现读者访问已经被写者释放的数据的情况

3⃣ RCU 语义总结表


传统读写锁 RCU 语义 理解
写者锁定整个资源 写者只在释放前等待旧读者 写操作延迟删除,减少阻塞
读者读取阻塞写者 读者可能被短暂阻塞,确保不会读取正在删除的资源 读操作高并发、低延迟
一致性严格 一致性可能是“弱时间+空间保证” 可以读取旧值,只要逻辑一致即可

4⃣ 核心 API 使用示例(C++ 风格伪代码)

#include <rcu.hpp> // 假设存在 C++ 封装
rcu_domain mydomain;
// 读者
void reader() {
    rcu_domain::lock(mydomain);  // rcu_read_lock()
    int x = ptr->value;          // 读取受 RCU 保护的数据
    rcu_domain::unlock(mydomain);// rcu_read_unlock()
}
// 写者
void writer(Config* new_cfg) {
    Config* old_cfg = cfg_ptr.exchange(new_cfg, std::memory_order_release); // 更新指针
    synchronize_rcu();         // 等待旧读者完成
    old_cfg->retire();         // 安全释放旧数据
}

注释说明

  1. 读者操作几乎无阻塞,允许高并发读取。
  2. 写者操作只在释放资源时等待旧读者完成,保证不会破坏读者视图。
  3. RCU 的核心是 延迟删除 + 安全同步,而不是强制锁整个资源。

5⃣ 可视化理解

可以把 RCU 的时间语义理解为:

Reader 1  |=======|
Reader 2      |=======|
Writer       |---更新---| (只在释放前等待读者完成)
Reader 3           |=======|
  • 写者不会阻塞前面或后面的读者,只在删除旧对象时“排队”等待所有旧读者完成。
  • 因此读操作非常轻量,写操作也不会造成全局阻塞。

RCU 时间语义

Time rcu_read_lock() rcu_read_unlock() Remove synchronize_rcu() [return] Free Old Memory Remove synchronize_rcu() ::: ::: ::: [return] Free Old Memory rcu_read_lock() rcu_read_unlock()

1⃣ 图像总体说明

这张图展示了 RCU(Read-Copy-Update)核心时间语义和操作流程,特别强调了:

  • 读者(Reader):并发访问共享数据,几乎无阻塞
  • 写者/更新者(Updater):修改数据,但删除旧数据需等待读者完成
  • synchronize_rcu():写者等待所有旧读者完成临界区后再安全释放旧对象
    时间轴从左向右或从上到下(图中竖线表示时间方向):
  • 左侧竖线:时间轴
  • 水平虚线:读者临界区或同步点
  • 箭头/矩形:读者和写者操作
    RCU 的核心目标是 提供高并发读访问,同时保证写者安全释放旧数据

2⃣ 核心元素解析

2.1 时间轴

  • 蓝色竖线 + 箭头表示时间方向
  • “Time” 标签标注
  • 读者和写者的操作沿此方向展开
  • 可以理解为 事件发生的顺序

2.2 读者操作(Reader Box)

  • rcu_read_lock():进入读者临界区
  • rcu_read_unlock():退出临界区
  • 读者可以并发访问,不阻塞其他读者
  • 写者需要在删除旧数据前等待这些读者完成

理解:RCU 的读操作几乎无锁,非常轻量,适合高频访问场景。

2.3 写者操作(Updater Box)

  • Remove:标记删除操作开始
  • synchronize_rcu():阻塞等待,直到所有旧读者退出临界区
  • [return]:同步完成返回
  • Free Old Memory:安全释放旧对象

理解:写者不会立即删除旧数据,而是通过 synchronize_rcu() 延迟删除,确保读者安全。

2.4 时间虚线和阻塞点

  • 白色虚线:表示操作完成点或同步等待点
  • 红色圆圈 + 叉号:表示潜在错误或操作冲突(例如未遵循 RCU 语义导致的未定义行为)

理解:RCU 不会阻塞大部分读操作,但写者必须等待旧读者完成,否则可能出现访问已释放内存的危险。

3⃣ RCU 时间语义总结

  1. 读者临界区
    • rcu_read_lock() / rcu_read_unlock()
    • 高并发读,无阻塞
  2. 写者更新
    • 更新数据指针不阻塞读者
    • 调用 synchronize_rcu() 等待旧读者完成
    • 删除旧数据(Free Old Memory)
  3. RCU 核心约束
    • 写者等待旧读者完成
    • 读者不等待写者开始,只阻塞写者释放旧对象
    • 确保读者看到的数据一致且安全

4⃣ 核心 C++ 使用示例

// 读者
rcu_read_lock();
int val = cfg_ptr->value; // 安全读取共享数据
rcu_read_unlock();
// 写者
Config* new_cfg = new Config(...);
Config* old_cfg = cfg_ptr.exchange(new_cfg, std::memory_order_release);
synchronize_rcu(); // 等待旧读者完成
delete old_cfg;    // 安全释放旧对象
  • cfg_ptr 是 RCU 保护的指针
  • 旧读者完成后再删除旧对象,保证安全

✓ 总结

  1. RCU 提供轻量级读者-写者语义
  2. 高并发读者 → 几乎无阻塞
  3. 写者删除延迟 → 安全且高效
  4. synchronize_rcu() 是关键同步点,保证时间语义正确
    RCU Semantics (Exercise The Code)

RCU 语义:空间-时间图解深度解析

TIME STEP 0 初始状态:curconfig 指向旧配置 {37, 46} 旧配置对象 37, 46 curconfig 内存中只有一个配置对象,指针指向它。 读者通过 curconfig 读取 a=37, b=46。 STEP 1 左读者进入临界区,load 旧指针 → mcp={37,46} 旧配置对象 37, 46 curconfig // 左读者(早期) { rlock mcp = load(consume); ◀ *cur_a = mcp->a; *cur_b = mcp->b; } mcp 指向旧配置 关键: rcu_read_lock() 标记进入临界区, consume 建立依赖链,保证后续读不重排。 STEP 2 读者读 a=37,写者同时 Copy 新对象 旧配置对象 37, 46 新配置对象 39, 44 构建中... curconfig // 左读者 { rlock mcp = load(consume); *cur_a = mcp->a; ◀ (37) *cur_b = mcp->b; } // 写者 mcp = new myconfig(); ◀ mcp->a = 39; mcp->b = 44; ◀ curconfig.exchange(...); rcu_synchronize(); 读者在自己的 mcp(旧指针)上独立读取, 写者同时在私有内存上构建新对象,互不干扰。 ⚡ 写者构建新配置时不阻塞任何读者! STEP 3 写者 exchange:指针原子切换到新配置 旧配置对象 37, 46 (读者仍持有) 新配置对象 39, 44 curconfig ⚡ curconfig.exchange(new_mcp) // 左读者(仍在临界区中!) { rlock mcp = load(); → {37,46} *cur_a = mcp→a; (37) ✓ *cur_b = mcp→b; ◀ 仍读旧 } // 写者 mcp = new myconfig(); mcp→a=39; mcp→b=44; mcp=curconfig.exchange(mcp); ◀ rcu_synchronize(); delete mcp; exchange 是原子操作(release 语义): • 新读者立刻看到新配置 {39,44} • 旧读者的 mcp 不受影响,继续读旧配置 • 两块内存同时存在,RCU 保证旧块不被提前 delete! STEP 4 两个读者并发:左读旧 b=46,右读新配置 旧配置对象 37, 46 新配置对象 39, 44 curconfig // 左读者(临界区末尾) { rlock mcp = load(); → {37,46} *cur_a = mcp→a; ✓ 37 *cur_b = mcp→b; ◀ 46 ✓ } ← 退出临界区 // 右读者(晚期,exchange 后进入) { rlock mcp = load(); → {39,44} ◀ *cur_a = mcp→a; ◀ 39 ✓ *cur_b = mcp→b; ◀ 44 ✓ } 一致性保证: 左读者:{37, 46} ── 完整旧版本 ✓ 右读者:{39, 44} ── 完整新版本 ✓ 不可能出现:{37, 44} 或 {39, 46} 的混合状态! STEP 5 Grace Period:写者在 rcu_synchronize() 阻塞等待 旧配置(受保护) 37, 46 🔒 不能释放 新配置对象 39, 44 curconfig ⏳ Grace Period(宽限期) ⏳ // 左读者(已退出临界区) { rlock ... *cur_a=37 *cur_b=46 ✓ } ✅ 已退出 // 写者 curconfig.exchange(mcp_new); rcu_synchronize(); ◀ 阻塞中... delete mcp; // 右读者 { rlock ... {39,44} } ← 仍在临界区 rcu_synchronize() 等待的条件: 在 exchange 之前进入临界区的读者 全部退出 (右读者是 exchange 之后进入的,不计入等待条件) 旧配置 {37,46} 在宽限期内绝对不会被释放! Grace Period STEP 6 宽限期结束,delete 旧配置,内存安全回收 旧配置对象 37, 46 💥 DELETED 新配置对象 39, 44 curconfig // 写者 curconfig.exchange(mcp_new); rcu_synchronize(); ✅ 返回 delete mcp; ◀ 安全! // 更新完成,无悬挂指针 为什么此时 delete 是安全的? rcu_synchronize() 返回时,保证: ✓ 所有在 exchange 前进入临界区的读者已退出 ✓ 没有任何线程的 mcp 指向旧配置 ✓ 新读者只持有新指针,不受影响 ∴ delete 旧对象,零 UAF(Use-After-Free)风险! 总结 内存序因果链 & RCU 核心不变式 写者 mcp→a=39 mcp→b=44 (普通写) sb exchange (release) 发布新指针 sw load (consume) 读者订阅 hb 读者 mcp→a (依赖链) mcp→b (依赖链) 保证读到 39 和 44 RCU 三大核心保证: ① 读者零开销: 读者不修改任何共享状态,无 Cache 抖动,线性扩展 ② 一致性快照: 每个读者看到完整的某一版本,不存在{新a, 旧b}的中间态 ③ 安全回收: 宽限期后 delete,保证无任何读者持有旧指针,零 UAF 空间-时间语义由 Jonathan Walpole, Josh Triplett, Phil Howard 首次阐明

一、整体场景设定

这组图展示了 RCU 最核心的语义:在写者更新期间,不同时刻进入的读者各自看到一致的快照

// 全局被保护结构
struct myconfig { int a; int b; };
std::atomic<myconfig*> curconfig;  // 初始指向 {37, 46}

三个并发角色:

左侧读者(早):在写者替换之前进入临界区
中间写者      :替换配置 {37,46} → {39,44}
右侧读者(晚):在写者替换之后进入临界区

二、读者代码逐行解析

void get(int* cur_a, int* cur_b) {
    struct myconfig* mcp;
    // ① 进入 RCU 读临界区
    // scoped_lock 析构时自动调用 rcu_read_unlock
    // 对应 Linux 内核的 rcu_read_lock()
    // 开销极低:x86 上仅一条 memory barrier 或完全无指令
    std::scoped_lock rlock(std::rcu_default_domain());
    // ② 带 consume 语义的指针读取
    // memory_order_consume 建立数据依赖链:
    //   凡是"依赖 mcp"的后续读(mcp->a, mcp->b)
    //   都保证在此 load 之后发生,不会被重排到前面
    // 注:当前编译器普遍将 consume 提升为 acquire
    mcp = curconfig.load(std::memory_order_consume);
    // ③ 通过依赖链安全读取字段
    // consume 保证:写者对这两个字段的写
    //              happens-before 此处的读
    *cur_a = mcp->a;   // 依赖链:mcp ──► mcp->a
    *cur_b = mcp->b;   // 依赖链:mcp ──► mcp->b
    // ④ scoped_lock 析构 → 自动退出临界区
    // 退出后写者的 rcu_synchronize() 才可能返回
}

三、写者代码逐行解析

void set(int cur_a, int cur_b) {
    // ① Copy:在堆上分配全新的配置对象
    // 此时旧配置仍然有效,读者可以安全读旧配置
    // 写者在私有内存上操作,不影响任何读者
    struct myconfig* mcp = new myconfig();
    // ② Update:初始化新配置的所有字段
    // 在指针发布之前完成,保证读者看到完整的新对象
    mcp->a = cur_a;   // 新值 39
    mcp->b = cur_b;   // 新值 44
    // ③ Publish:原子替换指针
    // exchange 的 release 语义保证:
    //   上面的写(mcp->a=39, mcp->b=44)
    //   happens-before 任何通过 consume/acquire 读到新指针的线程
    // exchange 返回旧指针,mcp 现在指向旧配置 {37,46}
    mcp = curconfig.exchange(mcp, std::memory_order_release);
    // ④ 等待宽限期(Grace Period)
    // 阻塞直到:所有"在 exchange 之前进入临界区的读者"全部退出
    // 即:所有持有旧指针 {37,46} 的读者都已完成并退出
    // 此后新读者只会读到新指针 {39,44}
    rcu_synchronize();
    // ⑤ 安全释放旧配置
    // 宽限期结束 = 没有任何线程持有旧指针
    // 此时 delete 是完全安全的
    delete mcp;   // 释放旧的 {37,46}
}

四、空间-时间图解析

最后一张 SVG 是最重要的:用坐标轴表示 RCU 的时空语义。

纵轴(Time)↑ = 时间流逝
横轴         = 地址空间(两块内存:旧配置区 / 新配置区)
─────────────────────────────────────────────────────────────────
时间线:
  早期读者(readers)          │ 地址空间:
  ─────────────────────────  │
  { rlock                    │  [37,46]  curconfig ──► [37,46]
  mcp = load();              │
  *cur_a = mcp->a; (37)      │
  *cur_b = mcp->b; (46)      │
  }                          │
        ↓                    │
══════ Grace Period ══════   │  写者替换:curconfig ──► [39,44]
  rcu_synchronize()          │  [37,46] 仍存在,等宽限期
  (等待上方读者退出)         │
        ↓                    │
  delete mcp ({37,46})       │  [37,46] 现在安全释放
════════════════════════════ │
  晚期读者(readers)          │
  { rlock                    │  curconfig ──► [39,44]
  mcp = load();              │
  *cur_a = mcp->a; (39)      │
  *cur_b = mcp->b; (44)      │
  }                          │
─────────────────────────────────────────────────────────────────

三条关键分界线
早期读者区 ⏟ 看到  { 37 , 46 } ∥ 宽限期 GP ⏟ rcu_synchronize() ∥ 晚期读者区 ⏟ 看到  { 39 , 44 } \underbrace{\text{早期读者区}}_{\text{看到 } \{37,46\}} \quad \| \quad \underbrace{\text{宽限期 GP}}_{\text{rcu\_synchronize()}} \quad \| \quad \underbrace{\text{晚期读者区}}_{\text{看到 } \{39,44\}} 看到 {37,46} 早期读者区rcu_synchronize() 宽限期 GP看到 {39,44} 晚期读者区

五、多帧图的逐步演进

图中用蓝色高亮矩形标注"当前执行到的行",完整时序如下:

阶段 1:初始状态

curconfig ──► {37, 46}
左读者:即将执行 mcp = load()
写者:  即将执行 mcp = new ...
右读者:尚未开始

阶段 2:左读者进入临界区,读取旧配置

// 左读者高亮行:
mcp = curconfig.load(consume);  // mcp → {37,46}
*cur_a = mcp->a;                // 高亮:读到 37
此时写者可能已经在执行:
  mcp_new->a = 39
  mcp_new->b = 44
  (但还没 exchange,所以 curconfig 仍指向旧配置)

阶段 3:写者执行 exchange,指针切换

// 写者高亮行:
mcp = curconfig.exchange(mcp_new, release);
// curconfig 现在指向 {39,44}
// mcp 持有旧指针 {37,46}
两块内存同时存在:
  {37,46} ── 左读者的 mcp 仍指向这里(受 RCU 保护)
  {39,44} ── curconfig 现在指向这里(新读者可见)

阶段 4:左读者继续读(读旧值,仍然一致)

// 左读者继续(mcp 仍指向旧对象):
*cur_b = mcp->b;  // 读到 46(而非新值 44)
// 虽然 curconfig 已切换,但 mcp 是 exchange 之前 load 的
// 所以读者始终读到一致的 {37, 46}

一致性保证: { c u r _ a = 37 ,   c u r _ b = 46 } 来自同一个对象 \text{一致性保证:} \quad \{cur\_a=37,\ cur\_b=46\} \quad \text{来自同一个对象} 一致性保证:{cur_a=37, cur_b=46}来自同一个对象

阶段 5:宽限期等待

// 写者阻塞在:
rcu_synchronize();
// 等待左读者退出临界区(执行 scoped_lock 析构)
// 此期间 {37,46} 不能释放
宽限期(Grace Period)的精确语义:
  开始:curconfig.exchange() 之后
  结束:所有"在 exchange 之前进入临界区"的读者全部退出
  期间可以有任意多的新读者,但它们读的是新配置 {39,44}
  不影响宽限期判断

阶段 6:右读者读新配置

// 右读者(宽限期内或之后进入):
mcp = curconfig.load(consume);  // mcp → {39,44}
*cur_a = mcp->a;  // 39
*cur_b = mcp->b;  // 44

阶段 7:宽限期结束,安全释放

// 宽限期结束后:
delete mcp;  // 释放 {37,46},此时零引用

六、内存序的完整因果链

写者:
  mcp->a = 39  ─┐
  mcp->b = 44  ─┤ sequenced-before
                 ↓
  exchange(release) ──────────────────── 同步点
                                              │
                                    synchronizes-with
                                              │
读者(读到新指针):                            ↓
  load(consume) ──────────────────────── 同步点
                 ↓
  mcp->a  ─┐ happens-after(依赖链)
  mcp->b  ─┘ 保证读到 39 和 44

写  39 , 44 ⏟ 普通写 → s b e x c h a n g e ( r e l ) ⏟ 发布 → s w l o a d ( c o n s u m e ) ⏟ 订阅 → h b 读  39 , 44 ⏟ 依赖链 \underbrace{写\ 39,44}_{\text{普通写}} \xrightarrow{sb} \underbrace{exchange(rel)}_{\text{发布}} \xrightarrow{sw} \underbrace{load(consume)}_{\text{订阅}} \xrightarrow{hb} \underbrace{读\ 39,44}_{\text{依赖链}} 普通写  39,44sb 发布 exchange(rel)sw 订阅 load(consume)hb 依赖链  39,44

七、RCU 语义的核心不变式

∀ 读者  r ,   ∃ 版本  v ∈ { 旧 , 新 } : r  读到的 { a , b }  完全来自版本  v 不存在 { a 新 , b 旧 }  或 { a 旧 , b 新 }  的中间状态 \boxed{ \forall \text{读者}\ r,\ \exists \text{版本}\ v \in \{\text{旧}, \text{新}\}:\\ r \text{ 读到的} \{a, b\} \text{ 完全来自版本 } v\\ \text{不存在} \{a_{\text{新}}, b_{\text{旧}}\} \text{ 或} \{a_{\text{旧}}, b_{\text{新}}\} \text{ 的中间状态} } 读者 r, 版本 v{,}:r 读到的{a,b} 完全来自版本 v不存在{a,b} {a,b} 的中间状态
宽限期的精确定义
G P = { t ∣ t > t e x c h a n g e   ∧   ∀ r ∈ 早期读者 ,   r  已退出临界区 } GP = \{t \mid t > t_{exchange}\ \land\ \forall r \in \text{早期读者},\ r \text{ 已退出临界区}\} GP={tt>texchange  r早期读者, r 已退出临界区}
历史注记:这套空间-时间图的表达方式,据作者所知最早由 Jonathan Walpole 及其学生 Josh Triplett 和 Phil Howard 提出,是理解 RCU 语义最直观的工具。

RCU 核心 API:时间同步 vs 空间同步

一、六个核心 API 的分类

RCU 的 API 按职责分为时间维度空间维度两类:

时间维度(Temporal)── 控制"何时"可以安全操作
  rcu_domain::lock()      进入读临界区(标记读者开始)
  rcu_domain::unlock()    退出读临界区(标记读者结束)
  rcu_synchronize()       等待所有已存在的读者退出
空间维度(Spatial)── 控制"哪个版本"对读者可见
  memory_order_consume    读取 RCU 保护的指针(建立依赖链)
  memory_order_release    更新 RCU 保护的指针(发布新版本)
  .retire()               宽限期结束后异步调用 deleter

RCU = 时间同步 ⏟ lock/unlock/synchronize × 空间同步 ⏟ consume/release/retire \text{RCU} = \underbrace{\text{时间同步}}_{\text{lock/unlock/synchronize}} \times \underbrace{\text{空间同步}}_{\text{consume/release/retire}} RCU=lock/unlock/synchronize 时间同步×consume/release/retire 空间同步

二、空间/时间同步框架

读者的两个维度

读者:
  时间维度(何时进出临界区)
    ├─ rcu_domain::lock()   → 时间同步点 1(临界区开始)
    └─ rcu_domain::unlock() → 时间同步点 2(临界区结束)
  空间维度(读哪个版本)
    └─ memory_order_consume load → 获取当前版本指针
                                   建立依赖链,保证读到一致数据

写者的拆分原则

写者的操作必须拆分为读者可见读者不可见两个阶段:

Add(新增)操作:
  ① 初始化新数据(读者不可见阶段)
       ↓
  ② memory_order_release store(读者可见阶段)
     → 新数据对所有后续读者原子可见
Delete(删除)操作:
  ① memory_order_release exchange(读者不可见阶段)
     → 旧数据对新读者不再可见
       ↓
  ② rcu_synchronize()(时间同步)
     → 等待所有持有旧指针的读者退出
       ↓
  ③ free/delete(安全回收)

存在性保证(Existence Guarantee)

RCU 提供的不是所有权(ownership),而是存在性保证(existence guarantee):
在临界区内 ⇒ 对象一定存在(不会被 delete) \text{在临界区内} \Rightarrow \text{对象一定存在(不会被 delete)} 在临界区内对象一定存在(不会被 delete
退出临界区后 ⇒ 不再保证对象存在 \text{退出临界区后} \Rightarrow \text{不再保证对象存在} 退出临界区后不再保证对象存在
shared_ptr 的对比:

shared_ptr:所有权语义,引用计数归零才释放(有原子 RMW 开销)
RCU:       存在性语义,临界区内保证存在,退出后不持有(零开销)

三、读者的空间/时间同步完整注解

void get(int* cur_a, int* cur_b)
{
    struct myconfig* mcp;
    // ══════════════════════════════════════════════════
    // 时间同步 1(Temporal Synchronization 1)
    // ══════════════════════════════════════════════════
    // 作用:标记"读者开始"
    // 对写者的 rcu_synchronize() 而言:
    //   - lock 之前的 synchronize 不需要等待本读者
    //   - lock 之后的 synchronize 需要等待本读者退出
    // 开销:x86 上几乎为零(无原子 RMW,无 Cache 抖动)
    std::scoped_lock rlock(std::rcu_default_domain());
    //                     ↑ 析构时自动调用 unlock
    // ══════════════════════════════════════════════════
    // 空间同步(Spatial Synchronization)
    // ══════════════════════════════════════════════════
    // 作用:获取"当前版本"的指针
    // memory_order_consume 建立数据依赖链:
    //   mcp 的所有派生读(mcp->a, mcp->b)
    //   保证在此 load 之后发生,不被重排到前面
    // 写者的 release store ──synchronizes-with──► 此 consume load
    // 保证:写者对 mcp->a, mcp->b 的写 happens-before 此处的读
    mcp = curconfig.load(std::memory_order_consume);
    // ══════════════════════════════════════════════════
    // 存在性保证(Existence Guarantee)
    // ══════════════════════════════════════════════════
    // 因为处于临界区内(rlock 持有中),
    // RCU 保证 mcp 指向的对象在此期间不会被 delete
    // 即使写者已经调用了 exchange 切换了 curconfig 指针
    *cur_a = mcp->a;   // 空间同步保证:读到一致的 37
    *cur_b = mcp->b;   // 空间同步保证:读到一致的 46
    // ══════════════════════════════════════════════════
    // 时间同步 2(Temporal Synchronization 2)
    // ══════════════════════════════════════════════════
    // 作用:标记"读者结束"
    // scoped_lock 析构自动触发 rcu_domain::unlock()
    // 此后:
    //   - 写者的 rcu_synchronize() 可以不再等待本读者
    //   - mcp 不再受 RCU 保护,不能再使用!
} // ← 自动 unlock,时间同步点 2

时间与空间的配合关系:
l o c k ⏟ 时间同步1 → l o a d ( c o n s u m e ) ⏟ 空间同步 → 读  a , b ⏟ 存在性保证 → u n l o c k ⏟ 时间同步2 \underbrace{lock}_{\text{时间同步1}} \xrightarrow{\quad} \underbrace{load(consume)}_{\text{空间同步}} \xrightarrow{\quad} \underbrace{读\ a,b}_{\text{存在性保证}} \xrightarrow{\quad} \underbrace{unlock}_{\text{时间同步2}} 时间同步1 lock 空间同步 load(consume) 存在性保证  a,b 时间同步2 unlock

四、写者的空间/时间同步完整注解

void set(int cur_a, int cur_b)
{
    // ── 阶段一:构建新版本(读者不可见,无需同步)────────────
    struct myconfig* mcp = new myconfig();
    // 普通写,不需要任何屏障
    // 此时 mcp 是写者的私有对象,没有任何读者能访问到它
    mcp->a = cur_a;   // 写 39
    mcp->b = cur_b;   // 写 44
    // ══════════════════════════════════════════════════
    // 空间同步(Spatial Synchronization)
    // ══════════════════════════════════════════════════
    // exchange 是原子操作,包含 memory_order_release 语义
    //
    // 对新读者的效果("make new data accessible"):
    //   release 保证:上方 mcp->a=39, mcp->b=44 的写
    //   happens-before 任何通过 consume/acquire 读到新指针的读者
    //   → 新读者看到完整的 {39,44}
    //
    // 对旧读者的效果("make old data inaccessible"):
    //   exchange 后,新读者通过 curconfig 只能读到新指针
    //   旧读者已持有旧指针,不受影响,仍读旧数据
    //
    // exchange 返回旧指针,mcp 现在指向旧配置 {37,46}
    mcp = curconfig.exchange(mcp, std::memory_order_release);
    // ══════════════════════════════════════════════════
    // 时间同步(Temporal Synchronization)
    // ══════════════════════════════════════════════════
    // 等待所有"在 exchange 之前进入临界区"的读者退出
    // 即:等待所有可能持有旧指针 {37,46} 的读者全部退出
    //
    // 精确语义:
    //   设 T_x = exchange 执行的时刻
    //   rcu_synchronize() 返回时,保证:
    //   所有在 T_x 之前调用 lock() 的读者都已调用 unlock()
    rcu_synchronize();
    // rcu_synchronize() 返回 = 宽限期(Grace Period)结束
    // ══════════════════════════════════════════════════
    // 安全回收(Safe Reclamation)
    // ══════════════════════════════════════════════════
    // 宽限期结束后:
    //   - 没有任何线程的指针指向旧配置 {37,46}
    //   - delete 是完全安全的,零 UAF 风险
    delete mcp;   // 释放旧配置 {37,46}
}

写者操作的时空关系:
m c p → a = 39 ,   m c p → b = 44 ⏟ 初始化新数据(不可见) → s b e x c h a n g e ( r e l e a s e ) ⏟ 空间同步:新可见,旧不可访问 → G P r c u _ s y n c h r o n i z e ( ) ⏟ 时间同步:等待旧读者 → d e l e t e ⏟ 安全回收 \underbrace{mcp\text{→}a=39,\ mcp\text{→}b=44}_{\text{初始化新数据(不可见)}} \xrightarrow{sb} \underbrace{exchange(release)}_{\text{空间同步:新可见,旧不可访问}} \xrightarrow{GP} \underbrace{rcu\_synchronize()}_{\text{时间同步:等待旧读者}} \xrightarrow{} \underbrace{delete}_{\text{安全回收}} 初始化新数据(不可见) mcpa=39, mcpb=44sb 空间同步:新可见,旧不可访问 exchange(release)GP 时间同步:等待旧读者 rcu_synchronize() 安全回收 delete

五、时间 vs 空间的本质区别

时间同步解决的问题:
  "何时"可以安全释放内存?
  答:所有持有旧指针的读者退出之后
  API:lock / unlock / synchronize / retire
  类比:引用计数的"归零",但无原子 RMW 开销
空间同步解决的问题:
  "哪个版本"对读者可见?
  答:读者在 lock 之后、unlock 之前 load 到的那个版本
  API:consume load / release store
  类比:mutex 保护的临界区,但无锁竞争开销

RCU 的核心创新 = 时间同步(等待读者) 空间同步(指针原子切换) ⇒ 读者完全无锁 \text{RCU 的核心创新} = \frac{\text{时间同步(等待读者)}}{\text{空间同步(指针原子切换)}} \Rightarrow \text{读者完全无锁} RCU 的核心创新=空间同步(指针原子切换)时间同步(等待读者)读者完全无锁

六、存在性保证 vs 所有权保证

// ── 所有权语义(shared_ptr)──────────────────────────────
{
    auto sp = g_shared_ptr;   // 原子引用计数 +1(~10ns)
    use(sp->data);
}   // 引用计数 -1(~10ns),归零时 delete
// 每次读操作都有两次原子 RMW,随线程数线性增加开销
// ── 存在性语义(RCU)─────────────────────────────────────
{
    std::scoped_lock rlock(rcu_domain);  // ~0ns(无原子 RMW)
    auto* p = ptr.load(consume);         // 一条 load 指令
    use(p->data);
    // p 不能带出临界区!退出后不保证存在
}   // ~0ns
// 任意多读者并发,开销不随线程数增加

shared_ptr 开销 = O ( N ) (N 个读者,N 次原子 RMW) \text{shared\_ptr 开销} = O(N) \quad \text{(N 个读者,N 次原子 RMW)} shared_ptr 开销=O(N)个读者,次原子 RMW
RCU 读者开销 = O ( 1 ) ≈ 0 (任意多读者) \text{RCU 读者开销} = O(1) \approx 0 \quad \text{(任意多读者)} RCU 读者开销=O(1)0(任意多读者)

七、retire 与 synchronize 的选择

// ── 方式一:synchronize_rcu(同步,阻塞写者)───────────────
void set_sync(int a, int b) {
    auto* mcp = new myconfig{a, b};
    auto* old = curconfig.exchange(mcp, release);
    // 写者线程阻塞,等宽限期
    // 简单,但影响写者吞吐量
    rcu_synchronize();
    delete old;
}
// ── 方式二:retire(异步,写者立即返回)────────────────────
void set_async(int a, int b) {
    auto* mcp = new myconfig{a, b};
    auto* old = curconfig.exchange(mcp, release);
    // 注册回调,宽限期结束后 RCU 自动调用 deleter
    // 写者立即返回,不等待
    std::rcu_retire(old);   // C++26 或 liburcu::call_rcu
}

方式 写者延迟 内存峰值 适用场景
synchronize 高(等宽限期) 写频率低
retire 低(立即返回) 较高(堆积待释放) 写频率高

时间同步 ↔ lock/unlock/synchronize/retire 空间同步 ↔ consume load / release store 两者配合 ⇒ 读者存在性保证 + 写者安全回收 \boxed{ \text{时间同步} \leftrightarrow \text{lock/unlock/synchronize/retire}\\ \text{空间同步} \leftrightarrow \text{consume\ load\ /\ release\ store}\\ \text{两者配合} \Rightarrow \text{读者存在性保证} + \text{写者安全回收} } 时间同步lock/unlock/synchronize/retire空间同步consume load / release store两者配合读者存在性保证+写者安全回收

RCU 时空同步的实际应用与链表操作

一、谁在做空间同步?

空间同步(Spatial Synchronization)并不是 RCU 独有的概念,而是并发编程中普遍存在的模式:

每任务栈变量(Per-task stack locations)
  └─ 每个线程有自己的栈,天然隔离,无竞争
     int local_var;   // 栈上变量,只有本线程访问
每 CPU/线程变量(Per-CPU/-thread variables)
  └─ __thread int counter;  // 线程本地存储
     各线程读写自己的副本,汇总时才需同步
带 per-bucket 锁的哈希表(Hash tables with per-bucket locks)
  └─ 分片(Sharding)的本质:将一个全局锁拆成 N 个局部锁
     操作 key → hash(key) % N → 只锁第 N 个 bucket
     不同 bucket 的操作完全并行
风险指针(Hazard Pointers)及其他延迟回收
  └─ 读者发布"我正在用这个指针"的声明
     写者看到声明后延迟 delete

核心结论:空间同步无处不在,几乎所有人都在用,只是很多人没意识到这个术语。
空间同步 = 通过数据划分减少竞争,而非通过时间串行化 \text{空间同步} = \text{通过数据划分减少竞争,而非通过时间串行化} 空间同步=通过数据划分减少竞争,而非通过时间串行化

二、rwlock vs RCU 性能对比

空临界区(纯调度开销)

测试:线程数 1 → 56,临界区为空(只测锁本身的开销)
rwlock 读者吞吐量:
  线程数  1  →  吞吐量  1.0x(基准)
  线程数  4  →  吞吐量  0.8x  ← 已开始下降
  线程数 14  →  吞吐量  0.3x  ← 严重退化
  线程数 56  →  吞吐量  0.08x ← Worst Case
RCU 读者吞吐量:
  线程数  1  →  吞吐量  1.0x
  线程数  4  →  吞吐量  4.0x  ← 线性增长
  线程数 56  →  吞吐量  56.0x ← 完美线性扩展

rwlock 吞吐量 ∝ 1 N RCU 吞吐量 ∝ N \text{rwlock 吞吐量} \propto \frac{1}{N} \qquad \text{RCU 吞吐量} \propto N rwlock 吞吐量N1RCU 吞吐量N

非空临界区(含实际工作)

测试:临界区内有实际计算(读取若干字段)
rwlock:随线程增加,竞争导致大量等待,吞吐量仍然下降
RCU:   临界区内无锁,读者并行执行,吞吐量接近线性增长

根本原因:rwlock 的 reader_count 是所有读者共享的热点变量,每次加锁都触发全局 Cache Invalidate。RCU 读者不修改任何共享状态,Cache Line 保持 Shared 状态。
rwlock:读者加锁 = 写操作 ⏟ 修改 reader_count 本质上是写竞争 \text{rwlock}:\text{读者加锁} = \underbrace{\text{写操作}}_{\text{修改 reader\_count}} \quad \text{本质上是写竞争} rwlock读者加锁=修改 reader_count 写操作本质上是写竞争
RCU:读者加锁 ≈ 0 不修改任何共享变量 \text{RCU}:\text{读者加锁} \approx 0 \quad \text{不修改任何共享变量} RCU读者加锁0不修改任何共享变量

三、RCU 链表操作

原子指针替换是 RCU 的特殊用法,更常见的是链表的插入、删除、遍历

3.1 数据结构定义

struct mystruct {
    int a;                  // 数据字段 A
    int b;                  // 数据字段 B
    struct list_head list;  // 链表节点(内嵌到结构体中)
                            // 用于 list_add_rcu / list_del_rcu
    struct rcu_head rh;     // RCU 回调头
                            // 用于 kfree_rcu 异步释放
                            // 宽限期结束后自动 free(msp)
};
// 全局链表头(受 mylock 保护写操作,受 RCU 保护读操作)
LIST_HEAD(mylist);
DEFINE_SPINLOCK(mylock);

3.2 插入操作

void mystruct_insert(int a, int b)  // 允许重复键值
{
    // ① 分配并初始化新节点(在锁外进行,不阻塞其他操作)
    struct mystruct* msp = kmalloc(sizeof(*msp), GFP_KERNEL);
    initialize_mystruct(msp, a, b);
    // 此时 msp 是私有的,没有任何读者能看到它
    // ② 加锁(写者互斥,防止多个写者并发修改链表结构)
    // 注意:这里只需要保护写者之间的互斥,读者完全不需要这个锁
    spin_lock(&mylock);
    // ③ RCU 安全插入
    // list_add_rcu 包含 smp_wmb()(写内存屏障)
    // 保证:msp 的所有字段初始化 happens-before 链表指针的发布
    // 即:读者看到 msp 在链表中时,msp->a 和 msp->b 一定已初始化
    // 这就是"空间同步":release 语义保证新数据对读者可见且完整
    list_add_rcu(&msp->list, &mylist);
    spin_unlock(&mylock);
    // 解锁后,新节点对所有新读者立即可见(但旧读者不受影响)
}

插入的内存序保证:
m s p → a = a ,   m s p → b = b ⏟ 初始化(读者不可见) → w m b l i s t _ a d d _ r c u ⏟ 空间同步:发布 → s w 读者  l i s t _ f o r _ e a c h _ r c u ⏟ 读到完整节点 \underbrace{msp\text{→}a=a,\ msp\text{→}b=b}_{\text{初始化(读者不可见)}} \xrightarrow{wmb} \underbrace{list\_add\_rcu}_{\text{空间同步:发布}} \xrightarrow{sw} \underbrace{读者\ list\_for\_each\_rcu}_{\text{读到完整节点}} 初始化(读者不可见) mspa=a, mspb=bwmb 空间同步:发布 list_add_rcusw 读到完整节点 读者 list_for_each_rcu

3.3 删除操作

void mystruct_delete(int a)
{
    struct mystruct* msp = NULL;
    // ① 加锁,保护写者之间互斥
    spin_lock(&mylock);
    // ② 查找目标节点(在锁保护下,mystruct_search 可以不用 rcu_read_lock)
    msp = mystruct_search(a);
    if (msp)
        // ③ RCU 安全删除:从链表中摘除节点
        // list_del_rcu 只修改链表指针,不释放内存
        // 执行后:
        //   - 新读者的 list_for_each_entry_rcu 不会看到 msp
        //   - 旧读者(已在遍历中)仍然可以安全访问 msp
        //   - msp->list.next 被设为特殊值,但 RCU 读者不用 next 判断结束
        list_del_rcu(&msp->list);
    spin_unlock(&mylock);
    if (msp)
        // ④ 异步释放(关键!)
        // kfree_rcu(msp, rh) 等价于:
        //   call_rcu(&msp->rh, [](rcu_head* rh){
        //       kfree(container_of(rh, mystruct, rh));
        //   });
        // 宽限期结束后自动调用 kfree(msp)
        // 不阻塞当前线程,写者立即返回
        kfree_rcu(msp, rh);
        // rh 是 mystruct 中 rcu_head 字段的名字
}

删除的时空关系:

空间同步(list_del_rcu):
  将节点从链表摘除 → 新读者看不到该节点
  旧读者仍可通过已持有的指针访问(存在性保证)
时间同步(kfree_rcu 内部的宽限期):
  等待所有"在 list_del_rcu 之前进入临界区"的旧读者退出
  宽限期结束 → 节点真正释放
两者配合:
  摘除(立即)→ 宽限期(异步等待)→ 释放(安全)

3.4 搜索操作(底层,供内部使用)

// 调用者必须满足以下条件之一:
//   1. 已持有 rcu_read_lock()(读者路径)
//   2. 已持有 mylock(写者路径,如 mystruct_delete 中的调用)
struct mystruct* mystruct_search(int a)
{
    struct mystruct* msp;
    // list_for_each_entry_rcu:RCU 安全的链表遍历
    // 内部使用 READ_ONCE() + smp_read_barrier_depends()
    // 等价于对每个 next 指针做 memory_order_consume load
    // 保证:读到链表节点时,节点的所有字段已初始化(依赖链保证)
    list_for_each_entry_rcu(msp, &mylist, list) {
        if (msp->a == a)
            return msp;
        // 注意:返回的指针在调用者的临界区内才安全
        // 调用者退出临界区后不能再使用此指针
    }
    return NULL;
}

思考题(原文提问):如果要让 mystruct_search() 自己做同步(自带 rcu_read_lock),需要修改什么?

// 修改版:自带同步的 search(不要求调用者持有锁)
struct mystruct* mystruct_search_standalone(int a)
{
    struct mystruct* msp;
    struct mystruct* result = NULL;
    rcu_read_lock();   // 自己管理临界区
    list_for_each_entry_rcu(msp, &mylist, list) {
        if (msp->a == a) {
            // 问题:返回指针后临界区就结束了,指针不再受保护!
            // 解决方案一:复制数据而非返回指针
            // 解决方案二:调用者保证在使用指针期间持有 rcu_read_lock
            result = msp;
            break;
        }
    }
    rcu_read_unlock();
    // 返回 result 之后,result 指向的对象可能随时被 delete!
    // 这就是为什么原始设计让调用者管理锁:
    // mystruct_lookup 在整个"搜索+使用"期间持有读锁
    return result;   // ← 危险!除非调用者有额外保证
}

3.5 查找操作(对外接口,完整同步)

int mystruct_lookup(int a)
{
    struct mystruct* msp;
    int ret = -1;
    // ① 进入 RCU 读临界区
    rcu_read_lock();
    // ② 搜索(在临界区保护下,msp 的存在性有保证)
    msp = mystruct_search(a);
    if (msp)
        // ③ 在临界区内读取 msp->b
        // 如果在 rcu_read_unlock 之后再读,msp 可能已被 kfree_rcu 释放!
        ret = msp->b;
    // ④ 退出临界区(之后 msp 不再安全)
    rcu_read_unlock();
    // ret 是整数,已经从受保护的内存中拷贝出来,安全返回
    return ret;
}

关键设计原则:在临界区内完成所有对受保护指针的访问,拷贝出值而非带出指针
正确模式: r c u _ r e a d _ l o c k ⏟ 进入 → 读  m s p → b  到局部变量 ⏟ 在临界区内拷贝值 → r c u _ r e a d _ u n l o c k ⏟ 退出 → 使用局部变量 \text{正确模式}:\underbrace{rcu\_read\_lock}_{\text{进入}} \to \underbrace{读\ msp\text{→}b\ \text{到局部变量}}_{\text{在临界区内拷贝值}} \to \underbrace{rcu\_read\_unlock}_{\text{退出}} \to \text{使用局部变量} 正确模式进入 rcu_read_lock在临界区内拷贝值  mspb 到局部变量退出 rcu_read_unlock使用局部变量
错误模式: r c u _ r e a d _ l o c k → 获取 msp → r c u _ r e a d _ u n l o c k → 使用  m s p → b ⏟ UAF 风险! \text{错误模式}:rcu\_read\_lock \to \text{获取 msp} \to rcu\_read\_unlock \to \underbrace{使用\ msp\text{→}b}_{\text{UAF 风险!}} 错误模式rcu_read_lock获取 msprcu_read_unlockUAF 风险! 使用 mspb

四、RCU 作为准读写锁(Quasi Reader-Writer Lock)

RCU 可以被视为一种特殊的读写锁,在传统发布-订阅基础上增加了:

传统 publish/subscribe 模式:
  写者 release store → 读者 acquire/consume load
RCU 在此基础上增加的能力:
  1. 堆分配的链表结构(Heap-allocated linked structure)
     └─ 不限于单指针,支持复杂的链表/树等结构
        每个节点都通过 RCU 保护
  2. 延迟回收(Deferred reclamation)
     └─ list_del_rcu 摘除 + kfree_rcu 异步释放
        写者不等待,系统在宽限期后自动回收
  3. RCU 读者 = 持有读锁的读写锁
     └─ rcu_read_lock()  ≈ rwlock_rdlock()
        rcu_read_unlock() ≈ rwlock_rdunlock()
        但开销接近零,可以完美扩展
  4. 同时具备时间和空间同步
     └─ 时间:lock/unlock/synchronize(何时安全)
        空间:consume/release(哪个版本可见)

特性 传统 RWLock RCU 准读写锁
读者开销 O ( N ) O(N) O(N)(原子 RMW) ≈ 0 \approx 0 0
读者扩展性 退化 线性扩展
写者等待 等读者退出 等宽限期(异步可选)
支持链表 需要全链表锁 per-node RCU 保护
内存回收 立即 延迟(宽限期后)

RCU = 空间同步 ⏟ release/consume + 时间同步 ⏟ lock/unlock/GP + 延迟回收 ⏟ kfree_rcu/retire \boxed{ \text{RCU} = \underbrace{\text{空间同步}}_{\text{release/consume}} + \underbrace{\text{时间同步}}_{\text{lock/unlock/GP}} + \underbrace{\text{延迟回收}}_{\text{kfree\_rcu/retire}} } RCU=release/consume 空间同步+lock/unlock/GP 时间同步+kfree_rcu/retire 延迟回收

RCU 阶段性状态切换(Phased State Change)深度解析

一、问题背景

多线程应用的两种运行模式:
  Normal(正常):性能优先,快速执行,不需要格外小心
  Maintenance(维护):安全优先,需要特别谨慎地操作
  切换要求:
    ① 正常模式下 common-case 操作必须极快
    ② 维护模式下所有线程必须可靠地感知到"需要小心"
    ③ 维护结束后必须可靠地恢复到正常模式

三个阶段的状态机:

Normal          Maintenance         Normal
(快速)    →   (谨慎)        →   (快速)
  快速         过渡期:快/慢均可      快速
               do_maint() 执行中

核心挑战:如何可靠地同步这个状态标志be_carefulfalse 变为 true 时,必须保证所有正在执行的线程都看到这个变化,然后才能安全地执行维护操作。

二、代码实现

2.1 公共操作(Common-Case Operation)

// 全局状态标志:false = 正常模式,true = 维护模式
bool be_careful;
void cco(void)   // Common-Case Operation
{
    // ① 进入 RCU 读临界区
    // 作用:标记"本线程正在运行,且已读取了 be_careful 的某个值"
    // 维护操作的 synchronize_rcu() 需要等待本临界区结束
    rcu_read_lock();
    // ② 读取状态标志(空间同步)
    // READ_ONCE:防止编译器将多次读取优化为一次(缓存到寄存器)
    // 等价于 memory_order_relaxed load + 禁止编译器重排
    // 保证:每次进入临界区都重新从内存读取 be_careful 的当前值
    if (READ_ONCE(be_careful))
        // 维护模式:小心执行(可能有额外的锁/检查)
        cco_carefully();
    else
        // 正常模式:快速执行(零额外开销)
        cco_quickly();
    // ③ 退出 RCU 读临界区
    // 退出后:维护操作的 synchronize_rcu() 可以不再等待本线程
    rcu_read_unlock();
}

2.2 维护操作(Maintenance Operation)

void maint(void)
{
    // ══════════════════════════════════════════
    // 阶段一:进入维护模式
    // ══════════════════════════════════════════
    // ① 空间同步:发布"需要小心"的信号
    // WRITE_ONCE:防止编译器将写操作优化掉或重排
    // 此后新进入临界区的线程读到 be_careful=true,执行 cco_carefully()
    // 但此时可能还有线程已经读了 be_careful=false,正在执行 cco_quickly()
    WRITE_ONCE(be_careful, true);
    // ② 时间同步:等待所有"已读到 false"的线程退出
    // 等待条件:所有在 WRITE_ONCE 之前进入临界区的线程都退出
    // 宽限期结束后:保证没有任何线程在执行基于 false 的 cco_quickly()
    synchronize_rcu();
    // 宽限期结束 = 所有线程都已看到 be_careful=true(或不在临界区)
    // ══════════════════════════════════════════
    // 阶段二:执行维护操作
    // ══════════════════════════════════════════
    // 此时保证:所有活跃线程都在执行 cco_carefully() 或不在临界区
    // 可以安全地执行维护操作(修改共享数据结构等)
    do_maint();
    // ══════════════════════════════════════════
    // 阶段三:退出维护模式
    // ══════════════════════════════════════════
    // ③ 第二次时间同步(关键问题:为什么需要这一步?)
    // 等待所有正在执行 cco_carefully() 的线程退出
    // 宽限期结束后:保证没有线程还在使用"维护模式下的数据状态"
    synchronize_rcu();
    // ④ 空间同步:发布"恢复正常"的信号
    // 此后新进入临界区的线程读到 false,执行 cco_quickly()
    WRITE_ONCE(be_careful, false);
}

三、为什么需要两次 synchronize_rcu()

这是原文中的核心问题。

只用一次的错误尝试

错误版本:
  WRITE_ONCE(be_careful, true);
  synchronize_rcu();
  do_maint();
  WRITE_ONCE(be_careful, false);  // ← 没有第二次 synchronize_rcu
时序问题:
  线程 A                    维护线程
  ─────────────────────────────────────────────
  rcu_read_lock()
  READ_ONCE → true
    (进入 cco_carefully)
                            do_maint() 完成
                            WRITE_ONCE(be_careful, false)  ← 立即写!
    (cco_carefully 仍在执行中)
    (此时 be_careful 已经是 false)
    (但线程 A 基于 true 的状态还没结束)
  rcu_read_unlock()

后果do_maint() 之后立即写 false,此时可能还有线程基于 be_careful=true 的假设在执行 cco_carefully()。如果 cco_carefully() 依赖维护期间的某个状态(如临时锁、特殊数据结构),提前恢复 false 可能导致数据不一致。

两次 synchronize_rcu 的正确语义

第一次 synchronize_rcu:
  保证:所有读到 false 并执行 cco_quickly() 的线程都已退出
  效果:do_maint() 执行时,所有活跃的 cco 线程都在执行 cco_carefully()
第二次 synchronize_rcu:
  保证:所有读到 true 并执行 cco_carefully() 的线程都已退出
  效果:WRITE_ONCE(false) 写入后,没有线程还在"维护模式下的行为"中

W R I T E ( t r u e ) ⏟ 空间同步 → G P 1 d o _ m a i n t ( ) ⏟ 安全维护 → G P 2 W R I T E ( f a l s e ) ⏟ 空间同步 \underbrace{WRITE(true)}_{\text{空间同步}} \xrightarrow{GP_1} \underbrace{do\_maint()}_{\text{安全维护}} \xrightarrow{GP_2} \underbrace{WRITE(false)}_{\text{空间同步}} 空间同步 WRITE(true)GP1 安全维护 do_maint()GP2 空间同步 WRITE(false)
G P 1 :清除所有 "快速路径" 的执行 GP_1 \text{:清除所有 "快速路径" 的执行} GP1:清除所有 "快速路径的执行
G P 2 :清除所有 "小心路径" 的执行 GP_2 \text{:清除所有 "小心路径" 的执行} GP2:清除所有 "小心路径的执行

四、时空图详细解析

原文用多帧图展示了完整的时序,核心是一个二维坐标系:

纵轴(Time):时间流逝 ↓
横轴(Value Space):be_careful 的值域(false ↔ true)

完整时序

时间
  │
  │  读者(早期)              维护线程
  │  ──────────────────────────────────────────────────
  │  rcu_read_lock()
  │  READ_ONCE → false         WRITE_ONCE(be_careful, true)
  │  cco_quickly() 执行中      │
  │  rcu_read_unlock()         │
  │                            synchronize_rcu()  ← GP1 开始
  │  ┌──────────────────── GP1(宽限期1)────────────────┐
  │  │ 等待所有读到 false 的线程退出                      │
  │  └────────────────────────────────────────────────┘
  │
  │  读者(中期)              do_maint() 执行中
  │  rcu_read_lock()
  │  READ_ONCE → true
  │  cco_carefully() 执行中
  │                            synchronize_rcu()  ← GP2 开始
  │  ┌──────────────────── GP2(宽限期2)────────────────┐
  │  │ 等待所有读到 true 的线程退出                       │
  │  rcu_read_unlock()
  │  └────────────────────────────────────────────────┘
  │                            WRITE_ONCE(be_careful, false)
  │
  │  读者(晚期)
  │  rcu_read_lock()
  │  READ_ONCE → false
  │  cco_quickly() 执行
  │  rcu_read_unlock()
  ↓

值空间(Value Space)的变化

be_careful 的值:
false ──────────────────┐
                        │ WRITE_ONCE(true)
true                    └────────────────────────┐
                                                 │ WRITE_ONCE(false)
false                                            └─────────────────────
         GP1 边界         do_maint()          GP2 边界
         (第一个宽限期)                      (第二个宽限期)

过渡期的语义

GP1 期间:
  be_careful = true(空间上)
  但部分线程仍在执行 cco_quickly()(时间上还没退出)
  这是合法的!这些线程在 WRITE_ONCE(true) 之前就读了 false
  它们的行为基于旧状态,do_maint() 必须等它们全部退出
GP2 期间:
  do_maint() 完成,be_careful 仍为 true
  部分线程在执行 cco_carefully()
  WRITE_ONCE(false) 必须等它们全部退出才能安全

五、维护结束同步的替代方案

原文问题:“Alternatives for maintenance-end synchronization?”

方案一:两次 synchronize_rcu(原始方案)

synchronize_rcu();      // GP2:等所有 carefully 线程退出
WRITE_ONCE(be_careful, false);

优点:简单直观。缺点:阻塞维护线程,增加维护时间。

方案二:call_rcu 异步回调

// do_maint() 完成后,异步注册回调
struct rcu_head rh;
call_rcu(&rh, [](struct rcu_head*) {
    // GP2 结束后自动执行
    WRITE_ONCE(be_careful, false);
});
// 维护线程立即返回,不阻塞

方案三:等待引用计数归零

// 如果 cco_carefully() 内部有计数器
atomic_t carefully_count = ATOMIC_INIT(0);
// cco_carefully 入口:
atomic_inc(&carefully_count);
// ... 执行 ...
atomic_dec(&carefully_count);
// 维护结束:
wait_event(wq, atomic_read(&carefully_count) == 0);
WRITE_ONCE(be_careful, false);

缺点:引入了新的共享热点变量,退化为类似 rwlock 的问题。

方案四:利用 be_careful=false 本身的内存序

// 如果 cco_carefully 对 be_careful 有依赖(数据依赖链),
// 则 WRITE_ONCE(false) 配合第一次 synchronize_rcu 就足够:
synchronize_rcu();   // 等 GP1:所有快速路径退出
do_maint();
WRITE_ONCE(be_careful, false);
synchronize_rcu();   // 等 GP2(但此 GP2 可以更短)

最优选择取决于具体场景:

方案 维护线程是否阻塞 额外开销 复杂度
两次 synchronize_rcu 阻塞
call_rcu 异步 不阻塞
引用计数 阻塞 高(Cache 竞争)

六、RCU 阶段性状态切换的本质

RCU 为"等待完成(wait-to-finish)"模式增加了:
  checked state variable(带检查的状态变量)
传统 wait-to-finish:
  lock → 修改 → unlock → 等所有读者退出
RCU 阶段性状态切换:
  WRITE(flag) → synchronize → do_work → synchronize → WRITE(flag)
  ↑ 空间同步       ↑ 时间同步              ↑ 时间同步   ↑ 空间同步

完整协议 = W R I T E ( t r u e ) ⏟ 空间 → G P 1 d o _ m a i n t ( ) ⏟ 安全窗口 → G P 2 W R I T E ( f a l s e ) ⏟ 空间 \text{完整协议} = \underbrace{WRITE(true)}_{\text{空间}} \xrightarrow{GP_1} \underbrace{do\_maint()}_{\text{安全窗口}} \xrightarrow{GP_2} \underbrace{WRITE(false)}_{\text{空间}} 完整协议=空间 WRITE(true)GP1 安全窗口 do_maint()GP2 空间 WRITE(false)
两个宽限期分别清除两个"旧状态窗口": G P 1  清除所有基于 false 的执行 G P 2  清除所有基于 true 的执行 两者之间是唯一安全的维护窗口 \boxed{ \text{两个宽限期分别清除两个"旧状态窗口":}\\ GP_1 \text{ 清除所有基于 false 的执行}\\ GP_2 \text{ 清除所有基于 true 的执行}\\ \text{两者之间是唯一安全的维护窗口} } 两个宽限期分别清除两个"旧状态窗口"GP1 清除所有基于 false 的执行GP2 清除所有基于 true 的执行两者之间是唯一安全的维护窗口

Linux 内核 RCU 使用实践与调试

一、Linux 内核中 RCU 保护的核心数据结构

task_struct(进程描述符,内核最核心的结构体)
  ├─ parent     父进程指针
  │              父进程可能随时退出,RCU 保护防止悬挂指针
  ├─ cred       进程凭证(uid/gid/capabilities)
  │              权限检查是热路径,RCU 保证零开销读取
  ├─ sighand    信号处理器表
  │              信号处理注册/注销时需要 RCU 保护
  └─ cgroups    控制组归属
               cgroup 迁移时 RCU 保护读者
cgroup(控制组)
  └─ subsys[]   各子系统指针数组
               子系统挂载/卸载时动态变化,RCU 保护遍历
fdtable(文件描述符表)
  └─ fd[]       文件指针数组
               close/dup 操作并发时 RCU 保护
files_struct(文件表)
  ├─ fdt        指向 fdtable 的指针(可能被替换为更大的表)
  └─ fd_array   内嵌的小型 fd 数组(避免小进程频繁分配)
inode / super_block(VFS 层)
  └─ [is]_fsnotify_marks   文件系统通知标记链表
                           inotify/fanotify 监视器的注册/注销

共同特点:这些数据结构都是读多写少(进程频繁读取自己的 cred,但很少修改),非常适合 RCU。

二、rcu_sync:RCU 与轻量级读写信号量的结合

// include/linux/rcu_sync.h 提供的接口
struct rcu_sync {
    int             gp_state;   // 当前宽限期状态
    int             gp_count;   // 活跃的 enter 调用计数
    wait_queue_head_t gp_wait;  // 等待宽限期完成的队列
    struct rcu_head cb_head;    // 异步回调头
};
// 初始化
void rcu_sync_init(struct rcu_sync* rsp);
// 进入"快速路径不可用"状态(开始有写者)
void rcu_sync_enter_start(struct rcu_sync* rsp);  // 无需等待
void rcu_sync_enter(struct rcu_sync* rsp);         // 等待宽限期
// 退出"快速路径不可用"状态(写者离开)
void rcu_sync_exit(struct rcu_sync* rsp);

核心应用:per-CPU 读写信号量(percpu_rw_semaphore)

// 设计思想:无写者时,读者走 RCU 快速路径(零开销)
//           有写者时,读者走传统信号量慢速路径
struct percpu_rw_semaphore {
    struct rcu_sync     rss;        // RCU 同步状态
    unsigned int __percpu *read_count; // per-CPU 读者计数
    struct rcuwait      writer;     // 写者等待队列
    wait_queue_head_t   waiters;    // 读者等待队列
    atomic_t            block;      // 阻塞标志
};
// 读者加锁:
void percpu_down_read(struct percpu_rw_semaphore* sem) {
    rcu_read_lock();
    if (likely(rcu_sync_is_idle(&sem->rss))) {
        // 快速路径:无写者,只需 RCU 临界区(~0ns)
        // per-CPU 计数器递增(无竞争)
        this_cpu_inc(*sem->read_count);
        rcu_read_unlock();
    } else {
        // 慢速路径:有写者,走传统信号量
        rcu_read_unlock();
        down_read_slowpath(sem);
    }
}

无写者时读者开销 ≈ 0 有写者时退化为传统信号量 \text{无写者时读者开销} \approx 0 \qquad \text{有写者时退化为传统信号量} 无写者时读者开销0有写者时退化为传统信号量

三、Linux v5.15 中 RCU 的使用规模

按子系统统计


子系统 RCU API 调用次数 代码行数 使用密度(次/KLoc)
ipc 89 9,649 9.22(最高)
virt 65 8,465 7.68
net 7,411 1,192,884 6.21
security 595 104,460 5.70
kernel 1,689 406,868 4.15
mm 319 160,825 1.98
drivers 5,328 19,275,468 0.28(最低)
总计 18,710 27,557,130 0.68

解读

ipc(进程间通信)密度最高(9.22/KLoc):
  消息队列、共享内存、信号量的元数据频繁被多进程读取
  经典的"读多写少"场景
net(网络子系统)绝对数量最多(7,411次):
  路由表、邻居表、网络设备列表
  数据包转发是极热路径,RCU 是唯一可接受的同步方式
drivers 密度最低(0.28/KLoc):
  驱动代码多为设备专属,并发度不高
  但绝对数量(5,328)仍然可观

总 RCU 调用 = 18,710 跨越 27.5  百万行代码 \text{总 RCU 调用} = 18{,}710 \quad \text{跨越} \quad 27.5 \text{ 百万行代码}  RCU 调用=18,710跨越27.5 百万行代码

四、调试 Linux 内核 RCU 代码

4.1 三层调试工具体系

第一层:Assertions(断言)── 编译期/运行期静态检查
  lockdep    检测锁的使用违规(忘记加锁/重复加锁等)
  sparse     静态分析工具,检查 __rcu 标注的指针使用
  KCSAN      内核并发安全分析器,动态检测数据竞争
第二层:用户态 RCU 原型
  用 liburcu 在用户态复现内核逻辑
  可以使用 gdb / valgrind / AddressSanitizer
  比内核调试快 100 倍
第三层:RCU CPU stall warnings
  运行时检测"读者占用临界区过长"
  说明存在阻塞/死循环/禁中断时间过长等问题

4.2 lockdep 断言

// ── 断言"当前代码在 RCU 读临界区内"────────────────────
// 检查是否持有普通 RCU 读锁
WARN_ON(!rcu_read_lock_held());
// 检查是否持有 BH(Bottom Half)RCU 读锁
WARN_ON(!rcu_read_lock_bh_held());
// 检查是否持有 sched RCU 读锁(禁止抢占)
WARN_ON(!rcu_read_lock_sched_held());
// 检查以上任意一种
WARN_ON(!rcu_read_lock_any_held());
// ── 典型使用场景 ──────────────────────────────────────
// 声明某个函数必须在 RCU 读临界区或持有指定锁的情况下调用
struct mystruct *mystruct_search(int a)
{
    // 调用者必须满足:持有 RCU 读锁 OR 持有 mylock
    // rcu_dereference_check 在不满足条件时触发 WARN
    struct mystruct *msp;
    list_for_each_entry_rcu(msp, &mylist, list,
        lockdep_is_held(&mylock))  // 额外允许持有 mylock
        if (msp->a == a)
            return msp;
    return NULL;
}
// rcu_dereference_check 系列:
// 在满足条件时读取 RCU 指针(否则 WARN)
struct myconfig *mcp;
// 普通 RCU:要求在 rcu_read_lock() 内
mcp = rcu_dereference(curconfig);
// 带额外条件:允许持有某个锁(写者也可调用)
mcp = rcu_dereference_check(curconfig,
    lockdep_is_held(&mylock));
// BH 版本(用于软中断上下文)
mcp = rcu_dereference_bh(curconfig);
// sched 版本(用于禁抢占上下文)
mcp = rcu_dereference_sched(curconfig);

4.3 sparse 与 KCSAN 断言

// ── sparse:静态类型检查 ──────────────────────────────
// 用 __rcu 标注受 RCU 保护的指针
struct myconfig __rcu *curconfig;
void reader(void) {
    struct myconfig *mcp;
    // 正确:使用 rcu_dereference(sparse 检查通过)
    mcp = rcu_dereference(curconfig);
    // 错误:直接访问 __rcu 指针(sparse 警告)
    // mcp = curconfig;  // ← sparse: warning: incorrect access
}
void writer(void) {
    struct myconfig *new_cfg = alloc_and_init();
    // 正确:使用 rcu_assign_pointer(sparse 检查通过)
    rcu_assign_pointer(curconfig, new_cfg);
    // 错误:直接赋值(sparse 警告)
    // curconfig = new_cfg;  // ← sparse: warning
}
// ── KCSAN:动态数据竞争检测 ───────────────────────────
// 断言某个内存位置只有一个写者
void update_flags(void) {
    // 如果有其他线程同时写 flags,KCSAN 报告数据竞争
    ASSERT_EXCLUSIVE_WRITER(flags);
    flags |= MY_FLAG;
}
// 断言某个内存位置只有一个访问者(读或写)
void exclusive_access(void) {
    ASSERT_EXCLUSIVE_ACCESS(my_var);
    // ... 独占访问 my_var
}
// 断言某个内存位置的特定位只被一个写者修改
void update_bits(void) {
    // 只检查低 8 位是否有并发写
    ASSERT_EXCLUSIVE_BITS(bitmap, 0xFF);
    bitmap &= ~0xFF;
    bitmap |= new_low_byte;
}
// SCOPED 版本:在当前作用域内持续断言
void scoped_writer(void) {
    ASSERT_EXCLUSIVE_WRITER_SCOPED(shared_var);
    // 整个作用域内断言只有一个写者
    shared_var = compute_value();
    maybe_modify(shared_var);
}   // 作用域结束,断言解除

4.4 RCU CPU Stall Warning

正常情况(无 Stall)
CPU 0                      CPU 1
────────────────────────────────────────────────
rcu_read_lock()
  ... 快速执行 ...          rcu_read_lock()
rcu_read_unlock()             ... 执行 ...
                           rcu_read_unlock()
Idle(空闲)               Userspace(用户态)
         ↓↓↓ Grace Period 正常完成 ↓↓↓
RCU Time: Grace period done ✓
Stall 情况(超过 21 秒)
CPU 0                           CPU 1
──────────────────────────────────────────────────
rcu_read_lock()
  禁用中断!                    rcu_read_lock()
  ... 长时间执行(>21s)...       ... 执行 ...
  (无法被抢占,无法退出临界区)  rcu_read_unlock()
                                Userspace 执行
         ↓↓↓ 21 秒后 ↓↓↓
RCU CPU Stall Warning!
  内核打印:
  INFO: rcu_sched detected stall on CPU 0
  (通常说明 CPU 0 外部有问题:死循环、禁中断过长等)

Stall Warning 的根本含义
Stall = 某个 CPU 在 RCU 读临界区内停留超过阈值时间 \text{Stall} = \text{某个 CPU 在 RCU 读临界区内停留超过阈值时间} Stall=某个 CPU  RCU 读临界区内停留超过阈值时间
默认阈值 = 21  秒(CONFIG_RCU_CPU_STALL_TIMEOUT) \text{默认阈值} = 21 \text{ 秒}(\text{CONFIG\_RCU\_CPU\_STALL\_TIMEOUT}) 默认阈值=21 CONFIG_RCU_CPU_STALL_TIMEOUT
Stall 几乎总是外部问题,不是 RCU 本身的 bug:

常见原因:
  ① 长时间禁用中断(local_irq_disable 后死循环)
  ② 长时间禁用抢占(preempt_disable 后阻塞)
  ③ 内核线程死锁(持有锁后等另一个锁)
  ④ 硬件故障(CPU 停止响应)
  ⑤ 虚拟机暂停(VM 被 hypervisor 暂停)
Stall Warning 的配置
# 方式一:启动参数禁用警告
# 适合:生产环境中控制台输出被忽略的场景
bootparam: rcupdate.rcu_cpu_stall_suppress=1
# 方式二:启动参数调整超时时间(秒)
# 适合:慢速嵌入式系统(增大)或压力测试(减小)
bootparam: rcupdate.rcu_cpu_stall_timeout=60
# 方式三:编译期配置
CONFIG_RCU_CPU_STALL_TIMEOUT=60   # 范围:3~300 秒
# 方式四:运行时调整(sysctl)
echo 60 > /sys/module/rcupdate/parameters/rcu_cpu_stall_timeout

何时应该保留 Stall Warning

开发/测试阶段:必须保留!
  Stall warning 是发现真实 bug 的重要手段
  发现过的真实 bug 类型:
    - 驱动在 RCU 临界区内睡眠
    - 中断处理函数持有 RCU 读锁后死循环
    - NUMA 节点间的死锁
生产环境:
  ├─ 响应时间敏感(实时系统)→ 保留,Stall 本身就是问题
  ├─ 控制台无人监控(嵌入式)→ 可以禁用(suppress=1)
  └─ 云计算批处理(吞吐量优先)→ 可以增大超时时间

五、整体总结

Linux 内核 RCU 的地位:
  使用量:18,710 次 API 调用,遍布所有核心子系统
  历史:从 2.5.43(2002年)开始,持续演进超过 20 年
  覆盖:网络、VFS、进程管理、内存管理、安全……
调试工具体系(三层):
  静态:sparse(__rcu 标注)+ lockdep(锁使用断言)
  动态:KCSAN(运行时数据竞争)+ stall warning(活锁检测)
  原型:用户态 liburcu + 真正的调试器(gdb/valgrind)

RCU 在 Linux 内核中的核心价值: 以接近零的读者开销,换取整个内核热路径的线性扩展 从双核到 1024 核,读者性能几乎不变 \boxed{ \text{RCU 在 Linux 内核中的核心价值:}\\ \text{以接近零的读者开销,换取整个内核热路径的线性扩展}\\ \text{从双核到 1024 核,读者性能几乎不变} } RCU  Linux 内核中的核心价值:以接近零的读者开销,换取整个内核热路径的线性扩展从双核到 1024 核,读者性能几乎不变

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐