20、【C++】多线程+IO文件流
C++ IO流通过// 输出并截断文件// 输入模式// 二进制读写常用打开模式ios::in:读模式。ios::out:写模式(默认截断)。ios::app:追加模式。:二进制模式。ios::trunc:截断文件(默认)。ios::ate:打开后定位到文件末尾。最小化锁粒度:仅在必要时加锁,避免全局锁。优先使用RAIIlock_guard自动释放资源。避免共享状态:使用消息队列或原子操作替代共享
20、【C++】多线程+IO文件流
目录
- 一、多线程基础
- 二、线程高级主题
- 三、C++文件IO基础
- 四、多线程文件IO
- 五、IO性能优化
- 六、案例分析
- 七、总结与最佳实践
一、多线程基础
1.1 线程创建与管理
1.1.1 std::thread构造与析构
std::thread用于创建和管理线程,构造时传入可调用对象(函数、lambda、函数对象):
#include <thread>
void func() { /* 线程函数 */ }
int main() {
std::thread t1(func); // 函数作为线程入口
std::thread t2([]() { /* lambda作为线程入口 */ });
return 0;
}
析构要求:线程对象销毁前必须调用join()或detach(),否则会调用std::terminate()终止程序。
1.1.2 join与detach
- join():阻塞当前线程,等待子线程完成。
- detach():分离线程,子线程在后台运行,主线程不等待。
std::thread t(func);
t.join(); // 主线程等待t完成
// t.detach(); // 子线程独立运行,主线程继续
1.1.3 线程标识(std::thread::id)
通过get_id()获取线程唯一标识:
std::thread t(func);
std::thread::id tid = t.get_id();
if (tid == std::thread::id()) {
std::cout << "线程未启动" << std::endl;
} else {
std::cout << "线程ID:" << tid << std::endl;
}
1.2 互斥锁(Mutex)
1.2.1 std::mutex与std::lock_guard
std::mutex提供基本互斥功能,std::lock_guard是RAII封装,自动加锁解锁:
#include <mutex>
std::mutex mtx;
int shared_data = 0;
void increment() {
std::lock_guard<std::mutex> lock(mtx); // 构造时加锁,析构时解锁
shared_data++;
}
1.2.2 std::unique_lock的灵活性
std::unique_lock比lock_guard更灵活,支持延迟加锁、手动解锁:
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
if (condition) {
lock.lock(); // 手动加锁
// ... 操作共享数据 ...
lock.unlock(); // 手动解锁
}
1.2.3 递归锁(std::recursive_mutex)
允许同一线程多次加锁,避免递归函数中的死锁:
std::recursive_mutex rmtx;
void recursive_func(int n) {
std::lock_guard<std::recursive_mutex> lock(rmtx);
if (n > 0) {
recursive_func(n - 1); // 递归调用,允许再次加锁
}
}
1.3 条件变量(Condition Variable)
1.3.1 wait/notify_one/notify_all
条件变量用于线程间同步,实现“等待-通知”机制:
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; }); // 等待ready为true
// ... 处理数据 ...
}
void producer() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true; // 修改条件
}
cv.notify_one(); // 通知一个等待线程
// cv.notify_all(); // 通知所有等待线程
}
1.3.2 虚假唤醒与条件变量的正确使用
虚假唤醒:线程可能在未收到通知时被唤醒,因此wait()的条件需用谓词判断:
// 错误:无谓词,可能虚假唤醒
cv.wait(lock);
// 正确:带谓词,确保条件满足
cv.wait(lock, []{ return ready; });
1.4 原子操作(std::atomic)
1.4.1 基本原子类型
std::atomic模板封装基本类型,提供线程安全的操作:
#include <atomic>
std::atomic<int> cnt(0); // 原子整数
void increment() {
cnt++; // 原子自增,无数据竞争
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << cnt << std::endl; // 输出2,无竞争
return 0;
}
1.4.2 内存序(Memory Order)
原子操作的内存序控制指令重排和可见性,默认memory_order_seq_cst(顺序一致性):
std::atomic<int> x(0), y(0);
void write_x_then_y() {
x.store(1, std::memory_order_relaxed); // 仅保证原子性,不保证顺序
y.store(2, std::memory_order_relaxed);
}
1.5 死锁及避免
1.5.1 死锁产生的四个条件
- 互斥条件:资源不可共享。
- 占有且等待:持有资源并等待其他资源。
- 不可剥夺条件:资源不可被强制剥夺。
- 循环等待条件:线程间形成资源等待环。
1.5.2 避免死锁的方法(顺序加锁、超时锁)
顺序加锁:多个线程按固定顺序获取锁:
std::mutex mtx1, mtx2;
void func1() {
std::lock_guard<std::mutex> lock1(mtx1); // 先锁mtx1
std::lock_guard<std::mutex> lock2(mtx2); // 再锁mtx2
}
void func2() {
std::lock_guard<std::mutex> lock1(mtx1); // 同样先锁mtx1
std::lock_guard<std::mutex> lock2(mtx2);
}
超时锁:使用std::try_lock_for避免无限等待:
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
if (std::try_lock(lock1, lock2) == -1) {
// 成功获取所有锁
} else {
// 获取锁失败,处理超时
}
二、线程高级主题
2.1 线程池(Thread Pool)
2.1.1 线程池的组成(任务队列+工作线程)
线程池管理多个工作线程,从任务队列获取任务执行:
class ThreadPool {
public:
ThreadPool(size_t num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
task(); // 执行任务
}
});
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
};
2.1.2 任务提交与结果获取
通过std::packaged_task和std::future获取任务结果:
template <typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex);
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
2.2 异步任务(std::async/std::future)
2.2.1 std::future与std::promise
std::future获取异步任务结果,std::promise设置结果:
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t([&prom]() {
prom.set_value(42); // 设置结果
});
std::cout << fut.get() << std::endl; // 获取结果(阻塞)
t.join();
2.2.2 std::async的启动策略
std::async异步执行任务,返回std::future:
// 策略1:std::launch::async 立即创建线程
auto fut1 = std::async(std::launch::async, []{ return 1; });
// 策略2:std::launch::deferred 延迟执行(调用get时在当前线程执行)
auto fut2 = std::async(std::launch::deferred, []{ return 2; });
// 默认策略:由实现决定(通常为async|deferred)
auto fut3 = std::async([]{ return 3; });
2.3 线程局部存储(thread_local)
thread_local变量在每个线程有独立实例:
thread_local int counter = 0;
void increment() {
counter++; // 每个线程的counter独立
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
return 0;
}
三、C++文件IO基础
3.1 IO流概述
3.1.1 流对象与打开模式
C++ IO流通过std::fstream(文件流)操作文件,打开模式指定读写方式:
#include <fstream>
std::ofstream ofs("file.txt", std::ios::out | std::ios::trunc); // 输出并截断文件
std::ifstream ifs("file.txt", std::ios::in); // 输入模式
std::fstream fs("file.bin", std::ios::in | std::ios::out | std::ios::binary); // 二进制读写
常用打开模式:
ios::in:读模式。ios::out:写模式(默认截断)。ios::app:追加模式。ios::binary:二进制模式。ios::trunc:截断文件(默认)。ios::ate:打开后定位到文件末尾。
3.1.2 流状态与错误处理
流对象的状态通过成员函数检查:
std::ifstream ifs("nonexistent.txt");
if (!ifs.is_open()) { // 检查文件是否打开成功
std::cerr << "文件打开失败" << std::endl;
}
int x;
ifs >> x;
if (ifs.fail()) { // 检查提取失败
std::cerr << "读取失败" << std::endl;
}
异常处理:设置异常掩码,错误时抛出异常:
ifs.exceptions(std::ios::failbit | std::ios::badbit);
try {
ifs >> x; // 失败时抛出std::ios_base::failure
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
3.2 文本文件读写
3.2.1 输出流(ofstream)
写入文本文件:
std::ofstream ofs("output.txt");
ofs << "Hello, World!" << std::endl; // 写入字符串
ofs << 42 << " " << 3.14 << std::endl; // 写入数值
3.2.2 输入流(ifstream)与getline
读取文本文件:
std::ifstream ifs("input.txt");
std::string line;
while (std::getline(ifs, line)) { // 按行读取
std::cout << line << std::endl;
}
int x;
ifs >> x; // 提取整数(跳过空白字符)
3.2.3 格式化输入输出(iomanip)
使用<iomanip>控制格式:
#include <iomanip>
std::ofstream ofs("formatted.txt");
ofs << std::setw(10) << "Name" << std::setw(5) << "Age" << std::endl; // 设置字段宽度
ofs << std::setw(10) << "Alice" << std::setw(5) << 30 << std::endl;
ofs << std::fixed << std::setprecision(2) << 3.1415 << std::endl; // 保留两位小数
3.3 二进制文件读写
3.3.1 read与write函数
二进制模式下使用read/write读写字节流:
std::ofstream ofs("data.bin", std::ios::binary);
int num = 42;
ofs.write(reinterpret_cast<const char*>(&num), sizeof(num)); // 写入整数
std::ifstream ifs("data.bin", std::ios::binary);
int num2;
ifs.read(reinterpret_cast<char*>(&num2), sizeof(num2)); // 读取整数
3.3.2 结构化数据读写
写入结构体:
struct Person {
char name[20];
int age;
};
Person p{"Alice", 30};
std::ofstream ofs("person.bin", std::ios::binary);
ofs.write(reinterpret_cast<const char*>(&p), sizeof(p));
3.4 文件定位与随机访问
3.4.1 seekg与seekp
seekg(读指针)和seekp(写指针)定位文件位置:
std::fstream fs("file.txt", std::ios::in | std::ios::out);
fs.seekg(5, std::ios::beg); // 从文件开始偏移5字节
fs.seekp(0, std::ios::end); // 定位到文件末尾(写指针)
偏移基准:
ios::beg:文件开始。ios::cur:当前位置。ios::end:文件末尾。
3.4.2 tellg与tellp
获取当前指针位置:
std::ifstream ifs("file.txt");
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg(); // 获取文件大小
std::cout << "文件大小:" << size << "字节" << std::endl;
四、多线程文件IO
4.1 多线程读写同一文件的同步策略
4.1.1 文件锁(互斥锁保护文件句柄)
多个线程通过互斥锁同步访问同一文件:
std::mutex file_mutex;
void write_to_file(const std::string& data) {
std::lock_guard<std::mutex> lock(file_mutex);
std::ofstream ofs("shared.txt", std::ios::app);
ofs << data << std::endl;
}
4.1.2 分块读写(并行IO)
将文件分为多个块,每个线程读写不同块,避免锁竞争:
void read_block(const std::string& filename, size_t block_size, size_t block_num) {
std::ifstream ifs(filename, std::ios::binary);
ifs.seekg(block_num * block_size, std::ios::beg);
char* buffer = new char[block_size];
ifs.read(buffer, block_size);
// 处理数据
delete[] buffer;
}
int main() {
const size_t block_size = 4096;
std::thread t1(read_block, "largefile.dat", block_size, 0);
std::thread t2(read_block, "largefile.dat", block_size, 1);
t1.join();
t2.join();
return 0;
}
4.2 多线程日志系统实现
4.2.1 日志队列(生产者-消费者模型)
多个线程(生产者)将日志写入队列,单个线程(消费者)从队列写入文件:
#include <queue>
#include <condition_variable>
std::queue<std::string> log_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool running = true;
void log_consumer() {
std::ofstream ofs("log.txt", std::ios::app);
while (running) {
std::unique_lock<std::mutex> lock(queue_mutex);
queue_cv.wait(lock, []{ return !log_queue.empty() || !running; });
while (!log_queue.empty()) {
ofs << log_queue.front() << std::endl;
log_queue.pop();
}
}
}
void log_producer(const std::string& msg) {
std::lock_guard<std::mutex> lock(queue_mutex);
log_queue.push(msg);
queue_cv.notify_one();
}
4.2.2 线程安全的日志写入
消费者线程负责实际IO,避免多线程直接写文件的竞争:
int main() {
std::thread consumer(log_consumer);
std::thread t1(log_producer, "Thread 1 log");
std::thread t2(log_producer, "Thread 2 log");
t1.join();
t2.join();
running = false;
queue_cv.notify_one(); // 唤醒消费者线程
consumer.join();
return 0;
}
4.3 大文件分块处理
4.3.1 多线程读取大文件
将大文件分为N块,每个线程读取一块并处理:
void process_block(const std::string& filename, size_t start, size_t end) {
std::ifstream ifs(filename, std::ios::binary);
ifs.seekg(start);
size_t size = end - start;
char* buffer = new char[size];
ifs.read(buffer, size);
// 处理数据(如计算哈希)
delete[] buffer;
}
int main() {
const std::string filename = "largefile.iso";
std::ifstream ifs(filename, std::ios::binary | std::ios::ate);
size_t file_size = ifs.tellg();
size_t block_size = 1024 * 1024; // 1MB/块
size_t num_blocks = (file_size + block_size - 1) / block_size;
std::vector<std::thread> threads;
for (size_t i = 0; i < num_blocks; ++i) {
size_t start = i * block_size;
size_t end = std::min((i + 1) * block_size, file_size);
threads.emplace_back(process_block, filename, start, end);
}
for (auto& t : threads) {
t.join();
}
return 0;
}
五、IO性能优化
5.1 缓冲区大小调整
增大IO缓冲区减少系统调用次数:
std::ifstream ifs("largefile.dat");
char buffer[1024 * 1024]; // 1MB缓冲区
ifs.rdbuf()->pubsetbuf(buffer, sizeof(buffer));
5.2 内存映射文件(mmap)
将文件映射到内存,通过指针直接访问,适用于大文件:
#include <sys/mman.h> // POSIX mmap
#include <fcntl.h>
int fd = open("file.txt", O_RDONLY);
struct stat sb;
fstat(fd, &sb);
char* addr = static_cast<char*>(mmap(nullptr, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0));
// 使用addr访问文件内容
munmap(addr, sb.st_size);
close(fd);
5.3 异步IO(Asynchronous IO)
Windows的ReadFileEx或POSIX的aio_read实现异步IO,避免阻塞线程:
// POSIX异步IO示例
#include <aio.h>
struct aiocb cb;
cb.aio_fildes = fd;
cb.aio_buf = buffer;
cb.aio_nbytes = size;
cb.aio_offset = offset;
aio_read(&cb);
aio_suspend(&cb, 1, nullptr); // 等待完成
六、案例分析
6.1 多线程文件复制工具
分块读取源文件,多线程并行写入目标文件:
void copy_block(const std::string& src, const std::string& dst, size_t start, size_t size) {
std::ifstream ifs(src, std::ios::binary);
std::ofstream ofs(dst, std::ios::binary | std::ios::in | std::ios::out);
ifs.seekg(start);
ofs.seekp(start);
char* buffer = new char[size];
ifs.read(buffer, size);
ofs.write(buffer, size);
delete[] buffer;
}
6.2 并发日志系统
使用条件变量和队列实现线程安全的日志写入:
// 见4.2.1节代码示例
七、总结与最佳实践
7.1 多线程编程最佳实践
- 最小化锁粒度:仅在必要时加锁,避免全局锁。
- 优先使用RAII:
lock_guard/unique_lock自动释放资源。 - 避免共享状态:使用消息队列或原子操作替代共享内存。
- 使用线程池:复用线程,减少创建销毁开销。
7.2 文件IO最佳实践
- 二进制模式处理结构化数据:避免文本模式的格式转换开销。
- 合理设置缓冲区:增大缓冲区提升大文件读写性能。
- 多线程IO分块处理:并行读写不同文件块,避免锁竞争。
- 使用内存映射处理大文件:mmap减少IO次数,提高访问速度。
通过合理结合多线程和文件IO技术,可显著提升程序的并发性能和数据处理效率,尤其在处理大文件或高并发场景下。
更多推荐



所有评论(0)