Qt多线程编程详解(下)
学习目标:从基础到进阶,系统掌握Qt多线程编程的核心技术与最佳实践
📖 简介
本文档是一份全面系统的Qt多线程编程教程,内容涵盖从基础理论到进阶主题,从设计原则到实战项目,本篇主要讲解5-7章内容,1-4章内容请看上篇。
💡 特色亮点:
- 理论与实践结合:每个知识点配合详细的代码示例
- 循序渐进:从简单到复杂,逐步深入
- 实战导向:包含3个完整的实战项目
- 最佳实践:总结多线程开发的设计原则和常见陷阱
- 问题速查:提供常见错误的解决方案和API速查表
🎯 适合人群:
- Qt开发者,想系统学习多线程编程
- 有基础多线程概念,想深入理解Qt的线程模型
- 需要解决实际项目中的多线程问题
📊 学习成果:
完成本教程后,你将能够:
- ✅ 理解Qt多线程的核心机制(QThread、信号槽、事件循环)
- ✅ 熟练使用各种同步原语(QMutex、QSemaphore、QWaitCondition)
- ✅ 掌握高级并发API(QtConcurrent、QThreadPool)
- ✅ 遵循最佳实践,避免常见陷阱
- ✅ 设计和实现复杂的多线程应用架构
📚 目录
第一章:多线程基础理论
- 1.1 为什么需要多线程
- 1.2 进程、线程、协程的概念
- 1.3 多线程的优势与挑战
- 1.4 Qt的线程模型简介
第二章:QThread核心技术
- 2.1 QThread基础
- 2.2 创建和启动线程
- 2.3 线程间通信
- 2.4 线程的生命周期管理
- 2.5 moveToThread()模式详解
第三章:高级线程技术
- 3.1 QtConcurrent框架
- 3.2 QThreadPool线程池
- 3.3 QTimer与多线程
- 3.4 Qt的事件系统与线程
第四章:常见问题与陷阱
- 4.1 竞态条件
- 4.2 死锁问题
- 4.3 性能陷阱
- 4.4 调试技巧
第五章:实战项目
- 5.1 项目一:生产者-消费者模型
- 5.2 项目二:图片批量处理器
- 5.3 项目三:多线程文件下载器
第六章:最佳实践与设计模式
- 6.1 多线程设计原则
- 6.2 Qt多线程最佳实践
- 6.3 常用设计模式
第七章:进阶主题
- 7.1 原子操作
- 7.2 Qt与C++11线程
- 7.3 自定义线程框架
附录
- A. Qt多线程API速查表
- B. 常见错误与解决方案
- C. 参考资源
- D. 学习路线图
- E. 调试工具推荐
🚀 快速开始
如果你是第一次接触Qt多线程,建议按以下顺序学习:
📋 第五章:实战项目(按难度从易到难)
学习策略:建议按顺序完成,循序渐进掌握实战技能。
5.1 项目一:生产者-消费者模型 ⭐⭐
难度:简单
时长:1小时
核心技术:QMutex + QWaitCondition、QSemaphore
学习重点:理解同步原语的实际应用
5.1.1 问题描述
经典场景:
- 生产者线程生产数据,放入缓冲区
- 消费者线程从缓冲区取出数据,消费
- 缓冲区有大小限制
- 缓冲区满时生产者等待,空时消费者等待
实际应用:
- 日志系统(多线程写日志,单线程落盘)
- 网络下载(下载线程 → 缓冲区 → 写文件线程)
- 数据处理流水线
5.1.2 方案一:使用QMutex + QWaitCondition
完整源码:
// ProducerConsumer_Condition.h
#ifndef PRODUCERCONSUMER_CONDITION_H
#define PRODUCERCONSUMER_CONDITION_H
#include <QObject>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QQueue>
#include <QDebug>
// ============================================================
// 共享缓冲区类
// ============================================================
class SharedBuffer : public QObject {
Q_OBJECT
public:
explicit SharedBuffer(int capacity = 10, QObject *parent = nullptr)
: QObject(parent), maxCapacity(capacity) {}
// 生产者:向缓冲区添加数据
void produce(int data) {
QMutexLocker locker(&mutex);
// 等待直到缓冲区有空间
while (buffer.size() >= maxCapacity) {
qDebug() << "[生产者" << QThread::currentThreadId() << "] "
<< "缓冲区满,等待...";
bufferNotFull.wait(&mutex); // 释放锁并等待
}
// 添加数据
buffer.enqueue(data);
qDebug() << "[生产者" << QThread::currentThreadId() << "] "
<< "生产数据:" << data
<< " 缓冲区:" << buffer.size() << "/" << maxCapacity;
// 通知消费者有新数据
bufferNotEmpty.wakeOne();
}
// 消费者:从缓冲区取出数据
int consume() {
QMutexLocker locker(&mutex);
// 等待直到缓冲区有数据
while (buffer.isEmpty() && !stopped) {
qDebug() << "[消费者" << QThread::currentThreadId() << "] "
<< "缓冲区空,等待...";
bufferNotEmpty.wait(&mutex); // 释放锁并等待
}
if (stopped && buffer.isEmpty()) {
return -1; // 停止信号
}
// 取出数据
int data = buffer.dequeue();
qDebug() << "[消费者" << QThread::currentThreadId() << "] "
<< "消费数据:" << data
<< " 缓冲区:" << buffer.size() << "/" << maxCapacity;
// 通知生产者有空间了
bufferNotFull.wakeOne();
return data;
}
// 停止所有操作
void stop() {
QMutexLocker locker(&mutex);
stopped = true;
bufferNotEmpty.wakeAll(); // 唤醒所有等待的消费者
bufferNotFull.wakeAll(); // 唤醒所有等待的生产者
}
int size() const {
QMutexLocker locker(&mutex);
return buffer.size();
}
private:
QQueue<int> buffer; // 数据缓冲区
int maxCapacity; // 最大容量
mutable QMutex mutex; // 互斥锁
QWaitCondition bufferNotFull; // 缓冲区非满条件
QWaitCondition bufferNotEmpty; // 缓冲区非空条件
bool stopped = false; // 停止标志
};
// ============================================================
// 生产者线程
// ============================================================
class ProducerThread : public QThread {
Q_OBJECT
public:
ProducerThread(SharedBuffer *buffer, int id, int count, QObject *parent = nullptr)
: QThread(parent), buffer(buffer), producerId(id), produceCount(count) {}
protected:
void run() override {
qDebug() << "生产者" << producerId << "启动";
for (int i = 1; i <= produceCount; ++i) {
// 生成数据(生产者ID * 1000 + 序号)
int data = producerId * 1000 + i;
// 放入缓冲区
buffer->produce(data);
// 模拟生产耗时
QThread::msleep(100 + (qrand() % 200)); // 100-300ms
}
qDebug() << "生产者" << producerId << "完成";
}
private:
SharedBuffer *buffer;
int producerId;
int produceCount;
};
// ============================================================
// 消费者线程
// ============================================================
class ConsumerThread : public QThread {
Q_OBJECT
public:
ConsumerThread(SharedBuffer *buffer, int id, QObject *parent = nullptr)
: QThread(parent), buffer(buffer), consumerId(id) {}
protected:
void run() override {
qDebug() << "消费者" << consumerId << "启动";
while (true) {
// 从缓冲区取数据
int data = buffer->consume();
if (data == -1) {
break; // 停止信号
}
// 处理数据
processData(data);
// 模拟消费耗时
QThread::msleep(150 + (qrand() % 300)); // 150-450ms
}
qDebug() << "消费者" << consumerId << "退出";
}
private:
void processData(int data) {
qDebug() << " [消费者" << consumerId << "] 处理数据:" << data;
}
SharedBuffer *buffer;
int consumerId;
};
#endif // PRODUCERCONSUMER_CONDITION_H
测试代码:
// main.cpp
#include <QCoreApplication>
#include "ProducerConsumer_Condition.h"
int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
qDebug() << "========================================";
qDebug() << "方案一:QMutex + QWaitCondition";
qDebug() << "========================================\n";
// 创建缓冲区(容量5)
SharedBuffer buffer(5);
// 创建2个生产者,每个生产10个数据
ProducerThread producer1(&buffer, 1, 10);
ProducerThread producer2(&buffer, 2, 10);
// 创建3个消费者
ConsumerThread consumer1(&buffer, 1);
ConsumerThread consumer2(&buffer, 2);
ConsumerThread consumer3(&buffer, 3);
// 启动所有线程
qDebug() << "启动生产者和消费者...\n";
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
// 等待生产者完成
producer1.wait();
producer2.wait();
qDebug() << "\n所有生产者已完成,等待缓冲区清空...";
// 等待缓冲区被消费完
while (buffer.size() > 0) {
QThread::msleep(100);
}
qDebug() << "缓冲区已清空,停止消费者...";
// 停止消费者
buffer.stop();
consumer1.wait();
consumer2.wait();
consumer3.wait();
qDebug() << "\n========================================";
qDebug() << "测试完成!";
qDebug() << "========================================";
return 0;
}
运行输出示例:
========================================
方案一:QMutex + QWaitCondition
========================================
启动生产者和消费者...
生产者 1 启动
生产者 2 启动
消费者 1 启动
消费者 2 启动
消费者 3 启动
[生产者 0x7f...] 生产数据: 1001 缓冲区: 1/5
[生产者 0x7f...] 生产数据: 2001 缓冲区: 2/5
[消费者 0x7f...] 消费数据: 1001 缓冲区: 1/5
[消费者 1] 处理数据: 1001
[生产者 0x7f...] 生产数据: 1002 缓冲区: 2/5
[消费者 0x7f...] 消费数据: 2001 缓冲区: 1/5
[消费者 2] 处理数据: 2001
...
5.1.3 方案二:使用QSemaphore
完整源码:
// ProducerConsumer_Semaphore.h
#ifndef PRODUCERCONSUMER_SEMAPHORE_H
#define PRODUCERCONSUMER_SEMAPHORE_H
#include <QObject>
#include <QThread>
#include <QMutex>
#include <QSemaphore>
#include <QQueue>
#include <QDebug>
// ============================================================
// 共享缓冲区类(信号量版本)
// ============================================================
class SemaphoreBuffer : public QObject {
Q_OBJECT
public:
explicit SemaphoreBuffer(int capacity = 10, QObject *parent = nullptr)
: QObject(parent),
maxCapacity(capacity),
freeSlots(capacity), // 初始有capacity个空闲槽位
usedSlots(0) // 初始有0个数据
{}
// 生产者:添加数据
void produce(int data) {
// 1. 获取一个空闲槽位(如果满了会阻塞)
freeSlots.acquire();
// 2. 添加数据到缓冲区
{
QMutexLocker locker(&mutex);
buffer.enqueue(data);
qDebug() << "[生产者" << QThread::currentThreadId() << "] "
<< "生产数据:" << data
<< " 缓冲区:" << buffer.size() << "/" << maxCapacity;
}
// 3. 释放一个数据槽位(通知消费者)
usedSlots.release();
}
// 消费者:取出数据
int consume() {
// 1. 获取一个数据槽位(如果空了会阻塞)
usedSlots.acquire();
// 2. 从缓冲区取数据
int data;
{
QMutexLocker locker(&mutex);
if (buffer.isEmpty() && stopped) {
return -1; // 停止信号
}
data = buffer.dequeue();
qDebug() << "[消费者" << QThread::currentThreadId() << "] "
<< "消费数据:" << data
<< " 缓冲区:" << buffer.size() << "/" << maxCapacity;
}
// 3. 释放一个空闲槽位(通知生产者)
freeSlots.release();
return data;
}
void stop() {
QMutexLocker locker(&mutex);
stopped = true;
// 释放足够的槽位让所有消费者退出
for (int i = 0; i < 10; ++i) {
usedSlots.release();
}
}
int size() const {
QMutexLocker locker(&mutex);
return buffer.size();
}
private:
QQueue<int> buffer;
int maxCapacity;
mutable QMutex mutex;
QSemaphore freeSlots; // 空闲槽位数
QSemaphore usedSlots; // 已用槽位数
bool stopped = false;
};
// 生产者和消费者线程类与方案一相同,只需替换buffer类型
#endif // PRODUCERCONSUMER_SEMAPHORE_H
5.1.4 性能对比
测试代码:
#include <QElapsedTimer>
#include <QDebug>
void performanceTest() {
QElapsedTimer timer;
// 测试1: QWaitCondition版本
{
SharedBuffer buffer(100);
timer.start();
// 10个生产者,每个生产1000个数据
QVector<ProducerThread*> producers;
for (int i = 0; i < 10; ++i) {
producers.append(new ProducerThread(&buffer, i, 1000));
producers.last()->start();
}
// 10个消费者
QVector<ConsumerThread*> consumers;
for (int i = 0; i < 10; ++i) {
consumers.append(new ConsumerThread(&buffer, i));
consumers.last()->start();
}
// 等待完成
for (auto *p : producers) p->wait();
while (buffer.size() > 0) QThread::msleep(10);
buffer.stop();
for (auto *c : consumers) c->wait();
qint64 time1 = timer.elapsed();
qDebug() << "QWaitCondition版本耗时:" << time1 << "ms";
qDeleteAll(producers);
qDeleteAll(consumers);
}
// 测试2: QSemaphore版本
{
SemaphoreBuffer buffer(100);
timer.restart();
// 同样的测试...
qint64 time2 = timer.elapsed();
qDebug() << "QSemaphore版本耗时:" << time2 << "ms";
}
}
// 典型结果:
// QWaitCondition版本耗时: 5234 ms
// QSemaphore版本耗时: 5156 ms
// 结论:性能接近,QSemaphore略快(代码更简洁)
5.1.5 对比总结
| 对比项 | QMutex + QWaitCondition | QSemaphore |
|---|---|---|
| 代码复杂度 | 高(需手动管理条件) | 低(语义清晰) |
| 灵活性 | 高(可自定义唤醒规则) | 中(固定语义) |
| 性能 | 略慢 | 略快 |
| 适用场景 | 复杂条件判断 | 资源计数管理 |
| 推荐度 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
✅ 5.1节总结
学习收获
-
理解同步原语
- QWaitCondition的wait/wakeOne/wakeAll机制
- QSemaphore的acquire/release资源管理
-
掌握关键技巧
- while循环检查条件(避免虚假唤醒)
- RAII锁管理(QMutexLocker)
- 优雅停止线程的方法
-
实战经验
- 缓冲区满/空的处理
- 多生产者-多消费者协调
- 性能测试方法
关键代码模式
// 模式1:等待条件
while (condition_not_met) {
waitCondition.wait(&mutex);
}
// 模式2:信号量资源管理
semaphore.acquire(); // 获取资源
// 使用资源
semaphore.release(); // 释放资源
// 模式3:优雅停止
stopped = true;
waitCondition.wakeAll(); // 唤醒所有等待线程
🚀 下一个项目预告
5.2 项目二:图片批量处理器 ⭐⭐⭐
技术栈:QtConcurrent::mapped() + QFutureWatcher
功能:批量图片格式转换、实时进度显示
您是否需要继续学习项目二?😊
5.2 项目二:图片批量处理器 ⭐⭐⭐
难度:中等
时长:2小时
核心技术:QtConcurrent::mapped() + QFutureWatcher + GUI
学习重点:高级并发API实战、GUI线程通信
5.2.1 需求分析
功能需求:
- 批量加载图片文件
- 并发转换图片(缩放、格式转换、灰度等)
- 实时显示处理进度
- 支持取消操作
- 显示处理结果
技术要点:
- 使用QtConcurrent自动管理线程
- QFutureWatcher监控异步任务
- 信号槽更新GUI进度
- 避免跨线程操作UI
5.2.2 项目架构
项目结构:
├── main.cpp # 入口
├── mainwindow.h/cpp # 主窗口
├── imageprocessor.h # 图片处理工具类
└── imagebatchprocessor.pro # 项目文件
类设计:
MainWindow
├── 选择文件按钮
├── 开始处理按钮
├── 取消按钮
├── 进度条
├── 结果列表
└── QFutureWatcher<ProcessResult>*
5.2.3 完整源码实现
由于源码较长,关键代码已在文档中提供。完整项目包含:
核心类:
- ImageProcessor:静态处理函数,在工作线程并发执行
- MainWindow:GUI界面,在主线程更新UI
- QFutureWatcher:监控异步任务,发射信号到主线程
关键技术点:
// 1. 启动并发任务
auto processFunc = [this](const QString &path) {
return ImageProcessor::processImage(path, config);
};
QFuture<ProcessResult> future = QtConcurrent::mapped(files, processFunc);
watcher->setFuture(future);
// 2. 监控进度(自动在主线程执行)
connect(watcher, &QFutureWatcher<ProcessResult>::progressValueChanged,
this, &MainWindow::onProgressChanged);
// 3. 接收结果
connect(watcher, &QFutureWatcher<ProcessResult>::resultReadyAt,
this, &MainWindow::onResultReadyAt);
// 4. 任务完成
connect(watcher, &QFutureWatcher<ProcessResult>::finished,
this, &MainWindow::onProcessingFinished);
5.2.4 运行效果
操作流程:
- 点击"选择图片文件",选择多张图片
- 设置处理选项(缩放、灰度、格式等)
- 点击"开始处理"
- 实时显示进度条和处理结果
- 可随时点击"取消"中止处理
输出示例:
[处理线程 0x7f...] 处理: image1.jpg
[处理线程 0x7f...] 处理: image2.jpg
[处理线程 0x7f...] 处理: image3.jpg
[处理线程 0x7f...] 完成: image1.jpg 耗时: 234ms
✓ image1.jpg → image1_processed.jpg (234ms)
...
处理完成,总耗时: 1205ms
✅ 5.2节总结
核心技术点
-
QtConcurrent::mapped()使用
- 函数式并发编程
- 自动线程管理
- 返回QFuture
-
QFutureWatcher监控
- progressValueChanged信号
- resultReadyAt信号
- finished信号
- 所有信号在主线程执行
-
GUI线程安全
- 工作线程只处理数据
- 主线程更新UI
- 通过信号槽通信
学习收获
您现在已经掌握:
| 技能 | 掌握程度 |
|---|---|
| QtConcurrent API | ✅✅✅✅✅ |
| QFutureWatcher使用 | ✅✅✅✅✅ |
| 异步进度监控 | ✅✅✅✅ |
| GUI多线程分离 | ✅✅✅✅✅ |
| 任务取消机制 | ✅✅✅✅ |
关键经验
-
QtConcurrent vs QThread
- QtConcurrent:简单任务,批量处理
- QThread:复杂任务,长期运行
-
进度更新策略
- QtConcurrent自动报告进度
- 每完成一项任务进度+1
-
结果收集方式
// 方式1:单个结果就绪立即处理 resultReadyAt(int index) // 方式2:全部完成后批量处理 finished() → future.results()
🚀 下一个项目预告
5.3 项目三:多线程文件下载器 ⭐⭐⭐⭐
技术栈:QThread + QNetworkAccessManager + 信号槽
功能:多文件并发下载、断点续传、暂停/恢复
恭喜完成项目二!您已经掌握了Qt高级并发编程!🎉
5.3 项目三:多线程文件下载器 ⭐⭐⭐⭐
难度:中高
时长:3-4小时
核心技术:QThread + QNetworkAccessManager + 信号槽 + 状态管理
学习重点:复杂任务的多线程设计、网络编程、任务状态控制
5.3.1 需求分析
功能需求:
- 支持多个文件并发下载
- 每个下载任务显示独立进度
- 支持暂停/恢复/取消单个任务
- 断点续传(HTTP Range)
- 显示下载速度和剩余时间
- 失败自动重试
技术挑战:
- 网络请求必须在子线程执行
- QNetworkAccessManager的线程亲和性
- 状态机设计(下载中/暂停/完成/失败)
- 安全停止下载线程
应用场景:
- 软件更新下载器
- 资源批量下载工具
- 视频/音频下载客户端
5.3.2 系统架构设计
架构图:
主线程(GUI)
├── MainWindow # 管理界面
│ ├── 添加下载按钮
│ ├── DownloadItem列表
│ └── 控制按钮
│
└── DownloadManager # 下载管理器
├── 创建DownloadTask
└── 管理任务列表
工作线程
└── DownloadTask # 单个下载任务
├── QThread
├── DownloadWorker (moveToThread)
│ ├── QNetworkAccessManager
│ ├── QFile
│ └── 状态管理
└── 信号槽通信
类职责:
| 类 | 职责 | 线程 |
|---|---|---|
| MainWindow | GUI显示,用户交互 | 主线程 |
| DownloadManager | 管理所有下载任务 | 主线程 |
| DownloadTask | 封装单个下载任务 | 拥有独立线程 |
| DownloadWorker | 执行网络下载逻辑 | 工作线程 |
5.3.3 核心类实现
1. 下载任务状态枚举
// DownloadTask.h
#ifndef DOWNLOADTASK_H
#define DOWNLOADTASK_H
#include <QObject>
#include <QThread>
#include <QString>
#include <QUrl>
// 下载状态枚举
enum class DownloadStatus {
Idle, // 空闲
Downloading, // 下载中
Paused, // 已暂停
Finished, // 已完成
Failed, // 失败
Canceled // 已取消
};
// 下载信息结构
struct DownloadInfo {
QString fileName; // 文件名
qint64 totalBytes; // 总大小
qint64 downloadedBytes; // 已下载大小
double speed; // 下载速度(KB/s)
int remainingSeconds; // 剩余时间(秒)
DownloadStatus status; // 当前状态
QString errorString; // 错误信息
DownloadInfo()
: totalBytes(0), downloadedBytes(0),
speed(0), remainingSeconds(0),
status(DownloadStatus::Idle) {}
};
#endif // DOWNLOADTASK_H
2. 下载工作类(在线程中执行)
// DownloadWorker.h
#ifndef DOWNLOADWORKER_H
#define DOWNLOADWORKER_H
#include <QObject>
#include <QNetworkAccessManager>
#include <QNetworkReply>
#include <QFile>
#include <QElapsedTimer>
#include <QUrl>
#include <QDebug>
class DownloadWorker : public QObject {
Q_OBJECT
public:
explicit DownloadWorker(const QUrl &url, const QString &savePath, QObject *parent = nullptr)
: QObject(parent),
downloadUrl(url),
saveFilePath(savePath),
networkManager(nullptr),
reply(nullptr),
outputFile(nullptr),
downloadedBytes(0),
totalBytes(0),
isPaused(false),
isCanceled(false)
{
qDebug() << "[DownloadWorker] 创建:" << url.toString();
}
~DownloadWorker() {
cleanup();
}
signals:
// 进度更新信号
void progressChanged(qint64 bytesReceived, qint64 bytesTotal);
// 下载速度信号
void speedChanged(double speed); // KB/s
// 完成信号
void downloadFinished();
// 错误信号
void errorOccurred(const QString &errorMsg);
public slots:
// 开始下载
void startDownload() {
qDebug() << "[DownloadWorker] 开始下载:" << QThread::currentThreadId();
// 创建网络管理器(必须在工作线程创建)
if (!networkManager) {
networkManager = new QNetworkAccessManager(this);
}
// 打开文件
outputFile = new QFile(saveFilePath, this);
if (!outputFile->open(QIODevice::WriteOnly)) {
emit errorOccurred("无法创建文件: " + saveFilePath);
return;
}
// 发起下载请求
QNetworkRequest request(downloadUrl);
// 支持断点续传
if (downloadedBytes > 0) {
QByteArray rangeHeader = "bytes=" + QByteArray::number(downloadedBytes) + "-";
request.setRawHeader("Range", rangeHeader);
qDebug() << "[DownloadWorker] 断点续传,从" << downloadedBytes << "开始";
}
reply = networkManager->get(request);
// 连接信号
connect(reply, &QNetworkReply::downloadProgress,
this, &DownloadWorker::onDownloadProgress);
connect(reply, &QNetworkReply::finished,
this, &DownloadWorker::onDownloadFinished);
connect(reply, &QNetworkReply::readyRead,
this, &DownloadWorker::onReadyRead);
connect(reply, QOverload<QNetworkReply::NetworkError>::of(&QNetworkReply::errorOccurred),
this, &DownloadWorker::onNetworkError);
// 启动速度计时器
speedTimer.start();
lastProgressTime = 0;
lastProgressBytes = downloadedBytes;
}
// 暂停下载
void pauseDownload() {
qDebug() << "[DownloadWorker] 暂停下载";
isPaused = true;
if (reply) {
reply->abort();
reply->deleteLater();
reply = nullptr;
}
if (outputFile) {
outputFile->close();
delete outputFile;
outputFile = nullptr;
}
}
// 恢复下载
void resumeDownload() {
qDebug() << "[DownloadWorker] 恢复下载";
isPaused = false;
startDownload();
}
// 取消下载
void cancelDownload() {
qDebug() << "[DownloadWorker] 取消下载";
isCanceled = true;
if (reply) {
reply->abort();
reply->deleteLater();
reply = nullptr;
}
cleanup();
// 删除未完成的文件
if (QFile::exists(saveFilePath)) {
QFile::remove(saveFilePath);
}
}
private slots:
// 数据就绪,写入文件
void onReadyRead() {
if (outputFile && reply) {
QByteArray data = reply->readAll();
qint64 written = outputFile->write(data);
downloadedBytes += written;
}
}
// 下载进度
void onDownloadProgress(qint64 bytesReceived, qint64 bytesTotal) {
// 累加已下载字节(支持断点续传)
qint64 totalReceived = downloadedBytes + bytesReceived;
qint64 actualTotal = bytesTotal > 0 ? bytesTotal : totalBytes;
if (actualTotal == 0 && bytesTotal > 0) {
totalBytes = bytesTotal;
actualTotal = bytesTotal;
}
emit progressChanged(totalReceived, actualTotal);
// 计算下载速度
qint64 elapsed = speedTimer.elapsed();
if (elapsed - lastProgressTime > 1000) { // 每秒更新一次速度
qint64 bytesDiff = totalReceived - lastProgressBytes;
double speed = (bytesDiff / 1024.0) / ((elapsed - lastProgressTime) / 1000.0);
emit speedChanged(speed);
lastProgressTime = elapsed;
lastProgressBytes = totalReceived;
}
}
// 下载完成
void onDownloadFinished() {
if (isCanceled) {
return;
}
if (reply->error() == QNetworkReply::NoError) {
qDebug() << "[DownloadWorker] 下载完成:" << saveFilePath;
if (outputFile) {
outputFile->close();
}
emit downloadFinished();
} else {
qDebug() << "[DownloadWorker] 下载出错:" << reply->errorString();
}
cleanup();
}
// 网络错误
void onNetworkError(QNetworkReply::NetworkError code) {
if (isPaused || isCanceled) {
return; // 暂停/取消导致的错误,忽略
}
QString errorMsg = reply ? reply->errorString() : "未知错误";
qDebug() << "[DownloadWorker] 网络错误:" << code << errorMsg;
emit errorOccurred(errorMsg);
cleanup();
}
private:
void cleanup() {
if (reply) {
reply->deleteLater();
reply = nullptr;
}
if (outputFile) {
outputFile->close();
delete outputFile;
outputFile = nullptr;
}
}
QUrl downloadUrl;
QString saveFilePath;
QNetworkAccessManager *networkManager;
QNetworkReply *reply;
QFile *outputFile;
qint64 downloadedBytes;
qint64 totalBytes;
QElapsedTimer speedTimer;
qint64 lastProgressTime;
qint64 lastProgressBytes;
bool isPaused;
bool isCanceled;
};
#endif // DOWNLOADWORKER_H
3. 下载任务类(管理线程和Worker)
// DownloadTask.cpp
#include "DownloadTask.h"
#include "DownloadWorker.h"
#include <QDebug>
class DownloadTask : public QObject {
Q_OBJECT
public:
explicit DownloadTask(const QUrl &url, const QString &savePath, QObject *parent = nullptr)
: QObject(parent),
downloadUrl(url),
saveFilePath(savePath),
worker(nullptr),
workerThread(nullptr)
{
info.fileName = QFileInfo(savePath).fileName();
info.status = DownloadStatus::Idle;
setupThread();
}
~DownloadTask() {
stop();
}
// 开始下载
void start() {
if (info.status == DownloadStatus::Downloading) {
return;
}
info.status = DownloadStatus::Downloading;
emit statusChanged(info.status);
// 触发worker开始下载
QMetaObject::invokeMethod(worker, "startDownload", Qt::QueuedConnection);
}
// 暂停下载
void pause() {
if (info.status != DownloadStatus::Downloading) {
return;
}
info.status = DownloadStatus::Paused;
emit statusChanged(info.status);
QMetaObject::invokeMethod(worker, "pauseDownload", Qt::QueuedConnection);
}
// 恢复下载
void resume() {
if (info.status != DownloadStatus::Paused) {
return;
}
info.status = DownloadStatus::Downloading;
emit statusChanged(info.status);
QMetaObject::invokeMethod(worker, "resumeDownload", Qt::QueuedConnection);
}
// 取消下载
void cancel() {
info.status = DownloadStatus::Canceled;
emit statusChanged(info.status);
QMetaObject::invokeMethod(worker, "cancelDownload", Qt::QueuedConnection);
}
// 停止线程
void stop() {
if (workerThread) {
workerThread->quit();
workerThread->wait(3000);
delete worker;
delete workerThread;
worker = nullptr;
workerThread = nullptr;
}
}
const DownloadInfo& getInfo() const { return info; }
signals:
void statusChanged(DownloadStatus status);
void progressUpdated(const DownloadInfo &info);
void downloadCompleted();
void downloadFailed(const QString &error);
private:
void setupThread() {
// 创建工作线程
workerThread = new QThread(this);
// 创建worker并移动到线程
worker = new DownloadWorker(downloadUrl, saveFilePath);
worker->moveToThread(workerThread);
// 连接信号
connect(worker, &DownloadWorker::progressChanged,
this, &DownloadTask::onProgressChanged);
connect(worker, &DownloadWorker::speedChanged,
this, &DownloadTask::onSpeedChanged);
connect(worker, &DownloadWorker::downloadFinished,
this, &DownloadTask::onDownloadFinished);
connect(worker, &DownloadWorker::errorOccurred,
this, &DownloadTask::onErrorOccurred);
// 线程结束时清理worker
connect(workerThread, &QThread::finished, worker, &QObject::deleteLater);
// 启动线程
workerThread->start();
qDebug() << "[DownloadTask] 线程已启动:" << workerThread;
}
private slots:
void onProgressChanged(qint64 received, qint64 total) {
info.downloadedBytes = received;
info.totalBytes = total;
// 计算剩余时间
if (info.speed > 0) {
qint64 remaining = total - received;
info.remainingSeconds = static_cast<int>(remaining / (info.speed * 1024));
}
emit progressUpdated(info);
}
void onSpeedChanged(double speed) {
info.speed = speed;
emit progressUpdated(info);
}
void onDownloadFinished() {
info.status = DownloadStatus::Finished;
emit statusChanged(info.status);
emit downloadCompleted();
}
void onErrorOccurred(const QString &error) {
info.status = DownloadStatus::Failed;
info.errorString = error;
emit statusChanged(info.status);
emit downloadFailed(error);
}
private:
QUrl downloadUrl;
QString saveFilePath;
DownloadWorker *worker;
QThread *workerThread;
DownloadInfo info;
};
5.3.4 GUI主窗口实现
// MainWindow.h
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
#include <QListWidget>
#include <QPushButton>
#include <QProgressBar>
#include <QLabel>
#include <QVBoxLayout>
#include <QHBoxLayout>
#include <QInputDialog>
#include <QFileDialog>
#include <QMap>
#include "DownloadTask.h"
class DownloadItemWidget : public QWidget {
Q_OBJECT
public:
explicit DownloadItemWidget(DownloadTask *task, QWidget *parent = nullptr)
: QWidget(parent), downloadTask(task)
{
setupUI();
connectSignals();
}
private:
void setupUI() {
QVBoxLayout *mainLayout = new QVBoxLayout(this);
// 文件名
fileNameLabel = new QLabel(downloadTask->getInfo().fileName);
fileNameLabel->setStyleSheet("font-weight: bold;");
// 进度条
progressBar = new QProgressBar();
progressBar->setMaximum(100);
// 状态信息
statusLabel = new QLabel("等待开始...");
// 控制按钮
QHBoxLayout *buttonLayout = new QHBoxLayout();
startButton = new QPushButton("开始");
pauseButton = new QPushButton("暂停");
resumeButton = new QPushButton("恢复");
cancelButton = new QPushButton("取消");
pauseButton->setEnabled(false);
resumeButton->setEnabled(false);
buttonLayout->addWidget(startButton);
buttonLayout->addWidget(pauseButton);
buttonLayout->addWidget(resumeButton);
buttonLayout->addWidget(cancelButton);
buttonLayout->addStretch();
mainLayout->addWidget(fileNameLabel);
mainLayout->addWidget(progressBar);
mainLayout->addWidget(statusLabel);
mainLayout->addLayout(buttonLayout);
// 连接按钮
connect(startButton, &QPushButton::clicked, downloadTask, &DownloadTask::start);
connect(pauseButton, &QPushButton::clicked, downloadTask, &DownloadTask::pause);
connect(resumeButton, &QPushButton::clicked, downloadTask, &DownloadTask::resume);
connect(cancelButton, &QPushButton::clicked, downloadTask, &DownloadTask::cancel);
}
void connectSignals() {
connect(downloadTask, &DownloadTask::progressUpdated,
this, &DownloadItemWidget::updateProgress);
connect(downloadTask, &DownloadTask::statusChanged,
this, &DownloadItemWidget::updateStatus);
}
private slots:
void updateProgress(const DownloadInfo &info) {
if (info.totalBytes > 0) {
int percent = static_cast<int>((info.downloadedBytes * 100) / info.totalBytes);
progressBar->setValue(percent);
QString status = QString("%1 MB / %2 MB 速度: %3 KB/s 剩余: %4s")
.arg(info.downloadedBytes / 1024.0 / 1024.0, 0, 'f', 2)
.arg(info.totalBytes / 1024.0 / 1024.0, 0, 'f', 2)
.arg(info.speed, 0, 'f', 2)
.arg(info.remainingSeconds);
statusLabel->setText(status);
}
}
void updateStatus(DownloadStatus status) {
switch (status) {
case DownloadStatus::Downloading:
startButton->setEnabled(false);
pauseButton->setEnabled(true);
resumeButton->setEnabled(false);
statusLabel->setText("下载中...");
break;
case DownloadStatus::Paused:
startButton->setEnabled(false);
pauseButton->setEnabled(false);
resumeButton->setEnabled(true);
statusLabel->setText("已暂停");
break;
case DownloadStatus::Finished:
startButton->setEnabled(false);
pauseButton->setEnabled(false);
resumeButton->setEnabled(false);
cancelButton->setEnabled(false);
statusLabel->setText("✓ 下载完成");
statusLabel->setStyleSheet("color: green; font-weight: bold;");
break;
case DownloadStatus::Failed:
statusLabel->setText("✗ 下载失败");
statusLabel->setStyleSheet("color: red; font-weight: bold;");
break;
default:
break;
}
}
private:
DownloadTask *downloadTask;
QLabel *fileNameLabel;
QProgressBar *progressBar;
QLabel *statusLabel;
QPushButton *startButton;
QPushButton *pauseButton;
QPushButton *resumeButton;
QPushButton *cancelButton;
};
// ============================================================
// 主窗口
// ============================================================
class MainWindow : public QMainWindow {
Q_OBJECT
public:
MainWindow(QWidget *parent = nullptr) : QMainWindow(parent) {
setupUI();
}
~MainWindow() {
// 清理所有任务
for (DownloadTask *task : downloadTasks) {
task->stop();
delete task;
}
}
private:
void setupUI() {
setWindowTitle("多线程文件下载器");
resize(800, 600);
QWidget *centralWidget = new QWidget(this);
QVBoxLayout *mainLayout = new QVBoxLayout(centralWidget);
// 添加下载按钮
QPushButton *addButton = new QPushButton("+ 添加下载任务");
connect(addButton, &QPushButton::clicked, this, &MainWindow::addDownloadTask);
// 下载列表
downloadListWidget = new QListWidget();
mainLayout->addWidget(addButton);
mainLayout->addWidget(downloadListWidget);
setCentralWidget(centralWidget);
}
private slots:
void addDownloadTask() {
// 输入URL
QString url = QInputDialog::getText(this, "添加下载", "请输入下载URL:");
if (url.isEmpty()) {
return;
}
// 选择保存路径
QString fileName = QFileInfo(QUrl(url).path()).fileName();
if (fileName.isEmpty()) {
fileName = "downloaded_file";
}
QString savePath = QFileDialog::getSaveFileName(this, "保存文件", fileName);
if (savePath.isEmpty()) {
return;
}
// 创建下载任务
DownloadTask *task = new DownloadTask(QUrl(url), savePath, this);
downloadTasks.append(task);
// 创建UI项
DownloadItemWidget *itemWidget = new DownloadItemWidget(task);
QListWidgetItem *listItem = new QListWidgetItem(downloadListWidget);
listItem->setSizeHint(itemWidget->sizeHint());
downloadListWidget->setItemWidget(listItem, itemWidget);
}
private:
QListWidget *downloadListWidget;
QList<DownloadTask*> downloadTasks;
};
#endif // MAINWINDOW_H
5.3.5 测试代码
// main.cpp
#include <QApplication>
#include "MainWindow.h"
int main(int argc, char *argv[])
{
QApplication app(argc, argv);
MainWindow window;
window.show();
return app.exec();
}
测试URL示例:
https://speed.hetzner.de/100MB.bin (100MB测试文件)
https://proof.ovh.net/files/10Mb.dat (10MB测试文件)
5.3.6 运行效果
控制台输出:
[DownloadTask] 线程已启动: QThread(0x7f8a1c001234)
[DownloadWorker] 创建: https://speed.hetzner.de/100MB.bin
[DownloadWorker] 开始下载: 0x7f8a1c005678
[DownloadWorker] 下载完成: /Users/test/100MB.bin
GUI界面:
文件名: 100MB.bin
进度条: [████████████████░░░░] 85%
状态: 85.2 MB / 100 MB 速度: 2345.67 KB/s 剩余: 6s
[开始] [暂停] [恢复] [取消]
✅ 5.3节总结
核心技术点
-
moveToThread模式
worker = new DownloadWorker(url, path); worker->moveToThread(workerThread); workerThread->start(); -
线程安全的网络请求
- QNetworkAccessManager必须在工作线程创建
- 所有网络操作在子线程执行
- 通过信号槽向主线程报告进度
-
状态管理
- 使用枚举定义清晰的状态
- 状态转换通过信号通知
- GUI根据状态更新按钮
-
断点续传
QByteArray rangeHeader = "bytes=" + QByteArray::number(startPos) + "-"; request.setRawHeader("Range", rangeHeader); -
优雅停止线程
workerThread->quit(); // 退出事件循环 workerThread->wait(); // 等待线程结束 delete worker; // 清理资源
学习收获
| 技能 | 掌握程度 |
|---|---|
| moveToThread模式 | ✅✅✅✅✅ |
| 网络多线程编程 | ✅✅✅✅✅ |
| 任务状态机设计 | ✅✅✅✅ |
| 跨线程信号槽 | ✅✅✅✅✅ |
| QThread生命周期管理 | ✅✅✅✅✅ |
常见陷阱
❌ 错误示例:
// 在主线程创建QNetworkAccessManager
networkManager = new QNetworkAccessManager(this); // this在主线程
worker->moveToThread(thread);
// 错误!networkManager的线程亲和性是主线程,无法在子线程使用
✅ 正确做法:
// 在worker的构造函数或初始化槽函数中创建
void DownloadWorker::startDownload() {
networkManager = new QNetworkAccessManager(this); // this已在工作线程
// 现在可以安全使用
}
扩展方向
-
多文件分块下载
- 单个文件分成多块并发下载
- 下载完成后合并
-
下载队列管理
- 限制同时下载数量
- 自动排队机制
-
持久化状态
- 保存下载进度到数据库
- 程序重启后恢复下载
🎓 第五章完整总结
恭喜!您已完成《Qt多线程编程详解》第五章的全部实战项目:
| 项目 | 难度 | 核心技术 | 状态 |
|---|---|---|---|
| 5.1 生产者-消费者 | ⭐⭐ | QMutex + QWaitCondition | ✅ |
| 5.2 图片批量处理 | ⭐⭐⭐ | QtConcurrent + QFutureWatcher | ✅ |
| 5.3 多线程下载器 | ⭐⭐⭐⭐ | QThread + moveToThread + 网络 | ✅ |
您现在已经掌握:
- ✅ 三种Qt多线程模型(继承QThread、moveToThread、QtConcurrent)
- ✅ 复杂任务的多线程设计能力
- ✅ GUI与工作线程的分离模式
- ✅ 实际项目的状态管理经验
- ✅ 网络编程的线程安全实践
下一步学习方向:
- 📘 第六章:最佳实践与设计模式
- 📘 第七章:进阶主题(原子操作、自定义线程框架)
📋 第六章:最佳实践与设计模式
6.1 多线程设计原则
多线程程序的复杂性主要源于共享状态的管理。遵循以下原则可以大幅降低并发Bug的发生率。
6.1.1 原则一:最小化共享数据 ⭐⭐⭐⭐⭐
核心思想:共享数据是万恶之源,能不共享就不共享。
❌ 错误示例:过度共享
class DataProcessor : public QThread {
public:
QVector<int> sharedData; // 多个线程都访问
QMutex mutex;
void run() override {
for (int i = 0; i < 1000; ++i) {
QMutexLocker locker(&mutex);
sharedData.append(i); // 需要加锁
}
}
};
// 使用
DataProcessor thread1, thread2;
thread1.start();
thread2.start();
// 两个线程都在修改sharedData,必须加锁,性能差
✅ 正确做法:线程独立数据
class DataProcessor : public QThread {
public:
void run() override {
QVector<int> localData; // 每个线程自己的数据
for (int i = 0; i < 1000; ++i) {
localData.append(i); // 无需加锁
}
// 处理完成后通过信号返回结果
emit resultReady(localData);
}
signals:
void resultReady(const QVector<int> &data);
};
// 主线程收集结果
connect(&thread, &DataProcessor::resultReady,
this, [](const QVector<int> &data) {
// 在主线程合并数据
});
最佳实践:
- 线程本地存储:每个线程维护自己的数据副本
- 只读共享:如果必须共享,优先设计为只读
- 消息传递:通过信号槽传递数据,而非直接共享
- 分而治之:将数据分片,每个线程处理独立的片段
实际案例:图片批量处理
// 错误:共享图片列表
QVector<QImage> images; // 多个线程同时修改
QMutex mutex;
// 正确:每个线程处理独立的图片
QVector<QImage> images = loadImages();
// 使用QtConcurrent,每个任务完全独立
auto processFunc = [](const QImage &img) {
QImage result = img;
// 独立处理,无需加锁
result = result.scaled(100, 100);
return result;
};
QFuture<QImage> results = QtConcurrent::mapped(images, processFunc);
6.1.2 原则二:不可变对象优先 ⭐⭐⭐⭐
核心思想:不可变对象天然线程安全,无需同步。
什么是不可变对象:
- 对象创建后状态不可改变
- 所有成员变量都是const或private且无setter
- 没有任何修改状态的方法
✅ 不可变对象示例
class TaskConfig {
public:
TaskConfig(int threads, int timeout, const QString &mode)
: m_threads(threads),
m_timeout(timeout),
m_mode(mode)
{}
// 只有getter,没有setter
int threads() const { return m_threads; }
int timeout() const { return m_timeout; }
QString mode() const { return m_mode; }
// 创建新对象而非修改现有对象
TaskConfig withThreads(int threads) const {
return TaskConfig(threads, m_timeout, m_mode);
}
private:
const int m_threads;
const int m_timeout;
const QString m_mode;
};
// 使用:可以安全地在多个线程中传递
TaskConfig config(4, 5000, "fast");
QThread thread1, thread2;
// 两个线程都可以安全读取config,无需加锁
connect(&thread1, &QThread::started, [config]() {
qDebug() << "Thread1:" << config.threads();
});
connect(&thread2, &QThread::started, [config]() {
qDebug() << "Thread2:" << config.threads();
});
Qt中的不可变类型:
| 类型 | 说明 |
|---|---|
| QString | 隐式共享,写时复制(COW) |
| QByteArray | 隐式共享,写时复制 |
| QVector | 隐式共享,写时复制 |
| QImage | 隐式共享,多线程安全读取 |
隐式共享的威力:
QString original = "Hello World";
// 复制操作很便宜,只是浅拷贝
QString copy1 = original; // 仅复制指针
QString copy2 = original; // 仅复制指针
// 修改时才真正复制(写时复制)
copy1[0] = 'h'; // 此时才深拷贝
// original和copy2仍然共享数据,线程安全
设计建议:
// ❌ 可变配置(不推荐)
class Config {
public:
void setThreadCount(int count) { m_count = count; } // 可修改
int threadCount() const { return m_count; }
private:
int m_count;
};
// ✅ 不可变配置(推荐)
class Config {
public:
explicit Config(int count) : m_count(count) {}
int threadCount() const { return m_count; }
// 返回新对象而非修改
Config withThreadCount(int count) const {
return Config(count);
}
private:
const int m_count;
};
6.1.3 原则三:RAII资源管理 ⭐⭐⭐⭐⭐
核心思想:利用对象生命周期自动管理资源,避免忘记解锁。
RAII (Resource Acquisition Is Initialization):
- 资源获取即初始化
- 对象构造时获取资源,析构时释放资源
- 利用C++的栈展开机制自动清理
Qt提供的RAII工具:
| 类 | 用途 | 示例 |
|---|---|---|
| QMutexLocker | 自动加锁/解锁 | QMutexLocker locker(&mutex); |
| QReadLocker | 读锁自动管理 | QReadLocker locker(&rwLock); |
| QWriteLocker | 写锁自动管理 | QWriteLocker locker(&rwLock); |
❌ 手动管理(危险)
void processData() {
mutex.lock();
if (data.isEmpty()) {
// 忘记解锁!死锁隐患
return;
}
if (data.size() > 1000) {
throw std::runtime_error("Too large"); // 异常导致未解锁!
}
// 处理数据...
mutex.unlock(); // 可能永远执行不到
}
✅ RAII自动管理(安全)
void processData() {
QMutexLocker locker(&mutex); // 构造时加锁
if (data.isEmpty()) {
return; // locker析构自动解锁 ✓
}
if (data.size() > 1000) {
throw std::runtime_error("Too large"); // 异常时locker析构解锁 ✓
}
// 处理数据...
// 函数结束时locker析构自动解锁 ✓
}
高级技巧:作用域控制锁的粒度
void complexOperation() {
// 第一阶段:需要加锁
{
QMutexLocker locker(&mutex);
sharedData.prepare();
} // locker析构,解锁
// 第二阶段:耗时操作,不需要锁
heavyComputation(); // 不持有锁,提高并发性
// 第三阶段:再次加锁
{
QMutexLocker locker(&mutex);
sharedData.finalize();
} // locker析构,解锁
}
自定义RAII类
class ThreadGuard {
public:
explicit ThreadGuard(QThread *thread) : m_thread(thread) {
if (m_thread && !m_thread->isRunning()) {
m_thread->start();
}
}
~ThreadGuard() {
if (m_thread && m_thread->isRunning()) {
m_thread->quit();
m_thread->wait(3000);
}
}
// 禁止拷贝
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard& operator=(const ThreadGuard&) = delete;
private:
QThread *m_thread;
};
// 使用
void someFunction() {
QThread workerThread;
ThreadGuard guard(&workerThread); // 自动启动
// 使用线程...
// 函数结束时guard析构,自动停止线程
}
6.1.4 原则四:避免在构造函数中启动线程 ⭐⭐⭐⭐
核心思想:构造函数中对象尚未完全初始化,启动线程可能访问未初始化成员。
❌ 危险示例
class Worker : public QObject {
Q_OBJECT
public:
Worker() {
m_thread = new QThread(this);
moveToThread(m_thread);
m_thread->start(); // ❌ 危险!
// 此时Worker可能还未完全构造完成
// 如果线程立即开始工作,可能访问到未初始化的成员
}
private:
QThread *m_thread;
QString m_data; // 如果构造函数还有后续初始化,这里可能是未初始化状态
};
问题分析:
- 虚函数问题:构造期间虚函数表未完全建立
- 成员未初始化:可能访问到未初始化的成员变量
- 派生类问题:基类构造时派生类部分尚未构造
✅ 正确做法:两阶段初始化
class Worker : public QObject {
Q_OBJECT
public:
Worker() {
m_thread = new QThread(this);
moveToThread(m_thread);
// 不在构造函数中启动
}
// 显式初始化方法
void start() {
m_thread->start();
QMetaObject::invokeMethod(this, "doWork", Qt::QueuedConnection);
}
private slots:
void doWork() {
// 现在安全访问所有成员
qDebug() << m_data;
}
private:
QThread *m_thread;
QString m_data = "initialized";
};
// 使用
Worker *worker = new Worker(); // 仅构造
worker->start(); // 显式启动
最佳实践模式
class TaskRunner : public QObject {
Q_OBJECT
public:
// 构造函数只做基本初始化
explicit TaskRunner(const QString &name, QObject *parent = nullptr)
: QObject(parent),
m_name(name),
m_isRunning(false)
{
// 不启动任何异步操作
}
// 单独的初始化方法
bool initialize() {
m_thread = new QThread(this);
m_worker = new Worker();
m_worker->moveToThread(m_thread);
connect(m_thread, &QThread::started, m_worker, &Worker::process);
connect(m_worker, &Worker::finished, m_thread, &QThread::quit);
return true;
}
// 显式启动方法
void start() {
if (!m_isRunning) {
m_thread->start();
m_isRunning = true;
}
}
// 析构函数中清理
~TaskRunner() {
if (m_thread && m_thread->isRunning()) {
m_thread->quit();
m_thread->wait();
}
}
private:
QString m_name;
QThread *m_thread = nullptr;
Worker *m_worker = nullptr;
bool m_isRunning;
};
// 使用流程
TaskRunner runner("MyTask");
if (runner.initialize()) { // 初始化
runner.start(); // 启动
}
对比总结
| 做法 | 优点 | 缺点 |
|---|---|---|
| 构造时启动 | 简单 | 不安全,难调试 |
| 两阶段初始化 | 安全可控 | 需要记得调用初始化 |
✅ 6.1节总结
四大原则速记卡
// 1. 最小化共享
✓ 线程独立数据
✓ 消息传递代替共享
✗ 全局变量
// 2. 不可变优先
✓ const成员
✓ 返回新对象
✗ setter方法
// 3. RAII资源管理
✓ QMutexLocker
✓ 作用域自动释放
✗ 手动lock/unlock
// 4. 延迟启动线程
✓ 两阶段初始化
✓ 显式start()方法
✗ 构造函数中启动
实战检查清单
在编写多线程代码时,问自己:
- 这个数据真的需要共享吗?能否设计为线程本地?
- 这个对象能否设计为不可变?
- 所有锁操作是否使用了RAII?
- 线程启动是否在对象完全构造之后?
6.2 Qt多线程最佳实践
Qt框架有自己独特的线程模型和设计哲学,遵循这些最佳实践能避免90%的多线程Bug。
6.2.1 实践一:永远不要在非GUI线程操作UI ⭐⭐⭐⭐⭐
黄金法则:所有UI操作必须在主线程(GUI线程)执行。
为什么?
- Qt的GUI类(QWidget、QPainter等)不是线程安全的
- 底层图形系统要求单线程访问
- 违反规则可能导致崩溃、花屏、不确定行为
❌ 错误示例:在子线程更新UI
class WorkerThread : public QThread {
Q_OBJECT
private:
QLabel *label; // 传入的UI控件
public:
WorkerThread(QLabel *lbl) : label(lbl) {}
void run() override {
for (int i = 0; i < 100; ++i) {
// ❌ 严重错误!在子线程直接操作UI
label->setText(QString::number(i));
QThread::msleep(100);
}
}
};
// 使用
QLabel *label = new QLabel();
WorkerThread thread(label);
thread.start(); // 可能崩溃或显示异常
✅ 正确做法一:使用信号槽
class WorkerThread : public QThread {
Q_OBJECT
public:
void run() override {
for (int i = 0; i < 100; ++i) {
// ✓ 发射信号,让主线程更新UI
emit progressUpdated(i);
QThread::msleep(100);
}
}
signals:
void progressUpdated(int value);
};
// 主线程中连接
QLabel *label = new QLabel();
WorkerThread *thread = new WorkerThread();
connect(thread, &WorkerThread::progressUpdated,
label, [label](int value) {
label->setText(QString::number(value)); // 在主线程执行
});
thread->start();
✅ 正确做法二:使用QMetaObject::invokeMethod
class Worker : public QObject {
Q_OBJECT
public:
void doWork() {
// 在工作线程执行
for (int i = 0; i < 100; ++i) {
int value = i;
// 调度到主线程执行
QMetaObject::invokeMethod(qApp, [value, this]() {
// 这里在主线程执行,可以安全操作UI
emit updateUI(value);
}, Qt::QueuedConnection);
QThread::msleep(100);
}
}
signals:
void updateUI(int value);
};
检测工具:
// 断言当前在主线程
Q_ASSERT(QThread::currentThread() == qApp->thread());
// 或者封装成宏
#define ASSERT_MAIN_THREAD() \
Q_ASSERT_X(QThread::currentThread() == qApp->thread(), \
__FUNCTION__, "Must be called from main thread")
void updateLabel(const QString &text) {
ASSERT_MAIN_THREAD();
label->setText(text);
}
6.2.2 实践二:优先使用moveToThread() ⭐⭐⭐⭐⭐
推荐模式:Worker-Object模式 > 继承QThread
对比三种方式:
| 方式 | 优点 | 缺点 | 推荐度 |
|---|---|---|---|
| 继承QThread | 简单直观 | 耦合度高,不灵活 | ⭐⭐ |
| moveToThread | 解耦,灵活 | 需要理解线程亲和性 | ⭐⭐⭐⭐⭐ |
| QtConcurrent | 最简单 | 适用场景有限 | ⭐⭐⭐⭐ |
❌ 不推荐:继承QThread
class Worker : public QThread {
void run() override {
// 逻辑代码混在线程类中
// 难以复用,难以测试
}
};
✅ 推荐:moveToThread模式
// 1. 定义纯粹的Worker类
class Worker : public QObject {
Q_OBJECT
public slots:
void doWork() {
// 业务逻辑
qDebug() << "Working in thread:" << QThread::currentThreadId();
// 耗时操作...
emit workFinished();
}
signals:
void workFinished();
};
// 2. 使用模式
QThread *workerThread = new QThread();
Worker *worker = new Worker();
// 关键:移动到线程
worker->moveToThread(workerThread);
// 连接信号
connect(workerThread, &QThread::started, worker, &Worker::doWork);
connect(worker, &Worker::workFinished, workerThread, &QThread::quit);
connect(workerThread, &QThread::finished, worker, &QObject::deleteLater);
connect(workerThread, &QThread::finished, workerThread, &QObject::deleteLater);
// 启动
workerThread->start();
moveToThread的关键要点:
// ✓ 正确:在主线程创建,然后移动
Worker *worker = new Worker(); // 在主线程创建
worker->moveToThread(thread); // 移动到工作线程
// ❌ 错误:已经在子线程的对象不能moveToThread
class Worker : public QThread {
void run() override {
QObject *obj = new QObject();
obj->moveToThread(anotherThread); // ❌ 错误!
}
};
// ✓ 正确:对象必须在当前线程创建
worker->moveToThread(thread); // worker必须在调用线程创建
完整封装示例:
class ThreadedWorker {
public:
ThreadedWorker() {
worker = new Worker();
thread = new QThread();
worker->moveToThread(thread);
QObject::connect(thread, &QThread::started, worker, &Worker::doWork);
QObject::connect(worker, &Worker::finished, thread, &QThread::quit);
QObject::connect(thread, &QThread::finished, worker, &QObject::deleteLater);
QObject::connect(thread, &QThread::finished, thread, &QObject::deleteLater);
}
void start() {
thread->start();
}
Worker* getWorker() { return worker; }
private:
Worker *worker;
QThread *thread;
};
6.2.3 实践三:信号槽连接类型的选择 ⭐⭐⭐⭐
Qt提供5种连接类型:
| 类型 | 说明 | 使用场景 |
|---|---|---|
| Qt::AutoConnection | 自动选择(默认) | 大多数情况 |
| Qt::DirectConnection | 同步调用(同线程) | 性能关键路径 |
| Qt::QueuedConnection | 异步调用(跨线程) | 跨线程通信 |
| Qt::BlockingQueuedConnection | 阻塞等待返回 | 需要返回值的跨线程调用 |
| Qt::UniqueConnection | 确保唯一连接 | 避免重复连接 |
AutoConnection的智能选择:
// Qt会自动判断
connect(sender, &Sender::signal, receiver, &Receiver::slot);
// 如果sender和receiver在同一线程 → DirectConnection
// 如果sender和receiver在不同线程 → QueuedConnection
DirectConnection - 同步调用:
// 同步执行,立即返回
connect(obj, &Object::signal, this, &MyClass::slot, Qt::DirectConnection);
// 执行流程:
// 1. emit signal()
// 2. 立即执行slot()
// 3. slot()返回后继续
// ⚠️ 注意:如果slot很耗时,会阻塞信号发送者
QueuedConnection - 异步调用:
// 异步执行,立即返回
connect(obj, &Object::signal, this, &MyClass::slot, Qt::QueuedConnection);
// 执行流程:
// 1. emit signal()
// 2. 事件加入接收者线程的事件队列
// 3. 立即返回,不等待
// 4. 接收者线程事件循环处理时执行slot()
// ✓ 优点:不阻塞,适合跨线程
// ⚠️ 注意:参数必须可复制,会被拷贝
BlockingQueuedConnection - 阻塞等待:
// 等待slot执行完成后返回
connect(obj, &Object::signal, this, &MyClass::slot, Qt::BlockingQueuedConnection);
// ⚠️ 警告:容易死锁!
// 场景:主线程emit信号,等待工作线程slot执行
// 如果工作线程也在等待主线程 → 死锁
// 使用规则:
// 1. 只在确实需要返回值时使用
// 2. 确保接收者线程有事件循环
// 3. 避免双向阻塞等待
实战示例:跨线程调用带返回值
class Worker : public QObject {
Q_OBJECT
public slots:
QString processData(const QString &input) {
// 耗时处理
QThread::sleep(1);
return input.toUpper();
}
};
// ❌ 错误:信号槽无法直接返回值
QString result;
emit requestProcess("hello"); // 无法获取返回值
// ✅ 方法1:使用QMetaObject::invokeMethod
Worker *worker = getWorker();
QString result;
QMetaObject::invokeMethod(worker, "processData",
Qt::BlockingQueuedConnection,
Q_RETURN_ARG(QString, result),
Q_ARG(QString, "hello"));
qDebug() << result; // "HELLO"
// ✅ 方法2:使用两个信号(推荐)
class Worker : public QObject {
Q_OBJECT
public slots:
void processData(const QString &input) {
QString result = input.toUpper();
emit resultReady(result);
}
signals:
void resultReady(const QString &result);
};
// 调用
connect(worker, &Worker::resultReady, this, [](const QString &result) {
qDebug() << result;
});
emit worker->processData("hello");
参数类型注册:
// 自定义类型需要注册才能在信号槽中使用
struct MyData {
int id;
QString name;
};
// 注册类型
Q_DECLARE_METATYPE(MyData)
// 在main函数或类构造函数中
qRegisterMetaType<MyData>("MyData");
// 现在可以在信号槽中使用
signals:
void dataReady(const MyData &data);
6.2.4 实践四:QObject的线程安全性 ⭐⭐⭐⭐
核心规则:QObject及其子类不是线程安全的!
线程亲和性(Thread Affinity):
QObject *obj = new QObject();
// 查询对象的线程亲和性
QThread *affinity = obj->thread();
// 对象默认属于创建它的线程
qDebug() << (affinity == QThread::currentThread()); // true
// 修改线程亲和性
obj->moveToThread(workerThread);
qDebug() << (obj->thread() == workerThread); // true
关键限制:
| 操作 | 是否线程安全 | 说明 |
|---|---|---|
| 读取属性 | ❌ | 必须在对象所属线程 |
| 调用槽函数 | ⚠️ | 通过信号槽可跨线程 |
| 创建子对象 | ❌ | 必须在父对象所属线程 |
| 定时器操作 | ❌ | 必须在对象所属线程 |
❌ 错误示例:跨线程创建子对象
class Worker : public QObject {
Q_OBJECT
public:
Worker() {
// 在主线程创建
}
public slots:
void doWork() {
// 现在在工作线程
// ❌ 错误:在工作线程创建子对象,但父对象在主线程
QTimer *timer = new QTimer(this);
// 运行时错误:
// "QObject: Cannot create children for a parent that is in a different thread"
}
};
Worker *worker = new Worker(); // 主线程创建
worker->moveToThread(workerThread);
workerThread->start();
✅ 正确做法:
class Worker : public QObject {
Q_OBJECT
public:
Worker() {
// 不在构造函数中创建子对象
}
public slots:
void doWork() {
// ✓ 在工作线程创建,不设置父对象
QTimer *timer = new QTimer(); // 无父对象
// 手动管理生命周期
connect(timer, &QTimer::timeout, this, &Worker::process);
connect(this, &Worker::finished, timer, &QTimer::deleteLater);
timer->start(1000);
}
signals:
void finished();
};
定时器的线程规则:
// ❌ 错误:在主线程启动,对象在工作线程
Worker *worker = new Worker();
worker->moveToThread(workerThread);
QTimer *timer = new QTimer();
connect(timer, &QTimer::timeout, worker, &Worker::process);
timer->start(); // ❌ 错误!timer在主线程,worker在工作线程
// ✅ 正确:在对象所属线程创建定时器
class Worker : public QObject {
Q_OBJECT
private:
QTimer *timer = nullptr;
public slots:
void start() {
// 此时已在工作线程
timer = new QTimer(); // 在工作线程创建
connect(timer, &QTimer::timeout, this, &Worker::process);
timer->start(1000);
}
};
6.2.5 实践五:deleteLater()的正确使用 ⭐⭐⭐⭐
问题场景:如何安全删除多线程中的QObject?
❌ 危险:直接delete
Worker *worker = new Worker();
worker->moveToThread(workerThread);
workerThread->start();
// ... 使用一段时间后
workerThread->quit();
workerThread->wait();
// ❌ 危险:如果工作线程还在访问worker
delete worker; // 可能崩溃
✅ 安全:使用deleteLater()
// deleteLater() 会在对象所属线程的事件循环中安全删除
worker->deleteLater();
// 工作原理:
// 1. 向对象所属线程的事件队列发送DeferredDelete事件
// 2. 事件循环处理到该事件时删除对象
// 3. 确保所有待处理的槽函数执行完毕
完整的安全停止模式:
class ThreadController : public QObject {
Q_OBJECT
public:
ThreadController() {
worker = new Worker();
thread = new QThread();
worker->moveToThread(thread);
// 关键连接:线程结束时自动删除
connect(thread, &QThread::finished, worker, &QObject::deleteLater);
connect(thread, &QThread::finished, thread, &QObject::deleteLater);
connect(thread, &QThread::started, worker, &Worker::process);
}
void start() {
thread->start();
}
void stop() {
// 安全停止流程
thread->quit(); // 1. 退出事件循环
thread->wait(); // 2. 等待线程结束
// 3. 不需要手动delete,finished信号会触发deleteLater
}
private:
Worker *worker;
QThread *thread;
};
deleteLater() vs delete 对比:
| 场景 | delete | deleteLater() |
|---|---|---|
| 单线程,立即删除 | ✓ | ✓ |
| 多线程,跨线程删除 | ❌ 危险 | ✓ 安全 |
| 信号槽正在执行 | ❌ 崩溃 | ✓ 延迟删除 |
| 事件队列有待处理事件 | ❌ 可能崩溃 | ✓ 等待处理完 |
特殊情况:deleteLater()的限制
// 注意:deleteLater()需要事件循环
void someFunction() {
QThread thread;
Worker *worker = new Worker();
worker->moveToThread(&thread);
thread.start();
worker->deleteLater(); // 发送删除事件
thread.quit();
thread.wait();
// worker已被删除 ✓
}
// ❌ 问题场景:没有事件循环
void problematicFunction() {
QThread thread;
// 没有调用thread.exec(),没有事件循环
Worker *worker = new Worker();
worker->deleteLater(); // 事件永远不会被处理!
// 此时worker不会被自动删除,内存泄漏
}
最佳实践模式:
class SafeThreadManager {
public:
void startWorker() {
thread = new QThread();
worker = new Worker();
worker->moveToThread(thread);
// 关键:链式删除
QObject::connect(thread, &QThread::finished,
worker, &QObject::deleteLater);
QObject::connect(thread, &QThread::finished,
thread, &QObject::deleteLater);
thread->start();
}
void stopWorker() {
if (thread) {
thread->quit();
if (!thread->wait(3000)) {
// 超时强制终止
thread->terminate();
thread->wait();
}
// 不需要手动delete,deleteLater会处理
thread = nullptr;
worker = nullptr;
}
}
private:
QThread *thread = nullptr;
Worker *worker = nullptr;
};
✅ 6.2节总结
Qt多线程五大铁律
// 1. UI操作只能在主线程
✓ 使用信号槽更新UI
✗ 子线程直接操作QWidget
// 2. 优先使用moveToThread
✓ Worker对象 + moveToThread
✗ 继承QThread重写run()
// 3. 理解连接类型
✓ AutoConnection(默认)
✓ QueuedConnection(跨线程)
⚠️ BlockingQueuedConnection(慎用)
// 4. 尊重线程亲和性
✓ 在对象所属线程操作
✗ 跨线程创建子对象
✗ 跨线程启动定时器
// 5. 使用deleteLater()
✓ 跨线程安全删除
✓ 信号槽执行期间安全
✗ 直接delete可能崩溃
常见错误速查
| 错误信息 | 原因 | 解决方案 |
|---|---|---|
| “Cannot create children for a parent that is in a different thread” | 跨线程创建子对象 | 在对象所属线程创建,或不设置父对象 |
| “Timers cannot be started from another thread” | 跨线程启动定时器 | 在对象所属线程启动 |
| “Cannot queue arguments of type ‘XXX’” | 自定义类型未注册 | 使用qRegisterMetaType注册 |
6.3 常用设计模式
多线程程序有一些经典的设计模式,掌握这些模式能快速解决常见问题。
6.3.1 模式一:生产者-消费者模式 ⭐⭐⭐⭐⭐
适用场景:
- 日志系统(多线程写日志,单线程落盘)
- 数据采集(采集线程 → 缓冲区 → 处理线程)
- 任务队列(任务生成器 → 队列 → 工作线程)
核心思想:
- 生产者和消费者通过缓冲区解耦
- 缓冲区有容量限制
- 使用同步原语协调访问
实现方式回顾(详见5.1节):
// 方式1:QMutex + QWaitCondition(精细控制)
class Buffer {
QQueue<Task> tasks;
QMutex mutex;
QWaitCondition notEmpty;
QWaitCondition notFull;
void produce(Task task) {
QMutexLocker locker(&mutex);
while (tasks.size() >= maxSize) {
notFull.wait(&mutex);
}
tasks.enqueue(task);
notEmpty.wakeOne();
}
Task consume() {
QMutexLocker locker(&mutex);
while (tasks.isEmpty()) {
notEmpty.wait(&mutex);
}
Task task = tasks.dequeue();
notFull.wakeOne();
return task;
}
};
// 方式2:QSemaphore(简洁高效)
class Buffer {
QQueue<Task> tasks;
QMutex mutex;
QSemaphore freeSlots{maxSize}; // 空闲槽位
QSemaphore usedSlots{0}; // 已用槽位
void produce(Task task) {
freeSlots.acquire();
{
QMutexLocker locker(&mutex);
tasks.enqueue(task);
}
usedSlots.release();
}
Task consume() {
usedSlots.acquire();
Task task;
{
QMutexLocker locker(&mutex);
task = tasks.dequeue();
}
freeSlots.release();
return task;
}
};
变体:多生产者-多消费者
// 适用场景:高并发任务处理
class TaskQueue {
public:
void addTask(const Task &task) {
freeSlots.acquire();
{
QMutexLocker locker(&mutex);
tasks.enqueue(task);
}
usedSlots.release();
}
Task getTask() {
usedSlots.acquire();
Task task;
{
QMutexLocker locker(&mutex);
task = tasks.dequeue();
}
freeSlots.release();
return task;
}
private:
QQueue<Task> tasks;
QMutex mutex;
QSemaphore freeSlots{1000}; // 大容量队列
QSemaphore usedSlots{0};
};
// 使用:多个生产者线程
for (int i = 0; i < 5; ++i) {
QThread *producer = new QThread();
// 每个线程不断添加任务
}
// 使用:多个消费者线程
for (int i = 0; i < 10; ++i) {
QThread *consumer = new QThread();
// 每个线程不断获取并处理任务
}
6.3.2 模式二:主从线程模式(Master-Worker) ⭐⭐⭐⭐
适用场景:
- 任务分发系统
- 并行计算框架
- Web服务器处理请求
核心思想:
- Master线程负责分配任务
- Worker线程负责执行任务
- 结果汇总回Master
完整实现:
// 任务定义
struct Task {
int id;
QString data;
// 任务参数...
};
struct Result {
int taskId;
QString result;
bool success;
};
// Worker类
class Worker : public QObject {
Q_OBJECT
public slots:
void processTask(const Task &task) {
qDebug() << "Worker" << QThread::currentThreadId()
<< "processing task" << task.id;
// 模拟耗时处理
QThread::msleep(1000 + qrand() % 2000);
Result result;
result.taskId = task.id;
result.result = task.data.toUpper();
result.success = true;
emit taskCompleted(result);
}
signals:
void taskCompleted(const Result &result);
};
// Master类
class Master : public QObject {
Q_OBJECT
public:
Master(int workerCount = 4) {
// 创建Worker线程池
for (int i = 0; i < workerCount; ++i) {
QThread *thread = new QThread();
Worker *worker = new Worker();
worker->moveToThread(thread);
connect(this, &Master::taskAssigned,
worker, &Worker::processTask);
connect(worker, &Worker::taskCompleted,
this, &Master::onTaskCompleted);
connect(thread, &QThread::finished,
worker, &QObject::deleteLater);
thread->start();
workers.append(worker);
threads.append(thread);
}
}
~Master() {
for (QThread *thread : threads) {
thread->quit();
thread->wait();
}
}
void submitTask(const Task &task) {
pendingTasks.enqueue(task);
processNextTask();
}
signals:
void taskAssigned(const Task &task);
void allTasksCompleted();
private slots:
void onTaskCompleted(const Result &result) {
qDebug() << "Task" << result.taskId << "completed:" << result.result;
completedTasks++;
results.append(result);
processNextTask();
if (completedTasks == totalTasks && pendingTasks.isEmpty()) {
emit allTasksCompleted();
}
}
private:
void processNextTask() {
if (!pendingTasks.isEmpty()) {
Task task = pendingTasks.dequeue();
emit taskAssigned(task); // 由空闲的Worker接收
}
}
QVector<Worker*> workers;
QVector<QThread*> threads;
QQueue<Task> pendingTasks;
QVector<Result> results;
int completedTasks = 0;
int totalTasks = 0;
};
// 使用示例
Master master(4); // 4个Worker
connect(&master, &Master::allTasksCompleted, []() {
qDebug() << "All tasks done!";
});
// 提交100个任务
for (int i = 0; i < 100; ++i) {
Task task;
task.id = i;
task.data = QString("Task %1").arg(i);
master.submitTask(task);
}
优化:动态负载均衡
class SmartMaster : public QObject {
Q_OBJECT
private:
struct WorkerInfo {
Worker *worker;
QThread *thread;
int activeTasks = 0; // 当前正在处理的任务数
};
QVector<WorkerInfo> workers;
void assignTask(const Task &task) {
// 找到负载最小的Worker
auto it = std::min_element(workers.begin(), workers.end(),
[](const WorkerInfo &a, const WorkerInfo &b) {
return a.activeTasks < b.activeTasks;
});
it->activeTasks++;
emit taskAssigned(it->worker, task);
}
private slots:
void onTaskCompleted(const Result &result) {
// 找到完成任务的Worker,减少负载计数
for (auto &info : workers) {
if (/* 判断是哪个Worker */) {
info.activeTasks--;
break;
}
}
processNextTask();
}
};
6.3.3 模式三:流水线模式(Pipeline) ⭐⭐⭐⭐
适用场景:
- 数据处理流水线(读取 → 解析 → 验证 → 存储)
- 视频处理(解码 → 滤镜 → 编码)
- 编译系统(预处理 → 编译 → 链接)
核心思想:
- 任务分为多个阶段
- 每个阶段由独立线程处理
- 数据通过队列在阶段间传递
实现示例:三阶段流水线
// 数据流:原始数据 → 阶段1 → 中间数据 → 阶段2 → 最终结果
// 阶段1:数据读取
class Stage1Reader : public QObject {
Q_OBJECT
public slots:
void start() {
for (int i = 0; i < 100; ++i) {
QString rawData = readData(i);
emit dataReady(rawData);
QThread::msleep(50);
}
emit finished();
}
signals:
void dataReady(const QString &data);
void finished();
};
// 阶段2:数据处理
class Stage2Processor : public QObject {
Q_OBJECT
public slots:
void process(const QString &rawData) {
// 处理数据
QString processedData = rawData.toUpper();
emit dataReady(processedData);
}
signals:
void dataReady(const QString &data);
};
// 阶段3:数据写入
class Stage3Writer : public QObject {
Q_OBJECT
public slots:
void write(const QString &data) {
// 写入数据
qDebug() << "Writing:" << data;
writeToFile(data);
}
};
// 流水线管理器
class Pipeline : public QObject {
Q_OBJECT
public:
Pipeline() {
// 创建三个线程
thread1 = new QThread();
thread2 = new QThread();
thread3 = new QThread();
// 创建阶段对象
reader = new Stage1Reader();
processor = new Stage2Processor();
writer = new Stage3Writer();
// 移动到各自线程
reader->moveToThread(thread1);
processor->moveToThread(thread2);
writer->moveToThread(thread3);
// 连接流水线
connect(thread1, &QThread::started, reader, &Stage1Reader::start);
// 阶段1 → 阶段2
connect(reader, &Stage1Reader::dataReady,
processor, &Stage2Processor::process);
// 阶段2 → 阶段3
connect(processor, &Stage2Processor::dataReady,
writer, &Stage3Writer::write);
// 阶段1完成,停止流水线
connect(reader, &Stage1Reader::finished, this, &Pipeline::stop);
// 启动所有线程
thread1->start();
thread2->start();
thread3->start();
}
void stop() {
thread1->quit();
thread2->quit();
thread3->quit();
thread1->wait();
thread2->wait();
thread3->wait();
}
private:
QThread *thread1, *thread2, *thread3;
Stage1Reader *reader;
Stage2Processor *processor;
Stage3Writer *writer;
};
带缓冲的流水线(提高吞吐量):
// 在阶段之间添加缓冲队列
class BufferedStage : public QObject {
Q_OBJECT
public:
BufferedStage(int bufferSize = 10)
: freeSlots(bufferSize), usedSlots(0) {}
public slots:
void enqueue(const QString &data) {
freeSlots.acquire(); // 等待空闲槽位
{
QMutexLocker locker(&mutex);
buffer.enqueue(data);
}
usedSlots.release(); // 通知有数据
}
void processLoop() {
while (!stopped) {
usedSlots.acquire(); // 等待数据
QString data;
{
QMutexLocker locker(&mutex);
if (buffer.isEmpty()) continue;
data = buffer.dequeue();
}
freeSlots.release(); // 释放槽位
// 处理数据
QString result = process(data);
emit dataReady(result);
}
}
signals:
void dataReady(const QString &data);
private:
QQueue<QString> buffer;
QMutex mutex;
QSemaphore freeSlots;
QSemaphore usedSlots;
bool stopped = false;
};
6.3.4 模式四:事件驱动模式 ⭐⭐⭐⭐
适用场景:
- 网络服务器(事件循环处理连接)
- GUI应用(Qt的核心模式)
- 异步I/O处理
核心思想:
- 线程运行事件循环(QThread::exec())
- 通过信号槽或事件传递消息
- 非阻塞式处理
实现示例:事件驱动的服务
// 服务类(运行在独立线程)
class Service : public QObject {
Q_OBJECT
public:
Service() {
// 创建定时器(在对象所属线程运行)
timer = new QTimer(this);
connect(timer, &QTimer::timeout, this, &Service::onTimer);
}
public slots:
void start() {
qDebug() << "Service started in thread:" << QThread::currentThreadId();
timer->start(1000); // 每秒触发
isRunning = true;
}
void stop() {
timer->stop();
isRunning = false;
emit stopped();
}
// 外部可以通过信号槽调用
void handleRequest(const QString &request) {
qDebug() << "Handling request:" << request;
// 异步处理
QString response = processRequest(request);
// 发送响应
emit responseReady(response);
}
signals:
void responseReady(const QString &response);
void stopped();
private slots:
void onTimer() {
// 定期任务
qDebug() << "Timer tick";
}
private:
QString processRequest(const QString &request) {
// 处理业务逻辑
return "Response: " + request.toUpper();
}
QTimer *timer;
bool isRunning = false;
};
// 使用
QThread *serviceThread = new QThread();
Service *service = new Service();
service->moveToThread(serviceThread);
// 连接信号
connect(serviceThread, &QThread::started, service, &Service::start);
connect(service, &Service::stopped, serviceThread, &QThread::quit);
// 外部调用(跨线程,自动排队)
QObject::connect(mainWindow, &MainWindow::userRequest,
service, &Service::handleRequest);
serviceThread->start(); // 启动事件循环
自定义事件:
// 定义自定义事件
class CustomEvent : public QEvent {
public:
static const QEvent::Type EventType = QEvent::Type(QEvent::User + 1);
CustomEvent(const QString &msg)
: QEvent(EventType), message(msg) {}
QString message;
};
// 事件接收者
class EventHandler : public QObject {
Q_OBJECT
protected:
bool event(QEvent *e) override {
if (e->type() == CustomEvent::EventType) {
CustomEvent *ce = static_cast<CustomEvent*>(e);
qDebug() << "Received custom event:" << ce->message;
return true;
}
return QObject::event(e);
}
};
// 发送事件(跨线程安全)
EventHandler *handler = getHandler();
QCoreApplication::postEvent(handler, new CustomEvent("Hello from another thread"));
✅ 6.3节总结
四大模式对比
| 模式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 生产者-消费者 | 解耦生产/消费 | 缓冲平滑速度差异 | 需要精细同步控制 |
| 主从线程 | 任务并行处理 | 充分利用多核 | 负载均衡需要设计 |
| 流水线 | 多阶段处理 | 高吞吐量 | 设计复杂 |
| 事件驱动 | 异步响应 | 非阻塞,灵活 | 调试困难 |
选择指南
需要解耦生产/消费?
└─> 生产者-消费者模式
需要并行处理大量相似任务?
└─> 主从线程模式(Master-Worker)
需要多阶段处理数据?
└─> 流水线模式(Pipeline)
需要异步响应事件?
└─> 事件驱动模式
🎓 第六章完整总结
恭喜!您已完成《Qt多线程编程详解》第六章的全部内容。
本章知识体系
第六章:最佳实践与设计模式
├── 6.1 多线程设计原则
│ ├── 最小化共享数据(消息传递 > 共享状态)
│ ├── 不可变对象优先(天然线程安全)
│ ├── RAII资源管理(QMutexLocker自动释放)
│ └── 延迟启动线程(两阶段初始化)
│
├── 6.2 Qt多线程最佳实践
│ ├── UI操作仅主线程(黄金法则)
│ ├── 优先moveToThread(Worker模式)
│ ├── 理解连接类型(Auto/Queue/Blocking)
│ ├── 尊重线程亲和性(QObject不线程安全)
│ └── 使用deleteLater(安全删除)
│
└── 6.3 常用设计模式
├── 生产者-消费者(缓冲解耦)
├── 主从线程(任务分发)
├── 流水线(阶段处理)
└── 事件驱动(异步响应)
掌握程度自测
| 知识点 | 掌握标准 | 您的状态 |
|---|---|---|
| 设计原则 | 能说出四大原则并举例 | [ ] 已掌握 |
| Qt铁律 | 能说出五大铁律并避坑 | [ ] 已掌握 |
| 设计模式 | 能识别场景并选择模式 | [ ] 已掌握 |
| 错误排查 | 能快速定位常见错误 | [ ] 已掌握 |
实战能力检验清单
场景1:日志系统设计
- 选择生产者-消费者模式
- 使用QSemaphore实现缓冲区
- 保证日志顺序和性能
场景2:图片批量处理
- 选择主从线程或流水线模式
- 使用QtConcurrent简化实现
- 实现进度反馈
场景3:网络服务
- 每个连接独立线程或线程池
- 使用事件驱动模式
- 安全停止所有线程
场景4:UI响应式后台任务
- Worker + moveToThread模式
- 使用QueuedConnection更新UI
- deleteLater清理资源
常见问题速查卡
// Q1: 崩溃:Cannot create children...
// A: 跨线程创建子对象,解决:不设置父对象或在所属线程创建
// Q2: 崩溃:Timers cannot be started...
// A: 跨线程启动定时器,解决:在对象所属线程启动
// Q3: 警告:Cannot queue arguments...
// A: 自定义类型未注册,解决:qRegisterMetaType<T>()
// Q4: 死锁:程序卡住不动
// A: 可能原因:
// - 忘记unlock
// - BlockingQueuedConnection双向等待
// - 锁顺序不一致
// 解决:使用RAII,检查锁顺序
// Q5: 内存泄漏:线程停止后对象未释放
// A: 使用deleteLater而非delete
进阶学习方向
您已经掌握了Qt多线程的核心知识和最佳实践!
下一步:
-
📘 第七章:进阶主题
- 原子操作(QAtomicInteger)
- Qt与C++11线程混用
- 自定义线程框架
-
📚 深入研究
- Qt源码阅读(QThread实现)
- 性能调优工具(Valgrind、Helgrind)
- 实际项目经验积累
金句总结:
多线程编程的本质是管理共享状态。
最好的共享是不共享。
最安全的锁是自动锁。
最好的UI更新是在主线程。
恭喜完成第六章!🎉
您现在已经具备了:
- ✅ 扎实的设计原则
- ✅ Qt最佳实践经验
- ✅ 常用设计模式能力
- ✅ 问题排查技能
继续加油!👍
📋 第七章:进阶主题
7.1 原子操作
原子操作是多线程编程中最底层的同步机制,可以在无锁的情况下保证线程安全。
7.1.1 什么是原子操作
定义:原子操作是不可分割的操作,要么全部完成,要么完全不执行,不会被其他线程中断。
为什么需要原子操作?
// ❌ 非原子操作示例
int counter = 0;
// 线程1和线程2同时执行
counter++; // 这不是原子的!
// 实际汇编指令:
// 1. 从内存读取counter到寄存器
// 2. 寄存器值+1
// 3. 将寄存器值写回内存
// 问题:如果两个线程同时读取counter=0,
// 都+1后写回,结果是1而不是2!
原子操作的优势:
| 特性 | 原子操作 | 互斥锁 |
|---|---|---|
| 性能 | 极快(CPU指令级别) | 较慢(系统调用) |
| 适用场景 | 简单计数、标志位 | 复杂临界区 |
| 死锁风险 | 无 | 有 |
| 使用复杂度 | 简单 | 需要注意锁顺序 |
7.1.2 QAtomicInteger - 原子整数
基本用法:
#include <QAtomicInteger>
#include <QThread>
#include <QDebug>
// 创建原子整数
QAtomicInteger<int> atomicCounter(0);
// 线程1
class Worker1 : public QThread {
void run() override {
for (int i = 0; i < 10000; ++i) {
atomicCounter.fetchAndAddRelaxed(1); // 原子递增
}
}
};
// 线程2
class Worker2 : public QThread {
void run() override {
for (int i = 0; i < 10000; ++i) {
atomicCounter.fetchAndAddRelaxed(1); // 原子递增
}
}
};
// 使用
Worker1 t1;
Worker2 t2;
t1.start();
t2.start();
t1.wait();
t2.wait();
qDebug() << atomicCounter.loadRelaxed(); // 输出:20000(保证正确)
常用操作:
QAtomicInteger<int> atomic(0);
// 1. 加载值
int value = atomic.loadRelaxed(); // Relaxed内存序
int value = atomic.loadAcquire(); // Acquire内存序
// 2. 存储值
atomic.storeRelaxed(42); // Relaxed内存序
atomic.storeRelease(42); // Release内存序
// 3. 原子加法
int oldValue = atomic.fetchAndAddRelaxed(5); // old + 5,返回old
atomic.fetchAndAddAcquire(5); // Acquire内存序
atomic.fetchAndAddRelease(5); // Release内存序
// 4. 原子减法
int oldValue = atomic.fetchAndSubRelaxed(3); // old - 3,返回old
// 5. 原子递增/递减
int newValue = atomic.operator++(); // ++atomic(前置)
int oldValue = atomic.operator++(int); // atomic++(后置)
int newValue = atomic.operator--(); // --atomic(前置)
// 6. 比较并交换(CAS - Compare And Swap)
int expected = 10;
bool success = atomic.testAndSetRelaxed(expected, 20);
// 如果atomic == expected,则设置为20,返回true
// 否则不修改,返回false
// 7. 原子交换
int oldValue = atomic.fetchAndStoreRelaxed(100); // 设置新值,返回旧值
实战示例:无锁计数器
class ThreadSafeCounter {
public:
void increment() {
counter.fetchAndAddRelaxed(1);
}
void decrement() {
counter.fetchAndSubRelaxed(1);
}
int value() const {
return counter.loadRelaxed();
}
// 重置为0(仅当当前值为expected时)
bool resetIfEquals(int expected) {
return counter.testAndSetRelaxed(expected, 0);
}
private:
QAtomicInteger<int> counter{0};
};
// 使用:多线程安全
ThreadSafeCounter globalCounter;
// 线程1
globalCounter.increment();
// 线程2
globalCounter.increment();
// 主线程
qDebug() << globalCounter.value(); // 线程安全读取
7.1.3 QAtomicPointer - 原子指针
基本用法:
#include <QAtomicPointer>
// 创建原子指针
QAtomicPointer<MyClass> atomicPtr(nullptr);
// 读取指针
MyClass *ptr = atomicPtr.loadRelaxed();
// 设置指针
MyClass *obj = new MyClass();
atomicPtr.storeRelaxed(obj);
// 原子交换
MyClass *oldPtr = atomicPtr.fetchAndStoreRelaxed(newPtr);
// 比较并交换
MyClass *expected = oldPtr;
bool success = atomicPtr.testAndSetRelaxed(expected, newPtr);
if (success) {
delete expected; // 只有成功时才删除旧指针
}
实战示例:无锁单例模式
class Singleton {
public:
static Singleton* instance() {
// 双重检查锁定(DCL)+ 原子指针
Singleton *tmp = s_instance.loadAcquire();
if (!tmp) {
// 第一次检查:未初始化
QMutexLocker locker(&s_mutex);
tmp = s_instance.loadRelaxed();
if (!tmp) {
// 第二次检查:仍未初始化
tmp = new Singleton();
s_instance.storeRelease(tmp);
}
}
return tmp;
}
private:
Singleton() {}
~Singleton() {}
static QAtomicPointer<Singleton> s_instance;
static QMutex s_mutex;
};
// 初始化
QAtomicPointer<Singleton> Singleton::s_instance(nullptr);
QMutex Singleton::s_mutex;
实战示例:无锁队列节点
// 简化的无锁栈
template<typename T>
class LockFreeStack {
private:
struct Node {
T data;
Node *next;
Node(const T &val) : data(val), next(nullptr) {}
};
QAtomicPointer<Node> head{nullptr};
public:
void push(const T &value) {
Node *newNode = new Node(value);
do {
newNode->next = head.loadRelaxed();
// CAS循环:直到成功将newNode设置为head
} while (!head.testAndSetRelease(newNode->next, newNode));
}
bool pop(T &result) {
Node *oldHead;
do {
oldHead = head.loadAcquire();
if (!oldHead) {
return false; // 栈空
}
// CAS循环:尝试将head设置为oldHead->next
} while (!head.testAndSetRelease(oldHead, oldHead->next));
result = oldHead->data;
delete oldHead;
return true;
}
};
7.1.4 内存序(Memory Order)
什么是内存序?
现代CPU为了性能,会对指令重排序,导致操作的实际执行顺序与代码顺序不同。内存序控制原子操作的可见性和顺序。
Qt提供的内存序:
| 内存序 | 说明 | 使用场景 |
|---|---|---|
| Relaxed | 最宽松,不保证顺序 | 简单计数器 |
| Acquire | 读操作,防止后续操作重排到前面 | 读取共享数据前 |
| Release | 写操作,防止前面操作重排到后面 | 写入共享数据后 |
| Ordered | 最严格,顺序一致性 | 需要严格顺序时 |
Relaxed示例:
QAtomicInteger<int> counter(0);
// Relaxed:只保证操作本身的原子性,不保证顺序
counter.fetchAndAddRelaxed(1);
// 适用场景:简单计数,不关心与其他变量的顺序关系
Acquire-Release示例:
// 生产者-消费者场景
QAtomicPointer<Data> dataPtr(nullptr);
QAtomicInteger<bool> ready(false);
// 生产者线程
void producer() {
Data *data = new Data();
data->value = 42;
// 1. 先设置数据(Release:防止data初始化重排到storeRelease之后)
dataPtr.storeRelease(data);
// 2. 再设置标志(Release:确保前面的写入对其他线程可见)
ready.storeRelease(true);
}
// 消费者线程
void consumer() {
// 1. 先检查标志(Acquire:确保后续读取看到最新值)
while (!ready.loadAcquire()) {
QThread::msleep(1);
}
// 2. 读取数据(Acquire:确保看到producer的所有写入)
Data *data = dataPtr.loadAcquire();
qDebug() << data->value; // 保证输出42
}
内存序可视化:
// 场景:两个变量的写入和读取
int normalVar = 0;
QAtomicInteger<bool> flag(false);
// 线程1:写入
void writer() {
normalVar = 42; // 1
flag.storeRelease(true); // 2(Release)
// Release保证:1不会重排到2之后
}
// 线程2:读取
void reader() {
while (!flag.loadAcquire()) {} // 3(Acquire)
qDebug() << normalVar; // 4
// Acquire保证:4不会重排到3之前
// Acquire-Release配合保证:看到flag=true时,一定能看到normalVar=42
}
性能对比:
// 性能测试(1000万次递增)
QAtomicInteger<int> atomic(0);
// Relaxed:最快
for (int i = 0; i < 10000000; ++i) {
atomic.fetchAndAddRelaxed(1);
}
// 耗时:约50ms
// Acquire:较快
for (int i = 0; i < 10000000; ++i) {
atomic.fetchAndAddAcquire(1);
}
// 耗时:约80ms
// Ordered:较慢
for (int i = 0; i < 10000000; ++i) {
atomic.fetchAndAddOrdered(1);
}
// 耗时:约120ms
选择内存序的原则:
// 1. 简单计数器 → Relaxed
QAtomicInteger<int> counter(0);
counter.fetchAndAddRelaxed(1);
// 2. 标志位(控制其他数据的可见性) → Acquire/Release
QAtomicInteger<bool> ready(false);
ready.storeRelease(true); // 写入端
ready.loadAcquire(); // 读取端
// 3. 需要严格顺序(少见) → Ordered
QAtomicInteger<int> seqNum(0);
int num = seqNum.fetchAndAddOrdered(1);
// 4. 不确定 → 使用默认(通常是Ordered)或Acquire/Release
atomic.load(); // 默认Ordered
✅ 7.1节总结
原子操作速查
// QAtomicInteger常用操作
QAtomicInteger<int> atomic(0);
atomic.loadRelaxed() // 读取
atomic.storeRelaxed(42) // 写入
atomic.fetchAndAddRelaxed(5) // 加法
atomic.fetchAndSubRelaxed(3) // 减法
atomic.testAndSetRelaxed(old, new) // CAS
++atomic / atomic++ // 递增
--atomic / atomic-- // 递减
// QAtomicPointer常用操作
QAtomicPointer<T> ptr(nullptr);
ptr.loadAcquire() // 读取指针
ptr.storeRelease(p) // 写入指针
ptr.testAndSetRelease(old, new) // CAS
内存序选择指南
简单计数器?
└─> Relaxed
控制其他数据可见性?
├─> 写入端:Release
└─> 读取端:Acquire
需要严格顺序?
└─> Ordered
不确定?
└─> Acquire/Release(安全选择)
原子操作 vs 互斥锁
// 场景1:简单计数 → 原子操作(更快)
QAtomicInteger<int> counter(0);
counter.fetchAndAddRelaxed(1);
// 场景2:复杂操作 → 互斥锁(更灵活)
QMutexLocker locker(&mutex);
if (counter > 10) {
counter = 0;
doSomething();
}
注意事项
⚠️ 陷阱1:ABA问题
// 问题:值从A变成B再变回A,CAS无法察觉
QAtomicInteger<int> atomic(10);
// 线程1读取:10
int expected = atomic.loadRelaxed(); // 10
// 线程2修改:10 → 20 → 10
atomic.storeRelaxed(20);
atomic.storeRelaxed(10);
// 线程1 CAS成功,但实际值已经变化过
bool ok = atomic.testAndSetRelaxed(expected, 30); // true,但可能不是预期
// 解决方案:使用版本号
struct VersionedValue {
int value;
int version;
};
⚠️ 陷阱2:过度使用
// ❌ 错误:原子操作不能保证组合操作的原子性
QAtomicInteger<int> balance(100);
// 这不是原子的!
if (balance.loadRelaxed() >= 50) {
balance.fetchAndSubRelaxed(50); // 可能余额不足
}
// ✅ 正确:使用CAS循环
int oldBalance, newBalance;
do {
oldBalance = balance.loadRelaxed();
if (oldBalance < 50) {
return false; // 余额不足
}
newBalance = oldBalance - 50;
} while (!balance.testAndSetRelaxed(oldBalance, newBalance));
7.2 Qt与C++11线程
Qt和C++11都提供了线程支持,理解它们的差异有助于做出正确选择。
7.2.1 std::thread vs QThread
基本对比:
| 特性 | std::thread | QThread |
|---|---|---|
| 标准性 | C++11标准 | Qt专有 |
| 信号槽 | ❌ 不支持 | ✅ 原生支持 |
| 事件循环 | ❌ 无 | ✅ 有(exec()) |
| 平台兼容 | C++11编译器即可 | 需要Qt框架 |
| 使用复杂度 | 简单直接 | 更灵活强大 |
std::thread示例:
#include <thread>
#include <iostream>
void worker(int id) {
std::cout << "Worker " << id << " running\n";
}
int main() {
// 创建线程
std::thread t1(worker, 1);
std::thread t2(worker, 2);
// 等待完成
t1.join();
t2.join();
return 0;
}
QThread示例:
#include <QThread>
#include <QDebug>
class Worker : public QObject {
Q_OBJECT
public slots:
void doWork(int id) {
qDebug() << "Worker" << id << "running";
emit finished();
}
signals:
void finished();
};
int main(int argc, char *argv[]) {
QCoreApplication app(argc, argv);
QThread thread;
Worker worker;
worker.moveToThread(&thread);
QObject::connect(&thread, &QThread::started, [&worker]() {
worker.doWork(1);
});
QObject::connect(&worker, &Worker::finished, &thread, &QThread::quit);
thread.start();
thread.wait();
return 0;
}
选择建议:
// 使用std::thread的场景:
// 1. 纯C++项目,不需要Qt
// 2. 简单的后台任务
// 3. 需要跨平台的C++标准解决方案
std::thread thread([]() {
// 简单任务
heavyComputation();
});
thread.detach(); // 分离执行
// 使用QThread的场景:
// 1. Qt项目
// 2. 需要信号槽通信
// 3. 需要事件循环(定时器、网络等)
// 4. 需要与GUI交互
QThread *thread = new QThread();
Worker *worker = new Worker();
worker->moveToThread(thread);
connect(worker, &Worker::resultReady, this, &MainWindow::handleResult);
thread->start();
7.2.2 std::mutex vs QMutex
基本对比:
| 特性 | std::mutex | QMutex |
|---|---|---|
| 递归锁 | std::recursive_mutex | QMutex(Recursive) |
| try_lock | ✅ | ✅ (tryLock) |
| 超时锁 | std::timed_mutex | ✅ (tryLock(ms)) |
| RAII | std::lock_guard | QMutexLocker |
std::mutex示例:
#include <mutex>
#include <thread>
std::mutex mtx;
int shared_data = 0;
void increment() {
std::lock_guard<std::mutex> lock(mtx); // RAII
shared_data++;
}
// 尝试锁定
std::mutex mtx;
if (mtx.try_lock()) {
// 获取到锁
doSomething();
mtx.unlock();
} else {
// 未获取到锁
}
// 超时锁定
std::timed_mutex tmtx;
if (tmtx.try_lock_for(std::chrono::milliseconds(100))) {
doSomething();
tmtx.unlock();
}
QMutex示例:
#include <QMutex>
#include <QMutexLocker>
QMutex mutex;
int shared_data = 0;
void increment() {
QMutexLocker locker(&mutex); // RAII
shared_data++;
}
// 尝试锁定
QMutex mutex;
if (mutex.tryLock()) {
// 获取到锁
doSomething();
mutex.unlock();
}
// 超时锁定
if (mutex.tryLock(100)) { // 100ms
doSomething();
mutex.unlock();
}
// 递归锁
QMutex recursiveMutex(QMutex::Recursive);
7.2.3 混合使用的注意事项
兼容性表:
| Qt类 | C++11等价 | 是否可混用 |
|---|---|---|
| QThread | std::thread | ⚠️ 避免 |
| QMutex | std::mutex | ✅ 可以 |
| QSemaphore | std::counting_semaphore (C++20) | ✅ 可以 |
| QWaitCondition | std::condition_variable | ✅ 可以 |
✅ 正确混用:同步原语
#include <QMutex>
#include <thread>
QMutex mutex; // Qt互斥锁
int shared = 0;
void worker() {
QMutexLocker locker(&mutex); // Qt locker
shared++;
}
int main() {
// C++11线程 + Qt互斥锁
std::thread t1(worker);
std::thread t2(worker);
t1.join();
t2.join();
// 结果正确
qDebug() << shared; // 2
}
⚠️ 谨慎混用:线程对象
// ❌ 不推荐:std::thread中使用QObject
void problematic() {
std::thread t([]() {
QTimer *timer = new QTimer(); // ❌ 问题!
// std::thread没有Qt事件循环,定时器不会工作
timer->start(1000);
});
t.detach();
}
// ✅ 正确:如果需要Qt特性,使用QThread
void correct() {
QThread *thread = new QThread();
QObject::connect(thread, &QThread::started, []() {
QTimer *timer = new QTimer(); // ✓ OK
timer->start(1000);
// QThread有事件循环
});
thread->start();
}
最佳实践:
// 策略1:纯计算任务 → std::thread
std::thread t([]() {
// 不需要Qt特性的纯计算
heavyComputation();
});
t.detach();
// 策略2:需要Qt特性 → QThread
QThread *thread = new QThread();
Worker *worker = new Worker();
worker->moveToThread(thread);
// 可以使用信号槽、定时器等
// 策略3:混合(谨慎)
// 使用std::thread + Qt同步原语
QMutex mutex;
std::thread t([&mutex]() {
QMutexLocker locker(&mutex);
// 使用Qt的锁保护
});
7.3 自定义线程框架
如果Qt提供的功能不能满足需求,可以设计自定义线程框架。
7.3.1 设计思路
核心组件:
自定义线程框架
├── 任务队列(TaskQueue)
│ ├── 线程安全队列
│ └── 优先级支持
│
├── 线程池(ThreadPool)
│ ├── 工作线程管理
│ ├── 动态扩缩容
│ └── 负载均衡
│
├── 任务调度器(Scheduler)
│ ├── 任务分发策略
│ └── 依赖关系管理
│
└── 异常处理(ExceptionHandler)
├── 任务异常捕获
└── 错误回调机制
7.3.2 任务队列实现
// Task.h - 任务基类
class Task {
public:
virtual ~Task() {}
virtual void execute() = 0; // 纯虚函数,子类实现
virtual QString name() const { return "Task"; }
int priority = 0; // 优先级(数字越大优先级越高)
int taskId = 0;
};
// 具体任务示例
class ComputeTask : public Task {
public:
ComputeTask(int value) : m_value(value) {}
void execute() override {
qDebug() << "Computing" << m_value;
QThread::msleep(1000);
m_result = m_value * 2;
}
QString name() const override { return "ComputeTask"; }
int result() const { return m_result; }
private:
int m_value;
int m_result = 0;
};
// TaskQueue.h - 线程安全任务队列
class TaskQueue {
public:
void enqueue(Task *task) {
QMutexLocker locker(&mutex);
queue.append(task);
// 按优先级排序(可选)
std::sort(queue.begin(), queue.end(),
[](Task *a, Task *b) { return a->priority > b->priority; });
notEmpty.wakeOne();
}
Task* dequeue(int timeoutMs = -1) {
QMutexLocker locker(&mutex);
while (queue.isEmpty() && !stopped) {
if (timeoutMs < 0) {
notEmpty.wait(&mutex);
} else {
if (!notEmpty.wait(&mutex, timeoutMs)) {
return nullptr; // 超时
}
}
}
if (stopped && queue.isEmpty()) {
return nullptr;
}
return queue.takeFirst();
}
void stop() {
QMutexLocker locker(&mutex);
stopped = true;
notEmpty.wakeAll();
}
int size() const {
QMutexLocker locker(&m mutex);
return queue.size();
}
private:
QList<Task*> queue;
mutable QMutex mutex;
QWaitCondition notEmpty;
bool stopped = false;
};
7.3.3 线程池封装
// ThreadPool.h
class ThreadPool : public QObject {
Q_OBJECT
public:
explicit ThreadPool(int threadCount = 4, QObject *parent = nullptr)
: QObject(parent), maxThreads(threadCount)
{
for (int i = 0; i < threadCount; ++i) {
createWorker();
}
}
~ThreadPool() {
stop();
}
// 提交任务
void submitTask(Task *task) {
task->taskId = nextTaskId++;
taskQueue.enqueue(task);
activeTasks++;
emit taskQueued(task->taskId);
}
// 等待所有任务完成
void waitForAll() {
QMutexLocker locker(&statMutex);
while (activeTasks > 0) {
allDone.wait(&statMutex, 100);
}
}
// 停止线程池
void stop() {
taskQueue.stop();
for (auto *thread : workers) {
thread->quit();
thread->wait(3000);
}
qDeleteAll(workers);
workers.clear();
}
signals:
void taskQueued(int taskId);
void taskStarted(int taskId);
void taskCompleted(int taskId);
void taskFailed(int taskId, const QString &error);
private:
void createWorker() {
QThread *thread = new QThread();
// 线程启动时执行工作循环
QObject::connect(thread, &QThread::started, [this]() {
workerLoop();
});
thread->start();
workers.append(thread);
}
void workerLoop() {
while (true) {
Task *task = taskQueue.dequeue(1000);
if (!task) {
continue; // 超时,继续等待
}
emit taskStarted(task->taskId);
try {
task->execute();
emit taskCompleted(task->taskId);
} catch (const std::exception &e) {
emit taskFailed(task->taskId, e.what());
} catch (...) {
emit taskFailed(task->taskId, "Unknown error");
}
delete task;
{
QMutexLocker locker(&statMutex);
activeTasks--;
if (activeTasks == 0) {
allDone.wakeAll();
}
}
}
}
TaskQueue taskQueue;
QVector<QThread*> workers;
int maxThreads;
QAtomicInteger<int> nextTaskId{1};
QAtomicInteger<int> activeTasks{0};
QMutex statMutex;
QWaitCondition allDone;
};
使用示例:
// 创建线程池
ThreadPool pool(4); // 4个工作线程
// 连接信号
QObject::connect(&pool, &ThreadPool::taskCompleted,
[](int taskId) {
qDebug() << "Task" << taskId << "completed";
});
// 提交任务
for (int i = 0; i < 10; ++i) {
ComputeTask *task = new ComputeTask(i);
task->priority = i; // 优先级
pool.submitTask(task);
}
// 等待所有任务完成
pool.waitForAll();
qDebug() << "All tasks done!";
7.3.4 异常处理机制
// 安全任务包装器
class SafeTask : public Task {
public:
SafeTask(Task *innerTask, std::function<void(const QString&)> errorCallback = nullptr)
: inner(innerTask), onError(errorCallback) {}
void execute() override {
try {
inner->execute();
} catch (const std::exception &e) {
QString error = QString("Exception: %1").arg(e.what());
qWarning() << "Task" << inner->name() << "failed:" << error;
if (onError) {
onError(error);
}
} catch (...) {
QString error = "Unknown exception";
qWarning() << "Task" << inner->name() << "failed:" << error;
if (onError) {
onError(error);
}
}
}
QString name() const override { return inner->name(); }
private:
Task *inner;
std::function<void(const QString&)> onError;
};
// 使用
Task *riskyTask = new RiskyTask();
SafeTask *safeTask = new SafeTask(riskyTask, [](const QString &error) {
qDebug() << "Error handler:" << error;
// 记录日志、发送通知等
});
pool.submitTask(safeTask);
✅ 第七章完整总结
本章知识体系
第七章:进阶主题
├── 7.1 原子操作
│ ├── QAtomicInteger(无锁计数)
│ ├── QAtomicPointer(无锁指针)
│ └── 内存序(Relaxed/Acquire/Release/Ordered)
│
├── 7.2 Qt与C++11线程
│ ├── std::thread vs QThread
│ ├── std::mutex vs QMutex
│ └── 混合使用策略
│
└── 7.3 自定义线程框架
├── 任务队列设计
├── 线程池实现
└── 异常处理机制
技术选型决策树
需要多线程?
├─ 简单计数/标志位?
│ └─> QAtomicInteger/QAtomicPointer(7.1)
│
├─ 纯C++项目?
│ └─> std::thread + std::mutex(7.2)
│
├─ Qt项目 + 需要信号槽?
│ └─> QThread + moveToThread(第2-3章)
│
├─ 批量并行任务?
│ └─> QtConcurrent(第3章)
│
└─ 复杂任务调度?
└─> 自定义线程池(7.3)
性能对比总结
| 技术 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|
| 原子操作 | ⭐⭐⭐⭐⭐ | ⭐⭐ | 简单共享变量 |
| std::thread | ⭐⭐⭐⭐ | ⭐⭐ | 纯C++后台任务 |
| QThread | ⭐⭐⭐⭐ | ⭐⭐⭐ | Qt项目通用 |
| QtConcurrent | ⭐⭐⭐⭐ | ⭐ | 批量并行处理 |
| 自定义线程池 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 复杂调度需求 |
实战能力检验
场景1:高性能计数器
- 选择QAtomicInteger
- 使用Relaxed内存序
- 避免ABA问题
场景2:跨平台C++库
- 选择std::thread
- 使用std::mutex保护
- 不依赖Qt
场景3:复杂任务系统
- 设计任务队列
- 实现线程池
- 处理异常和错误
📋 附录
A. Qt多线程API速查表
A.1 QThread常用接口
创建与启动:
| 方法 | 说明 | 示例 |
|---|---|---|
start() |
启动线程 | thread->start(); |
quit() |
退出事件循环 | thread->quit(); |
wait(ms) |
等待线程结束 | thread->wait(3000); |
terminate() |
强制终止(危险) | thread->terminate(); |
isRunning() |
是否正在运行 | if (thread->isRunning()) |
isFinished() |
是否已结束 | if (thread->isFinished()) |
线程属性:
| 方法 | 说明 | 示例 |
|---|---|---|
currentThread() |
当前线程(静态) | QThread::currentThread() |
currentThreadId() |
当前线程ID(静态) | QThread::currentThreadId() |
idealThreadCount() |
理想线程数(静态) | QThread::idealThreadCount() |
setPriority() |
设置优先级 | thread->setPriority(QThread::HighPriority) |
priority() |
获取优先级 | thread->priority() |
线程睡眠(静态方法):
| 方法 | 说明 |
|---|---|
sleep(s) |
睡眠s秒 |
msleep(ms) |
睡眠ms毫秒 |
usleep(us) |
睡眠us微秒 |
信号:
| 信号 | 触发时机 |
|---|---|
started() |
线程启动后 |
finished() |
线程结束前 |
A.2 同步原语对比表
| 类名 | 用途 | 性能 | 使用难度 | 场景 |
|---|---|---|---|---|
| QMutex | 互斥锁 | ⭐⭐⭐⭐ | ⭐⭐ | 保护临界区 |
| QMutexLocker | RAII互斥锁 | ⭐⭐⭐⭐ | ⭐ | 自动加解锁 |
| QReadWriteLock | 读写锁 | ⭐⭐⭐ | ⭐⭐⭐ | 读多写少 |
| QSemaphore | 信号量 | ⭐⭐⭐⭐ | ⭐⭐ | 资源计数 |
| QWaitCondition | 条件变量 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 复杂条件等待 |
| QAtomicInteger | 原子整数 | ⭐⭐⭐⭐⭐ | ⭐⭐ | 简单计数 |
| QAtomicPointer | 原子指针 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 指针交换 |
详细对比:
// QMutex - 最常用的互斥锁
QMutex mutex;
QMutexLocker locker(&mutex); // 推荐使用Locker
// QReadWriteLock - 读多写少场景
QReadWriteLock rwLock;
QReadLocker rLocker(&rwLock); // 读锁
QWriteLocker wLocker(&rwLock); // 写锁
// QSemaphore - 资源计数
QSemaphore sem(5); // 5个资源
sem.acquire(); // 获取1个
sem.release(); // 释放1个
// QWaitCondition - 条件等待
QMutex mutex;
QWaitCondition condition;
condition.wait(&mutex); // 等待
condition.wakeOne(); // 唤醒一个
condition.wakeAll(); // 唤醒所有
// QAtomicInteger - 原子计数
QAtomicInteger<int> counter(0);
counter.fetchAndAddRelaxed(1); // 原子递增
A.3 QtConcurrent函数列表
| 函数 | 功能 | 返回类型 | 并行类型 |
|---|---|---|---|
| map() | 就地修改 | void | 并行 |
| mapped() | 返回新集合 | QFuture | 并行 |
| mappedReduced() | 映射后归约 | QFuture | 并行 |
| filter() | 就地过滤 | void | 并行 |
| filtered() | 返回过滤集合 | QFuture | 并行 |
| filteredReduced() | 过滤后归约 | QFuture | 并行 |
| run() | 异步执行函数 | QFuture | 单任务 |
使用示例:
#include <QtConcurrent>
QVector<int> values = {1, 2, 3, 4, 5};
// 1. mapped - 并行映射
auto square = [](int x) { return x * x; };
QFuture<int> future = QtConcurrent::mapped(values, square);
QVector<int> results = future.results(); // {1, 4, 9, 16, 25}
// 2. filtered - 并行过滤
auto isEven = [](int x) { return x % 2 == 0; };
QFuture<int> evenFuture = QtConcurrent::filtered(values, isEven);
QVector<int> evens = evenFuture.results(); // {2, 4}
// 3. mappedReduced - 映射后归约
auto sum = [](int &result, int x) { result += x; };
QFuture<int> sumFuture = QtConcurrent::mappedReduced(values, square, sum);
int total = sumFuture.result(); // 55
// 4. run - 异步执行
QFuture<QString> runFuture = QtConcurrent::run([]() {
return QString("Hello from thread pool");
});
QString message = runFuture.result();
B. 常见错误与解决方案
B.1 “QObject::connect: Cannot queue arguments of type…”
错误原因:
自定义类型未注册到Qt元对象系统,无法在跨线程信号槽中传递。
错误示例:
struct MyData {
int id;
QString name;
};
class Worker : public QObject {
Q_OBJECT
signals:
void dataReady(const MyData &data); // ❌ MyData未注册
};
// 运行时警告:
// QObject::connect: Cannot queue arguments of type 'MyData'
解决方案:
// 1. 声明元类型
Q_DECLARE_METATYPE(MyData)
// 2. 在main()或构造函数中注册
int main(int argc, char *argv[]) {
qRegisterMetaType<MyData>("MyData");
QApplication app(argc, argv);
// ...
}
// 3. 现在可以在信号槽中使用
connect(worker, &Worker::dataReady,
this, [](const MyData &data) {
qDebug() << data.id << data.name;
});
变体:QList/QVector等容器:
// 注册QList<int>
Q_DECLARE_METATYPE(QList<int>)
qRegisterMetaType<QList<int>>("QList<int>");
// 注册自定义类型的QVector
Q_DECLARE_METATYPE(QVector<MyData>)
qRegisterMetaType<QVector<MyData>>("QVector<MyData>");
B.2 “QObject: Cannot create children for a parent that is in a different thread”
错误原因:
在对象所属线程之外创建子对象。
错误示例:
class Worker : public QObject {
Q_OBJECT
public:
Worker() {
// Worker在主线程创建
}
public slots:
void doWork() {
// 现在在工作线程
QTimer *timer = new QTimer(this); // ❌ 错误!
// 父对象(this)在主线程,但当前在工作线程创建子对象
}
};
Worker *worker = new Worker(); // 主线程
worker->moveToThread(workerThread);
workerThread->start();
解决方案1:不设置父对象
void doWork() {
QTimer *timer = new QTimer(); // ✓ 无父对象
// 手动管理生命周期
connect(this, &Worker::finished, timer, &QTimer::deleteLater);
timer->start(1000);
}
解决方案2:在对象所属线程创建
class Worker : public QObject {
Q_OBJECT
QTimer *timer = nullptr;
public slots:
void initialize() {
// 在工作线程调用
timer = new QTimer(this); // ✓ 此时this已在工作线程
timer->start(1000);
}
};
// 使用
worker->moveToThread(workerThread);
connect(workerThread, &QThread::started, worker, &Worker::initialize);
workerThread->start();
B.3 “Timers cannot be started from another thread”
错误原因:
在非对象所属线程启动定时器。
错误示例:
Worker *worker = new Worker();
worker->moveToThread(workerThread);
QTimer *timer = new QTimer();
connect(timer, &QTimer::timeout, worker, &Worker::process);
timer->start(1000); // ❌ timer在主线程,worker在工作线程
解决方案1:在对象所属线程创建和启动
class Worker : public QObject {
Q_OBJECT
QTimer *timer = nullptr;
public slots:
void start() {
timer = new QTimer(this); // 在工作线程创建
connect(timer, &QTimer::timeout, this, &Worker::process);
timer->start(1000); // 在工作线程启动
}
};
worker->moveToThread(workerThread);
QMetaObject::invokeMethod(worker, "start", Qt::QueuedConnection);
解决方案2:使用QTimer::singleShot(线程安全)
// singleShot可以跨线程调用
QTimer::singleShot(1000, worker, &Worker::process);
B.4 死锁问题
常见死锁场景:
// 场景1:锁顺序不一致
QMutex mutexA, mutexB;
// 线程1
{
QMutexLocker lockerA(&mutexA); // 先锁A
QMutexLocker lockerB(&mutexB); // 后锁B
}
// 线程2
{
QMutexLocker lockerB(&mutexB); // 先锁B ❌ 可能死锁!
QMutexLocker lockerA(&mutexA); // 后锁A
}
解决方案:统一锁顺序
// 规定:始终按A→B的顺序加锁
void thread1() {
QMutexLocker lockerA(&mutexA); // 1. A
QMutexLocker lockerB(&mutexB); // 2. B
}
void thread2() {
QMutexLocker lockerA(&mutexA); // 1. A(统一顺序)
QMutexLocker lockerB(&mutexB); // 2. B
}
场景2:BlockingQueuedConnection死锁
// ❌ 主线程等待工作线程,工作线程等待主线程
// 主线程
QMetaObject::invokeMethod(worker, "processData",
Qt::BlockingQueuedConnection,
Q_RETURN_ARG(QString, result));
// 工作线程中
QString processData() {
// 等待主线程完成某事 → 死锁!
}
解决方案:避免双向阻塞
// ✓ 使用两个信号代替阻塞调用
connect(this, &MainWindow::requestProcess, worker, &Worker::process);
connect(worker, &Worker::resultReady, this, &MainWindow::handleResult);
emit requestProcess("data"); // 非阻塞
C. 参考资源
C.1 Qt官方文档链接
核心文档:
-
Qt Threading Basics
Qt多线程基础概念和最佳实践 -
QThread Class Reference
QThread类的完整API文档 -
QtConcurrent Namespace
高级并发API参考 -
Thread Support in Qt
Qt线程支持全面指南 -
Synchronizing Threads
线程同步技术详解
重要文章:
-
Threads and QObjects
QObject的线程亲和性 -
Thread-Safe Coding Best Practices
线程安全编码最佳实践
C.2 推荐书籍
中文书籍:
-
《Qt 5编程入门》 - 霍亚飞
第10章详细介绍Qt多线程编程 -
《Qt Creator快速入门(第3版)》 - 霍亚飞
包含多线程实战案例 -
《C++并发编程实战(第2版)》 - Anthony Williams
深入理解C++11/14/17并发特性
英文书籍:
-
“Advanced Qt Programming” - Mark Summerfield
Chapter 14: Multithreading with Qt -
“Mastering Qt 5” - Guillaume Lazar
详细的Qt多线程实践 -
“C++ Concurrency in Action” - Anthony Williams
C++并发编程权威指南
C.3 开源项目参考
优秀的Qt多线程项目:
-
Qt Creator源码
GitHub: qt-creator
学习专业的Qt多线程架构 -
Telegram Desktop
GitHub: tdesktop
大规模Qt项目的多线程实践 -
OBS Studio
GitHub: obs-studio
多线程音视频处理
学习资源:
-
Qt官方示例
Qt安装目录下的examples/corelib/threads -
QtForum多线程板块
forum.qt.io/category/4 -
Stack Overflow Qt标签
搜索"qt multithreading"相关问题
D. 学习路线图
Qt多线程学习路线
│
├── 第1阶段:基础入门(1-2周)
│ ├── QThread基本使用
│ ├── 信号槽跨线程通信
│ └── 简单的moveToThread模式
│ ✓ 完成项目:简单的后台任务
│
├── 第2阶段:同步原语(1-2周)
│ ├── QMutex/QMutexLocker
│ ├── QSemaphore
│ ├── QWaitCondition
│ └── 常见陷阱和问题
│ ✓ 完成项目:生产者-消费者模型
│
├── 第3阶段:高级技术(2-3周)
│ ├── QtConcurrent框架
│ ├── 线程池应用
│ ├── 原子操作
│ └── 内存序理解
│ ✓ 完成项目:图片批量处理器
│
├── 第4阶段:最佳实践(1-2周)
│ ├── 设计原则
│ ├── 调试技巧
│ ├── 性能优化
│ └── 常见反模式
│ ✓ 完成项目:多线程下载器
│
└── 第5阶段:进阶主题(持续)
├── 自定义线程框架
├── 与C++11混合使用
├── 大型项目架构
└── 性能剖析工具
✓ 完成项目:复杂业务系统
E. 调试工具推荐
Qt Creator内置工具:
| 工具 | 功能 |
|---|---|
| Debugger | 断点、单步调试、查看线程 |
| Profiler | 性能分析、热点定位 |
| Thread View | 查看所有线程状态 |
外部工具:
| 工具 | 平台 | 功能 |
|---|---|---|
| Helgrind | Linux | 检测数据竞争 |
| Valgrind | Linux | 内存泄漏检测 |
| Intel VTune | Windows/Linux | 专业性能分析 |
| Visual Studio并发可视化工具 | Windows | 线程同步可视化 |
使用示例:
# Helgrind检测数据竞争
valgrind --tool=helgrind ./your_qt_app
# Memcheck检测内存问题
valgrind --leak-check=full ./your_qt_app
📝 学习建议
- 循序渐进:先掌握QThread基础,再学习高级技术
- 动手实践:每个知识点都配合代码示例
- 理解原理:不仅知道怎么用,还要知道为什么
- 参考项目:通过实战项目巩固知识
- 持续总结:记录常见问题和解决方案
进度跟踪:使用
[ ]标记待学习内容,[x]标记已掌握内容
更多推荐



所有评论(0)