为什么我们需要“自己的Qt式通信机制”?

在GUI编程、异步系统、事件驱动架构中,信号与槽(Signal & Slot) 是一种极为经典的设计模式。它实现了对象之间的松耦合通信,避免了复杂的回调嵌套和直接依赖。

Qt框架中的信号槽机制广为人知,但其依赖庞大的元对象编译器(moc)和运行时类型信息(RTTI),对于轻量级项目或嵌入式系统来说显得“过于沉重”。

本文将带你深入剖析一段纯C++17实现的线程安全、支持同步/异步调用、可定制线程池调度的信号与槽机制代码,从原理、架构、实现细节到性能优化,全方位解读这一优雅的通信模型。

整体架构概览

我们先来看整个系统的模块划分:

+------------------+
|   Object 基类     | <-- 提供线程池绑定能力
+------------------+
         |
         v
+------------------+
|   Signal<...>     | <-- 模板类,管理连接和发射信号
+------------------+
         |
         v
+------------------+
|  Connection<...>  | <-- 管理单个连接的生命周期
+------------------+
         |
         v
+------------------+
|  ThreadPool       | <-- 外部依赖,提供异步执行能力
+------------------+

核心目标:

  • 支持成员函数、自由函数、Lambda作为槽函数;
  • 支持同步调用(emit)与异步调用(emitAsync)
  • 支持线程安全,多线程环境下安全连接、断开、发射;
  • 支持自动内存管理,避免悬挂指针;
    可扩展至任意参数类型的信号;
    通过Object基类实现接收者线程池感知。

核心组件详解

Object 基类:线程上下文绑定

class Object {
public:
    Object() : thread_pool_(&global_thread_pool) {}
    virtual ~Object() = default;

    void setThreadPool(ThreadPool* pool) { ... }
    ThreadPool* getThreadPool() const { return thread_pool_; }

private:
    ThreadPool* thread_pool_;
};

设计意图

  • 所有能作为“信号接收者”的类必须继承 Object。
  • 每个 Object 实例可绑定一个 ThreadPool,表示其“所属线程环境”。
  • 当使用 connect(signal, obj, &Obj::slot) 时,系统会自动获取 obj->getThreadPool(),决定该槽函数应在哪个线程池中执行。

⚠️ 注意事项

  • 使用了 static_assert(std::is_base_of_v<Object, R>) 来强制约束接收者类型,确保类型安全。
  • 禁止拷贝构造和赋值(delete),防止浅拷贝导致线程池指针错误共享。

Connection 类:连接的生命周期管理

template<typename... SignalArgs>
class Connection {
    struct ConnectionData {
        bool connected = true;
        size_t id = 0;
        mutable std::shared_mutex signal_mutex;
        std::function<void(size_t)> disconnect_callback;
    };
    std::weak_ptr<ConnectionData> connection_data_;
};

核心设计:共享状态 + RAII + 弱引用

  • ConnectionData 是所有连接共享的状态对象,包含:
    • connected:是否已断开;
    • id:唯一标识符;
    • disconnect_callback:当连接断开时通知 Signal 清理自己。
    • connection_data_ 是 weak_ptr,避免循环引用导致内存泄漏。
  • disconnect() 方法通过 lock() 获取 shared_ptr,若存在则标记断开并触发回调。
  • isConnected() 安全判断连接有效性。

🧩 为什么用 weak_ptr?
防止 Signal 持有 ConnectionData 的 shared_ptr,而 Connection 又反过来持有同一个对象,造成无法释放。

Signal 类:信号的注册与发射中枢

这是整个机制的核心,我们分功能逐步拆解。

(1) 连接槽函数:connect() 的两种重载
① 自由函数 / Lambda

template<typename T>
Connection<SignalArgs...> connect(T&& slot) {
    return connect_impl(nullptr, std::forward<T>(slot));
}
  • 目标线程池为 nullptr,意味着同步调用时直接执行,异步时使用全局线程池。

② 成员函数(绑定到 Object 派生类)

template<typename T, typename R>
Connection<SignalArgs...> connect(R* receiver, T&& slot) {
    static_assert(...);
    ThreadPool* pool = receiver ? receiver->getThreadPool() : nullptr;
    auto wrapper = [receiver, slot](SignalArgs... args) {
        (receiver->*slot)(args...);
    };
    return connect_impl(pool, std::move(wrapper));
}
  • 将成员函数指针 &Class::func 包装成一个可调用的 lambda;
  • 同时提取接收者的线程池;
  • 最终统一转为 std::function<void(SignalArgs…)> 存储。

💡 技巧:使用 [receiver, slot] 捕获 this 和成员函数指针,形成闭包。

(2) 内部存储结构:ConnectedSlot

struct ConnectedSlot {
    std::function<void(SignalArgs...)> slot;
    std::shared_ptr<ConnectionData> connection_data;
    size_t id;
    ThreadPool* target_thread_pool;
};
  • 所有连接信息集中存储在一个 std::list 中;
  • 使用 std::list 而非 vector:插入/删除高效,迭代器不失效(重要!);
  • target_thread_pool 记录接收者期望运行的线程池。

同步发射:emit(…)

void emit(SignalArgs... args) {
    std::shared_lock<std::shared_mutex> lock(mutex_);
    auto local_slots = slots_; // 拷贝副本
    lock.unlock();

    for (const auto& slot_info : local_slots) {
        if (slot_info.connection_data->connected) {
            try {
                slot_info.slot(args...);
            } catch (...) { ... }
        }
    }
}

关键点分析

  • 使用 std::shared_mutex 实现读写分离:多个 emit 可并发进行(只读),但 connect/disconnect 需独占。
  • 拷贝 slots_ 到局部变量:防止在遍历过程中其他线程修改列表(如 disconnect() 删除元素),避免迭代器失效。
  • 加锁时间极短,仅用于拷贝,提升并发性能。
  • 异常捕获保护机制,防止某个槽函数崩溃影响其他槽。

异步发射:emitAsync(…) —— 真正的亮点!

void emitAsync(SignalArgs... args) {
    std::shared_lock<std::shared_mutex> lock(mutex_);
    auto local_slots = slots_;
    lock.unlock();

    for (const auto& slot_info : local_slots) {
        if (slot_info.connection_data->connected) {
            auto slot_copy = std::move(slot_info.slot);
            auto args_tuple = std::make_tuple(std::forward<SignalArgs>(args)...);
            ThreadPool* target_pool = slot_info.target_thread_pool ? ... : &global_thread_pool;

            auto task = [slot_copy = std::move(slot_copy), args_tuple = std::move(args_tuple)]() mutable {
                try {
                    std::apply(slot_copy, std::move(args_tuple));
                } catch (...) { ... }
            };

            target_pool->push(std::move(task));
        }
    }
}
  • std::make_tuple(std::forward<…>) 将变长参数完美转发并打包成元组
  • auto task = mutable {} 构造一个无参可调用对象,适配线程池接口
  • 移动语义 std::move(slot_info.slot) 减少拷贝开销,提高性能
  • std::apply(func, tuple) 在运行时展开元组调用函数
  • 捕获 mutable 允许 lambda 修改捕获的副本(如移动 args_tuple)

完全解耦信号发射与槽执行,发射线程不阻塞。

⚠️ 注意:参数被复制或移动到任务中,要求所有参数支持拷贝或移动语义。

全局 connect 函数与宏

#define SLOT(...) (&__VA_ARGS__)

template<typename... SigArgs, typename SlotFunc>
Connection<SigArgs...> connect(Signal<SigArgs...> &signal, SlotFunc&& slot) {
    return signal.connect(std::forward<SlotFunc>(slot));
}

// 重载版本支持 receiver
template<typename SignalType, typename ReceiverType, typename MemberFuncType>
auto connect(SignalType& signal, ReceiverType* receiver, MemberFuncType slot)
-> decltype(signal.connect(receiver, slot))

设计优点

  • 提供类似 Qt 的 connect(…) 全局函数语法;
  • 支持函数重载解析,自动匹配正确版本;
  • SLOT(…) 宏用于取成员函数地址,语义清晰。

线程安全与性能分析

线程安全策略

操作 锁机制 说明
connect / disconnect unique_lock<shared_mutex> 写操作,互斥访问
emit / emitAsync shared_lock<shared_mutex> 多个发射可并发
遍历 slots_ 拷贝副本 避免持有锁遍历

优点:读多写少场景下性能优秀。
潜在问题:若连接数极大,slots_ 拷贝成本高。

内存管理与异常安全

  • 使用 std::shared_ptr 和 std::weak_ptr 实现自动生命周期管理;
  • disconnect_callback 在 ConnectionData 销毁时自动清理 Signal 中的条目;
  • 所有异常被捕获,防止崩溃传播;
  • 移动语义减少临时对象开销。

完整源码

#pragma once

#include <iostream>
#include <functional>
#include <thread>
#include <mutex>
#include <shared_mutex>
#include <condition_variable>
#include <queue>
#include <list>
#include <memory>
#include <atomic>
#include <sstream>
#include <tuple>

#include "ThreadPool.hpp"

// --- 全局线程池管理 (简化) ---
static ThreadPool global_thread_pool(4);

// --- Object 基类 ---
class Object
{
public:
	Object() : thread_pool_(&global_thread_pool)
	{ }
	virtual ~Object() = default;

	void setThreadPool(ThreadPool* pool)
	{
		if(pool)
		{
			thread_pool_ = pool;
		}
	}

	ThreadPool *getThreadPool() const
	{
		return thread_pool_;
	}

	Object(const Object &) = delete;
	Object &operator=(const Object &) = delete;

private:
	ThreadPool * thread_pool_;
};

// --- 信号与槽机制 ---

// Connection 类
template<typename... SignalArgs>
class Signal;

template<typename... SignalArgs>
class Connection
{
	friend class Signal<SignalArgs...>;
public:
	Connection() = default;
	~Connection() = default;

	void disconnect()
	{
		if(auto shared_con_data = connection_data_.lock())
		{
			std::unique_lock<std::shared_mutex> lock(shared_con_data->signal_mutex);
			if(shared_con_data->connected)
			{
				shared_con_data->connected = false;
				if(shared_con_data->disconnect_callback)
				{
					shared_con_data->disconnect_callback(shared_con_data->id);
				}
			}
		}
	}

	bool isConnected() const
	{
		auto shared_con_data = connection_data_.lock();
		return shared_con_data && shared_con_data->connected;
	}

private:
	struct ConnectionData
	{
		bool connected = true;
		size_t id = 0;
		mutable std::shared_mutex signal_mutex;
		std::function<void(size_t)> disconnect_callback;
	};
	std::weak_ptr<typename Connection::ConnectionData> connection_data_;
};

// Signal 类
template<typename... SignalArgs>
class Signal
{
private:
	// 存储连接的槽信息
	struct ConnectedSlot
	{
		std::function<void(SignalArgs...)> slot; // 统一为 std::function
		std::shared_ptr<typename Connection<SignalArgs...>::ConnectionData> connection_data;
		size_t id;
		ThreadPool *target_thread_pool; // 接收者关联的线程池
	};

	mutable std::shared_mutex mutex_;
	std::list<ConnectedSlot> slots_;
	std::atomic<size_t> current_id_{0};

	// 连接实现辅助函数
	template<typename Callable>
	Connection<SignalArgs...> connect_impl(ThreadPool* pool, Callable&& callable)
	{
		auto con_data = std::make_shared<typename Connection<SignalArgs...>::ConnectionData>();
		con_data->disconnect_callback = [this](size_t slot_id)
		{
			std::unique_lock<std::shared_mutex> lock(this->mutex_);
			this->slots_.remove_if([slot_id](const ConnectedSlot & info)
			{
				return info.id == slot_id;
			});
		};

		std::unique_lock<std::shared_mutex> lock(mutex_);
		size_t id = ++current_id_;
		con_data->id = id;

		slots_.emplace_back(ConnectedSlot
		{
			std::function<void(SignalArgs...)>(std::forward<Callable>(callable)),
			con_data,
			id,
			pool
		});

		Connection<SignalArgs...> conn;
		conn.connection_data_ = con_data;
		return conn;
	}


public:
	Signal() = default;
	~Signal() = default;

	// 连接自由函数或Lambda
	template<typename T>
	Connection<SignalArgs...> connect(T&& slot)
	{
		return connect_impl(nullptr, std::forward<T>(slot));
	}

	// 连接成员函数到 Object 派生类实例
	template<typename T, typename R>
	Connection<SignalArgs...> connect(R* receiver, T&& slot)
	{
		static_assert(std::is_base_of_v<Object, R>, "Receiver must inherit from Object");
		ThreadPool* pool = (receiver) ? receiver->getThreadPool() : nullptr;
		// 将成员函数调用包装成 std::function
		auto wrapper = [receiver, slot](SignalArgs... args)
		{
			(receiver->*slot)(args...);
		};
		return connect_impl(pool, std::move(wrapper));
	}

	// 同步发射信号
	void emit(SignalArgs... args)
	{
		std::shared_lock<std::shared_mutex> lock(mutex_);
		// 拷贝连接列表以避免在迭代时因槽断开而修改列表
		auto local_slots = slots_;
		lock.unlock();

		for(const auto& slot_info : local_slots)
		{
			if(slot_info.connection_data->connected)
			{
				try
				{
					// 直接调用已包装好的 std::function
					slot_info.slot(args...);
				}
				catch(...)
				{
					std::cerr << "[Signal] Exception caught in slot (sync)." << std::endl;
				}
			}
		}
	}

	// 异步发射信号
	void emitAsync(SignalArgs... args)
	{
		std::shared_lock<std::shared_mutex> lock(mutex_);
		// 拷贝连接列表以避免在迭代时因槽断开而修改列表
		auto local_slots = slots_;
		lock.unlock();

		for(const auto& slot_info : local_slots)
		{
			if(slot_info.connection_data->connected)
			{
				// 1. 移动 std::function 副本
				auto slot_copy = std::move(slot_info.slot); // Move the slot function
				// 2. 移动参数打包成的 tuple
				auto args_tuple = std::make_tuple(std::forward<SignalArgs>(args)...); // Forward args to tuple
				// 3. 确定目标线程池
				ThreadPool* target_pool = slot_info.target_thread_pool ? slot_info.target_thread_pool : &global_thread_pool;


				// 4. 创建最终的无参任务 lambda
				//    捕获所有需要的数据(通过移动)
				auto task = [slot_copy = std::move(slot_copy), args_tuple = std::move(args_tuple)]() mutable
				{
					try
					{
						// 5. 使用 std::apply 展开 tuple 并调用槽函数
						std::apply(slot_copy, std::move(args_tuple)); // Apply with moved tuple
					}
					catch(...)
					{
						std::cerr << "[Signal] Exception caught in slot (async)." << std::endl;
					}
				};

				// 6. 将这个无参任务提交到正确的线程池
				try
				{
					target_pool->push(std::move(task)); // Enqueue the moved task
				}
				catch(const std::exception& e)
				{
					std::cerr << "[Signal] Failed to enqueue task: " << e.what() << std::endl;
				}
			}
		}
	}

	size_t getConnectionCount() const
	{
		std::shared_lock<std::shared_mutex> lock(mutex_);
		return slots_.size();
	}
};

// --- 宏定义 ---
#define SLOT(...) (&__VA_ARGS__)

// --- 全局 connect 函数模板 ---
// 为自由函数/Lambda连接提供便利
template<typename... SigArgs, typename SlotFunc>
Connection<SigArgs...> connect(Signal<SigArgs...> &signal, SlotFunc&& slot)
{
	return signal.connect(std::forward<SlotFunc>(slot));
}

// 通过让编译器推断 ReceiverType 和自动匹配 SignalArgs
template<typename SignalType, typename ReceiverType, typename MemberFuncType>
auto connect(SignalType& signal, ReceiverType* receiver, MemberFuncType slot)
-> decltype(signal.connect(receiver, slot))
{
	// SFINAE: 只有当 signal.connect(...) 有效时才启用此重载
	static_assert(std::is_base_of_v<Object, ReceiverType>, "Receiver must inherit from Object");
	return signal.connect(receiver, slot);
}
// ThreadPool.hpp
#pragma once
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <functional>
#include <condition_variable>
#include <future>
#include <memory>
#include <stdexcept>
#include <iostream>

class ThreadPool
{
public:
	/**
	 * @brief 线程池构造函数
	 *
	 * @param[in] threads 线程数量,默认使用硬件并发线程数
	 */
	explicit ThreadPool(size_t threads = std::thread::hardware_concurrency())
		: stop(false)
	{
		if(threads == 0)
		{
			threads = 1;
		}

		for (size_t i = 0; i < threads; ++i)
		{
			workers.emplace_back([this]
			{
				while (true)
				{
					std::function<void()> task;
					{
						std::unique_lock<std::mutex> lock(this->queue_mutex);
						this->condition.wait(lock, [this]
						{
							return this->stop || !this->tasks.empty();
						});

						if (this->stop && this->tasks.empty())
						{
							return;
						}

						task = std::move(this->tasks.front());
						this->tasks.pop();
					}
					try
					{
						task();
					}
					catch (...)
					{
						std::cerr << "Thread Pool: something wrong.";
					}
				}
			});
		}
	}

	/**
	 * @brief 将任务添加到线程池队列中执行
	 *
	 * 此函数接收一个可调用对象和其参数,将其包装成 packaged_task 并加入任务队列,
	 * 然后通知工作线程有新任务到来。返回一个 future 对象用于获取任务执行结果。
	 *
	 * @tparam[in] F 可调用对象的类型
	 * @tparam[in] Args 可调用对象参数的类型包
	 * @param[in] f 可调用对象
	 * @param[in] args 可调用对象的参数包
	 *
	 * @return std::future<typename std::result_of<F(Args...)>::type>
	 *         返回一个 future 对象,可用于获取任务执行结果或等待任务完成
	 *
	 * @throws std::runtime_error 当线程池已停止时抛出异常
	 */
	template<class F, class... Args>
	auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F(Args...)>::type>
	{
		// 获取可调用对象的返回类型
		using return_type = typename std::invoke_result<F(Args...)>::type;

		// 将可调用对象和参数绑定,创建 packaged_task 对象
		auto task = std::make_shared< std::packaged_task<return_type()> >(
		                std::bind(std::forward<F>(f), std::forward<Args>(args)...)
		            );

		std::future<return_type> res = task->get_future();
		{
			std::unique_lock<std::mutex> lock(queue_mutex);
			if(stop)
			{
				throw std::runtime_error("enqueue on stopped ThreadPool");
			}

			tasks.emplace([task]()
			{
				(*task)();
			});
		}
		condition.notify_one();
		return res;
	}

	template<class F>
	void push(F&& f)
	{
		{
			std::unique_lock<std::mutex> lock(queue_mutex);
			if (stop)
			{
				throw std::runtime_error("enqueue on stopped ThreadPool");
			}
			tasks.emplace(std::forward<F>(f));
		}
		condition.notify_one();
	}

	/**
	 * @brief 线程池析构函数
	 *
	 * 负责安全地停止所有工作线程并清理资源
	 */
	~ThreadPool()
	{
		{
			std::unique_lock<std::mutex> lock(queue_mutex);
			stop = true;
		}
		condition.notify_all();
		for(std::thread &worker : workers)
			if(worker.joinable())
			{
				worker.join();
			}
	}

	/**
	 * @brief 获取待处理任务的数量
	 *
	 * 该函数通过加锁访问任务队列,返回当前队列中等待执行的任务数量。
	 *
	 * @return size_t 返回任务队列中的任务数量
	 */
	size_t pending_tasks()
	{
		std::unique_lock<std::mutex> lock(queue_mutex);
		return tasks.size();
	}

	/**
	 * @brief 阻塞等待所有任务执行完成
	 *
	 * 此函数会等待任务队列中的所有任务执行完毕,但不会停止线程池接收新任务。
	 * 它通过检查任务队列是否为空来确定所有任务是否已完成。
	 *
	 * @note 此函数不会阻塞其他线程向队列中添加新任务
	 */
	void wait_until_empty()
	{
		std::unique_lock<std::mutex> lock(queue_mutex);
		condition.wait(lock, [this] { return tasks.empty(); });
	}

private:
	std::vector<std::thread> workers; // 工作线程
	std::queue<std::function<void()>> tasks; // 任务队列

	std::mutex queue_mutex; // 互斥锁, stop, tasks
	std::condition_variable condition; // 条件变量, queue_mutex
	bool stop; // 线程池是否停止
};

使用示例

// --- 示例用法 ---
class Sender : public Object
{
public:
    Signal<int, std::string> dataReady;
    Signal<> finished;

    void doWork()
    {
        for (int i = 0; i < 3; ++i)
        {
            std::cout << "[Sender] Emitting dataReady(" << i << ", 'Item " << i << "')\n";
            dataReady.emit(i, "Item " + std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        std::cout << "[Sender] Emitting finished()\n";
        finished.emit();
    }

    void doWorkAsync()
    {
        for (int i = 10; i < 13; ++i)
        {
            std::cout << "[Sender] Async Emitting dataReady(" << i << ", 'AsyncItem " << i << "')\n";
            dataReady.emitAsync(i, "AsyncItem " + std::to_string(i));
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
        std::cout << "[Sender] Async Emitting finished()\n";
        finished.emitAsync();
    }
};

class Receiver : public Object
{
public:
    Receiver(const std::string& name) : name(name)
    { }

    void onData(int value, const std::string& msg)
    {
        std::ostringstream oss;
        oss << "[Receiver: " << name << "] Received data: " << value << ", " << msg
            << " (Thread ID: " << std::this_thread::get_id() << ")\n";
        std::cout << oss.str();
        // Simulate work
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    void onFinish()
    {
        std::ostringstream oss;
        oss << "[Receiver: " << name << "] Received finish signal."
            << " (Thread ID: " << std::this_thread::get_id() << ")\n";
        std::cout << oss.str();
    }

    const std::string& getName() const
    {
        return name;
    }

private:
    std::string name;
};

int main()
{
    std::cout << "=== Qt-Style Signal/Slot with Inheritance (Fixed) ===" << std::endl;

    Sender sender;
    Receiver receiver1("R1");
    Receiver receiver2("R2");

    // 为 receiver1 创建并关联专用线程池
    ThreadPool receiver1_pool(2);
    receiver1.setThreadPool(&receiver1_pool);

    // 连接信号和槽
    auto conn1 = connect(sender.dataReady, &receiver1, &Receiver::onData);
    auto conn2 = connect(sender.dataReady, &receiver2, &Receiver::onData);
    auto conn3 = connect(sender.finished, &receiver1, &Receiver::onFinish);
    auto conn4 = connect(sender.finished, []()
    {
        std::cout << "[Lambda Slot] Work is done!\n";
    });

    std::cout << "Connections established.\n\n";

    std::cout << "--- Synchronous Execution ---\n";
    sender.doWork();

    std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 等待同步槽处理完毕

    conn2.disconnect();
    std::cout << "\nDisconnected receiver2. Sending data again...\n";
    sender.dataReady.emit(-1, "After disconnection");

    std::cout << "\n--- Asynchronous Execution ---\n";
    sender.doWorkAsync();

    // 等待异步任务完成
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    std::cout << "\nMain function finished.\n";

    return 0; // ThreadPool 析构函数会等待线程结束
}

运行结果:
运行结果

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐