前面几篇已经有了:

  • 基于向量库的 RAG(含多模态);
  • 多智能体中台(不同 Agent 各司其职);
  • 监控、日志、成本估算;
  • 向量数据库选型与性能调优。

实际写代码时,你多半是用「一长串 Python 逻辑」把这些 Agent 串起来的,比如:

# 伪代码
res1 = qa_agent.handle(...)
res2 = tool_agent.handle(res1, ...)
res3 = summary_agent.handle(res2, ...)
return res3

这个方式在 Demo 阶段没问题,但只要业务流程一复杂,就会马上遇到这些痛点:

  • if/else 分支越来越多,很难一眼看懂“流程图”;
  • 想插入一个新步骤,要改很多地方;
  • 无法直观追踪「一次调用走过哪些节点」;
  • 很难做「局部重试」「分支并行」。

这些问题,本质上都是工作流编排问题
这篇就用 LangGraph 的「图式工作流」思路,带你构建一套:

以“有向图”来描述和执行多智能体工作流的框架
让每个 Agent 变成图上的一个节点,流程变成一张可视化的有向图。


一、图式工作流的核心思想

一句话概括:

不再写“线性脚本”,而是写一张「有向图」,
每个节点是一个 Agent 或工具,每条边代表任务流向。

一个典型流程可以画成这样的结构(文字版):

用户请求
   │
   ▼
[路由节点 Router]
   ├──(问答)────► [QA_Agent] ──► [Answer_Formatter]
   ├──(数据处理)► [Data_Agent] ─► [Report_Agent]
   └──(多步流程)► [Planner] ─────► [执行子图 Subgraph]

优点很直接:

  • 流程结构可视化:一眼能看出有哪些节点、怎么连的;
  • 方便扩展:新增一个节点,只要接到图上合适的位置;
  • 易于调试:可以在节点级别记录输入/输出、耗时、错误;
  • 更适合并行:天然可以在图上跑分支并行。

下面用一个「多步骤任务处理」的小例子,从 0 开始搭一套极简图式工作流引擎(思路与 LangGraph 类似,但实现更轻量,更方便你直接改到项目里用)。


二、定义节点(Node)与有向边(Edge)

我们先抽象两个基础概念:

  • 节点(Node)​:可以是一个 Agent、一个工具调用、一个小函数;
  • 边(Edge)​:从节点 A 指向节点 B 的有向边,表示“执行完 A 之后进入 B”。

2.1 Node 抽象

from abc import ABC, abstractmethod
from typing import Dict, Any

class Node(ABC):
    """图上的一个节点:可以是 Agent、工具或简单函数"""

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        :param state: 当前工作流的状态(上下文)
        :return: 更新后的状态
        """
        pass

这里的 state 是一个「全局上下文」,在整个图中流转,类似:

{
  "user_query": "...",
  "retrieved_docs": [...],
  "answer": "...",
  "plan": [...],
  ...
}

2.2 Edge 与 Graph

from typing import Callable, List, Optional

class Edge:
    def __init__(self,
                 source: str,
                 target: str,
                 condition: Optional[Callable[Dictstr, Any](# "Reference Dictstr, Any \|\|\| __GENERATING_DETAILS__"), bool]] = None):
        """
        :param source: 源节点名称
        :param target: 目标节点名称
        :param condition: 条件函数,返回 True 才会走这条边
        """
        self.source = source
        self.target = target
        self.condition = condition


class Graph:
    """极简有向图工作流"""

    def __init__(self):
        self.nodes: Dict[str, Node] = {}
        self.edges: List[Edge] = []
        self.start_node: Optional[str] = None

    def add_node(self, node: Node, is_start: bool = False):
        self.nodes[node.name] = node
        if is_start:
            self.start_node = node.name

    def add_edge(self,
                 source: str,
                 target: str,
                 condition: Optional[Callable[Dictstr, Any](# "Reference Dictstr, Any \|\|\| __GENERATING_DETAILS__"), bool]] = None):
        self.edges.append(Edge(source, target, condition))

    def _get_next_nodes(self, current: str, state: Dict[str, Any]) -> List[str]:
        result = []
        for e in self.edges:
            if e.source != current:
                continue
            if e.condition is None or e.condition(state):
                result.append(e.target)
        return result

    def run(self, init_state: Dict[str, Any]) -> Dict[str, Any]:
        if not self.start_node:
            raise RuntimeError("未设置起始节点")
        state = dict(init_state)
        current = self.start_node

        visited = set()
        while current is not None:
            node = self.nodes[current]
            # 这里可以加日志/监控
            state = node.run(state)

            visited.add(current)
            next_nodes = self._get_next_nodes(current, state)

            if not next_nodes:
                # 无后继节点,结束
                break
            elif len(next_nodes) == 1:
                current = next_nodes[0]
            else:
                # 简化:多分支时只取第一个,你可以改成并行执行
                current = next_nodes[0]

        return state

这个 Graph 就是一个最小可用的「工作流引擎」:
你只需要实现每个节点的 run,再用 add_node + add_edge 把它们接成一张图,调用 graph.run() 即可执行整套流程。


三、用节点封装 Agent / 工具

下面我们以一个「多步骤任务」为例,构建 4 个节点:

  1. RouterNode:根据用户输入,决定走什么流程;
  2. RetrieveNode:从向量库/RAG 检索文档;
  3. AnswerNode:调用大模型生成回答;
  4. SummaryNode:对多轮结果做最终总结。

3.1 RouterNode:路由节点

class RouterNode(Node):
    def __init__(self, llm_client):
        super().__init__("router")
        self.llm = llm_client

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        query = state["user_query"]

        # 这里用简单规则/模型判断意图
        # 真实项目里可以用分类模型或少样本提示词
        if "总结" in query or "概括" in query:
            intent = "summary"
        elif "怎么" in query or "如何" in query:
            intent = "qa"
        else:
            intent = "qa"

        state["intent"] = intent
        return state

3.2 RetrieveNode:检索节点

class RetrieveNode(Node):
    def __init__(self, retriever):
        super().__init__("retrieve")
        self.retriever = retriever

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        query = state["user_query"]
        docs = self.retriever.retrieve(query, k=5)
        state["retrieved_docs"] = docs
        return state

3.3 AnswerNode:问答节点

class AnswerNode(Node):
    def __init__(self, llm_client):
        super().__init__("answer")
        self.llm = llm_client

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        docs = state.get("retrieved_docs", [])
        context = "\n\n".join(docs) if isinstance(docs, list) else str(docs)
        query = state["user_query"]

        messages = [
            {"role": "system", "content": "你是一个文档问答助手,请基于给定上下文回答。"},
            {"role": "user", "content": f"问题:{query}\n\n上下文:\n{context}"}
        ]
        answer = self.llm.chat(messages, scene="qa", agent="answer_agent")
        state["answer"] = answer
        return state

3.4 SummaryNode:总结节点

class SummaryNode(Node):
    def __init__(self, llm_client):
        super().__init__("summary")
        self.llm = llm_client

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        # 这里假设有一个历史答案列表
        answers = state.get("history_answers", [])
        if "answer" in state:
            answers.append(state["answer"])
        state["history_answers"] = answers

        summary_input = "\n\n".join(
            f"- {a}" for a in answers
        )

        messages = [
            {"role": "system", "content": "你是一个总结助手,请对多条答复做归纳。"},
            {"role": "user", "content": f"请对下面内容做整体总结:\n{summary_input}"}
        ]
        summary = self.llm.chat(messages, scene="summary", agent="summary_agent")
        state["final_summary"] = summary
        return state

四、把节点连成一张图(Graph)

现在我们有了 4 个节点,接下来把它们用有向边连起来:

graph = Graph()

# 注册节点
graph.add_node(RouterNode(llm_client), is_start=True)
graph.add_node(RetrieveNode(retriever))
graph.add_node(AnswerNode(llm_client))
graph.add_node(SummaryNode(llm_client))

# 连边:Router → Retrieve
graph.add_edge("router", "retrieve")

# Retrieve → Answer
graph.add_edge("retrieve", "answer")

# Answer → Summary(仅当意图是 summary 时才走)
def need_summary(state):
    return state.get("intent") == "summary"

graph.add_edge("answer", "summary", condition=need_summary)

执行一次完整流程:

init_state = {"user_query": "请根据文档内容帮我总结一下这套系统的核心功能"}
final_state = graph.run(init_state)

print("中间回答:", final_state.get("answer"))
print("最终总结:", final_state.get("final_summary"))

你会得到一个「图式执行」的完整链路,而不是一长串 if/else。


五、与多智能体中台结合:每个 Agent 变成一个节点

在你现有的多智能体中台里,每个 Agent 已经是一个类,比如 QAAgentToolAgentReportAgent 等。

你可以很自然地定义一个「AgentNode」,把任意 Agent 包装成图节点:

class AgentNode(Node):
    def __init__(self, agent):
        super().__init__(agent.meta()["name"])  # 假设每个 agent 有 meta
        self.agent = agent

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        # 把 state 映射成 agent 需要的 task/context 结构
        task = {
            "payload": {
                "message": state["user_query"],
                "state": state
            }
        }
        result = self.agent.handle(task, state)
        # 合并结果到全局 state
        state.update(result.get("context_updates", {}))
        if "answer" in result:
            state["answer"] = result["answer"]
        return state

这样你可以:

  • 用一张 Graph 描述多个 Agent 的协作顺序;
  • 按节点级粒度加入监控、日志、超时控制;
  • 在图中插入「人类审核节点」、「工具节点」等各种能力。

六、如何扩展:并行执行与子图(Subgraph)

上面的 Graph.run 是顺序执行的,如果想要并行跑分支,可以做一个简单扩展:

  1. _get_next_nodes 返回多个目标节点时,不是取第一个,而是:

    • 为每个目标节点起一个子协程/线程/任务;
    • 收集各子任务的结果合并回 state。
  2. 对于一个复杂任务,可以定义一个 SubgraphNode,内部再持有一张子图:

class SubgraphNode(Node):
    def __init__(self, name: str, subgraph: Graph):
        super().__init__(name)
        self.subgraph = subgraph

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        # 把当前 state 作为子图的初始 state
        sub_state = self.subgraph.run(state)
        # 子图执行完后的状态合并回上层
        state.update(sub_state)
        return state

这样,大型流程可以拆成若干「子图」,每个子图负责一个子任务(比如「数据采集子图」「文档分析子图」「报告生成子图」),整体结构就会非常清晰。


七、监控与调试:按节点记录 Trace

有了图式结构,做监控就简单很多:

  • Graph.run 中,每次进入节点前后记录:
    • 节点名称;
    • 执行开始/结束时间;
    • 输入/输出大小;
    • 是否抛异常;

示例(只展示关键部分):

import time

class Graph:
    ...
    def run(self, init_state: Dict[str, Any]) -> Dict[str, Any]:
        ...
        trace = []
        while current is not None:
            node = self.nodes[current]
            start = time.time()
            try:
                state = node.run(state)
                status = "success"
            except Exception as e:
                status = f"error:{e.__class__.__name__}"
                # 也可以选择中断或跳过
                raise
            finally:
                end = time.time()
                trace.append({
                    "node": current,
                    "status": status,
                    "duration_ms": (end - start) * 1000
                })
            ...
        state["trace"] = trace
        return state

这样,每次请求的 state["trace"] 就是一条完整的「执行路径」,你可以:

  • 打到日志里排查问题;
  • 接入监控系统画成「各节点耗时分布图」。

这和你之前做的大模型调用监控是互补关系:
一个看「LLM 维度」,一个看「工作流节点维度」。


八、如何在现有项目里渐进接入?

给一个尽量务实的迁移路线:

  1. 选一个最简单的流程先图式化

    • 比如「RAG 问答」:Router → Retrieve → Answer
    • 用这篇的 Graph + 几个 Node 先跑起来。
  2. 把已有的 1–2 个 Agent 包装成 Node

    • AgentNode 包装 QAAgent / PolicyAgent 之类;
    • 在 Graph 里替换原来的函数调用。
  3. 逐步把复杂逻辑从 if/else 拆到图上

    • 把分支条件写成 Edge 的 condition
    • 把多步流程拆成多个节点,而不是一个超级函数。
  4. 最后再考虑并行执行、子图、可视化编辑器

    • 并行执行可以先用线程池/协程实现;
    • 可视化编辑可以后面用 JSON/YAML 描述图结构,再渲染成前端流程图。

做到这里,你的多智能体系统就从:

「一堆 Agent 互相调用的脚本」

升级成了:

「一张可以清晰画出来、可以动态调整的有向图工作流」,
既容易维护,又更适合做后续的自动化测试、回溯和优化。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐