多 Agent 子任务编排

github:Schoober AI SDK GitHub 仓库
单个 Agent 的能力有上限。当任务足够复杂——比如"分析这个项目的代码质量,生成测试报告,然后部署到测试环境"——一个 Agent 很难同时精通代码分析、测试和部署。更好的做法是让多个专业 Agent 协作,每个 Agent 专注一个领域。

schoober-ai-sdk 通过父子任务机制实现这种协作:父 Agent 作为协调者,通过 new_task 工具将子任务委派给专业子 Agent,等待子任务完成后继续推理

1. 整体架构

用户: "分析代码质量并部署"
    │
    ▼
┌─ 主 Agent (协调者) ─────────────────────────────────┐
│                                                      │
│  ReAct Loop:                                         │
│    Reason: "需要先分析代码质量"                        │
│    Act: new_task(agentName: "codeReviewer", ...)     │
│    │                                                 │
│    │  状态: RUNNING → WAITING_FOR_SUBTASK            │
│    │  ReAct 循环暂停,等待子任务完成                   │
│    │                                                 │
│    │  ┌─ 子 Agent: codeReviewer ──────────┐         │
│    │  │  独立的 ReAct 循环                  │         │
│    │  │  独立的工具集                       │         │
│    │  │  独立的消息历史                     │         │
│    │  │  ...                               │         │
│    │  │  attempt_completion(result)         │         │
│    │  └────────────────────────────────────┘         │
│    │                                                 │
│    │  状态: WAITING_FOR_SUBTASK → RUNNING             │
│    │  子任务结果写入父任务消息历史                      │
│    │                                                 │
│    Observe: "代码分析完成,发现3个问题..."              │
│    Reason: "现在可以部署了"                            │
│    Act: new_task(agentName: "deployer", ...)          │
│    ...                                               │
│                                                      │
└──────────────────────────────────────────────────────┘

涉及的核心模块:

模块 职责
Agent 注册子 Agent,创建/恢复子任务
NewTaskTool 系统工具,LLM 通过它创建子任务
SubTaskManager 管理父子关系、状态转换、懒恢复
TaskExecutor 任务的执行引擎,连接以上所有模块

2. 子 Agent 的注册与获取

在创建 Agent 时,可以通过 subAgents 配置注册子 Agent:

const codeReviewer = new Agent({
    name: 'codeReviewer',
    description: '专注于代码质量分析',
    // ... LLM Provider、工具等独立配置
});

const deployer = new Agent({
    name: 'deployer',
    description: '负责部署到各种环境',
    // ...
});

const mainAgent = new Agent({
    name: 'coordinator',
    description: '任务协调者',
    subAgents: {
        codeReviewer,
        deployer,
    },
});

每个子 Agent 是完整独立的 Agent 实例——有自己的 LLM Provider、工具注册表、systemPrompt。主 Agent 和子 Agent 之间没有共享状态,唯一的通信方式是子任务的输入和输出。

子 Agent 信息会被注入到主 Agent 的 systemPrompt 中,告诉 LLM 有哪些子 Agent 可用:

// Agent.buildSystemPrompt() 中
const subAgentsPrompt = generateSubAgentsPrompt(this.subAgents);
// 生成类似:
// ## Available Sub-Agents
// - codeReviewer: 专注于代码质量分析
// - deployer: 负责部署到各种环境

LLM 看到这些信息后,就知道在需要时可以通过 new_task 工具调用对应的子 Agent。

3. NewTaskTool:子任务的创建入口

NewTaskTool 是一个系统工具(与 attempt_completion 一样,自动注册,不可移除)。它的参数定义很简单:

const NewTaskParamsSchema = z.object({
    agentName: z.string().describe('子Agent名称'),
    taskName: z.string().describe('子任务名称'),
    input: z.string().describe('子任务输入内容'),
});

execute 方法做的事情不简单。按照执行顺序:

async execute(params, context, isPartial): Promise<void> {
    // 1. 流式参数处理(isPartial 机制同其他工具)
    if (isPartial) {
        await this.sendToolStatus(requestId, ToolStatus.WAIT, { ... });
        return;
    }

    // 2. 获取父任务引用
    const parentTask = this.taskExecutor;

    // 3. 修改父任务状态: RUNNING → WAITING_FOR_SUBTASK
    await parentTask.setStatus(TaskStatus.WAITING_FOR_SUBTASK);

    // 4. 通过 TaskExecutor 创建子任务(委托给 SubTaskManager)
    const subTask = await parentTask.createSubTask(params.agentName, {
        id: subTaskRequestId,
        name: params.taskName,
        input: { message: params.input },
        parentId: parentTask.id,
    });

    // 5. 异步启动子任务(不阻塞等待)
    subTask.start({ message: params.input }).catch(error => { ... });

    // 注意:execute 在这里就返回了,不等子任务完成
}

关键点在第 5 步:子任务是异步启动的execute 方法不会 await subTask.start(),而是启动后立即返回。这意味着 NewTaskTool 的 execute 完成后,ReActEngine 的当前循环结束,进入下一轮 while 检查。此时父任务状态已经是 WAITING_FOR_SUBTASK,不再等于 RUNNING,循环自然退出。

// ReActEngine.run() 中
while (this.callbacks.getStatus() === TaskStatus.RUNNING) {
    // WAITING_FOR_SUBTASK !== RUNNING,循环退出
}

父任务的 ReAct 循环暂停了,不再消耗 token,静静等待子任务完成。

4. SubTaskManager:父子关系的管理者

4.1 创建子任务

SubTaskManager.createSubTask() 负责实际的子任务创建流程:

async createSubTask(agentName: string, config: StartTaskConfig): Promise<Task> {
    // 1. 通过 Agent 获取子 Agent
    const subAgent = this.agent.getSubAgent(agentName);

    // 2. 构建子任务上下文(复用会话信息,不复用任务状态)
    const subTaskContext = this.buildSubTaskContext();

    // 3. 构建子任务回调(关键:子任务完成时通知父任务)
    const subTaskCallbacks = this.buildSubTaskCallbacks();

    // 4. 通过子 Agent 创建任务
    const subTask = await subAgent.createTask(config, subTaskContext, subTaskCallbacks);

    // 5. 建立父子关系
    subTask.setParentTask(this.taskExecutor);
    this.subtasks.set(subTask.id, subTask);

    // 6. 更新父任务的 subtaskIds(持久化需要)
    await this.updateParentSubtaskIds(subTask.id, 'add');

    return subTask;
}

子任务上下文的构建策略是:复用会话信息,隔离任务状态

private buildSubTaskContext(): TaskContext {
    return {
        userId: parentContext.userId,      // 复用
        sessionId: parentContext.sessionId, // 复用
        custom: parentContext.custom,       // 复用(如 workspace 路径)
        // taskSummary: 不传递 — 子任务有自己的总结
        // tokenUsage: 不传递 — 子任务独立统计
    };
}

4.2 子任务完成通知

子任务完成时,是如何通知父任务的?通过 buildSubTaskCallbacks 注册的 onTaskStateUpdate 回调:

private buildSubTaskCallbacks(): TaskCallbacks {
    return {
        onMessage: async (message) => {
            // 转发消息到父任务的回调(前端能看到子任务的输出)
            if (parentCallbacks?.onMessage) {
                await parentCallbacks.onMessage(message);
            }
        },
        onTaskStateUpdate: async (state) => {
            // 子任务进入终态时,通知父任务
            if (state.status.isFinal()) {
                const result: SubTaskResult = {
                    success: state.status.equals(TaskStatus.COMPLETED),
                    summary: state.context?.taskSummary || '',
                    subtaskId: state.id,
                    error: state.error?.message,
                };
                await this.taskExecutor.subTaskDone(state.id, result);
            }
        },
    };
}

当子任务调用 attempt_completion 完成后,LifecycleManager 更新状态为 COMPLETED,触发 onTaskStateUpdate,回调中调用 this.taskExecutor.subTaskDone()

4.3 父任务恢复

subTaskDone 是整个编排流程中最关键的方法——它负责将暂停的父任务重新激活:

async subTaskDone(subTaskId: string, result: SubTaskResult): Promise<void> {
    // 1. 校验:父任务必须处于 WAITING_FOR_SUBTASK 状态
    if (!currentStatus.equals(TaskStatus.WAITING_FOR_SUBTASK)) {
        return;
    }

    // 2. 通知 NewTaskTool 更新工具状态和结果
    const newTaskTool = this.toolManager.getTool('new_task');
    if (newTaskTool) {
        newTaskTool.setTaskExecutor(this.taskExecutor);
        await newTaskTool.onSubTaskDone(subTaskId, result);
    }

    // 3. 恢复父任务状态: WAITING_FOR_SUBTASK → RUNNING
    await this.setStatus(TaskStatus.RUNNING);

    // 4. 重启 ReAct 循环
    if (!this.reactEngine.isExecuting()) {
        this.executeTask().catch(error => { ... });
    }
}

步骤 2 是串联工具系统和编排系统的关键——NewTaskTool.onSubTaskDone() 做了两件事:

async onSubTaskDone(subTaskId: string, result: SubTaskResult): Promise<void> {
    // a. 更新 UI 状态(前端看到子任务完成了)
    await this.sendToolStatus(requestId, ToolStatus.SUCCESS, {
        result: { success: result.success, summary: result.summary },
    });

    // b. 将子任务结果写入父任务的消息历史(LLM 能看到结果了)
    await this.setToolResult(requestId, JSON.stringify(result));
}

步骤 4 重启了父任务的 ReAct 循环。此时消息历史中已经包含了子任务的结果,LLM 会在下一轮推理中看到它,继续决定下一步行动。

完整的状态流转:

父任务: RUNNING
    │  LLM 调用 new_task
    ▼
父任务: WAITING_FOR_SUBTASK ←── NewTaskTool.execute()
    │
    │  子任务创建并异步启动
    │  父任务 ReAct 循环退出
    │
    │  ┌─── 子任务: PENDING → RUNNING ───┐
    │  │    独立 ReAct 循环               │
    │  │    ...                          │
    │  │    attempt_completion            │
    │  └─── 子任务: COMPLETED ───────────┘
    │
    │  onTaskStateUpdate 回调触发
    │  subTaskDone() 被调用
    ▼
父任务: RUNNING ←── SubTaskManager.subTaskDone()
    │
    │  NewTaskTool.onSubTaskDone() 写入结果
    │  ReAct 循环重启
    │
    │  LLM 看到子任务结果
    │  继续推理...
    ▼

5. 懒恢复:持久化场景下的子任务恢复

在生产环境中,服务可能在任务执行中途重启。重启后需要恢复父子任务关系。schoober-ai-sdk 采用懒恢复策略,而非启动时全量加载:

async routeToActiveSubTask(): Promise<Task | null> {
    // 只在父任务处于 WAITING_FOR_SUBTASK 时才尝试找子任务
    if (!state.status.equals(TaskStatus.WAITING_FOR_SUBTASK)) {
        return null;
    }
    return (await this.getActiveSubTask()) || null;
}

private async getActiveSubTask(): Promise<Task | undefined> {
    const subtaskIds = this.stateManager.getState().subtaskIds || [];

    // 从后往前遍历(最新的子任务最可能是活跃的)
    for (let i = subtaskIds.length - 1; i >= 0; i--) {
        const subTaskId = subtaskIds[i];

        // Level 1: 查内存缓存
        let subTask = this.subtasks.get(subTaskId);

        // Level 2: 缓存未命中,从持久化层恢复
        if (!subTask) {
            subTask = await this.restoreSubTask(subTaskId);
            if (subTask) {
                this.subtasks.set(subTaskId, subTask);
            }
        }

        // 找到第一个非终态的子任务
        if (subTask && !subTask.status.isFinal()) {
            return subTask;
        }
    }
    return undefined;
}

恢复流程:

private async restoreSubTask(subTaskId: string): Promise<Task | undefined> {
    // 1. 从持久化层加载子任务状态
    const subTaskState = await this.persistenceManager.loadTaskState(subTaskId);

    // 2. 从状态中获取 agentName,找到对应的子 Agent
    const subAgent = this.agent.getSubAgent(subTaskState.config.agentName);

    // 3. 通过子 Agent 恢复任务(加载消息历史、重建 TaskExecutor)
    const subTask = await subAgent.loadTask(subTaskId, subTaskContext, subTaskCallbacks);

    // 4. 重新建立父子关系
    subTask.setParentTask(this.taskExecutor);

    return subTask;
}

懒恢复的好处是:如果父任务已经完成(不处于 WAITING_FOR_SUBTASK),子任务根本不会被恢复,节省了不必要的资源开销。

6. 操作路由:透明的父子传播

当用户对一个任务调用 start()pause()abort() 时,如果该任务正在等待子任务,操作会自动路由到活跃的子任务:

// TaskExecutor.start()
async start(input?: TaskInput): Promise<void> {
    // 尝试路由到子任务
    const activeSubTask = await this.subTaskManager.routeToActiveSubTask();
    if (activeSubTask) {
        await activeSubTask.start(input);
        return;
    }
    // 否则走正常的生命周期流程
    await this.lifecycleManager.start(taskInput);
}

// TaskExecutor.pause()
async pause(needRollback = false): Promise<void> {
    const activeSubTask = await this.subTaskManager.routeToActiveSubTask();
    if (activeSubTask) {
        await activeSubTask.pause(needRollback);
        return;
    }
    await this.lifecycleManager.pause(needRollback);
}

// TaskExecutor.abort()
abort(): void {
    // 中止所有子任务(不仅是活跃的)
    this.subTaskManager.abortAllSubTasks();
    this.lifecycleManager.abort();
}

这种设计对调用者是透明的——调用者不需要知道任务是否有子任务,只需对父任务调用操作即可。abort 是个例外:它会中止所有子任务后再中止自身,因为中止是不可逆的,需要确保不留下孤立的子任务。

7. requestId 约定:串联工具状态和子任务结果

NewTaskTool 面临一个时序问题:execute 方法返回时子任务还没完成,但之后 onSubTaskDone 需要更新同一个工具调用的状态和结果。如何让前端和消息历史把这两次操作关联到同一个工具卡片?

答案是 requestId 约定

// 从子任务 ID 构建 requestId
private getRequestIdFromSubTaskId(subTaskId: string): string {
    return `sub_task_${subTaskId}`;
}

execute 阶段和 onSubTaskDone 阶段使用同一个 requestId:

execute():
    requestId = sub_task_${context.requestId}
    sendToolStatus(requestId, DOING, "子任务已创建,正在执行...")
    ↓
    ... 子任务异步执行 ...
    ↓
onSubTaskDone():
    requestId = sub_task_${subTaskId}  // 同一个 requestId
    sendToolStatus(requestId, SUCCESS, "子任务执行成功")
    setToolResult(requestId, result)

因为 requestId 相同,前端可以将这些状态更新合并到同一个工具卡片上——从"正在执行…“平滑过渡到"执行成功”。消息历史中的 tool_result 也能正确对应到之前的 tool_use。

8. 错误处理:子任务失败的传播

子任务并不总是成功的。当子任务失败时:

// buildSubTaskCallbacks 中
onTaskStateUpdate: async (state) => {
    if (state.status.isFinal()) {
        const result: SubTaskResult = {
            success: state.status.equals(TaskStatus.COMPLETED),
            // 失败时 success=false,error 字段携带错误信息
            error: state.error?.message,
        };
        await this.taskExecutor.subTaskDone(state.id, result);
    }
}

subTaskDone 不区分成功和失败——都会恢复父任务状态、写入结果、重启 ReAct 循环。失败的处理权交给了 LLM:它会在下一轮推理中看到 success: false 的结果,自行决定是重试、换一个策略、还是报告错误。

// LLM 在下一轮看到的工具结果
{
    "success": false,
    "summary": "",
    "subtaskId": "task_xxx",
    "error": "数据库连接超时"
}

如果 NewTaskTool.execute() 本身就失败了(比如子 Agent 不存在),则走不同的路径——直接恢复父任务状态为 RUNNING,并将错误写入消息历史:

catch (error) {
    // 恢复父任务状态(不让父任务卡在 WAITING_FOR_SUBTASK)
    await this.taskExecutor.setStatus(TaskStatus.RUNNING);

    // 写入错误状态和结果
    await this.sendToolStatus(requestId, ToolStatus.ERROR, { error: errorMessage });
    await this.setToolResult(requestId, JSON.stringify({ success: false, error: errorMessage }));
}

9. 小结

多 Agent 编排的设计围绕一个核心思路:将子任务视为一种特殊的工具调用

方面 普通工具调用 子任务调用
触发方式 LLM 在 ReAct 循环中调用 LLM 调用 new_task 工具
执行方式 同步执行,execute 返回即完成 异步执行,父任务暂停等待
结果回传 execute 中直接 setToolResult onSubTaskDone 回调中 setToolResult
状态影响 不改变任务状态 RUNNING → WAITING_FOR_SUBTASK → RUNNING
错误处理 ErrorTracker 追踪 结果传回 LLM,由 LLM 决定下一步

核心设计原则:

  • 子 Agent 完全独立:独立的 LLM Provider、工具集、消息历史、systemPrompt,父子之间仅通过输入/输出通信
  • 非阻塞等待:父任务通过状态切换(而非线程阻塞)等待子任务,不消耗额外资源
  • 懒恢复:子任务实例按需从持久化层恢复,不全量加载
  • 操作透明路由:start/pause/abort 自动传播到活跃子任务,调用者无需感知父子关系
  • LLM 驱动决策:子任务的创建和失败处理都由 LLM 在 ReAct 循环中决定,SDK 只提供机制
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐