schoober-ai-sdk: 多 Agent 子任务编排设计
多Agent子任务编排系统 本文介绍了一个基于父子任务机制的多Agent协作系统,通过schoober-ai-sdk实现。系统允许主Agent作为协调者,将复杂任务分解并委派给专业子Agent处理。核心架构包括: 分层结构:主Agent负责协调,子Agent专注特定领域 异步任务机制:通过NewTaskTool创建子任务,主任务暂停等待结果 独立运行环境:每个子Agent拥有独立的配置和工具集 状
多 Agent 子任务编排
github:Schoober AI SDK GitHub 仓库
单个 Agent 的能力有上限。当任务足够复杂——比如"分析这个项目的代码质量,生成测试报告,然后部署到测试环境"——一个 Agent 很难同时精通代码分析、测试和部署。更好的做法是让多个专业 Agent 协作,每个 Agent 专注一个领域。
schoober-ai-sdk 通过父子任务机制实现这种协作:父 Agent 作为协调者,通过 new_task 工具将子任务委派给专业子 Agent,等待子任务完成后继续推理。
1. 整体架构
用户: "分析代码质量并部署"
│
▼
┌─ 主 Agent (协调者) ─────────────────────────────────┐
│ │
│ ReAct Loop: │
│ Reason: "需要先分析代码质量" │
│ Act: new_task(agentName: "codeReviewer", ...) │
│ │ │
│ │ 状态: RUNNING → WAITING_FOR_SUBTASK │
│ │ ReAct 循环暂停,等待子任务完成 │
│ │ │
│ │ ┌─ 子 Agent: codeReviewer ──────────┐ │
│ │ │ 独立的 ReAct 循环 │ │
│ │ │ 独立的工具集 │ │
│ │ │ 独立的消息历史 │ │
│ │ │ ... │ │
│ │ │ attempt_completion(result) │ │
│ │ └────────────────────────────────────┘ │
│ │ │
│ │ 状态: WAITING_FOR_SUBTASK → RUNNING │
│ │ 子任务结果写入父任务消息历史 │
│ │ │
│ Observe: "代码分析完成,发现3个问题..." │
│ Reason: "现在可以部署了" │
│ Act: new_task(agentName: "deployer", ...) │
│ ... │
│ │
└──────────────────────────────────────────────────────┘
涉及的核心模块:
| 模块 | 职责 |
|---|---|
Agent |
注册子 Agent,创建/恢复子任务 |
NewTaskTool |
系统工具,LLM 通过它创建子任务 |
SubTaskManager |
管理父子关系、状态转换、懒恢复 |
TaskExecutor |
任务的执行引擎,连接以上所有模块 |
2. 子 Agent 的注册与获取
在创建 Agent 时,可以通过 subAgents 配置注册子 Agent:
const codeReviewer = new Agent({
name: 'codeReviewer',
description: '专注于代码质量分析',
// ... LLM Provider、工具等独立配置
});
const deployer = new Agent({
name: 'deployer',
description: '负责部署到各种环境',
// ...
});
const mainAgent = new Agent({
name: 'coordinator',
description: '任务协调者',
subAgents: {
codeReviewer,
deployer,
},
});
每个子 Agent 是完整独立的 Agent 实例——有自己的 LLM Provider、工具注册表、systemPrompt。主 Agent 和子 Agent 之间没有共享状态,唯一的通信方式是子任务的输入和输出。
子 Agent 信息会被注入到主 Agent 的 systemPrompt 中,告诉 LLM 有哪些子 Agent 可用:
// Agent.buildSystemPrompt() 中
const subAgentsPrompt = generateSubAgentsPrompt(this.subAgents);
// 生成类似:
// ## Available Sub-Agents
// - codeReviewer: 专注于代码质量分析
// - deployer: 负责部署到各种环境
LLM 看到这些信息后,就知道在需要时可以通过 new_task 工具调用对应的子 Agent。
3. NewTaskTool:子任务的创建入口
NewTaskTool 是一个系统工具(与 attempt_completion 一样,自动注册,不可移除)。它的参数定义很简单:
const NewTaskParamsSchema = z.object({
agentName: z.string().describe('子Agent名称'),
taskName: z.string().describe('子任务名称'),
input: z.string().describe('子任务输入内容'),
});
但 execute 方法做的事情不简单。按照执行顺序:
async execute(params, context, isPartial): Promise<void> {
// 1. 流式参数处理(isPartial 机制同其他工具)
if (isPartial) {
await this.sendToolStatus(requestId, ToolStatus.WAIT, { ... });
return;
}
// 2. 获取父任务引用
const parentTask = this.taskExecutor;
// 3. 修改父任务状态: RUNNING → WAITING_FOR_SUBTASK
await parentTask.setStatus(TaskStatus.WAITING_FOR_SUBTASK);
// 4. 通过 TaskExecutor 创建子任务(委托给 SubTaskManager)
const subTask = await parentTask.createSubTask(params.agentName, {
id: subTaskRequestId,
name: params.taskName,
input: { message: params.input },
parentId: parentTask.id,
});
// 5. 异步启动子任务(不阻塞等待)
subTask.start({ message: params.input }).catch(error => { ... });
// 注意:execute 在这里就返回了,不等子任务完成
}
关键点在第 5 步:子任务是异步启动的。execute 方法不会 await subTask.start(),而是启动后立即返回。这意味着 NewTaskTool 的 execute 完成后,ReActEngine 的当前循环结束,进入下一轮 while 检查。此时父任务状态已经是 WAITING_FOR_SUBTASK,不再等于 RUNNING,循环自然退出。
// ReActEngine.run() 中
while (this.callbacks.getStatus() === TaskStatus.RUNNING) {
// WAITING_FOR_SUBTASK !== RUNNING,循环退出
}
父任务的 ReAct 循环暂停了,不再消耗 token,静静等待子任务完成。
4. SubTaskManager:父子关系的管理者
4.1 创建子任务
SubTaskManager.createSubTask() 负责实际的子任务创建流程:
async createSubTask(agentName: string, config: StartTaskConfig): Promise<Task> {
// 1. 通过 Agent 获取子 Agent
const subAgent = this.agent.getSubAgent(agentName);
// 2. 构建子任务上下文(复用会话信息,不复用任务状态)
const subTaskContext = this.buildSubTaskContext();
// 3. 构建子任务回调(关键:子任务完成时通知父任务)
const subTaskCallbacks = this.buildSubTaskCallbacks();
// 4. 通过子 Agent 创建任务
const subTask = await subAgent.createTask(config, subTaskContext, subTaskCallbacks);
// 5. 建立父子关系
subTask.setParentTask(this.taskExecutor);
this.subtasks.set(subTask.id, subTask);
// 6. 更新父任务的 subtaskIds(持久化需要)
await this.updateParentSubtaskIds(subTask.id, 'add');
return subTask;
}
子任务上下文的构建策略是:复用会话信息,隔离任务状态:
private buildSubTaskContext(): TaskContext {
return {
userId: parentContext.userId, // 复用
sessionId: parentContext.sessionId, // 复用
custom: parentContext.custom, // 复用(如 workspace 路径)
// taskSummary: 不传递 — 子任务有自己的总结
// tokenUsage: 不传递 — 子任务独立统计
};
}
4.2 子任务完成通知
子任务完成时,是如何通知父任务的?通过 buildSubTaskCallbacks 注册的 onTaskStateUpdate 回调:
private buildSubTaskCallbacks(): TaskCallbacks {
return {
onMessage: async (message) => {
// 转发消息到父任务的回调(前端能看到子任务的输出)
if (parentCallbacks?.onMessage) {
await parentCallbacks.onMessage(message);
}
},
onTaskStateUpdate: async (state) => {
// 子任务进入终态时,通知父任务
if (state.status.isFinal()) {
const result: SubTaskResult = {
success: state.status.equals(TaskStatus.COMPLETED),
summary: state.context?.taskSummary || '',
subtaskId: state.id,
error: state.error?.message,
};
await this.taskExecutor.subTaskDone(state.id, result);
}
},
};
}
当子任务调用 attempt_completion 完成后,LifecycleManager 更新状态为 COMPLETED,触发 onTaskStateUpdate,回调中调用 this.taskExecutor.subTaskDone()。
4.3 父任务恢复
subTaskDone 是整个编排流程中最关键的方法——它负责将暂停的父任务重新激活:
async subTaskDone(subTaskId: string, result: SubTaskResult): Promise<void> {
// 1. 校验:父任务必须处于 WAITING_FOR_SUBTASK 状态
if (!currentStatus.equals(TaskStatus.WAITING_FOR_SUBTASK)) {
return;
}
// 2. 通知 NewTaskTool 更新工具状态和结果
const newTaskTool = this.toolManager.getTool('new_task');
if (newTaskTool) {
newTaskTool.setTaskExecutor(this.taskExecutor);
await newTaskTool.onSubTaskDone(subTaskId, result);
}
// 3. 恢复父任务状态: WAITING_FOR_SUBTASK → RUNNING
await this.setStatus(TaskStatus.RUNNING);
// 4. 重启 ReAct 循环
if (!this.reactEngine.isExecuting()) {
this.executeTask().catch(error => { ... });
}
}
步骤 2 是串联工具系统和编排系统的关键——NewTaskTool.onSubTaskDone() 做了两件事:
async onSubTaskDone(subTaskId: string, result: SubTaskResult): Promise<void> {
// a. 更新 UI 状态(前端看到子任务完成了)
await this.sendToolStatus(requestId, ToolStatus.SUCCESS, {
result: { success: result.success, summary: result.summary },
});
// b. 将子任务结果写入父任务的消息历史(LLM 能看到结果了)
await this.setToolResult(requestId, JSON.stringify(result));
}
步骤 4 重启了父任务的 ReAct 循环。此时消息历史中已经包含了子任务的结果,LLM 会在下一轮推理中看到它,继续决定下一步行动。
完整的状态流转:
父任务: RUNNING
│ LLM 调用 new_task
▼
父任务: WAITING_FOR_SUBTASK ←── NewTaskTool.execute()
│
│ 子任务创建并异步启动
│ 父任务 ReAct 循环退出
│
│ ┌─── 子任务: PENDING → RUNNING ───┐
│ │ 独立 ReAct 循环 │
│ │ ... │
│ │ attempt_completion │
│ └─── 子任务: COMPLETED ───────────┘
│
│ onTaskStateUpdate 回调触发
│ subTaskDone() 被调用
▼
父任务: RUNNING ←── SubTaskManager.subTaskDone()
│
│ NewTaskTool.onSubTaskDone() 写入结果
│ ReAct 循环重启
│
│ LLM 看到子任务结果
│ 继续推理...
▼
5. 懒恢复:持久化场景下的子任务恢复
在生产环境中,服务可能在任务执行中途重启。重启后需要恢复父子任务关系。schoober-ai-sdk 采用懒恢复策略,而非启动时全量加载:
async routeToActiveSubTask(): Promise<Task | null> {
// 只在父任务处于 WAITING_FOR_SUBTASK 时才尝试找子任务
if (!state.status.equals(TaskStatus.WAITING_FOR_SUBTASK)) {
return null;
}
return (await this.getActiveSubTask()) || null;
}
private async getActiveSubTask(): Promise<Task | undefined> {
const subtaskIds = this.stateManager.getState().subtaskIds || [];
// 从后往前遍历(最新的子任务最可能是活跃的)
for (let i = subtaskIds.length - 1; i >= 0; i--) {
const subTaskId = subtaskIds[i];
// Level 1: 查内存缓存
let subTask = this.subtasks.get(subTaskId);
// Level 2: 缓存未命中,从持久化层恢复
if (!subTask) {
subTask = await this.restoreSubTask(subTaskId);
if (subTask) {
this.subtasks.set(subTaskId, subTask);
}
}
// 找到第一个非终态的子任务
if (subTask && !subTask.status.isFinal()) {
return subTask;
}
}
return undefined;
}
恢复流程:
private async restoreSubTask(subTaskId: string): Promise<Task | undefined> {
// 1. 从持久化层加载子任务状态
const subTaskState = await this.persistenceManager.loadTaskState(subTaskId);
// 2. 从状态中获取 agentName,找到对应的子 Agent
const subAgent = this.agent.getSubAgent(subTaskState.config.agentName);
// 3. 通过子 Agent 恢复任务(加载消息历史、重建 TaskExecutor)
const subTask = await subAgent.loadTask(subTaskId, subTaskContext, subTaskCallbacks);
// 4. 重新建立父子关系
subTask.setParentTask(this.taskExecutor);
return subTask;
}
懒恢复的好处是:如果父任务已经完成(不处于 WAITING_FOR_SUBTASK),子任务根本不会被恢复,节省了不必要的资源开销。
6. 操作路由:透明的父子传播
当用户对一个任务调用 start()、pause() 或 abort() 时,如果该任务正在等待子任务,操作会自动路由到活跃的子任务:
// TaskExecutor.start()
async start(input?: TaskInput): Promise<void> {
// 尝试路由到子任务
const activeSubTask = await this.subTaskManager.routeToActiveSubTask();
if (activeSubTask) {
await activeSubTask.start(input);
return;
}
// 否则走正常的生命周期流程
await this.lifecycleManager.start(taskInput);
}
// TaskExecutor.pause()
async pause(needRollback = false): Promise<void> {
const activeSubTask = await this.subTaskManager.routeToActiveSubTask();
if (activeSubTask) {
await activeSubTask.pause(needRollback);
return;
}
await this.lifecycleManager.pause(needRollback);
}
// TaskExecutor.abort()
abort(): void {
// 中止所有子任务(不仅是活跃的)
this.subTaskManager.abortAllSubTasks();
this.lifecycleManager.abort();
}
这种设计对调用者是透明的——调用者不需要知道任务是否有子任务,只需对父任务调用操作即可。abort 是个例外:它会中止所有子任务后再中止自身,因为中止是不可逆的,需要确保不留下孤立的子任务。
7. requestId 约定:串联工具状态和子任务结果
NewTaskTool 面临一个时序问题:execute 方法返回时子任务还没完成,但之后 onSubTaskDone 需要更新同一个工具调用的状态和结果。如何让前端和消息历史把这两次操作关联到同一个工具卡片?
答案是 requestId 约定:
// 从子任务 ID 构建 requestId
private getRequestIdFromSubTaskId(subTaskId: string): string {
return `sub_task_${subTaskId}`;
}
execute 阶段和 onSubTaskDone 阶段使用同一个 requestId:
execute():
requestId = sub_task_${context.requestId}
sendToolStatus(requestId, DOING, "子任务已创建,正在执行...")
↓
... 子任务异步执行 ...
↓
onSubTaskDone():
requestId = sub_task_${subTaskId} // 同一个 requestId
sendToolStatus(requestId, SUCCESS, "子任务执行成功")
setToolResult(requestId, result)
因为 requestId 相同,前端可以将这些状态更新合并到同一个工具卡片上——从"正在执行…“平滑过渡到"执行成功”。消息历史中的 tool_result 也能正确对应到之前的 tool_use。
8. 错误处理:子任务失败的传播
子任务并不总是成功的。当子任务失败时:
// buildSubTaskCallbacks 中
onTaskStateUpdate: async (state) => {
if (state.status.isFinal()) {
const result: SubTaskResult = {
success: state.status.equals(TaskStatus.COMPLETED),
// 失败时 success=false,error 字段携带错误信息
error: state.error?.message,
};
await this.taskExecutor.subTaskDone(state.id, result);
}
}
subTaskDone 不区分成功和失败——都会恢复父任务状态、写入结果、重启 ReAct 循环。失败的处理权交给了 LLM:它会在下一轮推理中看到 success: false 的结果,自行决定是重试、换一个策略、还是报告错误。
// LLM 在下一轮看到的工具结果
{
"success": false,
"summary": "",
"subtaskId": "task_xxx",
"error": "数据库连接超时"
}
如果 NewTaskTool.execute() 本身就失败了(比如子 Agent 不存在),则走不同的路径——直接恢复父任务状态为 RUNNING,并将错误写入消息历史:
catch (error) {
// 恢复父任务状态(不让父任务卡在 WAITING_FOR_SUBTASK)
await this.taskExecutor.setStatus(TaskStatus.RUNNING);
// 写入错误状态和结果
await this.sendToolStatus(requestId, ToolStatus.ERROR, { error: errorMessage });
await this.setToolResult(requestId, JSON.stringify({ success: false, error: errorMessage }));
}
9. 小结
多 Agent 编排的设计围绕一个核心思路:将子任务视为一种特殊的工具调用。
| 方面 | 普通工具调用 | 子任务调用 |
|---|---|---|
| 触发方式 | LLM 在 ReAct 循环中调用 | LLM 调用 new_task 工具 |
| 执行方式 | 同步执行,execute 返回即完成 | 异步执行,父任务暂停等待 |
| 结果回传 | execute 中直接 setToolResult | onSubTaskDone 回调中 setToolResult |
| 状态影响 | 不改变任务状态 | RUNNING → WAITING_FOR_SUBTASK → RUNNING |
| 错误处理 | ErrorTracker 追踪 | 结果传回 LLM,由 LLM 决定下一步 |
核心设计原则:
- 子 Agent 完全独立:独立的 LLM Provider、工具集、消息历史、systemPrompt,父子之间仅通过输入/输出通信
- 非阻塞等待:父任务通过状态切换(而非线程阻塞)等待子任务,不消耗额外资源
- 懒恢复:子任务实例按需从持久化层恢复,不全量加载
- 操作透明路由:start/pause/abort 自动传播到活跃子任务,调用者无需感知父子关系
- LLM 驱动决策:子任务的创建和失败处理都由 LLM 在 ReAct 循环中决定,SDK 只提供机制
更多推荐



所有评论(0)