引擎设计要点(一 进程与线程管理)
引擎设计要点:进程与线程管理:主线程负责UI/渲染工作线程处理计算密集型任务合理划分线程边界,减少锁竞争我们以游戏引擎为例,详细说明进程与线程管理的设计。我们将设计一个简化的游戏引擎,包含以下部分:主线程(UI/渲染)工作线程(处理物理、AI等计算密集型任务)资源加载线程(异步加载资源)网络线程(处理网络通信)我们将使用以下技术:线程池管理多个工作线程任务队列(主线程与工作线程之间的通信)事件系统
引擎设计要点:
进程与线程管理:
-
主线程负责UI/渲染
-
工作线程处理计算密集型任务
-
合理划分线程边界,减少锁竞争
我们以游戏引擎为例,详细说明进程与线程管理的设计。我们将设计一个简化的游戏引擎,包含以下部分:
-
主线程(UI/渲染)
-
工作线程(处理物理、AI等计算密集型任务)
-
资源加载线程(异步加载资源)
-
网络线程(处理网络通信)
我们将使用以下技术:
-
线程池管理多个工作线程
-
任务队列(主线程与工作线程之间的通信)
-
事件系统(用于线程间通信)
注意:为了简化,我们将重点放在架构上,代码可能不完整,但会展示核心思想。
首先,我们使用UML来描述引擎的线程架构。
UML图(使用类图和时序图):
由于无法直接绘制UML图,我将用文字描述:
类图:
-
Engine(引擎主类)
-
属性:threadPool, renderThread, resourceThread, networkThread
-
方法:start(), stop(), update(), render()
-
-
ThreadPool(线程池)
-
属性:workers(工作线程列表),taskQueue(任务队列)
-
方法:submitTask(task)
-
-
Task(任务基类)
- 方法:execute()
-
RenderThread(渲染线程,继承自Thread)
- 方法:run()
-
ResourceThread(资源线程,继承自Thread)
- 方法:run()
-
NetworkThread(网络线程,继承自Thread)
- 方法:run()
-
EventSystem(事件系统)
-
属性:listeners(事件监听器映射)
-
方法:subscribe(eventType, listener), publish(event)
-
时序图:
主线程(Main Thread) -> Engine: start()
Engine -> RenderThread: 启动渲染线程
Engine -> ResourceThread: 启动资源线程
Engine -> NetworkThread: 启动网络线程
Engine -> ThreadPool: 初始化工作线程池
工作线程(Worker Thread)从ThreadPool的任务队列中获取任务并执行。
现在,我们分别用C++、Java、Python和JavaScript实现。
注意:由于不同语言的线程模型和库不同,实现方式会有差异。
我们假设引擎运行在一个进程中,多个线程共享内存。
我们将重点放在线程间的任务调度和通信上。
由于篇幅限制,每个语言只实现核心部分。
我们先从C++开始
引擎设计要点:进程与线程管理详细实现
一、UML 设计模式
1. 线程架构 UML 类图
@startuml
package "引擎线程架构" {
abstract class ThreadManager {
+startAll() : void
+stopAll() : void
+submitTask(task: Runnable, priority: int) : Future
+waitForCompletion(timeout: long) : boolean
}
class MainThread {
-RenderSystem renderSystem
-UISystem uiSystem
-InputSystem inputSystem
-MessageQueue messageQueue
-bool shouldExit
+run() : void
+processMessages() : void
+scheduleRender() : void
+postMessage(msg: Message) : void
}
class WorkerThreadPool {
-List<WorkerThread> workers
-Queue<Task> taskQueue
-Map<Thread, TaskAffinity> affinityMap
+execute(task: Task) : Future
+executeWithAffinity(task: Task, affinity: AffinityMask) : Future
+balanceLoad() : void
}
class ComputeThread {
-TaskQueue computeQueue
-ComputeContext context
-AtomicLong workCounter
+runComputeKernel(kernel: ComputeKernel, data: Buffer) : void
+synchronize() : void
+getWorkGroupSize() : int
}
class ResourceThread {
-AsyncLoader loader
-CacheManager cache
-PriorityQueue<LoadRequest> loadQueue
+loadAsync(path: String) : Promise<Resource>
+preload(paths: List<String>) : void
+cancelLoad(path: String) : boolean
}
class NetworkThread {
-SocketManager sockets
-PacketQueue incomingQueue
-PacketQueue outgoingQueue
+sendPacket(packet: NetworkPacket) : void
+receivePackets() : List<NetworkPacket>
+processIncoming() : void
}
class TaskScheduler {
-TaskGraph taskGraph
-DependencyResolver dependencies
-PerformanceMonitor perfMonitor
+schedule(task: Task) : void
+parallelize(tasks: List<Task>) : void
+optimizeSchedule() : void
}
ThreadManager <|-- MainThread
ThreadManager <|-- WorkerThreadPool
WorkerThreadPool o-- ComputeThread
WorkerThreadPool o-- ResourceThread
WorkerThreadPool o-- NetworkThread
ThreadManager o-- TaskScheduler
note right of WorkerThreadPool
使用工作窃取算法
避免锁竞争
支持任务亲和性
end note
}
@enduml
2. 线程通信 UML 序列图
@startuml
actor User
participant MainThread as MT
participant WorkerPool as WP
participant ResourceThread as RT
participant ComputeThread as CT
participant "GPU Driver" as GPU
User -> MT: 用户输入(鼠标点击)
MT -> MT: 处理UI事件
MT -> WP: 提交计算任务\n(高优先级)
WP -> CT: 分发计算任务
CT -> CT: 并行计算
par
MT -> MT: 继续渲染
MT -> GPU: 提交渲染命令
GPU --> MT: 渲染完成
and
CT -> CT: 计算物理模拟
CT --> WP: 返回计算结果
end
WP -> MT: 通知计算完成
MT -> MT: 更新场景状态
MT -> GPU: 提交新帧渲染
MT -> User: 显示更新后的画面
note over MT, CT
主线程和工作线程并行工作
减少锁竞争是关键
通过消息队列通信
end note
@enduml
二、C++ 实现示例
1. 现代C++线程管理框架
#include <atomic>
#include <thread>
#include <vector>
#include <queue>
#include <future>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <type_traits>
#include <iostream>
#include <chrono>
#include <sstream>
#include <iomanip>
// ==================== 线程安全的队列 ====================
template<typename T>
class ConcurrentQueue {
private:
mutable std::mutex mutex_;
std::queue<T> queue_;
std::condition_variable cond_;
public:
void push(T value) {
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(value));
}
cond_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) return false;
value = std::move(queue_.front());
queue_.pop();
return true;
}
std::optional<T> pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) return std::nullopt;
T value = std::move(queue_.front());
queue_.pop();
return value;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
};
// ==================== 无锁环形缓冲区 ====================
template<typename T, size_t Capacity>
class LockFreeRingBuffer {
private:
static_assert(Capacity > 0 && (Capacity & (Capacity - 1)) == 0,
"Capacity must be power of two");
alignas(64) std::atomic<size_t> head_{0};
alignas(64) std::atomic<size_t> tail_{0};
T buffer_[Capacity];
public:
bool try_push(const T& value) {
size_t current_head = head_.load(std::memory_order_relaxed);
size_t current_tail = tail_.load(std::memory_order_acquire);
if (full(current_head, current_tail)) {
return false;
}
buffer_[current_head & (Capacity - 1)] = value;
head_.store(current_head + 1, std::memory_order_release);
return true;
}
bool try_pop(T& value) {
size_t current_tail = tail_.load(std::memory_order_relaxed);
size_t current_head = head_.load(std::memory_order_acquire);
if (empty(current_head, current_tail)) {
return false;
}
value = buffer_[current_tail & (Capacity - 1)];
tail_.store(current_tail + 1, std::memory_order_release);
return true;
}
private:
bool full(size_t head, size_t tail) const {
return (head - tail) == Capacity;
}
bool empty(size_t head, size_t tail) const {
return head == tail;
}
};
// ==================== 任务亲和性调度器 ====================
class TaskAffinityScheduler {
public:
enum class Affinity {
ANY, // 任意线程
MAIN, // 主线程
COMPUTE, // 计算密集型
IO, // IO密集型
RENDER, // 渲染相关
NETWORK // 网络相关
};
private:
struct PendingTask {
std::function<void()> task;
Affinity affinity;
int priority;
uint64_t submitTime;
};
struct ThreadInfo {
std::thread thread;
Affinity affinity;
std::atomic<bool> running{true};
ConcurrentQueue<PendingTask> taskQueue;
std::atomic<uint64_t> workCounter{0};
// 缓存行填充,避免伪共享
char padding[64];
};
std::vector<std::unique_ptr<ThreadInfo>> threads_;
std::unordered_map<std::thread::id, ThreadInfo*> threadMap_;
std::mutex mapMutex_;
// 主线程特殊处理
ThreadInfo* mainThreadInfo_{nullptr};
public:
TaskAffinityScheduler() {
// 获取CPU核心数
unsigned int numCores = std::thread::hardware_concurrency();
// 创建不同亲和性的线程
createThreads(Affinity::COMPUTE, numCores / 2); // 50%用于计算
createThreads(Affinity::IO, numCores / 4); // 25%用于IO
createThreads(Affinity::RENDER, 2); // 2个渲染线程
createThreads(Affinity::NETWORK, 1); // 1个网络线程
// 注册主线程
registerMainThread();
}
~TaskAffinityScheduler() {
for (auto& info : threads_) {
info->running.store(false, std::memory_order_release);
if (info->thread.joinable()) {
info->thread.join();
}
}
}
template<typename F>
auto submit(F&& func, Affinity affinity = Affinity::ANY,
int priority = 0) -> std::future<decltype(func())> {
using ResultType = decltype(func());
auto task = std::make_shared<std::packaged_task<ResultType()>>(
std::forward<F>(func));
std::future<ResultType> future = task->get_future();
PendingTask pending{
[task]() { (*task)(); },
affinity,
priority,
static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count())
};
// 根据亲和性选择线程
ThreadInfo* target = selectThread(affinity);
if (target) {
target->taskQueue.push(std::move(pending));
target->workCounter.fetch_add(1, std::memory_order_relaxed);
}
return future;
}
void processMainThreadTasks() {
if (!mainThreadInfo_) return;
// 处理主线程任务队列
constexpr size_t MAX_TASKS_PER_FRAME = 100;
size_t processed = 0;
while (processed < MAX_TASKS_PER_FRAME) {
auto task = mainThreadInfo_->taskQueue.pop();
if (!task) break;
try {
task->task();
processed++;
} catch (const std::exception& e) {
std::cerr << "Main thread task failed: " << e.what() << std::endl;
}
}
}
private:
void createThreads(Affinity affinity, size_t count) {
for (size_t i = 0; i < count; ++i) {
auto info = std::make_unique<ThreadInfo>();
info->affinity = affinity;
info->thread = std::thread([this, info = info.get()]() {
workerThreadFunc(info);
});
{
std::lock_guard<std::mutex> lock(mapMutex_);
threadMap_[info->thread.get_id()] = info.get();
}
threads_.push_back(std::move(info));
}
}
void workerThreadFunc(ThreadInfo* info) {
// 设置线程名称(平台相关)
setThreadName(getAffinityName(info->affinity));
// 设置CPU亲和性(平台相关)
setThreadAffinity(info->affinity);
// 工作循环
while (info->running.load(std::memory_order_acquire)) {
PendingTask task;
if (info->taskQueue.try_pop(task)) {
try {
task.task();
info->workCounter.fetch_sub(1, std::memory_order_relaxed);
} catch (const std::exception& e) {
std::cerr << "Worker task failed: " << e.what() << std::endl;
}
} else {
// 工作窃取:尝试从其他线程窃取任务
if (tryStealWork(info)) {
continue;
}
// 无任务时短暂休眠
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
}
bool tryStealWork(ThreadInfo* thief) {
// 从最忙的线程窃取工作
ThreadInfo* busiest = nullptr;
uint64_t maxWork = 0;
for (const auto& info : threads_) {
if (info.get() == thief) continue;
uint64_t work = info->workCounter.load(std::memory_order_relaxed);
if (work > maxWork && !info->taskQueue.empty()) {
maxWork = work;
busiest = info.get();
}
}
if (busiest) {
PendingTask task;
if (busiest->taskQueue.try_pop(task)) {
thief->taskQueue.push(std::move(task));
busiest->workCounter.fetch_sub(1, std::memory_order_relaxed);
thief->workCounter.fetch_add(1, std::memory_order_relaxed);
return true;
}
}
return false;
}
ThreadInfo* selectThread(Affinity affinity) {
if (affinity == Affinity::MAIN) {
return mainThreadInfo_;
}
// 选择最少工作的合适线程
ThreadInfo* best = nullptr;
uint64_t minWork = UINT64_MAX;
for (const auto& info : threads_) {
if (affinity != Affinity::ANY && info->affinity != affinity) {
continue;
}
uint64_t work = info->workCounter.load(std::memory_order_relaxed);
if (work < minWork) {
minWork = work;
best = info.get();
}
}
return best;
}
void registerMainThread() {
auto info = std::make_unique<ThreadInfo>();
info->affinity = Affinity::MAIN;
info->thread = std::thread([](){}); // 虚拟线程
info->thread.detach();
mainThreadInfo_ = info.get();
{
std::lock_guard<std::mutex> lock(mapMutex_);
threadMap_[std::this_thread::get_id()] = info.get();
}
threads_.push_back(std::move(info));
}
// 平台相关函数
static void setThreadName(const std::string& name) {
#ifdef _WIN32
// Windows设置线程名
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push,8)
typedef struct tagTHREADNAME_INFO {
DWORD dwType;
LPCSTR szName;
DWORD dwThreadID;
DWORD dwFlags;
} THREADNAME_INFO;
#pragma pack(pop)
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = name.c_str();
info.dwThreadID = GetCurrentThreadId();
info.dwFlags = 0;
__try {
RaiseException(MS_VC_EXCEPTION, 0, sizeof(info)/sizeof(ULONG_PTR),
(ULONG_PTR*)&info);
} __except(EXCEPTION_EXECUTE_HANDLER) {
}
#elif defined(__linux__)
// Linux设置线程名
pthread_setname_np(pthread_self(), name.substr(0, 15).c_str());
#endif
}
static void setThreadAffinity(Affinity affinity) {
#ifdef _WIN32
DWORD_PTR affinityMask = 0;
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
switch (affinity) {
case Affinity::COMPUTE:
// 使用所有CPU核心
affinityMask = (1 << sysInfo.dwNumberOfProcessors) - 1;
break;
case Affinity::IO:
// 使用一半核心
affinityMask = (1 << (sysInfo.dwNumberOfProcessors / 2)) - 1;
break;
case Affinity::RENDER:
// 使用特定核心
affinityMask = 0x1; // 第一个核心
break;
default:
affinityMask = 0;
}
if (affinityMask != 0) {
SetThreadAffinityMask(GetCurrentThread(), affinityMask);
}
#endif
}
static std::string getAffinityName(Affinity affinity) {
switch (affinity) {
case Affinity::MAIN: return "MainThread";
case Affinity::COMPUTE: return "ComputeThread";
case Affinity::IO: return "IOThread";
case Affinity::RENDER: return "RenderThread";
case Affinity::NETWORK: return "NetworkThread";
default: return "WorkerThread";
}
}
};
// ==================== 主线程渲染系统 ====================
class RenderEngine {
private:
TaskAffinityScheduler scheduler_;
std::atomic<bool> running_{false};
std::thread renderThread_;
// 渲染状态
struct FrameData {
std::vector<float> vertices;
std::vector<uint32_t> indices;
std::vector<float> transforms;
};
ConcurrentQueue<FrameData> frameQueue_;
LockFreeRingBuffer<FrameData, 3> frameBuffer_; // 三重缓冲
public:
void start() {
running_.store(true, std::memory_order_release);
renderThread_ = std::thread([this]() {
mainRenderLoop();
});
}
void stop() {
running_.store(false, std::memory_order_release);
if (renderThread_.joinable()) {
renderThread_.join();
}
}
void updateScene() {
// 在主线程处理UI和输入
processInput();
updateUI();
// 提交计算任务到工作线程
auto physicsFuture = scheduler_.submit([this]() {
return calculatePhysics();
}, TaskAffinityScheduler::Affinity::COMPUTE, 10);
auto aiFuture = scheduler_.submit([this]() {
return calculateAI();
}, TaskAffinityScheduler::Affinity::COMPUTE, 5);
// 继续处理渲染
prepareRendering();
// 等待计算完成(非阻塞方式)
if (physicsFuture.wait_for(std::chrono::milliseconds(0)) ==
std::future_status::ready) {
applyPhysicsResults(physicsFuture.get());
}
// 处理主线程任务
scheduler_.processMainThreadTasks();
// 提交渲染帧
submitFrame();
}
private:
void mainRenderLoop() {
auto lastTime = std::chrono::high_resolution_clock::now();
while (running_.load(std::memory_order_acquire)) {
auto currentTime = std::chrono::high_resolution_clock::now();
float deltaTime = std::chrono::duration<float>(
currentTime - lastTime).count();
lastTime = currentTime;
// 限制帧率
constexpr float TARGET_FPS = 60.0f;
constexpr float FRAME_TIME = 1.0f / TARGET_FPS;
if (deltaTime < FRAME_TIME) {
std::this_thread::sleep_for(
std::chrono::duration<float>(FRAME_TIME - deltaTime));
deltaTime = FRAME_TIME;
}
// 渲染一帧
renderFrame(deltaTime);
}
}
void renderFrame(float deltaTime) {
// 从缓冲区获取帧数据
FrameData frame;
if (frameBuffer_.try_pop(frame)) {
// 实际渲染逻辑
uploadToGPU(frame.vertices, frame.indices, frame.transforms);
draw();
present();
}
// 处理渲染线程的任务
scheduler_.processMainThreadTasks();
}
void processInput() {
// 处理用户输入(必须在主线程)
}
void updateUI() {
// 更新UI状态(必须在主线程)
}
PhysicsData calculatePhysics() {
// 计算密集型物理模拟
PhysicsData result;
// ... 复杂计算
return result;
}
AIData calculateAI() {
// 计算密集型AI逻辑
AIData result;
// ... 复杂计算
return result;
}
void prepareRendering() {
// 准备渲染数据
}
void submitFrame() {
FrameData frame;
// 填充帧数据
frameBuffer_.try_push(frame);
}
void uploadToGPU(const std::vector<float>& vertices,
const std::vector<uint32_t>& indices,
const std::vector<float>& transforms) {
// 上传数据到GPU
}
void draw() {
// 执行绘制命令
}
void present() {
// 呈现到屏幕
}
};
// ==================== 使用示例 ====================
int main() {
RenderEngine engine;
engine.start();
// 模拟游戏循环
for (int i = 0; i < 1000; ++i) {
engine.updateScene();
std::this_thread::sleep_for(std::chrono::milliseconds(16));
}
engine.stop();
return 0;
}
2. C++20 协程支持的任务系统
#include <coroutine>
#include <concepts>
#include <experimental/generator>
// 协程任务
template<typename T>
struct Task {
struct promise_type {
T value;
std::exception_ptr exception;
Task get_return_object() {
return Task{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
auto final_suspend() noexcept {
struct awaiter {
bool await_ready() noexcept { return false; }
std::coroutine_handle<> await_suspend(
std::coroutine_handle<promise_type> h) noexcept {
if (h.promise().continuation) {
return h.promise().continuation;
}
return std::noop_coroutine();
}
void await_resume() noexcept {}
};
return awaiter{};
}
void unhandled_exception() {
exception = std::current_exception();
}
template<std::convertible_to<T> From>
void return_value(From&& from) {
value = std::forward<From>(from);
}
std::coroutine_handle<> continuation;
};
std::coroutine_handle<promise_type> handle;
explicit Task(std::coroutine_handle<promise_type> h) : handle(h) {}
~Task() {
if (handle) handle.destroy();
}
Task(Task&& other) noexcept : handle(other.handle) {
other.handle = nullptr;
}
Task& operator=(Task&& other) noexcept {
if (this != &other) {
if (handle) handle.destroy();
handle = other.handle;
other.handle = nullptr;
}
return *this;
}
bool await_ready() const { return false; }
std::coroutine_handle<> await_suspend(
std::coroutine_handle<> awaiting) {
handle.promise().continuation = awaiting;
return handle;
}
T await_resume() {
if (handle.promise().exception) {
std::rethrow_exception(handle.promise().exception);
}
return std::move(handle.promise().value);
}
};
// 并行任务执行器
class ParallelExecutor {
private:
TaskAffinityScheduler& scheduler_;
public:
explicit ParallelExecutor(TaskAffinityScheduler& scheduler)
: scheduler_(scheduler) {}
template<typename... Tasks>
auto when_all(Tasks&&... tasks) -> Task<std::tuple<typename Tasks::value_type...>> {
using ResultType = std::tuple<typename Tasks::value_type...>;
auto all_done = [](auto&... tasks) {
return (... && tasks.handle.done());
};
// 等待所有任务完成
while (!all_done(tasks...)) {
co_await std::suspend_always{};
}
// 收集结果
co_return ResultType{co_await std::move(tasks)...};
}
template<typename F, typename... Args>
auto async(F&& func, Args&&... args)
-> Task<std::invoke_result_t<F, Args...>> {
using ResultType = std::invoke_result_t<F, Args...>;
// 将任务提交到线程池
auto future = scheduler_.submit(
[func = std::forward<F>(func),
args = std::make_tuple(std::forward<Args>(args)...)]() mutable {
return std::apply(func, std::move(args));
},
TaskAffinityScheduler::Affinity::COMPUTE
);
// 等待结果
co_return future.get();
}
};
// 使用协程的示例
Task<int> computeSomething(int x) {
// 模拟计算密集型任务
co_await std::suspend_always{};
int result = x * x;
co_return result;
}
Task<float> computeSomethingElse(float y) {
// 另一个计算任务
co_await std::suspend_always{};
float result = y * 2.0f;
co_return result;
}
Task<void> mainAsyncLogic() {
ParallelExecutor executor(/* scheduler */);
// 并行执行多个任务
auto [result1, result2] = co_await executor.when_all(
computeSomething(42),
computeSomethingElse(3.14f)
);
std::cout << "Results: " << result1 << ", " << result2 << std::endl;
// 继续执行其他任务
auto result3 = co_await executor.async([](int a, int b) {
return a + b;
}, 10, 20);
std::cout << "Result3: " << result3 << std::endl;
}
三、Java 实现示例
1. Java虚拟线程(Project Loom)架构
import java.lang.reflect.Field;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.*;
import java.util.function.Supplier;
// 线程优先级和亲和性枚举
enum ThreadAffinity {
MAIN, // 主线程(UI/渲染)
COMPUTE, // 计算密集型
IO, // IO密集型
RENDER, // 渲染辅助
NETWORK, // 网络处理
BACKGROUND // 后台任务
}
// 高级线程池配置
class ThreadPoolConfig {
private final int corePoolSize;
private final int maxPoolSize;
private final long keepAliveTime;
private final TimeUnit unit;
private final ThreadAffinity affinity;
private final int priority;
private final String threadNamePrefix;
private ThreadPoolConfig(Builder builder) {
this.corePoolSize = builder.corePoolSize;
this.maxPoolSize = builder.maxPoolSize;
this.keepAliveTime = builder.keepAliveTime;
this.unit = builder.unit;
this.affinity = builder.affinity;
this.priority = builder.priority;
this.threadNamePrefix = builder.threadNamePrefix;
}
static class Builder {
private int corePoolSize = Runtime.getRuntime().availableProcessors();
private int maxPoolSize = corePoolSize * 2;
private long keepAliveTime = 60L;
private TimeUnit unit = TimeUnit.SECONDS;
private ThreadAffinity affinity = ThreadAffinity.COMPUTE;
private int priority = Thread.NORM_PRIORITY;
private String threadNamePrefix = "EngineThread";
Builder corePoolSize(int size) {
this.corePoolSize = size;
return this;
}
Builder maxPoolSize(int size) {
this.maxPoolSize = size;
return this;
}
Builder affinity(ThreadAffinity affinity) {
this.affinity = affinity;
return this;
}
Builder priority(int priority) {
this.priority = priority;
return this;
}
Builder threadNamePrefix(String prefix) {
this.threadNamePrefix = prefix;
return this;
}
ThreadPoolConfig build() {
return new ThreadPoolConfig(this);
}
}
}
// 虚拟线程感知的线程池(Java 19+)
class VirtualThreadAwareExecutor implements ExecutorService {
private final ExecutorService delegate;
private final boolean useVirtualThreads;
private final ThreadAffinity affinity;
public VirtualThreadAwareExecutor(ThreadPoolConfig config) {
this.affinity = config.getAffinity();
// 根据任务类型选择线程类型
if (shouldUseVirtualThreads(config.getAffinity())) {
this.useVirtualThreads = true;
this.delegate = createVirtualThreadExecutor(config);
} else {
this.useVirtualThreads = false;
this.delegate = createPlatformThreadExecutor(config);
}
}
private boolean shouldUseVirtualThreads(ThreadAffinity affinity) {
// 虚拟线程适合IO密集型任务
return affinity == ThreadAffinity.IO ||
affinity == ThreadAffinity.NETWORK ||
affinity == ThreadAffinity.BACKGROUND;
}
private ExecutorService createVirtualThreadExecutor(ThreadPoolConfig config) {
// Java 19+ 虚拟线程
ThreadFactory factory = Thread.ofVirtual()
.name(config.getThreadNamePrefix() + "-", 0)
.factory();
return Executors.newThreadPerTaskExecutor(factory);
}
private ExecutorService createPlatformThreadExecutor(ThreadPoolConfig config) {
// 平台线程(内核线程)适合计算密集型任务
ThreadFactory factory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(config.getThreadNamePrefix() + "-" +
counter.getAndIncrement());
thread.setPriority(config.getPriority());
thread.setDaemon(true);
// 设置CPU亲和性(需要JNI或第三方库)
setThreadAffinity(thread, config.getAffinity());
return thread;
}
};
return new ThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaxPoolSize(),
config.getKeepAliveTime(),
config.getUnit(),
new LinkedBlockingQueue<>(),
factory
);
}
private void setThreadAffinity(Thread thread, ThreadAffinity affinity) {
// 实际实现需要JNI调用本地方法
// 这里只是示例
switch (affinity) {
case COMPUTE:
// 绑定到高性能核心
break;
case RENDER:
// 绑定到与GPU通信的核心
break;
default:
// 使用默认调度
break;
}
}
@Override
public <T> Future<T> submit(Callable<T> task) {
if (useVirtualThreads) {
// 虚拟线程优化:避免线程池队列
return delegate.submit(wrapForVirtualThread(task));
}
return delegate.submit(task);
}
private <T> Callable<T> wrapForVirtualThread(Callable<T> task) {
return () -> {
// 虚拟线程特定优化
Thread current = Thread.currentThread();
if (current.isVirtual()) {
// 设置虚拟线程的调度提示
setVirtualThreadHint(affinity);
}
return task.call();
};
}
private void setVirtualThreadHint(ThreadAffinity affinity) {
// 虚拟线程调度提示
// 实际实现需要Java内部API
}
// 委托方法
@Override public void execute(Runnable command) { delegate.execute(command); }
@Override public <T> Future<T> submit(Runnable task, T result) { return delegate.submit(task, result); }
@Override public Future<?> submit(Runnable task) { return delegate.submit(task); }
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return delegate.invokeAll(tasks); }
@Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return delegate.invokeAll(tasks, timeout, unit); }
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return delegate.invokeAny(tasks); }
@Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.invokeAny(tasks, timeout, unit); }
@Override public void shutdown() { delegate.shutdown(); }
@Override public List<Runnable> shutdownNow() { return delegate.shutdownNow(); }
@Override public boolean isShutdown() { return delegate.isShutdown(); }
@Override public boolean isTerminated() { return delegate.isTerminated(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return delegate.awaitTermination(timeout, unit); }
}
// 主线程调度器
class MainThreadScheduler {
private static final MainThreadScheduler INSTANCE = new MainThreadScheduler();
private final LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "Main-Thread-Scheduler");
t.setPriority(Thread.MAX_PRIORITY);
return t;
});
private volatile Thread mainThread;
private MainThreadScheduler() {
// 注册主线程
registerMainThread();
}
public static MainThreadScheduler getInstance() {
return INSTANCE;
}
private void registerMainThread() {
this.mainThread = Thread.currentThread();
// 设置线程优先级
mainThread.setPriority(Thread.MAX_PRIORITY);
// 设置线程名称
mainThread.setName("Main-Render-Thread");
}
public boolean isMainThread() {
return Thread.currentThread() == mainThread;
}
public void runOnMainThread(Runnable task) {
if (isMainThread()) {
// 已经在主线程,直接执行
task.run();
} else {
// 提交到主线程队列
taskQueue.offer(task);
}
}
public void runOnMainThread(Runnable task, long delayMillis) {
scheduler.schedule(() -> runOnMainThread(task), delayMillis, TimeUnit.MILLISECONDS);
}
public <T> CompletableFuture<T> callOnMainThread(Supplier<T> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
runOnMainThread(() -> {
try {
future.complete(supplier.get());
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}
public void processTasks() {
if (!isMainThread()) {
throw new IllegalStateException("Must be called from main thread");
}
// 每帧处理有限数量的任务,避免阻塞
int maxTasksPerFrame = 100;
int processed = 0;
while (processed < maxTasksPerFrame) {
Runnable task = taskQueue.poll();
if (task == null) break;
try {
task.run();
processed++;
} catch (Exception e) {
System.err.println("Main thread task failed: " + e.getMessage());
}
}
}
public void shutdown() {
scheduler.shutdown();
taskQueue.clear();
}
}
// 渲染引擎主循环
class RenderEngine {
private final MainThreadScheduler mainScheduler = MainThreadScheduler.getInstance();
private final VirtualThreadAwareExecutor computeExecutor;
private final VirtualThreadAwareExecutor ioExecutor;
private final VirtualThreadAwareExecutor networkExecutor;
private volatile boolean running = false;
private long frameCount = 0;
private double averageFrameTime = 0.0;
// 线程间通信的消息队列
private final ConcurrentLinkedQueue<Runnable> renderQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ComputeResult> computeResults = new ConcurrentLinkedQueue<>();
public RenderEngine() {
// 创建不同亲和性的执行器
this.computeExecutor = new VirtualThreadAwareExecutor(
new ThreadPoolConfig.Builder()
.affinity(ThreadAffinity.COMPUTE)
.threadNamePrefix("ComputeWorker")
.corePoolSize(Runtime.getRuntime().availableProcessors())
.priority(Thread.MAX_PRIORITY - 1)
.build()
);
this.ioExecutor = new VirtualThreadAwareExecutor(
new ThreadPoolConfig.Builder()
.affinity(ThreadAffinity.IO)
.threadNamePrefix("IOWorker")
.corePoolSize(2)
.maxPoolSize(4)
.priority(Thread.NORM_PRIORITY)
.build()
);
this.networkExecutor = new VirtualThreadAwareExecutor(
new ThreadPoolConfig.Builder()
.affinity(ThreadAffinity.NETWORK)
.threadNamePrefix("NetworkWorker")
.corePoolSize(1)
.maxPoolSize(2)
.priority(Thread.MIN_PRIORITY)
.build()
);
}
public void start() {
running = true;
mainLoop();
}
public void stop() {
running = false;
computeExecutor.shutdown();
ioExecutor.shutdown();
networkExecutor.shutdown();
mainScheduler.shutdown();
}
private void mainLoop() {
long lastFrameTime = System.nanoTime();
final double targetFrameTime = 1.0 / 60.0; // 60 FPS
while (running) {
long frameStartTime = System.nanoTime();
// 1. 处理输入(必须在主线程)
processInput();
// 2. 并行提交计算任务
submitComputeTasks();
// 3. 更新UI状态(必须在主线程)
updateUI();
// 4. 处理渲染命令队列
processRenderQueue();
// 5. 处理计算结果
processComputeResults();
// 6. 执行渲染
renderFrame();
// 7. 处理主线程调度任务
mainScheduler.processTasks();
// 8. 帧率控制
long frameEndTime = System.nanoTime();
double frameTime = (frameEndTime - frameStartTime) / 1_000_000_000.0;
// 更新平均帧时间
averageFrameTime = (averageFrameTime * 0.9) + (frameTime * 0.1);
// 休眠以维持目标帧率
if (frameTime < targetFrameTime) {
try {
long sleepMillis = (long)((targetFrameTime - frameTime) * 1000);
Thread.sleep(Math.max(1, sleepMillis));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
frameCount++;
lastFrameTime = frameEndTime;
}
}
private void processInput() {
// 处理用户输入事件
// 这必须在主线程完成
}
private void submitComputeTasks() {
// 提交并行计算任务
// 物理计算
CompletableFuture<PhysicsResult> physicsFuture =
CompletableFuture.supplyAsync(() -> {
return calculatePhysics();
}, computeExecutor);
// AI计算
CompletableFuture<AIResult> aiFuture =
CompletableFuture.supplyAsync(() -> {
return calculateAI();
}, computeExecutor);
// 资源加载(IO密集型)
CompletableFuture<Resource> resourceFuture =
CompletableFuture.supplyAsync(() -> {
return loadResourceAsync("texture.png");
}, ioExecutor);
// 组合结果
CompletableFuture.allOf(physicsFuture, aiFuture, resourceFuture)
.thenAcceptAsync(v -> {
// 计算结果处理(在主线程)
PhysicsResult physics = physicsFuture.join();
AIResult ai = aiFuture.join();
Resource resource = resourceFuture.join();
// 将结果添加到队列供主线程处理
computeResults.offer(new ComputeResult(physics, ai, resource));
}, Runnable::run); // 使用当前线程执行(主线程)
}
private void updateUI() {
// 更新用户界面
// 这必须在主线程完成
}
private void processRenderQueue() {
// 处理渲染命令队列
Runnable command;
while ((command = renderQueue.poll()) != null) {
command.run();
}
}
private void processComputeResults() {
// 处理计算结果
ComputeResult result;
while ((result = computeResults.poll()) != null) {
applyPhysics(result.physics);
applyAI(result.ai);
applyResource(result.resource);
}
}
private void renderFrame() {
// 执行实际渲染
// 这通常在OpenGL/DirectX/Vulkan上下文中完成
}
// 辅助类
private static class ComputeResult {
final PhysicsResult physics;
final AIResult ai;
final Resource resource;
ComputeResult(PhysicsResult physics, AIResult ai, Resource resource) {
this.physics = physics;
this.ai = ai;
this.resource = resource;
}
}
// 示例方法
private PhysicsResult calculatePhysics() {
// 模拟物理计算
return new PhysicsResult();
}
private AIResult calculateAI() {
// 模拟AI计算
return new AIResult();
}
private Resource loadResourceAsync(String path) {
// 模拟异步资源加载
try {
Thread.sleep(50); // 模拟IO延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new Resource(path);
}
private void applyPhysics(PhysicsResult physics) {
// 应用物理计算结果
}
private void applyAI(AIResult ai) {
// 应用AI计算结果
}
private void applyResource(Resource resource) {
// 应用加载的资源
}
}
// 线程安全的资源管理器
class ThreadSafeResourceManager {
private final ConcurrentHashMap<String, Resource> cache =
new ConcurrentHashMap<>();
private final VirtualThreadAwareExecutor ioExecutor;
private final StripedLock<String> loadLocks = new StripedLock<>(16);
public ThreadSafeResourceManager() {
this.ioExecutor = new VirtualThreadAwareExecutor(
new ThreadPoolConfig.Builder()
.affinity(ThreadAffinity.IO)
.threadNamePrefix("ResourceIO")
.corePoolSize(2)
.maxPoolSize(8)
.build()
);
}
public CompletableFuture<Resource> getOrLoad(String path) {
// 1. 检查缓存
Resource cached = cache.get(path);
if (cached != null) {
return CompletableFuture.completedFuture(cached);
}
// 2. 异步加载
return CompletableFuture.supplyAsync(() -> {
// 使用细粒度锁避免重复加载
return loadLocks.compute(path, key -> {
// 双重检查
Resource existing = cache.get(key);
if (existing != null) {
return existing;
}
// 实际加载
Resource loaded = loadResource(key);
cache.put(key, loaded);
return loaded;
});
}, ioExecutor);
}
private Resource loadResource(String path) {
// 实际资源加载逻辑
try {
Thread.sleep(100); // 模拟IO
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new Resource(path);
}
}
// 条纹锁实现(减少锁竞争)
class StripedLock<K> {
private final Object[] locks;
private final int mask;
public StripedLock(int concurrencyLevel) {
// 确保是2的幂
int size = Integer.highestOneBit(concurrencyLevel) << 1;
locks = new Object[size];
mask = size - 1;
for (int i = 0; i < locks.length; i++) {
locks[i] = new Object();
}
}
public <V> V compute(K key, Function<K, V> function) {
int hash = key.hashCode();
int index = hash & mask;
synchronized (locks[index]) {
return function.apply(key);
}
}
}
// 使用示例
public class GameEngineDemo {
public static void main(String[] args) {
RenderEngine engine = new RenderEngine();
// 启动引擎
Thread engineThread = new Thread(() -> {
try {
engine.start();
} catch (Exception e) {
e.printStackTrace();
}
}, "Game-Engine-Main");
engineThread.setPriority(Thread.MAX_PRIORITY);
engineThread.start();
// 模拟运行5秒
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 停止引擎
engine.stop();
try {
engineThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
四、Python 实现示例
1. 异步/等待架构的引擎
import asyncio
import threading
import multiprocessing
import queue
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple
import heapq
import uuid
# ==================== 线程亲和性枚举 ====================
class ThreadAffinity(Enum):
MAIN = "main" # 主线程(UI/渲染)
COMPUTE = "compute" # 计算密集型
IO = "io" # IO密集型
RENDER = "render" # 渲染辅助
NETWORK = "network" # 网络处理
# ==================== 任务优先级队列 ====================
@dataclass(order=True)
class PrioritizedTask:
priority: int
timestamp: float
task_id: str = ""
task: Any = None
def __init__(self, priority: int, task: Callable, task_id: str = None):
self.priority = priority
self.timestamp = time.time()
self.task_id = task_id or str(uuid.uuid4())
self.task = task
# ==================== 线程安全的优先级队列 ====================
class ThreadSafePriorityQueue:
def __init__(self):
self._queue = []
self._lock = threading.Lock()
self._not_empty = threading.Condition(self._lock)
def put(self, item: PrioritizedTask):
with self._lock:
heapq.heappush(self._queue, item)
self._not_empty.notify()
def get(self, timeout: float = None) -> PrioritizedTask:
with self._not_empty:
if timeout is None:
while not self._queue:
self._not_empty.wait()
else:
end_time = time.time() + timeout
while not self._queue:
remaining = end_time - time.time()
if remaining <= 0:
raise queue.Empty
self._not_empty.wait(remaining)
return heapq.heappop(self._queue)
def qsize(self) -> int:
with self._lock:
return len(self._queue)
def empty(self) -> bool:
with self._lock:
return len(self._queue) == 0
# ==================== 工作线程管理器 ====================
class WorkerThread:
def __init__(self, name: str, affinity: ThreadAffinity):
self.name = name
self.affinity = affinity
self.thread = None
self.running = False
self.task_queue = ThreadSafePriorityQueue()
self.work_counter = 0
def start(self):
self.running = True
self.thread = threading.Thread(
target=self._worker_loop,
name=self.name,
daemon=True
)
self.thread.start()
def stop(self):
self.running = False
if self.thread:
self.thread.join(timeout=5)
def submit(self, task: Callable, priority: int = 0) -> str:
"""提交任务并返回任务ID"""
task_item = PrioritizedTask(priority, task)
self.task_queue.put(task_item)
self.work_counter += 1
return task_item.task_id
def _worker_loop(self):
"""工作线程主循环"""
print(f"Worker {self.name} started with affinity {self.affinity}")
while self.running:
try:
# 获取任务(带超时,避免忙等待)
task_item = self.task_queue.get(timeout=0.1)
try:
# 执行任务
task_item.task()
self.work_counter -= 1
except Exception as e:
print(f"Task execution failed in {self.name}: {e}")
except queue.Empty:
# 队列为空,尝试工作窃取
continue
except Exception as e:
print(f"Worker {self.name} error: {e}")
time.sleep(0.1)
# ==================== 线程池管理器 ====================
class ThreadPoolManager:
def __init__(self):
self.workers: Dict[ThreadAffinity, List[WorkerThread]] = {}
self.thread_executors: Dict[ThreadAffinity, ThreadPoolExecutor] = {}
self.process_executor: Optional[ProcessPoolExecutor] = None
self.main_thread_id = threading.get_ident()
# 初始化线程池
self._init_pools()
def _init_pools(self):
"""根据亲和性初始化不同的线程池"""
cpu_count = multiprocessing.cpu_count()
# 计算密集型线程池(使用进程池避免GIL限制)
self.process_executor = ProcessPoolExecutor(
max_workers=cpu_count,
mp_context=multiprocessing.get_context('spawn')
)
# IO密集型线程池
self.thread_executors[ThreadAffinity.IO] = ThreadPoolExecutor(
max_workers=cpu_count * 2,
thread_name_prefix='IO-Worker'
)
# 网络线程池
self.thread_executors[ThreadAffinity.NETWORK] = ThreadPoolExecutor(
max_workers=4,
thread_name_prefix='Network-Worker'
)
# 渲染辅助线程池
self.thread_executors[ThreadAffinity.RENDER] = ThreadPoolExecutor(
max_workers=2,
thread_name_prefix='Render-Worker'
)
# 创建工作线程
for affinity in [ThreadAffinity.COMPUTE, ThreadAffinity.RENDER]:
self.workers[affinity] = []
worker_count = cpu_count if affinity == ThreadAffinity.COMPUTE else 2
for i in range(worker_count):
worker = WorkerThread(
name=f"{affinity.value}-Worker-{i}",
affinity=affinity
)
worker.start()
self.workers[affinity].append(worker)
def submit(self, func: Callable, affinity: ThreadAffinity = ThreadAffinity.COMPUTE,
priority: int = 0) -> str:
"""提交任务到指定亲和性的线程池"""
if affinity == ThreadAffinity.COMPUTE:
# 计算密集型任务使用进程池
future = self.process_executor.submit(func)
task_id = str(uuid.uuid4())
# 包装Future以便跟踪
def wrapper():
try:
future.result(timeout=30)
except Exception as e:
print(f"Compute task failed: {e}")
# 在工作线程中执行包装器
worker = self._select_worker(affinity)
return worker.submit(wrapper, priority)
elif affinity in self.thread_executors:
# IO/网络/渲染任务使用线程池
future = self.thread_executors[affinity].submit(func)
task_id = str(uuid.uuid4())
def wrapper():
try:
future.result(timeout=30)
except Exception as e:
print(f"{affinity.value} task failed: {e}")
worker = self._select_worker(affinity)
return worker.submit(wrapper, priority)
else:
raise ValueError(f"Unsupported affinity: {affinity}")
def _select_worker(self, affinity: ThreadAffinity) -> WorkerThread:
"""选择最少工作的worker"""
if affinity not in self.workers:
# 创建新的worker
worker = WorkerThread(
name=f"{affinity.value}-Worker",
affinity=affinity
)
worker.start()
self.workers[affinity] = [worker]
return worker
workers = self.workers[affinity]
# 选择工作计数器最小的worker
return min(workers, key=lambda w: w.work_counter)
def is_main_thread(self) -> bool:
return threading.get_ident() == self.main_thread_id
def run_on_main_thread(self, func: Callable):
"""在主线程执行函数"""
if self.is_main_thread():
func()
else:
# 在主线程的工作队列中执行
# 实际应用中需要主线程轮询这个队列
pass
def shutdown(self):
"""关闭所有线程池和工作线程"""
if self.process_executor:
self.process_executor.shutdown(wait=True)
for executor in self.thread_executors.values():
executor.shutdown(wait=True)
for worker_list in self.workers.values():
for worker in worker_list:
worker.stop()
# ==================== 异步渲染引擎 ====================
class AsyncRenderEngine:
def __init__(self):
self.thread_pool = ThreadPoolManager()
self.running = False
self.render_loop_task = None
self.last_frame_time = 0
self.target_fps = 60
self.frame_time = 1.0 / self.target_fps
# 渲染状态
self.scene_objects = []
self.pending_updates = queue.Queue()
self.render_commands = queue.Queue()
async def start(self):
"""启动引擎"""
self.running = True
self.last_frame_time = time.time()
# 启动异步渲染循环
self.render_loop_task = asyncio.create_task(self._render_loop())
# 启动任务处理循环
asyncio.create_task(self._process_tasks())
async def stop(self):
"""停止引擎"""
self.running = False
if self.render_loop_task:
await self.render_loop_task
self.thread_pool.shutdown()
async def _render_loop(self):
"""主渲染循环"""
print("Render loop started")
while self.running:
frame_start = time.time()
# 1. 处理输入(必须在主线程)
await self._process_input()
# 2. 并行提交计算任务
await self._submit_compute_tasks()
# 3. 更新UI
await self._update_ui()
# 4. 处理渲染命令
await self._process_render_commands()
# 5. 执行渲染
await self._execute_render()
# 6. 帧率控制
frame_time = time.time() - frame_start
sleep_time = max(0, self.frame_time - frame_time)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# 更新帧统计
self._update_frame_stats(frame_time)
async def _process_input(self):
"""处理用户输入(模拟)"""
# 实际实现中会处理鼠标、键盘事件
await asyncio.sleep(0.001) # 模拟处理时间
async def _submit_compute_tasks(self):
"""并行提交计算任务"""
# 创建异步任务组
physics_task = asyncio.create_task(self._run_physics())
ai_task = asyncio.create_task(self._run_ai())
pathfinding_task = asyncio.create_task(self._run_pathfinding())
# 并行执行
await asyncio.gather(physics_task, ai_task, pathfinding_task)
# 应用结果到主线程
physics_result = physics_task.result()
ai_result = ai_task.result()
pathfinding_result = pathfinding_task.result()
# 将结果添加到更新队列
self.pending_updates.put(('physics', physics_result))
self.pending_updates.put(('ai', ai_result))
self.pending_updates.put(('pathfinding', pathfinding_result))
async def _run_physics(self):
"""运行物理计算(使用线程池避免阻塞事件循环)"""
loop = asyncio.get_event_loop()
# 在线程池中执行CPU密集型计算
result = await loop.run_in_executor(
self.thread_pool.process_executor,
self._calculate_physics
)
return result
async def _run_ai(self):
"""运行AI计算"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.thread_pool.process_executor,
self._calculate_ai
)
return result
async def _run_pathfinding(self):
"""运行路径查找"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.thread_pool.process_executor,
self._calculate_pathfinding
)
return result
def _calculate_physics(self):
"""计算物理(模拟CPU密集型任务)"""
time.sleep(0.005) # 模拟计算时间
return {"velocity": 1.0, "position": [0, 0, 0]}
def _calculate_ai(self):
"""计算AI(模拟CPU密集型任务)"""
time.sleep(0.003)
return {"state": "idle", "target": None}
def _calculate_pathfinding(self):
"""计算路径查找(模拟CPU密集型任务)"""
time.sleep(0.002)
return {"path": [], "cost": 0}
async def _update_ui(self):
"""更新UI状态"""
# 处理UI更新
await asyncio.sleep(0.001)
async def _process_render_commands(self):
"""处理渲染命令队列"""
# 从队列中获取所有命令并执行
commands = []
while not self.render_commands.empty():
try:
command = self.render_commands.get_nowait()
commands.append(command)
except queue.Empty:
break
for command in commands:
await command()
async def _execute_render(self):
"""执行实际渲染"""
# 模拟渲染时间
await asyncio.sleep(0.008)
async def _process_tasks(self):
"""处理任务队列"""
while self.running:
try:
# 处理待更新
while not self.pending_updates.empty():
update_type, data = self.pending_updates.get_nowait()
self._apply_update(update_type, data)
# 短暂休眠避免忙等待
await asyncio.sleep(0.001)
except Exception as e:
print(f"Task processing error: {e}")
def _apply_update(self, update_type: str, data: Any):
"""应用更新到场景"""
if update_type == 'physics':
# 应用物理更新
pass
elif update_type == 'ai':
# 应用AI更新
pass
elif update_type == 'pathfinding':
# 应用路径查找更新
pass
def _update_frame_stats(self, frame_time: float):
"""更新帧统计信息"""
fps = 1.0 / frame_time if frame_time > 0 else 0
# 可以记录帧率、帧时间等统计信息
# ==================== 异步资源加载器 ====================
class AsyncResourceLoader:
def __init__(self, thread_pool: ThreadPoolManager):
self.thread_pool = thread_pool
self.cache = {}
self.loading_tasks = {}
async def load(self, resource_path: str) -> Any:
"""异步加载资源"""
# 检查缓存
if resource_path in self.cache:
return self.cache[resource_path]
# 检查是否正在加载
if resource_path in self.loading_tasks:
# 等待现有任务完成
return await self.loading_tasks[resource_path]
# 创建新的加载任务
task = asyncio.create_task(self._load_resource(resource_path))
self.loading_tasks[resource_path] = task
try:
resource = await task
self.cache[resource_path] = resource
return resource
finally:
# 清理任务记录
del self.loading_tasks[resource_path]
async def _load_resource(self, resource_path: str) -> Any:
"""实际加载资源(使用IO线程池)"""
loop = asyncio.get_event_loop()
# 在IO线程池中执行文件操作
return await loop.run_in_executor(
self.thread_pool.thread_executors[ThreadAffinity.IO],
self._load_from_disk,
resource_path
)
def _load_from_disk(self, resource_path: str) -> Any:
"""从磁盘加载资源(模拟)"""
time.sleep(0.1) # 模拟IO延迟
return f"Resource data for {resource_path}"
# ==================== 使用示例 ====================
async def main():
"""主程序示例"""
# 创建渲染引擎
engine = AsyncRenderEngine()
# 创建资源加载器
resource_loader = AsyncResourceLoader(engine.thread_pool)
# 启动引擎
await engine.start()
# 模拟游戏逻辑
try:
# 运行5秒
for i in range(5):
print(f"Frame {i}")
# 模拟异步资源加载
resource = await resource_loader.load(f"texture_{i}.png")
print(f"Loaded: {resource}")
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
finally:
# 停止引擎
await engine.stop()
if __name__ == "__main__":
# 运行异步主程序
asyncio.run(main())
2. 多进程渲染管道
import multiprocessing as mp
from multiprocessing.managers import BaseManager
import numpy as np
from typing import List, Tuple
import struct
# ==================== 共享内存管理器 ====================
class SharedMemoryManager:
"""管理进程间共享内存,避免数据拷贝"""
def __init__(self):
self.buffers = {}
self.locks = {}
def create_buffer(self, name: str, size: int) -> memoryview:
"""创建共享内存缓冲区"""
if name in self.buffers:
return self.buffers[name]
# 创建共享内存
buffer = mp.RawArray('B', size)
self.buffers[name] = memoryview(buffer)
self.locks[name] = mp.Lock()
return self.buffers[name]
def get_buffer(self, name: str) -> memoryview:
"""获取共享内存缓冲区"""
return self.buffers.get(name)
def lock_buffer(self, name: str):
"""锁定缓冲区进行写操作"""
return self.locks[name]
# ==================== 渲染工作进程 ====================
class RenderWorkerProcess(mp.Process):
"""专用的渲染工作进程"""
def __init__(self, worker_id: int, input_queue: mp.Queue,
output_queue: mp.Queue, shared_memory: SharedMemoryManager):
super().__init__()
self.worker_id = worker_id
self.input_queue = input_queue
self.output_queue = output_queue
self.shared_memory = shared_memory
self.running = True
def run(self):
"""工作进程主循环"""
print(f"Render worker {self.worker_id} started")
while self.running:
try:
# 获取渲染任务
task = self.input_queue.get(timeout=0.1)
if task == 'STOP':
break
# 执行渲染
result = self._render_frame(task)
# 发送结果
self.output_queue.put(result)
except mp.queues.Empty:
continue
except Exception as e:
print(f"Render worker {self.worker_id} error: {e}")
print(f"Render worker {self.worker_id} stopped")
def _render_frame(self, task_data) -> dict:
"""渲染一帧(模拟)"""
# 实际实现会使用OpenGL/Vulkan/DirectX
# 这里只是模拟
# 使用共享内存读取顶点数据
vertex_buffer = self.shared_memory.get_buffer('vertices')
if vertex_buffer:
# 处理顶点数据
vertices = np.frombuffer(vertex_buffer, dtype=np.float32)
# ... 渲染逻辑
return {
'worker_id': self.worker_id,
'frame_data': b'rendered_frame_data',
'timestamp': time.time()
}
# ==================== 多进程渲染引擎 ====================
class MultiProcessRenderEngine:
"""使用多进程的渲染引擎,避免GIL限制"""
def __init__(self, num_workers: int = None):
self.num_workers = num_workers or mp.cpu_count()
self.workers: List[RenderWorkerProcess] = []
self.task_queues: List[mp.Queue] = []
self.result_queue = mp.Queue()
self.shared_memory = SharedMemoryManager()
# 初始化共享内存
self._init_shared_memory()
def _init_shared_memory(self):
"""初始化共享内存缓冲区"""
# 顶点缓冲区
vertex_size = 1024 * 1024 * 10 # 10MB
self.shared_memory.create_buffer('vertices', vertex_size)
# 纹理缓冲区
texture_size = 1024 * 1024 * 50 # 50MB
self.shared_memory.create_buffer('textures', texture_size)
def start(self):
"""启动所有工作进程"""
for i in range(self.num_workers):
task_queue = mp.Queue()
self.task_queues.append(task_queue)
worker = RenderWorkerProcess(
worker_id=i,
input_queue=task_queue,
output_queue=self.result_queue,
shared_memory=self.shared_memory
)
worker.start()
self.workers.append(worker)
def stop(self):
"""停止所有工作进程"""
for i, worker in enumerate(self.workers):
self.task_queues[i].put('STOP')
worker.join(timeout=5)
self.workers.clear()
def render_frame(self, frame_data: dict) -> List[dict]:
"""渲染一帧(并行)"""
# 将数据写入共享内存
self._update_shared_memory(frame_data)
# 分发任务到工作进程
tasks = self._split_frame_tasks(frame_data)
for i, task in enumerate(tasks):
worker_idx = i % self.num_workers
self.task_queues[worker_idx].put(task)
# 收集结果
results = []
for _ in range(len(tasks)):
try:
result = self.result_queue.get(timeout=2.0)
results.append(result)
except mp.queues.Empty:
print("Timeout waiting for render result")
return results
def _update_shared_memory(self, frame_data: dict):
"""更新共享内存中的数据"""
# 更新顶点数据
if 'vertices' in frame_data:
vertex_buffer = self.shared_memory.get_buffer('vertices')
vertices = frame_data['vertices']
# 锁定缓冲区进行写操作
with self.shared_memory.lock_buffer('vertices'):
# 将数据复制到共享内存
vertex_buffer[:len(vertices)] = vertices
def _split_frame_tasks(self, frame_data: dict) -> List[dict]:
"""将帧渲染任务分割为多个子任务"""
# 根据场景复杂度分割任务
num_tasks = min(self.num_workers * 2, 16)
tasks = []
for i in range(num_tasks):
task = {
'task_id': i,
'scene_section': i,
'total_sections': num_tasks,
**frame_data
}
tasks.append(task)
return tasks
# ==================== 使用示例 ====================
def run_multiprocess_render_demo():
"""多进程渲染演示"""
# 创建引擎
engine = MultiProcessRenderEngine(num_workers=4)
try:
# 启动引擎
engine.start()
# 渲染几帧
for frame in range(10):
print(f"Rendering frame {frame}")
# 准备帧数据
frame_data = {
'frame_number': frame,
'vertices': np.random.randn(1000, 3).astype(np.float32).tobytes(),
'camera_position': [0, 0, 5],
'light_position': [10, 10, 10]
}
# 并行渲染
results = engine.render_frame(frame_data)
# 合并结果
combined = engine._combine_results(results)
print(f"Frame {frame} rendered by {len(results)} workers")
time.sleep(0.5)
finally:
# 停止引擎
engine.stop()
if __name__ == "__main__":
# 注意:多进程代码需要在 __main__ 保护中运行
run_multiprocess_render_demo()
五、JavaScript/TypeScript 实现示例
1. Web Workers 架构的浏览器引擎
// types.ts - 类型定义
type ThreadAffinity = 'main' | 'compute' | 'io' | 'render' | 'network';
interface Task {
id: string;
priority: number;
affinity: ThreadAffinity;
execute: () => Promise<any>;
createdAt: number;
}
interface WorkerMessage {
type: 'task' | 'result' | 'error' | 'status';
taskId?: string;
payload?: any;
}
// 主线程调度器
class MainThreadScheduler {
private static instance: MainThreadScheduler;
private taskQueue: Array<{task: () => void, priority: number}> = [];
private animationFrameId: number | null = null;
private isProcessing = false;
private constructor() {
this.startProcessingLoop();
}
static getInstance(): MainThreadScheduler {
if (!MainThreadScheduler.instance) {
MainThreadScheduler.instance = new MainThreadScheduler();
}
return MainThreadScheduler.instance;
}
scheduleTask(task: () => void, priority: number = 0): void {
this.taskQueue.push({task, priority});
this.taskQueue.sort((a, b) => b.priority - a.priority); // 降序排序
if (!this.isProcessing) {
this.startProcessingLoop();
}
}
scheduleAnimationTask(task: () => void): void {
// 使用requestAnimationFrame确保在渲染前执行
requestAnimationFrame(() => {
task();
this.processTasks();
});
}
private startProcessingLoop(): void {
if (this.animationFrameId !== null) {
return;
}
const processLoop = () => {
this.processTasks();
this.animationFrameId = requestAnimationFrame(processLoop);
};
this.animationFrameId = requestAnimationFrame(processLoop);
}
private processTasks(): void {
this.isProcessing = true;
// 限制每帧处理的任务数量,避免阻塞渲染
const MAX_TASKS_PER_FRAME = 50;
let processed = 0;
while (this.taskQueue.length > 0 && processed < MAX_TASKS_PER_FRAME) {
const item = this.taskQueue.shift();
if (item) {
try {
item.task();
processed++;
} catch (error) {
console.error('Main thread task error:', error);
}
}
}
if (this.taskQueue.length === 0) {
this.isProcessing = false;
if (this.animationFrameId !== null) {
cancelAnimationFrame(this.animationFrameId);
this.animationFrameId = null;
}
}
}
}
// Web Workers 管理器
class WebWorkerManager {
private workers: Map<string, Worker> = new Map();
private taskCallbacks: Map<string, {
resolve: (value: any) => void;
reject: (error: any) => void;
}> = new Map();
private workerConfigs = {
compute: {
url: '/workers/compute.worker.js',
count: navigator.hardwareConcurrency || 4
},
io: {
url: '/workers/io.worker.js',
count: 2
},
network: {
url: '/workers/network.worker.js',
count: 1
}
};
async initialize(): Promise<void> {
// 初始化工作线程池
const initPromises: Promise<void>[] = [];
for (const [type, config] of Object.entries(this.workerConfigs)) {
for (let i = 0; i < config.count; i++) {
const workerId = `${type}-${i}`;
initPromises.push(this.createWorker(workerId, config.url));
}
}
await Promise.all(initPromises);
}
private async createWorker(id: string, scriptUrl: string): Promise<void> {
return new Promise((resolve, reject) => {
try {
const worker = new Worker(scriptUrl, {
name: id,
type: 'module' // 如果需要ES模块支持
});
worker.onmessage = (event: MessageEvent<WorkerMessage>) => {
this.handleWorkerMessage(event.data);
};
worker.onerror = (error) => {
console.error(`Worker ${id} error:`, error);
this.workers.delete(id);
};
this.workers.set(id, worker);
resolve();
} catch (error) {
reject(error);
}
});
}
private handleWorkerMessage(message: WorkerMessage): void {
if (message.type === 'result' && message.taskId) {
const callback = this.taskCallbacks.get(message.taskId);
if (callback) {
callback.resolve(message.payload);
this.taskCallbacks.delete(message.taskId);
}
} else if (message.type === 'error' && message.taskId) {
const callback = this.taskCallbacks.get(message.taskId);
if (callback) {
callback.reject(new Error(message.payload));
this.taskCallbacks.delete(message.taskId);
}
}
}
async executeTask<T>(
affinity: ThreadAffinity,
task: () => T | Promise<T>,
priority: number = 0
): Promise<T> {
if (affinity === 'main' || affinity === 'render') {
// 主线程或渲染相关任务在主线程执行
return new Promise((resolve, reject) => {
MainThreadScheduler.getInstance().scheduleTask(() => {
try {
const result = task();
if (result instanceof Promise) {
result.then(resolve).catch(reject);
} else {
resolve(result);
}
} catch (error) {
reject(error);
}
}, priority);
});
}
// 选择合适的工作线程
const worker = this.selectWorker(affinity);
if (!worker) {
throw new Error(`No available worker for affinity: ${affinity}`);
}
const taskId = this.generateTaskId();
return new Promise((resolve, reject) => {
this.taskCallbacks.set(taskId, { resolve, reject });
// 序列化任务并发送到工作线程
const taskString = task.toString();
const message: WorkerMessage = {
type: 'task',
taskId,
payload: {
code: taskString,
priority
}
};
worker.postMessage(message);
});
}
private selectWorker(affinity: ThreadAffinity): Worker | null {
// 简单的负载均衡:选择第一个可用的同类型worker
for (const [id, worker] of this.workers.entries()) {
if (id.startsWith(affinity)) {
return worker;
}
}
return null;
}
private generateTaskId(): string {
return `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
terminate(): void {
for (const worker of this.workers.values()) {
worker.terminate();
}
this.workers.clear();
this.taskCallbacks.clear();
}
}
// 渲染引擎主类
class WebRenderEngine {
private workerManager: WebWorkerManager;
private canvas: HTMLCanvasElement;
private gl: WebGL2RenderingContext | null = null;
private isRunning = false;
private lastFrameTime = 0;
private frameCount = 0;
// 渲染状态
private sceneObjects: any[] = [];
private pendingUpdates: Array<() => void> = [];
constructor(canvas: HTMLCanvasElement) {
this.canvas = canvas;
this.workerManager = new WebWorkerManager();
this.initWebGL();
}
private initWebGL(): void {
const gl = this.canvas.getContext('webgl2');
if (!gl) {
throw new Error('WebGL2 not supported');
}
this.gl = gl;
// 配置WebGL
gl.enable(gl.DEPTH_TEST);
gl.enable(gl.CULL_FACE);
gl.clearColor(0.1, 0.1, 0.1, 1.0);
}
async start(): Promise<void> {
if (this.isRunning) return;
// 初始化工作线程
await this.workerManager.initialize();
this.isRunning = true;
this.lastFrameTime = performance.now();
this.requestAnimationFrame();
}
stop(): void {
this.isRunning = false;
this.workerManager.terminate();
}
private requestAnimationFrame(): void {
if (!this.isRunning) return;
const frameCallback = (timestamp: number) => {
this.renderFrame(timestamp);
if (this.isRunning) {
requestAnimationFrame(frameCallback);
}
};
requestAnimationFrame(frameCallback);
}
private async renderFrame(timestamp: number): Promise<void> {
const deltaTime = (timestamp - this.lastFrameTime) / 1000;
this.lastFrameTime = timestamp;
this.frameCount++;
// 1. 处理输入(主线程)
this.processInput();
// 2. 并行提交计算任务
await this.submitComputeTasks();
// 3. 更新UI(主线程)
this.updateUI();
// 4. 处理待更新
this.processPendingUpdates();
// 5. 执行渲染
this.executeRender();
// 6. 性能监控
if (this.frameCount % 60 === 0) {
this.logPerformance(deltaTime);
}
}
private processInput(): void {
// 处理用户输入
// 这必须在主线程完成
}
private async submitComputeTasks(): Promise<void> {
// 并行执行多个计算任务
const tasks = [
this.workerManager.executeTask('compute', () => this.calculatePhysics(), 10),
this.workerManager.executeTask('compute', () => this.calculateAI(), 5),
this.workerManager.executeTask('compute', () => this.calculatePathfinding(), 3)
];
try {
const results = await Promise.allSettled(tasks);
// 处理结果
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
this.pendingUpdates.push(() => {
this.applyComputeResult(index, result.value);
});
}
});
} catch (error) {
console.error('Compute tasks failed:', error);
}
}
private calculatePhysics(): any {
// 模拟物理计算
const start = performance.now();
while (performance.now() - start < 5) {
// 模拟5ms的计算
}
return { velocity: 1.0, position: [0, 0, 0] };
}
private calculateAI(): any {
// 模拟AI计算
const start = performance.now();
while (performance.now() - start < 3) {
// 模拟3ms的计算
}
return { state: 'idle', target: null };
}
private calculatePathfinding(): any {
// 模拟路径查找
const start = performance.now();
while (performance.now() - start < 2) {
// 模拟2ms的计算
}
return { path: [], cost: 0 };
}
private applyComputeResult(taskIndex: number, result: any): void {
// 应用计算结果到场景
switch (taskIndex) {
case 0: // 物理
// 应用物理更新
break;
case 1: // AI
// 应用AI更新
break;
case 2: // 路径查找
// 应用路径查找更新
break;
}
}
private updateUI(): void {
// 更新用户界面
// 这必须在主线程完成
}
private processPendingUpdates(): void {
// 处理所有待更新的回调
while (this.pendingUpdates.length > 0) {
const update = this.pendingUpdates.shift();
if (update) {
try {
update();
} catch (error) {
console.error('Pending update error:', error);
}
}
}
}
private executeRender(): void {
if (!this.gl) return;
const gl = this.gl;
// 清除缓冲区
gl.clear(gl.COLOR_BUFFER_BIT | gl.DEPTH_BUFFER_BIT);
// 设置视口
gl.viewport(0, 0, this.canvas.width, this.canvas.height);
// 渲染场景对象
this.sceneObjects.forEach(obj => {
this.renderObject(obj);
});
}
private renderObject(object: any): void {
// 实际渲染逻辑
if (!this.gl) return;
// 设置着色器、绑定缓冲区、绘制等
}
private logPerformance(deltaTime: number): void {
const fps = 1 / deltaTime;
console.log(`FPS: ${fps.toFixed(1)}, Frame: ${this.frameCount}`);
}
// 异步资源加载
async loadResource(url: string): Promise<any> {
return this.workerManager.executeTask('io', async () => {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Failed to load resource: ${url}`);
}
// 根据文件类型处理
if (url.endsWith('.json')) {
return response.json();
} else if (url.endsWith('.png') || url.endsWith('.jpg')) {
const blob = await response.blob();
return createImageBitmap(blob);
} else {
return response.text();
}
});
}
}
// Web Worker 实现示例 (compute.worker.js)
// 注意:这是单独的Worker文件
self.onmessage = async function(event: MessageEvent<WorkerMessage>) {
const { type, taskId, payload } = event.data;
if (type === 'task' && taskId) {
try {
// 反序列化并执行任务
const taskCode = payload.code;
// 注意:实际实现中需要更安全的代码执行方式
// 这里简化了,实际应该使用Function构造函数或WebAssembly
const task = eval(`(${taskCode})`);
const result = await task();
// 发送结果回主线程
self.postMessage({
type: 'result',
taskId,
payload: result
});
} catch (error) {
self.postMessage({
type: 'error',
taskId,
payload: error.message
});
}
}
};
// 使用示例
async function runWebEngineDemo() {
const canvas = document.getElementById('game-canvas') as HTMLCanvasElement;
if (!canvas) {
console.error('Canvas not found');
return;
}
const engine = new WebRenderEngine(canvas);
try {
// 启动引擎
await engine.start();
// 加载资源
const texture = await engine.loadResource('/assets/texture.png');
console.log('Texture loaded:', texture);
// 模拟运行
setTimeout(() => {
engine.stop();
console.log('Engine stopped');
}, 10000);
} catch (error) {
console.error('Engine error:', error);
engine.stop();
}
}
// 当页面加载完成后启动
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', runWebEngineDemo);
} else {
runWebEngineDemo();
}
2. Node.js 工作线程架构
// worker-threads-manager.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const os = require('os');
const EventEmitter = require('events');
// 任务优先级队列
class PriorityQueue {
constructor() {
this.queue = [];
}
enqueue(task, priority) {
const item = { task, priority, timestamp: Date.now() };
this.queue.push(item);
this.queue.sort((a, b) => {
if (a.priority !== b.priority) {
return b.priority - a.priority; // 降序
}
return a.timestamp - b.timestamp; // 时间升序
});
}
dequeue() {
return this.queue.shift();
}
isEmpty() {
return this.queue.length === 0;
}
size() {
return this.queue.length;
}
}
// 工作线程管理器
class WorkerThreadsManager extends EventEmitter {
constructor(config = {}) {
super();
this.config = {
maxWorkers: os.cpus().length,
workerScript: config.workerScript || './worker.js',
taskTimeout: config.taskTimeout || 30000,
...config
};
this.workers = new Map(); // workerId -> Worker实例
this.availableWorkers = new Set(); // 空闲worker
this.taskQueue = new PriorityQueue();
this.pendingTasks = new Map(); // taskId -> { resolve, reject, timeout }
this.workerCounter = 0;
this.isShuttingDown = false;
// 初始化worker池
this.initWorkerPool();
}
initWorkerPool() {
const workerCount = Math.min(this.config.maxWorkers, os.cpus().length);
console.log(`Initializing ${workerCount} worker threads`);
for (let i = 0; i < workerCount; i++) {
this.createWorker(`worker-${i}`);
}
}
createWorker(workerId) {
const worker = new Worker(this.config.workerScript, {
workerData: { workerId, config: this.config }
});
worker.on('message', (message) => {
this.handleWorkerMessage(workerId, message);
});
worker.on('error', (error) => {
console.error(`Worker ${workerId} error:`, error);
this.handleWorkerError(workerId, error);
});
worker.on('exit', (code) => {
console.log(`Worker ${workerId} exited with code ${code}`);
this.handleWorkerExit(workerId, code);
});
this.workers.set(workerId, worker);
this.availableWorkers.add(workerId);
this.emit('workerCreated', workerId);
return worker;
}
handleWorkerMessage(workerId, message) {
const { type, taskId, result, error } = message;
if (type === 'taskResult') {
const pendingTask = this.pendingTasks.get(taskId);
if (pendingTask) {
clearTimeout(pendingTask.timeout);
if (error) {
pendingTask.reject(new Error(error));
} else {
pendingTask.resolve(result);
}
this.pendingTasks.delete(taskId);
}
// worker完成任务,重新标记为空闲
this.availableWorkers.add(workerId);
this.processTaskQueue();
} else if (type === 'ready') {
// worker准备就绪
this.availableWorkers.add(workerId);
this.processTaskQueue();
} else if (type === 'status') {
this.emit('workerStatus', { workerId, ...message });
}
}
handleWorkerError(workerId, error) {
// 重启worker
this.restartWorker(workerId);
}
handleWorkerExit(workerId, code) {
this.workers.delete(workerId);
this.availableWorkers.delete(workerId);
if (!this.isShuttingDown && code !== 0) {
// 非正常退出,重启worker
setTimeout(() => {
this.createWorker(workerId);
}, 1000);
}
}
restartWorker(workerId) {
const oldWorker = this.workers.get(workerId);
if (oldWorker) {
oldWorker.terminate();
}
this.workers.delete(workerId);
this.availableWorkers.delete(workerId);
setTimeout(() => {
this.createWorker(workerId);
}, 100);
}
submitTask(taskFn, taskData, priority = 0) {
return new Promise((resolve, reject) => {
const taskId = `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
// 设置超时
const timeout = setTimeout(() => {
const pendingTask = this.pendingTasks.get(taskId);
if (pendingTask) {
pendingTask.reject(new Error(`Task ${taskId} timeout after ${this.config.taskTimeout}ms`));
this.pendingTasks.delete(taskId);
}
}, this.config.taskTimeout);
this.pendingTasks.set(taskId, { resolve, reject, timeout });
// 将任务加入队列
this.taskQueue.enqueue({ taskId, taskFn: taskFn.toString(), taskData }, priority);
// 尝试立即处理
this.processTaskQueue();
});
}
processTaskQueue() {
if (this.taskQueue.isEmpty() || this.availableWorkers.size === 0) {
return;
}
// 获取空闲worker
const workerId = Array.from(this.availableWorkers)[0];
const taskItem = this.taskQueue.dequeue();
if (!workerId || !taskItem) {
return;
}
this.availableWorkers.delete(workerId);
const worker = this.workers.get(workerId);
worker.postMessage({
type: 'executeTask',
taskId: taskItem.taskId,
taskFn: taskItem.taskFn,
taskData: taskItem.taskData
});
}
async shutdown() {
this.isShuttingDown = true;
console.log('Shutting down worker thread manager...');
// 拒绝新任务
this.taskQueue.queue = [];
// 等待所有进行中的任务完成
const pendingTaskIds = Array.from(this.pendingTasks.keys());
if (pendingTaskIds.length > 0) {
console.log(`Waiting for ${pendingTaskIds.length} pending tasks to complete...`);
await Promise.allSettled(
pendingTaskIds.map(taskId =>
new Promise(resolve => {
const pendingTask = this.pendingTasks.get(taskId);
if (pendingTask) {
pendingTask.resolve = pendingTask.reject = resolve;
}
})
)
).timeout(10000); // 10秒超时
}
// 终止所有worker
const terminationPromises = Array.from(this.workers.values()).map(worker => {
return worker.terminate();
});
await Promise.allSettled(terminationPromises);
this.workers.clear();
this.availableWorkers.clear();
this.pendingTasks.clear();
console.log('Worker thread manager shutdown complete');
}
getStats() {
return {
totalWorkers: this.workers.size,
availableWorkers: this.availableWorkers.size,
pendingTasks: this.pendingTasks.size,
queuedTasks: this.taskQueue.size()
};
}
}
// 工作线程脚本模板
const workerScriptTemplate = `
const { parentPort, workerData } = require('worker_threads');
const workerId = workerData.workerId;
const config = workerData.config;
console.log(\`Worker \${workerId} started\`);
// 任务执行环境
const taskContext = {
workerId,
require,
console,
setTimeout,
setInterval,
clearTimeout,
clearInterval,
Buffer,
// 添加其他需要的全局对象
};
// 安全的任务执行函数
function executeTaskSafely(taskFnCode, taskData) {
try {
// 使用Function构造函数创建函数
// 注意:实际生产环境需要更严格的安全措施
const taskFn = new Function('context', 'data', \`
with(context) {
return (\${taskFnCode})(data);
}
\`);
return taskFn(taskContext, taskData);
} catch (error) {
throw new Error(\`Task execution failed: \${error.message}\`);
}
}
// 处理来自主线程的消息
parentPort.on('message', async (message) => {
const { type, taskId, taskFn, taskData } = message;
if (type === 'executeTask') {
try {
const startTime = Date.now();
// 执行任务
const result = await executeTaskSafely(taskFn, taskData);
const endTime = Date.now();
const executionTime = endTime - startTime;
// 发送结果回主线程
parentPort.postMessage({
type: 'taskResult',
taskId,
result,
executionTime
});
} catch (error) {
parentPort.postMessage({
type: 'taskResult',
taskId,
error: error.message
});
}
}
});
// 通知主线程worker已准备好
parentPort.postMessage({ type: 'ready', workerId });
`;
// 渲染引擎使用工作线程
class NodeRenderEngine {
constructor() {
this.workerManager = new WorkerThreadsManager({
maxWorkers: os.cpus().length,
workerScript: workerScriptTemplate, // 实际应该是文件路径
taskTimeout: 60000
});
this.frameCounter = 0;
this.isRunning = false;
this.lastFrameTime = 0;
// 帧统计
this.frameStats = {
totalFrames: 0,
averageFrameTime: 0,
minFrameTime: Infinity,
maxFrameTime: 0
};
}
async start() {
if (this.isRunning) return;
this.isRunning = true;
this.lastFrameTime = Date.now();
this.frameCounter = 0;
console.log('Render engine starting...');
// 开始主循环
this.mainLoop();
}
stop() {
this.isRunning = false;
console.log('Render engine stopping...');
}
async mainLoop() {
while (this.isRunning) {
const frameStart = Date.now();
this.frameCounter++;
try {
// 1. 并行执行计算任务
await this.executeParallelTasks();
// 2. 更新状态
this.updateGameState();
// 3. 渲染帧
await this.renderFrame();
// 4. 处理事件
this.processEvents();
} catch (error) {
console.error('Frame execution error:', error);
}
// 帧率控制
const frameTime = Date.now() - frameStart;
this.updateFrameStats(frameTime);
const targetFrameTime = 1000 / 60; // 60 FPS
const sleepTime = Math.max(0, targetFrameTime - frameTime);
if (sleepTime > 0) {
await this.sleep(sleepTime);
}
// 定期打印统计信息
if (this.frameCounter % 60 === 0) {
this.printStats();
}
}
}
async executeParallelTasks() {
// 定义并行任务
const tasks = [
{
fn: (data) => {
// 物理计算
const start = Date.now();
while (Date.now() - start < 10) {
// 模拟10ms计算
}
return { physics: 'updated' };
},
data: { deltaTime: 0.016 },
priority: 10
},
{
fn: (data) => {
// AI计算
const start = Date.now();
while (Date.now() - start < 5) {
// 模拟5ms计算
}
return { ai: 'processed' };
},
data: { entities: [] },
priority: 5
},
{
fn: (data) => {
// 路径查找
const start = Date.now();
while (Date.now() - start < 3) {
// 模拟3ms计算
}
return { pathfinding: 'complete' };
},
data: { start: [0, 0], end: [10, 10] },
priority: 3
}
];
// 并行提交所有任务
const taskPromises = tasks.map(task =>
this.workerManager.submitTask(task.fn, task.data, task.priority)
);
// 等待所有任务完成
const results = await Promise.allSettled(taskPromises);
// 处理结果
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
this.applyTaskResult(index, result.value);
} else {
console.error(`Task ${index} failed:`, result.reason);
}
});
}
applyTaskResult(taskIndex, result) {
// 将任务结果应用到游戏状态
switch (taskIndex) {
case 0:
// 应用物理更新
break;
case 1:
// 应用AI更新
break;
case 2:
// 应用路径查找结果
break;
}
}
updateGameState() {
// 更新游戏状态(在主线程)
}
async renderFrame() {
// 执行渲染(可以是软件渲染或调用外部渲染服务)
// 这里可以进一步使用worker进行并行渲染
}
processEvents() {
// 处理游戏事件
}
updateFrameStats(frameTime) {
this.frameStats.totalFrames++;
this.frameStats.averageFrameTime =
(this.frameStats.averageFrameTime * 0.9) + (frameTime * 0.1);
this.frameStats.minFrameTime = Math.min(this.frameStats.minFrameTime, frameTime);
this.frameStats.maxFrameTime = Math.max(this.frameStats.maxFrameTime, frameTime);
}
printStats() {
const stats = this.workerManager.getStats();
console.log(`
Frame ${this.frameCounter} Stats:
FPS: ${(1000 / this.frameStats.averageFrameTime).toFixed(1)}
Avg Frame Time: ${this.frameStats.averageFrameTime.toFixed(2)}ms
Min Frame Time: ${this.frameStats.minFrameTime.toFixed(2)}ms
Max Frame Time: ${this.frameStats.maxFrameTime.toFixed(2)}ms
Workers: ${stats.totalWorkers} total, ${stats.availableWorkers} available
Tasks: ${stats.pendingTasks} pending, ${stats.queuedTasks} queued
`);
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async shutdown() {
this.stop();
await this.workerManager.shutdown();
}
}
// 使用示例
async function runNodeEngineDemo() {
const engine = new NodeRenderEngine();
try {
console.log('Starting Node.js render engine demo...');
// 启动引擎
await engine.start();
// 运行10秒
await new Promise(resolve => setTimeout(resolve, 10000));
// 停止引擎
await engine.shutdown();
console.log('Demo completed');
} catch (error) {
console.error('Demo failed:', error);
await engine.shutdown();
}
}
// 如果是主线程,运行演示
if (isMainThread) {
runNodeEngineDemo().catch(console.error);
}
六、总结
关键设计要点总结:
-
主线程职责:
- UI渲染和更新
- 用户输入处理
- 协调其他线程的工作
- 必须保持响应性(60fps)
-
工作线程划分:
- 计算密集型:物理、AI、路径查找
- IO密集型:文件/网络操作
- 专用线程:音频、网络、渲染辅助
-
减少锁竞争策略:
- 无锁数据结构:环形缓冲区、原子操作
- 线程局部存储:避免共享数据
- 工作窃取:平衡负载,减少空闲
- 消息传递:替代共享内存
- 读写锁:读多写少的场景
- 条纹锁:减小锁粒度
-
语言特定优化:
- C++:手动内存管理,无锁编程,CPU亲和性设置
- Java:虚拟线程(Project Loom),并发集合类
- Python:异步/等待,多进程绕过GIL
- JavaScript:Web Workers,事件循环优化
-
性能监控:
- 帧率统计
- 线程利用率
- 任务队列长度
- 锁等待时间
最佳实践建议:
- 避免在主线程执行阻塞操作
- 使用合适的线程池大小(CPU核心数的1-2倍)
- 优先使用任务队列而非直接线程创建
- 实现优雅的关闭机制
- 添加全面的错误处理和恢复
- 监控和调整线程优先级
- 考虑平台差异(Windows/Linux/macOS/移动设备)
通过合理的进程与线程管理,引擎可以充分利用多核CPU资源,同时保持UI的流畅响应,这是高性能引擎设计的关键所在。
更多推荐


所有评论(0)