FastGPT 工作流后端实现分析

核心架构概览

FastGPT 工作流后端采用节点调度器模式,通过统一的调度引擎执行各种类型的节点,实现复杂的 AI 工作流编排。

1. 调度引擎核心 (dispatchWorkFlow)

主要职责

  • 节点执行调度: 按依赖关系执行节点
  • 边状态管理: 控制数据流向
  • 变量传递: 节点间数据共享
  • 流式响应: 实时推送执行状态
  • 错误处理: 异常捕获和恢复

核心流程

export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {
  // 1. 初始化执行环境
  let { runtimeNodes, runtimeEdges, variables, histories } = data;
  
  // 2. 设置系统变量
  variables = {
    ...getSystemVariable(data),
    ...externalProvider.externalWorkflowVariables,
    ...variables
  };
  
  // 3. 递归执行节点
  const entryNodes = runtimeNodes.filter(item => item.isEntry);
  await Promise.all(entryNodes.map(node => checkNodeCanRun(node)));
  
  // 4. 返回执行结果
  return {
    flowResponses: chatResponses,
    flowUsages: chatNodeUsages,
    assistantResponses: chatAssistantResponse,
    newVariables: removeSystemVariable(variables)
  };
}

2. 节点类型映射 (callbackMap)

节点分发器

const callbackMap: Record<FlowNodeTypeEnum, Function> = {
  [FlowNodeTypeEnum.chatNode]: dispatchChatCompletion,        // AI对话
  [FlowNodeTypeEnum.datasetSearchNode]: dispatchDatasetSearch, // 知识库搜索
  [FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request,   // HTTP请求
  [FlowNodeTypeEnum.ifElseNode]: dispatchIfElse,              // 条件判断
  [FlowNodeTypeEnum.code]: dispatchRunCode,                   // 代码执行
  [FlowNodeTypeEnum.loop]: dispatchLoop,                      // 循环控制
  [FlowNodeTypeEnum.tools]: dispatchRunTools,                 // 工具调用
  // ... 30+ 种节点类型
};

节点执行模式

  • Active 模式: 正常执行节点逻辑
  • Skip 模式: 跳过执行,传递默认值
  • Wait 模式: 等待前置条件满足

3. 节点执行机制

节点运行状态检查

async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<RuntimeNodeItemType[]> {
  // 1. 检查节点运行状态
  const status = checkNodeRunStatus({ node, runtimeEdges });
  
  // 2. 根据状态执行不同逻辑
  if (status === 'run') {
    return nodeRunWithActive(node);
  }
  if (status === 'skip') {
    return nodeRunWithSkip(node);
  }
  
  // 3. 递归执行下游节点
  const { nextStepActiveNodes } = nodeOutput(node, result);
  return Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)));
}

节点参数注入

function getNodeRunParams(node: RuntimeNodeItemType) {
  const params: Record<string, any> = {};
  
  node.inputs.forEach(input => {
    // 1. 变量替换 {{$xx.xx$}} 和 {{xx}}
    let value = replaceEditorVariable({
      text: input.value,
      nodes: runtimeNodes,
      variables
    });
    
    // 2. 引用变量解析
    value = getReferenceVariableValue({
      value,
      nodes: runtimeNodes,
      variables
    });
    
    // 3. 类型格式化
    params[input.key] = valueTypeFormat(value, input.valueType);
  });
  
  return params;
}

4. 数据流管理

边状态控制

// 边的三种状态
type EdgeStatus = 'waiting' | 'active' | 'skipped';

// 更新边状态
targetEdges.forEach(edge => {
  if (skipHandleId.includes(edge.sourceHandle)) {
    edge.status = 'skipped';  // 跳过分支
  } else {
    edge.status = 'active';   // 激活分支
  }
});

节点输出传递

function nodeOutput(node: RuntimeNodeItemType, result: Record<string, any>) {
  // 1. 更新节点输出值
  node.outputs.forEach(outputItem => {
    if (result[outputItem.key] !== undefined) {
      outputItem.value = result[outputItem.key];
    }
  });
  
  // 2. 获取下游节点
  const nextStepActiveNodes = runtimeNodes.filter(node => 
    targetEdges.some(item => 
      item.target === node.nodeId && item.status === 'active'
    )
  );
  
  return { nextStepActiveNodes };
}

5. 特殊功能实现

流式响应处理

// SSE 流式响应配置
if (stream && res) {
  res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Cache-Control', 'no-cache, no-transform');
}

// 推送执行状态
props.workflowStreamResponse?.({
  event: SseResponseEventEnum.flowNodeStatus,
  data: { status: 'running', name: node.name }
});

交互节点处理

// 处理用户交互节点(表单输入、用户选择等)
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
  nodeInteractiveResponse = {
    entryNodeIds: [nodeRunResult.node.nodeId],
    interactiveResponse
  };
  // 暂停工作流,等待用户交互
  return [];
}

循环控制

// 循环节点实现
[FlowNodeTypeEnum.loop]: dispatchLoop,
[FlowNodeTypeEnum.loopStart]: dispatchLoopStart,
[FlowNodeTypeEnum.loopEnd]: dispatchLoopEnd,

// 循环逻辑:通过边的连接实现循环回路

6. 错误处理机制

节点级错误处理

const dispatchRes = await (async () => {
  try {
    return await callbackMap[node.flowNodeType](dispatchData);
  } catch (error) {
    // 获取所有输出边
    const targetEdges = runtimeEdges.filter(item => item.source === node.nodeId);
    const skipHandleIds = targetEdges.map(item => item.sourceHandle);
    
    // 跳过所有边并返回错误
    return {
      [DispatchNodeResponseKeyEnum.nodeResponse]: {
        error: formatHttpError(error)
      },
      [DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleIds
    };
  }
})();

工作流级错误恢复

  • 深度限制: 防止无限递归 (最大20层)
  • 运行次数控制: 避免死循环
  • 资源清理: 连接关闭时自动清理

7. 性能优化

并发执行

// 并行执行无依赖的节点
const nextStepActiveNodesResults = (
  await Promise.all(nextStepActiveNodes.map(node => checkNodeCanRun(node)))
).flat();

内存管理

// 进程让步,避免阻塞
await surrenderProcess();

// 移除系统变量,减少内存占用
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables)

去重优化

// 确保节点只执行一次
nextStepActiveNodes = nextStepActiveNodes.filter(
  (node, index, self) => self.findIndex(t => t.nodeId === node.nodeId) === index
);

8. 调试支持

Debug 模式

if (props.mode === 'debug') {
  debugNextStepRunNodes = debugNextStepRunNodes.concat([
    ...nextStepActiveNodes,
    ...nextStepSkipNodes
  ]);
  // Debug 模式下不继续执行,返回调试信息
  return { nextStepActiveNodes: [], nextStepSkipNodes: [] };
}

执行追踪

return {
  debugResponse: {
    finishedNodes: runtimeNodes,      // 已完成的节点
    finishedEdges: runtimeEdges,      // 边的状态
    nextStepRunNodes: debugNextStepRunNodes  // 下一步要执行的节点
  }
};

总结

FastGPT 工作流后端实现了一个高度可扩展的节点调度引擎,具备以下特点:

  1. 统一调度: 通过 callbackMap 实现节点类型的统一分发
  2. 状态管理: 精确控制节点和边的执行状态
  3. 数据流控制: 支持条件分支、循环、并行等复杂逻辑
  4. 实时响应: SSE 流式推送执行状态和结果
  5. 错误恢复: 完善的异常处理和资源清理机制
  6. 性能优化: 并发执行、内存管理、去重优化
  7. 调试支持: 完整的调试模式和执行追踪

这套架构为 FastGPT 提供了强大的 AI 工作流编排能力,支持复杂的业务逻辑实现。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐