Swoole $server->on('task', function ($server, $taskId, $workerId, $data) { ... });
这行代码是 Swoole 实现 异步非阻塞、解耦耗时操作 的核心。


一、知识体系:Swoole on('task') 的完整知识体系(六大模块)

要真正掌握 onTask 回调,必须构建以下完整知识体系:


1. Swoole 的多进程模型与 Task Worker

组件 说明
Worker 进程 处理网络请求(HTTP、TCP、WebSocket)
Task Worker 进程 专门处理异步任务(如发邮件、写日志、调用第三方 API)
Task Worker 模式 通过 task_worker_num 配置数量
同步 vs 异步任务 task() 异步,taskwait() 同步等待结果

onTask 只在 Task Worker 进程 中执行


2. task 机制的核心方法

方法 说明
$server->task($data) 发送异步任务,立即返回
$server->taskwait($data) 发送同步任务,等待结果返回
$server->finish($result) onTask 中调用,返回结果
$server->finish() 可触发 onFinish 通知 Worker 任务完成

示例:

// Worker 进程中触发任务
$server->task("send_email_to_user_123");

// Task Worker 中处理
$server->on('task', function ($server, $taskId, $workerId, $data) {
    sendEmail($data);
    $server->finish("Email sent");
});

// Worker 进程接收结果
$server->on('finish', function ($server, $taskId, $data) {
    echo "Task $taskId result: $data\n";
});

3. 任务的序列化与反序列化

机制 说明
$data 必须可序列化 只能是字符串、数组、基本类型
Swoole 自动 serialize() / unserialize() 跨进程传递
不能传递资源(如文件句柄、数据库连接)
大任务可分片或存数据库,只传 ID

$data 是“任务描述”,不是“执行环境”


4. 同步任务 vs 异步任务

类型 方法 是否阻塞 适用场景
异步任务 task() 发邮件、写日志
同步任务 taskwait() 是(等待结果) 获取计算结果

⚠️ taskwait() 会阻塞 Worker,慎用在高并发场景


5. 错误处理与超时控制

问题 解决方案
taskwait() 超时 设置超时时间:$server->taskwait($data, 3.0)
任务处理失败 finish() 中返回错误信息
Task Worker 崩溃 Swoole 会自动重启
任务队列积压 监控 task_queue_size

配置:

$server->set([
    'task_worker_num' => 4,
    'task_max_request' => 1000, // 每个 Task Worker 处理 1000 次后重启,防内存泄漏
]);

6. 实际应用场景

场景 实现方式
发送邮件 task() 调用邮件服务
记录操作日志 task() 写入文件或数据库
图片处理 task() 调用 Imagick
调用第三方 API task() 避免阻塞请求
定时任务 结合 Swoole\Timer + task()
消息推送 task() 触发 WebSocket 广播

onTask 是“解耦业务逻辑”的利器


二、底层原理:on('task') 的实现机制

我们深入 Swoole 的 C 源码,解析 onTask 的底层原理。


1. Swoole 的整体架构回顾

+------------------+
|   Master 进程     |
|   +--------------+ |
|   | Reactor 线程  | | ← 监听连接
|   +--------------+ |
+---------+--------+
          |
          v
+------------------+
|   Manager 进程    | ← fork Worker 和 Task Worker
+---------+--------+
          |
          +------------------+
          |                  |
          v                  v
+------------------+   +------------------+
|   Worker 进程     |   |   Task Worker     | ← 执行 onTask
+------------------+   +------------------+
      ↑                      ↓
      |                  [onTask] 处理任务
      +-------- task() <----+
               ↓
         [onFinish] 触发

onTask 只在 Task Worker 进程 中执行


2. 任务队列(Task Queue)机制

Swoole 使用 内存队列(基于 RingBuffer 或 Shared Memory)在 Worker 和 Task Worker 之间通信。

队列结构:
  • 入队:Worker 调用 task() → 数据写入队列
  • 出队:Task Worker 循环读取队列
  • 负载均衡:多个 Task Worker 自动竞争任务

✅ 队列是“生产者-消费者”模型


3. task() 的底层流程

$server->task($data);
C 层执行流程:
  1. 检查 $data 是否可序列化
  2. serialize($data) → 二进制数据
  3. 选择一个空闲的 Task Worker(轮询或抢占)
  4. 将数据写入该 Task Worker 的管道(pipe)
  5. 返回 $taskId(任务 ID)

task() 是非阻塞的,立即返回


4. onTask 回调的触发机制

Task Worker 的事件循环:

while (true) {
    $data = read_from_pipe(); // 从管道读取任务
    if ($data) {
        $unserialized_data = unserialize($data);
        $callback = swoole_server_get_callback(SW_SERVER_CB_onTask);
        php_swoole_server_call_callback(callback, server, taskId, workerId, unserialized_data);
    }
}
参数说明:
参数 来源
$server Swoole\Server 实例
$taskId 系统分配的唯一任务 ID
$workerId Task Worker 的进程 ID(从 0 开始)
$data 用户传入的任务数据(已反序列化)

onTask 是 Task Worker 的“主循环”


5. finish()onFinish 的通信机制

$server->finish("OK");
流程:
  1. finish() 将结果序列化
  2. 通过 Task Worker → Worker 的反向管道 发送
  3. 对应的 Worker 进程收到后触发 onFinish 回调

✅ 这是一个“请求-响应”闭环


6. 内存与性能优化

优化点 说明
task_max_request 每个 Task Worker 处理 N 次后重启,防内存泄漏
连接池 onWorkerStart 中创建数据库/Redis 连接池
避免大 $data 大数据存数据库,只传 ID
监控队列长度 $server->stats()['task_queue_size']

✅ Task Worker 也是“常驻内存”的,需注意内存管理


三、典型代码示例

$server = new Swoole\Http\Server("0.0.0.0", 9501);

$server->set([
    'task_worker_num' => 4,
    'task_max_request' => 1000,
]);

$server->on('request', function ($request, $response) use ($server) {
    // 触发异步任务
    $taskId = $server->task("send_email:123");
    $response->end("Task sent, ID: $taskId");
});

$server->on('task', function ($server, $taskId, $workerId, $data) {
    echo "Task $taskId running in worker $workerId: $data\n";
    
    // 模拟耗时操作
    sleep(2);
    
    // 处理完成,返回结果
    $server->finish("Task $taskId done");
});

$server->on('finish', function ($server, $taskId, $data) {
    echo "Task $taskId finished: $data\n";
});

$server->start();

四、总结

✅ 知识体系(六大模块)

模块 核心内容
1. 多进程模型 Worker vs Task Worker
2. task 方法 task() vs taskwait()
3. 序列化机制 $data 必须可序列化
4. 同步 vs 异步 阻塞与非阻塞任务
5. 错误与超时 taskwait 超时、finish 返回结果
6. 实际应用 发邮件、日志、API 调用

✅ 底层原理(四大核心)

原理 说明
Task Worker 进程 专门处理任务,不处理网络请求
任务队列(RingBuffer) Worker 与 Task Worker 间通信
序列化传递 $data 被 serialize/unserialize
finish → onFinish 闭环 实现异步任务回调

最终结论

$server->on('task') 是 Swoole 实现 异步解耦 的核心机制。
它让 Worker 进程专注于处理高并发请求,而将耗时操作交给 Task Worker 异步执行。
掌握 onTask,你就能构建高性能、高可用的 PHP 后端服务,避免阻塞、提升响应速度。

🚀 推荐:结合 HyperfSwoft 框架,使用 @Inject@Task 注解,实现更高级的任务调度。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐