20、【C++】多线程+IO文件流

目录

一、多线程基础

1.1 线程创建与管理

1.1.1 std::thread构造与析构

std::thread用于创建和管理线程,构造时传入可调用对象(函数、lambda、函数对象):

#include <thread>

void func() { /* 线程函数 */ }

int main() {
    std::thread t1(func); // 函数作为线程入口
    std::thread t2([]() { /* lambda作为线程入口 */ });
    return 0;
}

析构要求:线程对象销毁前必须调用join()detach(),否则会调用std::terminate()终止程序。

1.1.2 join与detach
  • join():阻塞当前线程,等待子线程完成。
  • detach():分离线程,子线程在后台运行,主线程不等待。
std::thread t(func);
t.join(); // 主线程等待t完成
// t.detach(); // 子线程独立运行,主线程继续
1.1.3 线程标识(std::thread::id)

通过get_id()获取线程唯一标识:

std::thread t(func);
std::thread::id tid = t.get_id();
if (tid == std::thread::id()) {
    std::cout << "线程未启动" << std::endl;
} else {
    std::cout << "线程ID:" << tid << std::endl;
}

1.2 互斥锁(Mutex)

1.2.1 std::mutex与std::lock_guard

std::mutex提供基本互斥功能,std::lock_guard是RAII封装,自动加锁解锁:

#include <mutex>

std::mutex mtx;
int shared_data = 0;

void increment() {
    std::lock_guard<std::mutex> lock(mtx); // 构造时加锁,析构时解锁
    shared_data++;
}
1.2.2 std::unique_lock的灵活性

std::unique_locklock_guard更灵活,支持延迟加锁、手动解锁:

std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
if (condition) {
    lock.lock(); // 手动加锁
    // ... 操作共享数据 ...
    lock.unlock(); // 手动解锁
}
1.2.3 递归锁(std::recursive_mutex)

允许同一线程多次加锁,避免递归函数中的死锁:

std::recursive_mutex rmtx;

void recursive_func(int n) {
    std::lock_guard<std::recursive_mutex> lock(rmtx);
    if (n > 0) {
        recursive_func(n - 1); // 递归调用,允许再次加锁
    }
}

1.3 条件变量(Condition Variable)

1.3.1 wait/notify_one/notify_all

条件变量用于线程间同步,实现“等待-通知”机制:

std::condition_variable cv;
std::mutex mtx;
bool ready = false;

void consumer() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return ready; }); // 等待ready为true
    // ... 处理数据 ...
}

void producer() {
    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true; // 修改条件
    }
    cv.notify_one(); // 通知一个等待线程
    // cv.notify_all(); // 通知所有等待线程
}
1.3.2 虚假唤醒与条件变量的正确使用

虚假唤醒:线程可能在未收到通知时被唤醒,因此wait()的条件需用谓词判断:

// 错误:无谓词,可能虚假唤醒
cv.wait(lock); 

// 正确:带谓词,确保条件满足
cv.wait(lock, []{ return ready; }); 

1.4 原子操作(std::atomic)

1.4.1 基本原子类型

std::atomic模板封装基本类型,提供线程安全的操作:

#include <atomic>

std::atomic<int> cnt(0); // 原子整数

void increment() {
    cnt++; // 原子自增,无数据竞争
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    t1.join();
    t2.join();
    std::cout << cnt << std::endl; // 输出2,无竞争
    return 0;
}
1.4.2 内存序(Memory Order)

原子操作的内存序控制指令重排和可见性,默认memory_order_seq_cst(顺序一致性):

std::atomic<int> x(0), y(0);

void write_x_then_y() {
    x.store(1, std::memory_order_relaxed); // 仅保证原子性,不保证顺序
    y.store(2, std::memory_order_relaxed);
}

1.5 死锁及避免

1.5.1 死锁产生的四个条件
  1. 互斥条件:资源不可共享。
  2. 占有且等待:持有资源并等待其他资源。
  3. 不可剥夺条件:资源不可被强制剥夺。
  4. 循环等待条件:线程间形成资源等待环。
1.5.2 避免死锁的方法(顺序加锁、超时锁)

顺序加锁:多个线程按固定顺序获取锁:

std::mutex mtx1, mtx2;

void func1() {
    std::lock_guard<std::mutex> lock1(mtx1); // 先锁mtx1
    std::lock_guard<std::mutex> lock2(mtx2); // 再锁mtx2
}

void func2() {
    std::lock_guard<std::mutex> lock1(mtx1); // 同样先锁mtx1
    std::lock_guard<std::mutex> lock2(mtx2);
}

超时锁:使用std::try_lock_for避免无限等待:

std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
if (std::try_lock(lock1, lock2) == -1) {
    // 成功获取所有锁
} else {
    // 获取锁失败,处理超时
}

二、线程高级主题

2.1 线程池(Thread Pool)

2.1.1 线程池的组成(任务队列+工作线程)

线程池管理多个工作线程,从任务队列获取任务执行:

class ThreadPool {
public:
    ThreadPool(size_t num_threads) {
        for (size_t i = 0; i < num_threads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) return;
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task(); // 执行任务
                }
            });
        }
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop = false;
};
2.1.2 任务提交与结果获取

通过std::packaged_taskstd::future获取任务结果:

template <typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
    using return_type = typename std::result_of<F(Args...)>::type;
    auto task = std::make_shared<std::packaged_task<return_type()>>(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );
    std::future<return_type> res = task->get_future();
    {
        std::lock_guard<std::mutex> lock(queue_mutex);
        tasks.emplace([task]() { (*task)(); });
    }
    condition.notify_one();
    return res;
}

2.2 异步任务(std::async/std::future)

2.2.1 std::future与std::promise

std::future获取异步任务结果,std::promise设置结果:

std::promise<int> prom;
std::future<int> fut = prom.get_future();

std::thread t([&prom]() {
    prom.set_value(42); // 设置结果
});

std::cout << fut.get() << std::endl; // 获取结果(阻塞)
t.join();
2.2.2 std::async的启动策略

std::async异步执行任务,返回std::future

// 策略1:std::launch::async 立即创建线程
auto fut1 = std::async(std::launch::async, []{ return 1; });

// 策略2:std::launch::deferred 延迟执行(调用get时在当前线程执行)
auto fut2 = std::async(std::launch::deferred, []{ return 2; });

// 默认策略:由实现决定(通常为async|deferred)
auto fut3 = std::async([]{ return 3; });

2.3 线程局部存储(thread_local)

thread_local变量在每个线程有独立实例:

thread_local int counter = 0;

void increment() {
    counter++; // 每个线程的counter独立
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    t1.join();
    t2.join();
    return 0;
}

三、C++文件IO基础

3.1 IO流概述

3.1.1 流对象与打开模式

C++ IO流通过std::fstream(文件流)操作文件,打开模式指定读写方式:

#include <fstream>

std::ofstream ofs("file.txt", std::ios::out | std::ios::trunc); // 输出并截断文件
std::ifstream ifs("file.txt", std::ios::in); // 输入模式
std::fstream fs("file.bin", std::ios::in | std::ios::out | std::ios::binary); // 二进制读写

常用打开模式

  • ios::in:读模式。
  • ios::out:写模式(默认截断)。
  • ios::app:追加模式。
  • ios::binary:二进制模式。
  • ios::trunc:截断文件(默认)。
  • ios::ate:打开后定位到文件末尾。
3.1.2 流状态与错误处理

流对象的状态通过成员函数检查:

std::ifstream ifs("nonexistent.txt");
if (!ifs.is_open()) { // 检查文件是否打开成功
    std::cerr << "文件打开失败" << std::endl;
}

int x;
ifs >> x;
if (ifs.fail()) { // 检查提取失败
    std::cerr << "读取失败" << std::endl;
}

异常处理:设置异常掩码,错误时抛出异常:

ifs.exceptions(std::ios::failbit | std::ios::badbit);
try {
    ifs >> x; // 失败时抛出std::ios_base::failure
} catch (const std::exception& e) {
    std::cerr << e.what() << std::endl;
}

3.2 文本文件读写

3.2.1 输出流(ofstream)

写入文本文件:

std::ofstream ofs("output.txt");
ofs << "Hello, World!" << std::endl; // 写入字符串
ofs << 42 << " " << 3.14 << std::endl; // 写入数值
3.2.2 输入流(ifstream)与getline

读取文本文件:

std::ifstream ifs("input.txt");
std::string line;
while (std::getline(ifs, line)) { // 按行读取
    std::cout << line << std::endl;
}

int x;
ifs >> x; // 提取整数(跳过空白字符)
3.2.3 格式化输入输出(iomanip)

使用<iomanip>控制格式:

#include <iomanip>

std::ofstream ofs("formatted.txt");
ofs << std::setw(10) << "Name" << std::setw(5) << "Age" << std::endl; // 设置字段宽度
ofs << std::setw(10) << "Alice" << std::setw(5) << 30 << std::endl;

ofs << std::fixed << std::setprecision(2) << 3.1415 << std::endl; // 保留两位小数

3.3 二进制文件读写

3.3.1 read与write函数

二进制模式下使用read/write读写字节流:

std::ofstream ofs("data.bin", std::ios::binary);
int num = 42;
ofs.write(reinterpret_cast<const char*>(&num), sizeof(num)); // 写入整数

std::ifstream ifs("data.bin", std::ios::binary);
int num2;
ifs.read(reinterpret_cast<char*>(&num2), sizeof(num2)); // 读取整数
3.3.2 结构化数据读写

写入结构体:

struct Person {
    char name[20];
    int age;
};

Person p{"Alice", 30};
std::ofstream ofs("person.bin", std::ios::binary);
ofs.write(reinterpret_cast<const char*>(&p), sizeof(p));

3.4 文件定位与随机访问

3.4.1 seekg与seekp

seekg(读指针)和seekp(写指针)定位文件位置:

std::fstream fs("file.txt", std::ios::in | std::ios::out);
fs.seekg(5, std::ios::beg); // 从文件开始偏移5字节
fs.seekp(0, std::ios::end); // 定位到文件末尾(写指针)

偏移基准

  • ios::beg:文件开始。
  • ios::cur:当前位置。
  • ios::end:文件末尾。
3.4.2 tellg与tellp

获取当前指针位置:

std::ifstream ifs("file.txt");
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg(); // 获取文件大小
std::cout << "文件大小:" << size << "字节" << std::endl;

四、多线程文件IO

4.1 多线程读写同一文件的同步策略

4.1.1 文件锁(互斥锁保护文件句柄)

多个线程通过互斥锁同步访问同一文件:

std::mutex file_mutex;

void write_to_file(const std::string& data) {
    std::lock_guard<std::mutex> lock(file_mutex);
    std::ofstream ofs("shared.txt", std::ios::app);
    ofs << data << std::endl;
}
4.1.2 分块读写(并行IO)

将文件分为多个块,每个线程读写不同块,避免锁竞争:

void read_block(const std::string& filename, size_t block_size, size_t block_num) {
    std::ifstream ifs(filename, std::ios::binary);
    ifs.seekg(block_num * block_size, std::ios::beg);
    char* buffer = new char[block_size];
    ifs.read(buffer, block_size);
    // 处理数据
    delete[] buffer;
}

int main() {
    const size_t block_size = 4096;
    std::thread t1(read_block, "largefile.dat", block_size, 0);
    std::thread t2(read_block, "largefile.dat", block_size, 1);
    t1.join();
    t2.join();
    return 0;
}

4.2 多线程日志系统实现

4.2.1 日志队列(生产者-消费者模型)

多个线程(生产者)将日志写入队列,单个线程(消费者)从队列写入文件:

#include <queue>
#include <condition_variable>

std::queue<std::string> log_queue;
std::mutex queue_mutex;
std::condition_variable queue_cv;
bool running = true;

void log_consumer() {
    std::ofstream ofs("log.txt", std::ios::app);
    while (running) {
        std::unique_lock<std::mutex> lock(queue_mutex);
        queue_cv.wait(lock, []{ return !log_queue.empty() || !running; });
        while (!log_queue.empty()) {
            ofs << log_queue.front() << std::endl;
            log_queue.pop();
        }
    }
}

void log_producer(const std::string& msg) {
    std::lock_guard<std::mutex> lock(queue_mutex);
    log_queue.push(msg);
    queue_cv.notify_one();
}
4.2.2 线程安全的日志写入

消费者线程负责实际IO,避免多线程直接写文件的竞争:

int main() {
    std::thread consumer(log_consumer);
    std::thread t1(log_producer, "Thread 1 log");
    std::thread t2(log_producer, "Thread 2 log");
    t1.join();
    t2.join();
    running = false;
    queue_cv.notify_one(); // 唤醒消费者线程
    consumer.join();
    return 0;
}

4.3 大文件分块处理

4.3.1 多线程读取大文件

将大文件分为N块,每个线程读取一块并处理:

void process_block(const std::string& filename, size_t start, size_t end) {
    std::ifstream ifs(filename, std::ios::binary);
    ifs.seekg(start);
    size_t size = end - start;
    char* buffer = new char[size];
    ifs.read(buffer, size);
    // 处理数据(如计算哈希)
    delete[] buffer;
}

int main() {
    const std::string filename = "largefile.iso";
    std::ifstream ifs(filename, std::ios::binary | std::ios::ate);
    size_t file_size = ifs.tellg();
    size_t block_size = 1024 * 1024; // 1MB/块
    size_t num_blocks = (file_size + block_size - 1) / block_size;

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_blocks; ++i) {
        size_t start = i * block_size;
        size_t end = std::min((i + 1) * block_size, file_size);
        threads.emplace_back(process_block, filename, start, end);
    }

    for (auto& t : threads) {
        t.join();
    }
    return 0;
}

五、IO性能优化

5.1 缓冲区大小调整

增大IO缓冲区减少系统调用次数:

std::ifstream ifs("largefile.dat");
char buffer[1024 * 1024]; // 1MB缓冲区
ifs.rdbuf()->pubsetbuf(buffer, sizeof(buffer));

5.2 内存映射文件(mmap)

将文件映射到内存,通过指针直接访问,适用于大文件:

#include <sys/mman.h> // POSIX mmap
#include <fcntl.h>

int fd = open("file.txt", O_RDONLY);
struct stat sb;
fstat(fd, &sb);
char* addr = static_cast<char*>(mmap(nullptr, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0));
// 使用addr访问文件内容
munmap(addr, sb.st_size);
close(fd);

5.3 异步IO(Asynchronous IO)

Windows的ReadFileEx或POSIX的aio_read实现异步IO,避免阻塞线程:

// POSIX异步IO示例
#include <aio.h>

struct aiocb cb;
cb.aio_fildes = fd;
cb.aio_buf = buffer;
cb.aio_nbytes = size;
cb.aio_offset = offset;
aio_read(&cb);
aio_suspend(&cb, 1, nullptr); // 等待完成

六、案例分析

6.1 多线程文件复制工具

分块读取源文件,多线程并行写入目标文件:

void copy_block(const std::string& src, const std::string& dst, size_t start, size_t size) {
    std::ifstream ifs(src, std::ios::binary);
    std::ofstream ofs(dst, std::ios::binary | std::ios::in | std::ios::out);
    ifs.seekg(start);
    ofs.seekp(start);
    char* buffer = new char[size];
    ifs.read(buffer, size);
    ofs.write(buffer, size);
    delete[] buffer;
}

6.2 并发日志系统

使用条件变量和队列实现线程安全的日志写入:

// 见4.2.1节代码示例

七、总结与最佳实践

7.1 多线程编程最佳实践

  • 最小化锁粒度:仅在必要时加锁,避免全局锁。
  • 优先使用RAIIlock_guard/unique_lock自动释放资源。
  • 避免共享状态:使用消息队列或原子操作替代共享内存。
  • 使用线程池:复用线程,减少创建销毁开销。

7.2 文件IO最佳实践

  • 二进制模式处理结构化数据:避免文本模式的格式转换开销。
  • 合理设置缓冲区:增大缓冲区提升大文件读写性能。
  • 多线程IO分块处理:并行读写不同文件块,避免锁竞争。
  • 使用内存映射处理大文件:mmap减少IO次数,提高访问速度。

通过合理结合多线程和文件IO技术,可显著提升程序的并发性能和数据处理效率,尤其在处理大文件或高并发场景下。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐