一、目标:做一个“可用”的最小任务系统

我们要实现一个简化版任务系统,支持:

  • ✅ 提交任务(任意函数)

  • ✅ 工作线程自动执行

  • ✅ 无任务时阻塞等待

  • ✅ 支持安全关闭

  • ✅ 正确释放线程

你可以理解为:

线程池的前身。

二、设计思路(系统取向)

我们需要三类能力:


1️⃣ 资源保护(队列)

  • 任务队列
  • 多线程访问
  • 必须用 mutex 保护

2️⃣ 线程协作(等待任务)

  • 队列空 → 线程休眠
  • 新任务 → 唤醒线程
  • 使用 condition_variable

3️⃣ 状态控制(停止系统)

  • 停止标志
  • 使用 atomic<bool>

三、整体结构图

主线程
   │
   ├── submit() → push 任务 → notify
   │
   ▼
工作线程
   │
   ├── wait 等待任务
   ├── 取任务
   ├── unlock
   ├── 执行
   └── 循环

四、完整实现(可直接运行)

1️⃣ 头文件

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <functional>

2️⃣ 任务系统类

class TaskSystem {
public:
    TaskSystem(size_t threadCount)
        : running_(true)
    {
        for (size_t i = 0; i < threadCount; ++i) {
            workers_.emplace_back([this] {
                this->workerLoop();
            });
        }
    }

    ~TaskSystem() {
        shutdown();
    }

    // 提交任务
    void submit(std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(mtx_);
            tasks_.push(task);
        }
        cv_.notify_one();
    }

    // 关闭系统
    void shutdown() {
        running_.store(false);
        cv_.notify_all();

        for (auto& t : workers_) {
            if (t.joinable()) {
                t.join();
            }
        }
    }

private:
    void workerLoop() {
        while (true) {
            std::function<void()> task;

            {
                std::unique_lock<std::mutex> lock(mtx_);

                cv_.wait(lock, [this] {
                    return !tasks_.empty() || !running_.load();
                });

                if (!running_.load() && tasks_.empty()) {
                    return;
                }

                task = tasks_.front();
                tasks_.pop();
            }

            task();  // 在锁外执行任务
        }
    }

private:
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mtx_;
    std::condition_variable cv_;
    std::atomic<bool> running_;
};

五、测试代码

int main() {
    TaskSystem pool(3);

    for (int i = 0; i < 10; ++i) {
        pool.submit([i] {
            std::cout << "Task " << i
                      << " running in thread "
                      << std::this_thread::get_id()
                      << std::endl;
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    pool.shutdown();

    return 0;
}

输出类似:

Task 0 running in thread 1234
Task 1 running in thread 5678
...

六、关键点逐条拆解(非常重要)


1️⃣ 为什么 running_ 用 atomic?

因为:

  • 多线程读写
  • 只控制“状态”
  • 不涉及复杂资源

符合 atomic 使用场景。

2️⃣ 为什么任务执行必须在锁外?

如果这样写:

task();

放在锁内:

  • 所有线程会被阻塞
  • 并发能力消失

原则:

锁保护资源,不保护业务逻辑

3️⃣ 为什么 wait 要带 predicate?

cv_.wait(lock, condition);

防止:

  • 假唤醒
  • 丢通知

4️⃣ 为什么 shutdown 要 notify_all?

因为:

  • 可能多个线程在等待
  • 必须全部唤醒退出

七、Java 类比

Java 线程池:

ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> {});
pool.shutdown();

Java 帮你封装了:

  • 任务队列
  • 条件变量
  • 状态控制

C++:

你自己写。

这就是 C++ 并发“系统级能力”的核心差异。

八、系统层面你已经掌握了什么?

现在你已经具备:

✔ 线程模型理解

谁共享数据?

✔ 资源保护

mutex + RAII

✔ 线程协作

condition_variable

✔ 状态控制

atomic

✔ 生命周期管理

join + shutdown

九、工程升级方向(后续可以做)

  • 支持有界队列
  • 支持优先级任务
  • 支持 future
  • 支持返回值
  • 支持任务取消

十、本篇总结口诀

队列用 mutex
等待用 cv
状态用 atomic
生命周期用 join

十一、下一篇(最终篇)

第七篇我们讲:

并发排障与工程纪律

  • 死锁四条件
  • 锁顺序策略
  • 竞态如何定位
  • 性能如何优化
  • 锁粒度怎么拆
  • 工程规范 checklist

这才是“系统取向”的终章。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐