关于智能体实现工作流的流转自定义实现在网络上的智能体平台都有自己的实现,于是我根据自身搭建与网络学习搭配出自己的一个自定义实现方法

关键技术概念解释:

  1. 有向无环图(DAG)

    • 定义:Directed Acyclic Graph,节点间有方向性,无循环

    • 作用:确保工作流节点按正确顺序执行,避免死循环

    • 算法:拓扑排序确定执行序列

  2. json Schema

    • 作用:标准化数据结构定义

    • 优势:类型验证、自动文档生成

    • 应用:工作流节点的输入输出参数约束

  3. 事件驱动架构

    • 模式:发布-订阅模式

    • 实现:SSE单向推送 + WebSocket双向通信

    • 优势:解耦合、异步处理、高并发

  4. 状态机

    • 应用:每个节点和工作流都有状态管理

    • 状态:待执行、执行中、成功、失败、重试中

    • 转换:基于事件和条件自动转换

口述是真的难以诉说,所以我进行举例来实现这个功能

背景 例子:

假设我们装配了3个主要节点的工作流:
​
**节点1**:提示词优化智能体客户端
- 输入:用户原始文本
- 输出:优化后的提示词(string类型)
​
**节点2**:图片生成智能体客户端
- 输入:节点1优化后的提示词
- 输出:生成的图片
​
**节点3**:故事生成智能体客户端
- 输入:节点2生成的图片
- 输出:幽默的故事描述
​
**执行流程**:用户输入 → 节点1处理 → 节点2生成图片 → 节点3生成故事 → 返回图片+故事

1,节点信息存储方式

user_lang_chain_info 工作流配置存储 存储表

sql 语句:

CREATE TABLE user_lang_chain_info ( id bigint NOT NULL AUTO_INCREMENT COMMENT '非业务主键', bot_id int NULL DEFAULT NULL COMMENT '智能体ID', name varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT 'LangChain名称', desc text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '智能体描述', open text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '开放配置信息, 包含节点和边', gcy text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT 'GCY配置信息, 包含虚拟节点和边', uid varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '用户ID', flow_id varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT 'Flow ID', maas_id bigint NULL DEFAULT NULL COMMENT 'Group ID', bot_name varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '智能体名称', extra_inputs text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '额外输入项', extra_inputs_config text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '多文件参数', space_id bigint NULL DEFAULT NULL, create_time datetime NULL DEFAULT NULL COMMENT '创建时间', update_time datetime NULL DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (id) USING BTREE, INDEX idx_bot_id(bot_id ASC) USING BTREE, INDEX idx_uid(uid ASC) USING BTREE, INDEX idx_flow_id(flow_id ASC) USING BTREE, INDEX idx_maas_id(maas_id ASC) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '用户语言链信息表' ROW_FORMAT = Dynamic;

关键字段 :

-- 关键字段
`open` text - 存储完整的节点和边配置(JSON格式)
`extra_inputs` text - 存储额外输入参数配置
`extra_inputs_config` text - 存储多文件参数配置

open 字段的 json 结构例子:

```json
{
  "nodes": [
    {
      "id": "node_1_prompt_optimize",
      "type": "spark-llm",
      "data": {
        "label": "提示词优化",
        "nodeParam": {
          "serviceId": "spark-4.0",
          "model": "spark-4.0-ultra",
          "temperature": 0.7
        },
        "inputs": [
          {
            "id": "input_1",
            "name": "user_input",
            "schema": {"type": "string"},
            "required": true
          }
        ],
        "outputs": [
          {
            "id": "output_1",
            "name": "output_optimized_prompt",
            "schema": {"type": "string"}
          }
        ]
      }
    },
    {
      "id": "node_2_image_generate",
      "type": "image-generation",
      "data": {
        "label": "图片生成",
        "inputs": [
          {
            "name": "input_prompt",
            "schema": {"type": "string"}
          }
        ],
        "outputs": [
          {
            "name": "generated_image",
            "schema": {"type": "file", "fileType": "image"}
          }
        ]
      }
    },
    {
      "id": "node_3_story_generate",
      "type": "spark-llm",
      "data": {
        "label": "故事生成",
        "inputs": [
          {
            "name": "genenate_image",
            "schema": {"type": "file", "fileType": "image"}
          }
        ],
        "outputs": [
          {
            "name": "story",
            "schema": {"type": "string"}
          }
        ]
      }
    }
  ],
  "edges": [
    {
      "source": "node_1_prompt_optimize",
      "sourceHandle": "output_optimized_prompt",
      "target": "node_2_image_generate",
      "targetHandle": "input_prompt"
    },
    {
      "source": "node_2_image_generate",
      "sourceHandle": "generated_image",
      "target": "node_3_story_generate",
      "targetHandle": "genenate_image"
    }
  ]
}

2,数据间的流转机制

主要是通过边参数 edgessourceHandletargetHandle 来确定流转的数据内容,通过 sourcetarget来确定位置

// React Flow中的节点定义 const nodes = [ { id: "node_1_prompt_optimize", type: "customNode", data: { inputs: [ { id: "input_user_text", name: "用户输入" } ], outputs: [ { id: "output_optimized_prompt", name: "优化提示词" } // sourceHandle对应这个 ] } } ];

// 连接线定义 const edges = [ { id: "edge_1_to_2", source: "node_1_prompt_optimize", sourceHandle: "output_optimized_prompt", // 从这个输出端口引出 target: "node_2_image_generate", targetHandle: "input_prompt" // 连接到这个输入端口 } ];

实际代码中的数据流转

用户输入 → 节点1.input_user_text → 处理 → 节点1.output_optimized_prompt ↓ 节点2.input_prompt ← 节点1.output_optimized_prompt → 节点2.output_image_url ↓ 节点3.input_image ← 节点2.output_image_url → 处理 → 节点3.output_story ↓ 前端显示:图片 + 故事

也就是正常我们理解的 从sourceHandle 从哪个 端口出来的数据 ? targetHandle 装到哪个端口去装配 给智能体 ?

复杂多出入端口场景

怎么处理的呢 ?也就是像树一样,不是一个线连着一个线,一个节点的输入参数可能可以由前端多个节点的执行结果作为处理|

// 复杂节点示例 { "id": "node_complex", "data": { "inputs": [ { "id": "input_text", "name": "text" }, { "id": "input_image", "name": "image" }, { "id": "input_audio", "name": "audio" } ], "outputs": [ { "id": "output_result", "name": "result" }, { "id": "output_metadata", "name": "metadata" } ] } }

// 对应的连接可以是 {

"sourceHandle": "output_result", // 从result输出端口 "target": "node_next", "targetHandle": "input_data" // 连接到data输入端口 }

const edges = [ { id: "edge_1_to_2", "source": "node_complex", sourceHandle: "output_result", // 从这个输出端口引出

"sourceHandle": "output_metadata",

target: "node_2_image_generate", targetHandle: "next_node_prompt" // 连接到这个输入端口 }

关键代码实现

// 工作流引擎解析连接关系
public class WorkflowExecutor {
// 解析edges,建立变量映射表
private Map<String, String> buildVariableMappings(JSONArray edges) {
    Map<String, String> mappings = new HashMap<>();
    
    for (JSONObject edge : edges) {
        String sourceKey = edge.getString("source") + "." + edge.getString("sourceHandle");
        String targetKey = edge.getString("target") + "." + edge.getString("targetHandle");
        
        // 建立映射:node_1.output_optimized_prompt -> node_2.input_prompt
        mappings.put(sourceKey, targetKey);
    }
    
    return mappings;
}
​
// 执行单个节点
private Map<String, Object> executeNode(String nodeId, Map<String, Object> inputs) {
    // 获取节点配置
    JSONObject nodeConfig = getNodeConfig(nodeId);
    
    // 调用对应的AI服务
    if (nodeId.equals("node_1_prompt_optimize")) {
        return executePromptOptimization(inputs);
    } else if (nodeId.equals("node_2_image_generate")) {
        return executeImageGeneration(inputs);
    }
    
    return new HashMap<>();
}
​
// 执行提示词优化
private Map<String, Object> executePromptOptimization(Map<String, Object> inputs) {
    String userInput = (String) inputs.get("user_input"); // 从input_user_text端口接收
    
    // 调用Spark LLM优化提示词
    String optimizedPrompt = sparkLLM.optimizePrompt(userInput);
    
    // 返回结果(通过output_optimized_prompt端口输出)
    Map<String, Object> outputs = new HashMap<>();
    outputs.put("optimized_prompt", optimizedPrompt); // 变量名与端口对应
    
    return outputs;
}
​
// 执行图片生成
private Map<String, Object> executeImageGeneration(Map<String, Object> inputs) {
    String prompt = (String) inputs.get("prompt"); // 从input_prompt端口接收
    
    // 调用图片生成API
    String imageUrl = imageGenerator.generate(prompt);
    
    // 返回结果
    Map<String, Object> outputs = new HashMap<>();
    outputs.put("image_url", imageUrl);
    
    return outputs;
}
​
// 主执行流程
public void executeWorkflow(JSONObject workflowConfig) {
    JSONArray edges = workflowConfig.getJSONArray("edges");
    Map<String, String> variableMappings = buildVariableMappings(edges);
    
    // 初始化上下文
    Map<String, Object> globalContext = new HashMap<>();
    globalContext.put("user_input", "画一只可爱的猫咪");
    
    // 按拓扑顺序执行节点
    List<String> executionOrder = getExecutionOrder(edges);
    
    for (String nodeId : executionOrder) {
        // 1. 准备当前节点的输入参数
        Map<String, Object> nodeInputs = prepareNodeInputs(nodeId, globalContext, variableMappings);
        
        // 2. 执行节点
        Map<String, Object> nodeOutputs = executeNode(nodeId, nodeInputs);
        
        // 3. 将输出结果存入全局上下文(用于传递给后续节点)
        for (Map.Entry<String, Object> output : nodeOutputs.entrySet()) {
            String outputKey = nodeId + "." + output.getKey();
            globalContext.put(outputKey, output.getValue());
        }
    }
}
​
// 准备节点输入(关键的变量映射逻辑)
private Map<String, Object> prepareNodeInputs(String nodeId, Map<String, Object> context, 
                                             Map<String, String> mappings) {
    Map<String, Object> nodeInputs = new HashMap<>();
    JSONObject nodeConfig = getNodeConfig(nodeId);
    
    // 遍历节点的输入端口
    JSONArray inputs = nodeConfig.getJSONObject("data").getJSONArray("inputs");
    for (JSONObject input : inputs) {
        String inputId = input.getString("id");           // input_prompt
        String varName = input.getString("name");         // prompt
        
        // 查找是否有映射关系:node_2.input_prompt -> 来自哪个输出
        String mappingKey = nodeId + "." + inputId;
        String sourceKey = mappings.get(mappingKey);     // node_1.output_optimized_prompt
        
        if (sourceKey != null) {
            // 从全局上下文中获取前序节点的结果
            Object value = context.get(sourceKey);
            nodeInputs.put(varName, value);               // prompt = optimized_prompt的值
        }
    }
    
    return nodeInputs;
}
​

代码在上述案例中的执行过程

  1. 初始状态

    // 用户输入 globalContext.put("user_input", "画一只可爱的猫咪");

    // 映射关系 mappings.put("node_1.output_optimized_prompt", "node_2.input_prompt"); mappings.put("node_2.output_image_url", "node_3.input_image");

  2. 执行节点 1

    // 节点1输入准备 node1Inputs = prepareNodeInputs("node_1_prompt_optimize", globalContext, mappings); // node1Inputs = {"user_input": "画一只可爱的猫咪"}

    // 执行节点1 node1Outputs = executePromptOptimization(node1Inputs); // node1Outputs = {"optimized_prompt": "A cute cat sitting on a windowsill, photorealistic, 4k"}

    // 保存到全局上下文 globalContext.put("node_1.optimized_prompt", "A cute cat sitting on a windowsill, photorealistic, 4k");

  3. 执行节点2

    // 节点2输入准备 node2Inputs = prepareNodeInputs("node_2_image_generate", globalContext, mappings);

    // 查找映射:node_2.input_prompt -> node_1.output_optimized_prompt // 从globalContext获取:node_1.optimized_prompt = "A cute cat sitting on a windowsill, photorealistic, 4k" // node2Inputs = {"prompt": "A cute cat sitting on a windowsill, photorealistic, 4k"}

    // 执行节点2 node2Outputs = executeImageGeneration(node2Inputs); // node2Outputs = {"image_url": "https://cdn.example.com/generated_cat.jpg"}

    // 保存到全局上下文 globalContext.put("node_2.image_url", "https://cdn.example.com/generated_cat.jpg");

  4. 执行节点3

    // 节点3输入准备 node3Inputs = prepareNodeInputs("node_3_story_generate", globalContext, mappings);

    // 查找映射:node_3.input_image -> node_2.output_image_url // 从globalContext获取:node_2.image_url = "https://cdn.example.com/generated_cat.jpg" // node3Inputs = {"image": "https://cdn.example.com/generated_cat.jpg"}

    // 执行节点3生成故事...

3,调度图

简化代码展示

// 简化的执行调度逻辑
public class WorkflowScheduler {
    private Map<String, Node> nodes;
    private Map<String, Edge> edges;
​
    public void execute(String startNodeId, Map<String, Object> initialInputs) {
        // 1. 构建执行图
        Map<String, List<String>> graph = buildExecutionGraph();
​
        // 2. 拓扑排序确定执行顺序
        List<String> executionOrder = topologicalSort(graph);
​
        // 3. 按顺序执行节点
        Map<String, Object> context = new HashMap<>(initialInputs);
        for (String nodeId : executionOrder) {
            Node node = nodes.get(nodeId);
            Map<String, Object> nodeInputs = extractNodeInputs(node, context);
            Map<String, Object> nodeOutputs = node.execute(nodeInputs);
            context.putAll(nodeOutputs);
        }
    }
}

里面设计的知识点

- **拓扑排序**:确保节点按依赖关系顺序执行
- **状态机**:每个节点有等待、执行中、成功、失败等状态
- **错误处理**:支持重试机制和降级策略
- **并发控制**:可并行执行无依赖关系的节点

4,画图深度理解

  1. 三节点案例的 时序图

    ```mermaid 代码例

    sequenceDiagram

    participant U as 用户

    participant W as 工作流引擎

    participant N1 as 节点1 提示词优化器

    participant N2 as 节点2 图片生成器

    participant N3 as 节点3 故事生成器

    participant C as 全局上下文

    Note over U,C: 初始化阶段

    U->>W: 输入:"画一只可爱的猫咪"

    W->>C: 存储全局变量 user_input = "画一只可爱的猫咪"

    Note over W,C: 执行节点1

    W->>W: 准备节点1输入参数

    W->>C: 查询映射关系 node_1.input_user_text

    C-->>W: 返回:user_input变量

    W->>N1: 执行提示词优化 输入:user_input

    N1-->>W: 返回优化结果 optimized_prompt

    W->>C: 存储节点1输出 node_1.optimized_prompt

    Note over W,C: 执行节点2(关键变量传递)

    W->>W: 准备节点2输入参数

    W->>C: 查询映射关系 node_2.input_prompt ← node_1.output_optimized_prompt

    C-->>W: 返回:optimized_prompt值

    W->>N2: 执行图片生成 输入:prompt = optimized_prompt

    N2-->>W: 返回生成图片 image_url

    W->>C: 存储节点2输出 node_2.image_url

    Note over W,C: 执行节点3

    W->>W: 准备节点3输入参数

    W->>C: 查询映射关系 node_3.input_image ← node_2.output_image_url

    C-->>W: 返回:image_url值

    W->>N3: 执行故事生成 输入:image = image_url

    N3-->>W: 返回幽默故事 story_text

    W->>C: 存储节点3输出 node_3.story_text

    Note over W,U: 输出结果

    W->>U: 返回最终结果 图片URL + 故事文本

    图示:

    image-20260123222628330

  2. Handle映射关系详细图

    mermaid 代码

    graph LR

    subgraph "节点1输出端口"

    N1_OUT1[output_optimized_prompt Handle ID]:::handle

    N1_VAR1[optimized_prompt 变量名]:::variable

    N1_VAL1["A cute cat sitting on... 变量值"]:::value

    end

    subgraph "映射关系"

    MAP1[sourceHandle → targetHandle 变量传递桥梁]:::mapping

    end

    subgraph "节点2输入端口"

    N2_IN1[input_prompt Handle ID]:::handle

    N2_VAR1[prompt 变量名]:::variable

    N2_VAL1["A cute cat sitting on... 接收到的值"]:::value

    end

    N1_OUT1 --> MAP1

    N1_VAR1 --> MAP1

    N1_VAL1 --> MAP1

    MAP1 --> N2_IN1

    MAP1 --> N2_VAR1

    MAP1 --> N2_VAL1

    %% 样式定义

    classDef handle fill:#e1f5fe,stroke:#01579b,stroke-width:2px

    classDef variable fill:#f3e5f5,stroke:#6a1b9a,stroke-width:2px

    classDef value fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px

    classDef mapping fill:#fff3e0,stroke:#e65100,stroke-width:2px

图“

image-20260123222840094

3.完整工作流程状态图

image-20260123223025849

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐