内容来自:程序员老廖

本项目围绕一个"任务队列 TaskQueue"展开,核心是用清晰、可讲解的代码实现一个贴近真实业务的异步任务调度系统,支持:

  • 多线程任务队列 TaskQueue

  • 多命名队列管理 TaskQueueManager

  • 即时任务 + 延迟任务调度

不引入花哨语法、只用常见 C++11/14/17 特性的前提下,我们重点实现:

  1. 周期任务(PeriodicTask):类似心跳 / 定时清理

  2. 延迟重试任务(RetryTask + 退避策略)

  3. 任务取消(Cancelable Task)

  4. 任务优先级(高/中/低)

  5. 队列容量控制 & 拒绝策略(背压)

  6. 线程池模式:一个队列多线程消费

  7. 异常保护 & 监控指标(统计信息)

系统也会涉及到常用 C++11/14/17 语法

视频教程与源码领取:C++进阶项目:工业级多任务队列设计与实现

1. 系统整体架构

1.1 核心组件关系图

1.2 单个任务队列内部结构图

2. 关键竞态条件分析与修复

2.1 竞态条件:Lost Wakeup(唤醒丢失)

这是任务队列中最关键的并发问题,会导致任务延迟执行甚至挂起。

2.1.1 问题描述

在多线程环境下,工作线程和提交线程之间存在一个危险的时间窗口:

2.1.2 错误代码示例

// ❌ 错误的实现
void TaskQueueSTD::postTask(std::unique_ptr<QueuedTask> task, TaskPriority prio) {
    {
        std::unique_lock<std::mutex> lock(pending_mutex_);
        // 添加任务到队列
        pending_normal_.push(std::move(entry));
    }  // 锁在这里释放
​
    notifyWake();  // ❌ 在锁外调用,存在时间窗口!
}
​
void TaskQueueSTD::processTasks() {
    while (true) {
        auto task = getNextTask();  // 步骤1:检查队列(持有锁)
                                     // 步骤2:释放锁
        // ⚠️ 危险窗口:此时其他线程可能添加任务并唤醒
​
        if (!task.run_task_) {
            flag_notify_.wait(...);  // 步骤3:开始等待(可能错过唤醒)
        }
    }
}

2.1.3 修复方案

核心原则:在释放锁后立即调用 notifyWake(),最小化时间窗口。

// ✅ 正确的实现
void TaskQueueSTD::postTask(std::unique_ptr<QueuedTask> task, TaskPriority prio) {
    std::unique_lock<std::mutex> lock(pending_mutex_);
    // 添加任务到队列
    pending_normal_.push(std::move(entry));
​
    lock.unlock();  // ✅ 显式释放锁
    notifyWake();   // ✅ 立即通知,最小化时间窗口
}

2.1.4 为什么显式 unlock 很重要?

// 对比两种写法的时序差异
​
// ❌ 方式1:作用域结束自动释放锁
{
    std::unique_lock<std::mutex> lock(pending_mutex_);
    pending_normal_.push(std::move(entry));
}  // 锁释放
   // 可能执行其他清理代码
   // 可能发生线程切换
notifyWake();  // 延迟较大
​
// ✅ 方式2:显式unlock + 立即notify
std::unique_lock<std::mutex> lock(pending_mutex_);
pending_normal_.push(std::move(entry));
lock.unlock();  // 精确控制释放时机
notifyWake();   // 紧接着通知,时间窗口最小

2.2 延迟任务时间精度问题

2.2.1 问题描述

当延迟任务的剩余时间小于 1ms 时,std::chrono::duration_cast 会将其截断为 0,导致工作线程无法区分:

  • 情况 1:没有任何任务(应该无限等待)

  • 情况 2:有延迟任务即将到期(应该立即重新检查)

// 获取下一个任务
auto diff = std::chrono::duration_cast<Millis>(delay_info.next_fire_at_ - tick);
result.sleep_time_ms_ = diff.count();  // 可能被截断为 0
​
// 工作线程处理
if (task.sleep_time_ms_ == 0) {
    // ❌ 问题:无法区分"没任务"还是"即将到期"
    flag_notify_.wait(vi::Event::kForever);  // 可能导致任务延迟
}

2.2.2 修复方案

重新检查延迟队列来区分两种情况:

// ✅ 正确的处理逻辑
if (task.sleep_time_ms_ > 0) {
    // 情况1:有明确的等待时间
    flag_notify_.wait(static_cast<int>(task.sleep_time_ms_));
​
} else if (task.sleep_time_ms_ == 0) {
    // 情况2:时间为0,需要区分原因
    std::unique_lock<std::mutex> lock(pending_mutex_);
    bool has_delayed = !delayed_queue_.empty();
    lock.unlock();
​
    if (has_delayed) {
        // 有延迟任务即将到期,立即循环重新检查
        // 不等待,直接进入下一次循环
    } else {
        // 没有任何任务,无限等待
        flag_notify_.wait(vi::Event::kForever);
    }
​
} else {  // sleep_time_ms_ < 0
    // 情况3:任务已过期,立即处理
    // 不等待,直接进入下一次循环
}

2.3 完整的任务提交与执行流程

3. 核心功能实现

3.1 周期任务(PeriodicTask)

3.1.1 设计思路

周期任务通过自我重新投递实现循环执行:

template <typename Closure>
class PeriodicTask : public QueuedTask {
private:
    bool run() override {
        // 1. 执行业务逻辑
        closure_();
​
        // 2. 重新投递自己
        TaskQueueBase* current = TaskQueueBase::current();
        if (current) {
            current->postDelayedTask(
                std::unique_ptr<QueuedTask>(this),
                interval_ms_);
            // 3. 返回 false 表示所有权已转移
            return false;
        }
​
        return true;
    }
​
    typename std::decay<Closure>::type closure_;
    uint32_t interval_ms_{};
};

3.1.2 执行流程

3.2 重试任务(RetryTask)

3.2.1 退避策略

支持三种退避策略:

struct RetryStrategy {
    enum class Type {
        Fixed,      // 固定间隔:base, base, base...
        Linear,     // 线性退避:base, 2*base, 3*base...
        Exponential // 指数退避:base, 2*base, 4*base, 8*base...
    };
​
    Type     type{Type::Fixed};
    uint32_t base_delay_ms{1000};
};

3.3 任务优先级调度

3.3.1 三级优先级队列

// 内部实现
std::queue<PendingEntry> pending_high_;    // 高优先级
std::queue<PendingEntry> pending_normal_;  // 普通优先级
std::queue<PendingEntry> pending_low_;     // 低优先级

3.3.2 优先级调度逻辑

关键设计点:

  • 高优先级任务优先执行

  • 延迟任务到期后,根据 order ID 与即时任务比较

  • 保证 FIFO 语义的全局一致性

3.4 任务取消机制

3.4.1 取消流程

4. 线程池模式

4.1 单线程 vs 多线程队列

// 单线程队列(默认)
auto queue1 = TaskQueue::create("worker1");  // 1个工作线程
​
// 线程池模式
auto queue_pool = std::make_unique<TaskQueue>(
    std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueSTD("pool", 4)  // 4个工作线程
    )
);

4.2 多线程竞争模型

关键点:

  • 所有工作线程共享同一个任务队列

  • 通过互斥锁保证线程安全

  • Event 的 set() 会唤醒所有等待线程(只有一个能获取锁)

5. 队列容量控制与背压

5.1 容量配置

struct CapacityConfig {
    std::size_t max_pending{0};   // 即时任务队列容量(0=不限制)
    std::size_t max_delayed{0};   // 延迟任务队列容量(0=不限制)
    std::function<void(std::unique_ptr<QueuedTask>)> on_reject;  // 拒绝回调
};
​
// 使用示例
TQ("worker1")->configureCapacity({
    .max_pending = 1000,
    .max_delayed = 500,
    .on_reject = [](std::unique_ptr<QueuedTask> task) {
        std::cerr << "任务被拒绝:队列已满" << std::endl;
    }
});

5.2 背压处理流程

6. 异常保护与监控

6.1 任务异常保护

// 执行任务时的异常捕获
QueuedTask* release_ptr = task.run_task_.release();
try {
    if (release_ptr->run()) {
        delete release_ptr;
    }
} catch (const std::exception& e) {
    std::cerr << "[TaskQueueSTD:" << name_ << "] std::exception: "
              << e.what() << std::endl;
    delete release_ptr;
} catch (...) {
    std::cerr << "[TaskQueueSTD:" << name_ << "] unknown exception" << std::endl;
    delete release_ptr;
}

6.2 队列统计信息

struct QueueStats {
    std::uint64_t executed_task_count{0};  // 已执行任务总数
    std::size_t   pending_task_count{0};   // 待执行即时任务数
    std::size_t   delayed_task_count{0};   // 待执行延迟任务数
};
​
// 使用示例
auto stats = TQ("worker1")->stats();
std::cout << "执行=" << stats.executed_task_count
          << ", 待处理=" << stats.pending_task_count
          << ", 延迟=" << stats.delayed_task_count << std::endl;

7. 关键设计模式

7.1 RAII 资源管理

// CurrentTaskQueueSetter 使用 RAII 模式
class CurrentTaskQueueSetter {
public:
    explicit CurrentTaskQueueSetter(TaskQueueBase* taskQueue)
        : _previous(_current) {
        _current = taskQueue;  // 构造时设置
    }
​
    ~CurrentTaskQueueSetter() {
        _current = _previous;  // 析构时恢复
    }
​
private:
    TaskQueueBase* const _previous;
};
​
// 使用
void processTasks() {
    CurrentTaskQueueSetter setCurrent(this);  // 自动管理 thread_local
    // 任务执行期间,current() 返回正确的队列指针
}

7.2 智能指针与所有权转移

// 任务所有权流转
Client                  TaskQueue               QueuedTask
  |                        |                        |
  |--postTask(task)------->|                        |
  |  (move ownership)      |                        |
  |                        |--store in queue------->|
  |                        |                        |
  |                   Worker Thread                 |
  |                        |                        |
  |                        |<--getNextTask()--------|
  |                        |                        |
  |                        |--run()---------------->|
  |                        |                        |
  |                        |  return true: delete   |
  |                        |  return false: requeue |

8. C++11/14/17 语法精讲

8.1 完美转发 std::forward

template <typename Closure>
class ClosureTask : public QueuedTask {
public:
    explicit ClosureTask(Closure&& closure)
        : closure_(std::forward<Closure>(closure)) {}
​
private:
    typename std::decay<Closure>::type closure_;
};

为什么使用 std::decay?

  • 移除引用:int& → int

  • 移除 cv 限定符:const int → int

  • 确保存储的是值类型,避免悬空引用

8.2 std::chrono 时间处理

using Clock      = std::chrono::steady_clock;
using TimePoint  = Clock::time_point;
using Millis     = std::chrono::milliseconds;
​
// 计算延迟任务触发时间
auto fire_at = Clock::now() + Millis(delay_ms);
​
// 计算剩余时间
auto diff = std::chrono::duration_cast<Millis>(fire_at - now());
int64_t remaining_ms = diff.count();

8.3 thread_local 线程局部存储

namespace {
    thread_local TaskQueueBase* _current = nullptr;
}
​
// 每个线程有独立的 _current 副本
TaskQueueBase* TaskQueueBase::current() {
    return _current;
}

8.4 std::enable_if SFINAE

// 只接受非 unique_ptr<QueuedTask> 类型的闭包
template <class Closure,
          typename std::enable_if<
              !std::is_convertible<Closure, std::unique_ptr<QueuedTask>>::value
          >::type* = nullptr>
void postTask(Closure&& closure) {
    postTask(ToQueuedTask(std::forward<Closure>(closure)));
}

9. 实战应用场景

9.1 心跳上报

TQ("heartbeat")->postPeriodicTask([](){
    // 每30秒上报一次心跳
    sendHeartbeat();
}, 30000);

9.2 订单超时取消

auto order_id = createOrder();
auto task_id = TQ("order")->postCancellableDelayedTask([order_id](){
    // 30分钟后自动取消订单
    cancelOrder(order_id);
}, 30 * 60 * 1000);
​
// 用户支付成功,取消定时任务
if (paymentSuccess) {
    TQ("order")->cancelTask(task_id);
}

9.3 网络请求重试

TQ("network")->postRetryTask([]() -> bool {
    return sendRequest();  // 返回 true 表示成功
}, 3, {  // 最多重试3次
    .type = TaskQueue::RetryStrategy::Type::Exponential,
    .base_delay_ms = 1000  // 1s, 2s, 4s 指数退避
});

10. 性能优化建议

10.1 减少锁竞争

  • ✅ 在持锁期间只做必要操作

  • ✅ 任务执行在锁外进行

  • ✅ 显式 unlock() 后立即 notifyWake()

10.2 避免频繁内存分配

  • ✅ 使用 std::queue 避免随机访问开销

  • ✅ 延迟队列使用 std::map 自动排序

  • ✅ 任务对象使用 unique_ptr 管理

10.3 线程池大小选择

// CPU 密集型任务
size_t thread_count = std::thread::hardware_concurrency();
​
// IO 密集型任务
size_t thread_count = std::thread::hardware_concurrency() * 2;
​
auto queue = std::make_unique<TaskQueue>(
    std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
        new TaskQueueSTD("worker", thread_count)
    )
);

11. 总结

本任务队列系统通过清晰的设计和严谨的实现,展示了:

  1. ✅ 并发安全设计:正确处理竞态条件(Lost Wakeup)

  2. ✅ 时间精度处理:延迟任务时间截断问题的解决

  3. ✅ 功能完整性:周期、重试、取消、优先级、容量控制

  4. ✅ 异常安全性:任务异常不影响工作线程

  5. ✅ 可观测性:统计信息支持监控

  6. ✅ 现代 C++:智能指针、完美转发、chrono、thread_local 等

这是一个生产级别的任务队列实现,适合用于:

  • 教学和学习现代 C++ 并发编程

  • 理解任务调度系统的核心原理

  • 作为实际项目的基础组件

更多资源:原代码:https://github.com/ouxianghui/task-queue.git

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐