FastGPT源码解析 Agent工作流编排后端详解
FastGPT 工作流后端实现了一个高度可扩展的节点调度引擎统一调度: 通过实现节点类型的统一分发状态管理: 精确控制节点和边的执行状态数据流控制: 支持条件分支、循环、并行等复杂逻辑实时响应: SSE 流式推送执行状态和结果错误恢复: 完善的异常处理和资源清理机制性能优化: 并发执行、内存管理、去重优化调试支持: 完整的调试模式和执行追踪这套架构为 FastGPT 提供了强大的 AI 工作流编排
·
FastGPT 工作流后端实现分析
核心架构概览
FastGPT 工作流后端采用节点调度器模式,通过统一的调度引擎执行各种类型的节点,实现复杂的 AI 工作流编排。
1. 调度引擎核心 (dispatchWorkFlow
)
主要职责
- 节点执行调度: 按依赖关系执行节点
- 边状态管理: 控制数据流向
- 变量传递: 节点间数据共享
- 流式响应: 实时推送执行状态
- 错误处理: 异常捕获和恢复
核心流程
export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {
// 1. 初始化执行环境
let { runtimeNodes, runtimeEdges, variables, histories } = data;
// 2. 设置系统变量
variables = {
...getSystemVariable(data),
...externalProvider.externalWorkflowVariables,
...variables
};
// 3. 递归执行节点
const entryNodes = runtimeNodes.filter(item => item.isEntry);
await Promise.all(entryNodes.map(node => checkNodeCanRun(node)));
// 4. 返回执行结果
return {
flowResponses: chatResponses,
flowUsages: chatNodeUsages,
assistantResponses: chatAssistantResponse,
newVariables: removeSystemVariable(variables)
};
}
2. 节点类型映射 (callbackMap
)
节点分发器
const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.chatNode]: dispatchChatCompletion, // AI对话
[FlowNodeTypeEnum.datasetSearchNode]: dispatchDatasetSearch, // 知识库搜索
[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request, // HTTP请求
[FlowNodeTypeEnum.ifElseNode]: dispatchIfElse, // 条件判断
[FlowNodeTypeEnum.code]: dispatchRunCode, // 代码执行
[FlowNodeTypeEnum.loop]: dispatchLoop, // 循环控制
[FlowNodeTypeEnum.tools]: dispatchRunTools, // 工具调用
// ... 30+ 种节点类型
};
节点执行模式
- Active 模式: 正常执行节点逻辑
- Skip 模式: 跳过执行,传递默认值
- Wait 模式: 等待前置条件满足
3. 节点执行机制
节点运行状态检查
async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<RuntimeNodeItemType[]> {
// 1. 检查节点运行状态
const status = checkNodeRunStatus({ node, runtimeEdges });
// 2. 根据状态执行不同逻辑
if (status === 'run') {
return nodeRunWithActive(node);
}
if (status === 'skip') {
return nodeRunWithSkip(node);
}
// 3. 递归执行下游节点
const { nextStepActiveNodes } = nodeOutput(node, result);
return Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)));
}
节点参数注入
function getNodeRunParams(node: RuntimeNodeItemType) {
const params: Record<string, any> = {};
node.inputs.forEach(input => {
// 1. 变量替换 {{$xx.xx$}} 和 {{xx}}
let value = replaceEditorVariable({
text: input.value,
nodes: runtimeNodes,
variables
});
// 2. 引用变量解析
value = getReferenceVariableValue({
value,
nodes: runtimeNodes,
variables
});
// 3. 类型格式化
params[input.key] = valueTypeFormat(value, input.valueType);
});
return params;
}
4. 数据流管理
边状态控制
// 边的三种状态
type EdgeStatus = 'waiting' | 'active' | 'skipped';
// 更新边状态
targetEdges.forEach(edge => {
if (skipHandleId.includes(edge.sourceHandle)) {
edge.status = 'skipped'; // 跳过分支
} else {
edge.status = 'active'; // 激活分支
}
});
节点输出传递
function nodeOutput(node: RuntimeNodeItemType, result: Record<string, any>) {
// 1. 更新节点输出值
node.outputs.forEach(outputItem => {
if (result[outputItem.key] !== undefined) {
outputItem.value = result[outputItem.key];
}
});
// 2. 获取下游节点
const nextStepActiveNodes = runtimeNodes.filter(node =>
targetEdges.some(item =>
item.target === node.nodeId && item.status === 'active'
)
);
return { nextStepActiveNodes };
}
5. 特殊功能实现
流式响应处理
// SSE 流式响应配置
if (stream && res) {
res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Cache-Control', 'no-cache, no-transform');
}
// 推送执行状态
props.workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: { status: 'running', name: node.name }
});
交互节点处理
// 处理用户交互节点(表单输入、用户选择等)
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
nodeInteractiveResponse = {
entryNodeIds: [nodeRunResult.node.nodeId],
interactiveResponse
};
// 暂停工作流,等待用户交互
return [];
}
循环控制
// 循环节点实现
[FlowNodeTypeEnum.loop]: dispatchLoop,
[FlowNodeTypeEnum.loopStart]: dispatchLoopStart,
[FlowNodeTypeEnum.loopEnd]: dispatchLoopEnd,
// 循环逻辑:通过边的连接实现循环回路
6. 错误处理机制
节点级错误处理
const dispatchRes = await (async () => {
try {
return await callbackMap[node.flowNodeType](dispatchData);
} catch (error) {
// 获取所有输出边
const targetEdges = runtimeEdges.filter(item => item.source === node.nodeId);
const skipHandleIds = targetEdges.map(item => item.sourceHandle);
// 跳过所有边并返回错误
return {
[DispatchNodeResponseKeyEnum.nodeResponse]: {
error: formatHttpError(error)
},
[DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleIds
};
}
})();
工作流级错误恢复
- 深度限制: 防止无限递归 (最大20层)
- 运行次数控制: 避免死循环
- 资源清理: 连接关闭时自动清理
7. 性能优化
并发执行
// 并行执行无依赖的节点
const nextStepActiveNodesResults = (
await Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)))
).flat();
内存管理
// 进程让步,避免阻塞
await surrenderProcess();
// 移除系统变量,减少内存占用
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables)
去重优化
// 确保节点只执行一次
nextStepActiveNodes = nextStepActiveNodes.filter(
(node, index, self) => self.findIndex(t => t.nodeId === node.nodeId) === index
);
8. 调试支持
Debug 模式
if (props.mode === 'debug') {
debugNextStepRunNodes = debugNextStepRunNodes.concat([
...nextStepActiveNodes,
...nextStepSkipNodes
]);
// Debug 模式下不继续执行,返回调试信息
return { nextStepActiveNodes: [], nextStepSkipNodes: [] };
}
执行追踪
return {
debugResponse: {
finishedNodes: runtimeNodes, // 已完成的节点
finishedEdges: runtimeEdges, // 边的状态
nextStepRunNodes: debugNextStepRunNodes // 下一步要执行的节点
}
};
总结
FastGPT 工作流后端实现了一个高度可扩展的节点调度引擎,具备以下特点:
- 统一调度: 通过
callbackMap
实现节点类型的统一分发 - 状态管理: 精确控制节点和边的执行状态
- 数据流控制: 支持条件分支、循环、并行等复杂逻辑
- 实时响应: SSE 流式推送执行状态和结果
- 错误恢复: 完善的异常处理和资源清理机制
- 性能优化: 并发执行、内存管理、去重优化
- 调试支持: 完整的调试模式和执行追踪
这套架构为 FastGPT 提供了强大的 AI 工作流编排能力,支持复杂的业务逻辑实现。
更多推荐
所有评论(0)