基于C++17的线程安全信号与槽机制实现
本文解析了一种基于C++17的轻量级信号槽通信机制,包含Object基类、Connection连接管理和Signal信号发射三大模块。该系统支持同步/异步调用、线程安全通信、自动内存管理,通过弱引用和RAII机制避免内存泄漏,利用线程池实现异步任务调度。相比Qt的信号槽机制,该实现更轻量且不依赖元对象编译器,适合嵌入式等资源受限场景。核心创新在于结合std::function、可变参数模板和线程池
为什么我们需要“自己的Qt式通信机制”?
在GUI编程、异步系统、事件驱动架构中,信号与槽(Signal & Slot) 是一种极为经典的设计模式。它实现了对象之间的松耦合通信,避免了复杂的回调嵌套和直接依赖。
Qt框架中的信号槽机制广为人知,但其依赖庞大的元对象编译器(moc)和运行时类型信息(RTTI),对于轻量级项目或嵌入式系统来说显得“过于沉重”。
本文将带你深入剖析一段纯C++17实现的线程安全、支持同步/异步调用、可定制线程池调度的信号与槽机制代码,从原理、架构、实现细节到性能优化,全方位解读这一优雅的通信模型。
整体架构概览
我们先来看整个系统的模块划分:
+------------------+
| Object 基类 | <-- 提供线程池绑定能力
+------------------+
|
v
+------------------+
| Signal<...> | <-- 模板类,管理连接和发射信号
+------------------+
|
v
+------------------+
| Connection<...> | <-- 管理单个连接的生命周期
+------------------+
|
v
+------------------+
| ThreadPool | <-- 外部依赖,提供异步执行能力
+------------------+
核心目标:
- 支持成员函数、自由函数、Lambda作为槽函数;
- 支持同步调用(emit)与异步调用(emitAsync);
- 支持线程安全,多线程环境下安全连接、断开、发射;
- 支持自动内存管理,避免悬挂指针;
可扩展至任意参数类型的信号;
通过Object基类实现接收者线程池感知。
核心组件详解
Object 基类:线程上下文绑定
class Object {
public:
Object() : thread_pool_(&global_thread_pool) {}
virtual ~Object() = default;
void setThreadPool(ThreadPool* pool) { ... }
ThreadPool* getThreadPool() const { return thread_pool_; }
private:
ThreadPool* thread_pool_;
};
设计意图
- 所有能作为“信号接收者”的类必须继承 Object。
- 每个 Object 实例可绑定一个 ThreadPool,表示其“所属线程环境”。
- 当使用 connect(signal, obj, &Obj::slot) 时,系统会自动获取 obj->getThreadPool(),决定该槽函数应在哪个线程池中执行。
⚠️ 注意事项
- 使用了 static_assert(std::is_base_of_v<Object, R>) 来强制约束接收者类型,确保类型安全。
- 禁止拷贝构造和赋值(delete),防止浅拷贝导致线程池指针错误共享。
Connection 类:连接的生命周期管理
template<typename... SignalArgs>
class Connection {
struct ConnectionData {
bool connected = true;
size_t id = 0;
mutable std::shared_mutex signal_mutex;
std::function<void(size_t)> disconnect_callback;
};
std::weak_ptr<ConnectionData> connection_data_;
};
核心设计:共享状态 + RAII + 弱引用
- ConnectionData 是所有连接共享的状态对象,包含:
- connected:是否已断开;
- id:唯一标识符;
- disconnect_callback:当连接断开时通知 Signal 清理自己。
- connection_data_ 是 weak_ptr,避免循环引用导致内存泄漏。
- disconnect() 方法通过 lock() 获取 shared_ptr,若存在则标记断开并触发回调。
- isConnected() 安全判断连接有效性。
🧩 为什么用 weak_ptr?
防止 Signal 持有 ConnectionData 的 shared_ptr,而 Connection 又反过来持有同一个对象,造成无法释放。
Signal 类:信号的注册与发射中枢
这是整个机制的核心,我们分功能逐步拆解。
(1) 连接槽函数:connect() 的两种重载
① 自由函数 / Lambda
template<typename T>
Connection<SignalArgs...> connect(T&& slot) {
return connect_impl(nullptr, std::forward<T>(slot));
}
- 目标线程池为 nullptr,意味着同步调用时直接执行,异步时使用全局线程池。
② 成员函数(绑定到 Object 派生类)
template<typename T, typename R>
Connection<SignalArgs...> connect(R* receiver, T&& slot) {
static_assert(...);
ThreadPool* pool = receiver ? receiver->getThreadPool() : nullptr;
auto wrapper = [receiver, slot](SignalArgs... args) {
(receiver->*slot)(args...);
};
return connect_impl(pool, std::move(wrapper));
}
- 将成员函数指针 &Class::func 包装成一个可调用的 lambda;
- 同时提取接收者的线程池;
- 最终统一转为 std::function<void(SignalArgs…)> 存储。
💡 技巧:使用 [receiver, slot] 捕获 this 和成员函数指针,形成闭包。
(2) 内部存储结构:ConnectedSlot
struct ConnectedSlot {
std::function<void(SignalArgs...)> slot;
std::shared_ptr<ConnectionData> connection_data;
size_t id;
ThreadPool* target_thread_pool;
};
- 所有连接信息集中存储在一个 std::list 中;
- 使用 std::list 而非 vector:插入/删除高效,迭代器不失效(重要!);
- target_thread_pool 记录接收者期望运行的线程池。
同步发射:emit(…)
void emit(SignalArgs... args) {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto local_slots = slots_; // 拷贝副本
lock.unlock();
for (const auto& slot_info : local_slots) {
if (slot_info.connection_data->connected) {
try {
slot_info.slot(args...);
} catch (...) { ... }
}
}
}
关键点分析
- 使用 std::shared_mutex 实现读写分离:多个 emit 可并发进行(只读),但 connect/disconnect 需独占。
- 拷贝 slots_ 到局部变量:防止在遍历过程中其他线程修改列表(如 disconnect() 删除元素),避免迭代器失效。
- 加锁时间极短,仅用于拷贝,提升并发性能。
- 异常捕获保护机制,防止某个槽函数崩溃影响其他槽。
异步发射:emitAsync(…) —— 真正的亮点!
void emitAsync(SignalArgs... args) {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto local_slots = slots_;
lock.unlock();
for (const auto& slot_info : local_slots) {
if (slot_info.connection_data->connected) {
auto slot_copy = std::move(slot_info.slot);
auto args_tuple = std::make_tuple(std::forward<SignalArgs>(args)...);
ThreadPool* target_pool = slot_info.target_thread_pool ? ... : &global_thread_pool;
auto task = [slot_copy = std::move(slot_copy), args_tuple = std::move(args_tuple)]() mutable {
try {
std::apply(slot_copy, std::move(args_tuple));
} catch (...) { ... }
};
target_pool->push(std::move(task));
}
}
}
- std::make_tuple(std::forward<…>) 将变长参数完美转发并打包成元组
- auto task = … mutable {} 构造一个无参可调用对象,适配线程池接口
- 移动语义 std::move(slot_info.slot) 减少拷贝开销,提高性能
- std::apply(func, tuple) 在运行时展开元组调用函数
- 捕获 mutable 允许 lambda 修改捕获的副本(如移动 args_tuple)
完全解耦信号发射与槽执行,发射线程不阻塞。
⚠️ 注意:参数被复制或移动到任务中,要求所有参数支持拷贝或移动语义。
全局 connect 函数与宏
#define SLOT(...) (&__VA_ARGS__)
template<typename... SigArgs, typename SlotFunc>
Connection<SigArgs...> connect(Signal<SigArgs...> &signal, SlotFunc&& slot) {
return signal.connect(std::forward<SlotFunc>(slot));
}
// 重载版本支持 receiver
template<typename SignalType, typename ReceiverType, typename MemberFuncType>
auto connect(SignalType& signal, ReceiverType* receiver, MemberFuncType slot)
-> decltype(signal.connect(receiver, slot))
设计优点
- 提供类似 Qt 的 connect(…) 全局函数语法;
- 支持函数重载解析,自动匹配正确版本;
- SLOT(…) 宏用于取成员函数地址,语义清晰。
线程安全与性能分析
线程安全策略
| 操作 | 锁机制 | 说明 |
|---|---|---|
| connect / disconnect | unique_lock<shared_mutex> | 写操作,互斥访问 |
| emit / emitAsync | shared_lock<shared_mutex> | 多个发射可并发 |
| 遍历 slots_ | 拷贝副本 | 避免持有锁遍历 |
优点:读多写少场景下性能优秀。
潜在问题:若连接数极大,slots_ 拷贝成本高。
内存管理与异常安全
- 使用 std::shared_ptr 和 std::weak_ptr 实现自动生命周期管理;
- disconnect_callback 在 ConnectionData 销毁时自动清理 Signal 中的条目;
- 所有异常被捕获,防止崩溃传播;
- 移动语义减少临时对象开销。
完整源码
#pragma once
#include <iostream>
#include <functional>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
#include <queue>
#include <list>
#include <memory>
#include <atomic>
#include <sstream>
#include <tuple>
#include "ThreadPool.hpp"
// --- 全局线程池管理 (简化) ---
static ThreadPool global_thread_pool(4);
// --- Object 基类 ---
class Object
{
public:
Object() : thread_pool_(&global_thread_pool)
{ }
virtual ~Object() = default;
void setThreadPool(ThreadPool* pool)
{
if(pool)
{
thread_pool_ = pool;
}
}
ThreadPool *getThreadPool() const
{
return thread_pool_;
}
Object(const Object &) = delete;
Object &operator=(const Object &) = delete;
private:
ThreadPool * thread_pool_;
};
// --- 信号与槽机制 ---
// Connection 类
template<typename... SignalArgs>
class Signal;
template<typename... SignalArgs>
class Connection
{
friend class Signal<SignalArgs...>;
public:
Connection() = default;
~Connection() = default;
void disconnect()
{
if(auto shared_con_data = connection_data_.lock())
{
std::unique_lock<std::shared_mutex> lock(shared_con_data->signal_mutex);
if(shared_con_data->connected)
{
shared_con_data->connected = false;
if(shared_con_data->disconnect_callback)
{
shared_con_data->disconnect_callback(shared_con_data->id);
}
}
}
}
bool isConnected() const
{
auto shared_con_data = connection_data_.lock();
return shared_con_data && shared_con_data->connected;
}
private:
struct ConnectionData
{
bool connected = true;
size_t id = 0;
mutable std::shared_mutex signal_mutex;
std::function<void(size_t)> disconnect_callback;
};
std::weak_ptr<typename Connection::ConnectionData> connection_data_;
};
// Signal 类
template<typename... SignalArgs>
class Signal
{
private:
// 存储连接的槽信息
struct ConnectedSlot
{
std::function<void(SignalArgs...)> slot; // 统一为 std::function
std::shared_ptr<typename Connection<SignalArgs...>::ConnectionData> connection_data;
size_t id;
ThreadPool *target_thread_pool; // 接收者关联的线程池
};
mutable std::shared_mutex mutex_;
std::list<ConnectedSlot> slots_;
std::atomic<size_t> current_id_{0};
// 连接实现辅助函数
template<typename Callable>
Connection<SignalArgs...> connect_impl(ThreadPool* pool, Callable&& callable)
{
auto con_data = std::make_shared<typename Connection<SignalArgs...>::ConnectionData>();
con_data->disconnect_callback = [this](size_t slot_id)
{
std::unique_lock<std::shared_mutex> lock(this->mutex_);
this->slots_.remove_if([slot_id](const ConnectedSlot & info)
{
return info.id == slot_id;
});
};
std::unique_lock<std::shared_mutex> lock(mutex_);
size_t id = ++current_id_;
con_data->id = id;
slots_.emplace_back(ConnectedSlot
{
std::function<void(SignalArgs...)>(std::forward<Callable>(callable)),
con_data,
id,
pool
});
Connection<SignalArgs...> conn;
conn.connection_data_ = con_data;
return conn;
}
public:
Signal() = default;
~Signal() = default;
// 连接自由函数或Lambda
template<typename T>
Connection<SignalArgs...> connect(T&& slot)
{
return connect_impl(nullptr, std::forward<T>(slot));
}
// 连接成员函数到 Object 派生类实例
template<typename T, typename R>
Connection<SignalArgs...> connect(R* receiver, T&& slot)
{
static_assert(std::is_base_of_v<Object, R>, "Receiver must inherit from Object");
ThreadPool* pool = (receiver) ? receiver->getThreadPool() : nullptr;
// 将成员函数调用包装成 std::function
auto wrapper = [receiver, slot](SignalArgs... args)
{
(receiver->*slot)(args...);
};
return connect_impl(pool, std::move(wrapper));
}
// 同步发射信号
void emit(SignalArgs... args)
{
std::shared_lock<std::shared_mutex> lock(mutex_);
// 拷贝连接列表以避免在迭代时因槽断开而修改列表
auto local_slots = slots_;
lock.unlock();
for(const auto& slot_info : local_slots)
{
if(slot_info.connection_data->connected)
{
try
{
// 直接调用已包装好的 std::function
slot_info.slot(args...);
}
catch(...)
{
std::cerr << "[Signal] Exception caught in slot (sync)." << std::endl;
}
}
}
}
// 异步发射信号
void emitAsync(SignalArgs... args)
{
std::shared_lock<std::shared_mutex> lock(mutex_);
// 拷贝连接列表以避免在迭代时因槽断开而修改列表
auto local_slots = slots_;
lock.unlock();
for(const auto& slot_info : local_slots)
{
if(slot_info.connection_data->connected)
{
// 1. 移动 std::function 副本
auto slot_copy = std::move(slot_info.slot); // Move the slot function
// 2. 移动参数打包成的 tuple
auto args_tuple = std::make_tuple(std::forward<SignalArgs>(args)...); // Forward args to tuple
// 3. 确定目标线程池
ThreadPool* target_pool = slot_info.target_thread_pool ? slot_info.target_thread_pool : &global_thread_pool;
// 4. 创建最终的无参任务 lambda
// 捕获所有需要的数据(通过移动)
auto task = [slot_copy = std::move(slot_copy), args_tuple = std::move(args_tuple)]() mutable
{
try
{
// 5. 使用 std::apply 展开 tuple 并调用槽函数
std::apply(slot_copy, std::move(args_tuple)); // Apply with moved tuple
}
catch(...)
{
std::cerr << "[Signal] Exception caught in slot (async)." << std::endl;
}
};
// 6. 将这个无参任务提交到正确的线程池
try
{
target_pool->push(std::move(task)); // Enqueue the moved task
}
catch(const std::exception& e)
{
std::cerr << "[Signal] Failed to enqueue task: " << e.what() << std::endl;
}
}
}
}
size_t getConnectionCount() const
{
std::shared_lock<std::shared_mutex> lock(mutex_);
return slots_.size();
}
};
// --- 宏定义 ---
#define SLOT(...) (&__VA_ARGS__)
// --- 全局 connect 函数模板 ---
// 为自由函数/Lambda连接提供便利
template<typename... SigArgs, typename SlotFunc>
Connection<SigArgs...> connect(Signal<SigArgs...> &signal, SlotFunc&& slot)
{
return signal.connect(std::forward<SlotFunc>(slot));
}
// 通过让编译器推断 ReceiverType 和自动匹配 SignalArgs
template<typename SignalType, typename ReceiverType, typename MemberFuncType>
auto connect(SignalType& signal, ReceiverType* receiver, MemberFuncType slot)
-> decltype(signal.connect(receiver, slot))
{
// SFINAE: 只有当 signal.connect(...) 有效时才启用此重载
static_assert(std::is_base_of_v<Object, ReceiverType>, "Receiver must inherit from Object");
return signal.connect(receiver, slot);
}
// ThreadPool.hpp
#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <functional>
#include <condition_variable>
#include <future>
#include <memory>
#include <stdexcept>
#include <iostream>
class ThreadPool
{
public:
/**
* @brief 线程池构造函数
*
* @param[in] threads 线程数量,默认使用硬件并发线程数
*/
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
: stop(false)
{
if(threads == 0)
{
threads = 1;
}
for (size_t i = 0; i < threads; ++i)
{
workers.emplace_back([this]
{
while (true)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this]
{
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty())
{
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
try
{
task();
}
catch (...)
{
std::cerr << "Thread Pool: something wrong.";
}
}
});
}
}
/**
* @brief 将任务添加到线程池队列中执行
*
* 此函数接收一个可调用对象和其参数,将其包装成 packaged_task 并加入任务队列,
* 然后通知工作线程有新任务到来。返回一个 future 对象用于获取任务执行结果。
*
* @tparam[in] F 可调用对象的类型
* @tparam[in] Args 可调用对象参数的类型包
* @param[in] f 可调用对象
* @param[in] args 可调用对象的参数包
*
* @return std::future<typename std::result_of<F(Args...)>::type>
* 返回一个 future 对象,可用于获取任务执行结果或等待任务完成
*
* @throws std::runtime_error 当线程池已停止时抛出异常
*/
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type>
{
// 获取可调用对象的返回类型
using return_type = typename std::invoke_result<F(Args...)>::type;
// 将可调用对象和参数绑定,创建 packaged_task 对象
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]()
{
(*task)();
});
}
condition.notify_one();
return res;
}
template<class F>
void push(F&& f)
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop)
{
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
/**
* @brief 线程池析构函数
*
* 负责安全地停止所有工作线程并清理资源
*/
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker : workers)
if(worker.joinable())
{
worker.join();
}
}
/**
* @brief 获取待处理任务的数量
*
* 该函数通过加锁访问任务队列,返回当前队列中等待执行的任务数量。
*
* @return size_t 返回任务队列中的任务数量
*/
size_t pending_tasks()
{
std::unique_lock<std::mutex> lock(queue_mutex);
return tasks.size();
}
/**
* @brief 阻塞等待所有任务执行完成
*
* 此函数会等待任务队列中的所有任务执行完毕,但不会停止线程池接收新任务。
* 它通过检查任务队列是否为空来确定所有任务是否已完成。
*
* @note 此函数不会阻塞其他线程向队列中添加新任务
*/
void wait_until_empty()
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return tasks.empty(); });
}
private:
std::vector<std::thread> workers; // 工作线程
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex queue_mutex; // 互斥锁, stop, tasks
std::condition_variable condition; // 条件变量, queue_mutex
bool stop; // 线程池是否停止
};
使用示例
// --- 示例用法 ---
class Sender : public Object
{
public:
Signal<int, std::string> dataReady;
Signal<> finished;
void doWork()
{
for (int i = 0; i < 3; ++i)
{
std::cout << "[Sender] Emitting dataReady(" << i << ", 'Item " << i << "')\n";
dataReady.emit(i, "Item " + std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "[Sender] Emitting finished()\n";
finished.emit();
}
void doWorkAsync()
{
for (int i = 10; i < 13; ++i)
{
std::cout << "[Sender] Async Emitting dataReady(" << i << ", 'AsyncItem " << i << "')\n";
dataReady.emitAsync(i, "AsyncItem " + std::to_string(i));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::cout << "[Sender] Async Emitting finished()\n";
finished.emitAsync();
}
};
class Receiver : public Object
{
public:
Receiver(const std::string& name) : name(name)
{ }
void onData(int value, const std::string& msg)
{
std::ostringstream oss;
oss << "[Receiver: " << name << "] Received data: " << value << ", " << msg
<< " (Thread ID: " << std::this_thread::get_id() << ")\n";
std::cout << oss.str();
// Simulate work
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
void onFinish()
{
std::ostringstream oss;
oss << "[Receiver: " << name << "] Received finish signal."
<< " (Thread ID: " << std::this_thread::get_id() << ")\n";
std::cout << oss.str();
}
const std::string& getName() const
{
return name;
}
private:
std::string name;
};
int main()
{
std::cout << "=== Qt-Style Signal/Slot with Inheritance (Fixed) ===" << std::endl;
Sender sender;
Receiver receiver1("R1");
Receiver receiver2("R2");
// 为 receiver1 创建并关联专用线程池
ThreadPool receiver1_pool(2);
receiver1.setThreadPool(&receiver1_pool);
// 连接信号和槽
auto conn1 = connect(sender.dataReady, &receiver1, &Receiver::onData);
auto conn2 = connect(sender.dataReady, &receiver2, &Receiver::onData);
auto conn3 = connect(sender.finished, &receiver1, &Receiver::onFinish);
auto conn4 = connect(sender.finished, []()
{
std::cout << "[Lambda Slot] Work is done!\n";
});
std::cout << "Connections established.\n\n";
std::cout << "--- Synchronous Execution ---\n";
sender.doWork();
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 等待同步槽处理完毕
conn2.disconnect();
std::cout << "\nDisconnected receiver2. Sending data again...\n";
sender.dataReady.emit(-1, "After disconnection");
std::cout << "\n--- Asynchronous Execution ---\n";
sender.doWorkAsync();
// 等待异步任务完成
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "\nMain function finished.\n";
return 0; // ThreadPool 析构函数会等待线程结束
}
运行结果:
更多推荐

所有评论(0)