本文全面介绍AI智能体的概念、分类及实现技术,重点讲解CoT思维链、Agent Loop和ReAct模式等核心技术。通过分析OpenManus开源框架,详细阐述自主规划智能体的架构设计与实现原理,并基于Spring AI框架指导读者自主开发Manus智能体。最后探讨智能体工作流、OWL框架和A2A协议等前沿技术,为构建具有自主决策能力的AI系统提供完整解决方案。

前排提示,文末有大模型AGI-CSDN独家资料包哦!

本节重点

学习 AI 智能体的概念和构建方法,掌握如何实现具有自主决策能力的 AI 系统。通过 CoT 思维链、ReAc⁠t 模式等核心技术,深入理解智能体的工作原理,并参考 OpenManus 开源框架实战开发能够自主规划执行任务的 AI 智能体。

具体内容包括:

  • AI 智能体概念与特点
  • 智能体实现关键技术
  • 使用 AI 智能体的多种方式
  • OpenManus 实现原理
  • 自主实现 Manus 智能体
  • 智能体工作流编排
  • A2A 协议

一、什么是智能体?

智能体(Agent)是一个能够感知环境、进行推理、制定计划、做出决策并自主采取行动以实现特定目标的 AI 系统。它以大语言模型为核心,集成 记忆、知识库和工具 等能力为一体,构造了完整的决策能力、执行能力和记忆能力,就像一个有主观能动性的人类一样。

与普通的 AI 大模型不同,智能体能够:

  1. 感知环境:通过各种输入渠道获取信息(多模态),理解用户需求和环境状态
  2. 自主规划任务步骤:将复杂任务分解为可执行的子任务,并设计执行顺序
  3. 主动调用工具完成任务:根据需要选择并使用各种外部工具和 API,扩展自身能力边界
  4. 进行多步推理:通过思维链(Chain of Thought)逐步分析问题并推导解决方案
  5. 持续学习和记忆过去的交互:保持上下文连贯性,利用历史交互改进决策
  6. 根据环境反馈调整行为:根据执行结果动态调整策略,实现闭环优化

智能体的分类

跟人的生长阶段一样,智能体也是可以不断进化的。⁠按照自主性和规划能力,智能体可以分为几个层次:

1)反应式智能体:仅根据当前输入和固定规则做出反应,类似简单的聊天机器⁠人,没有真正的规划能力。23 年时的大多数 AI 聊天机器人应用,几乎都是反应式智能体。

2)有限规划智能体:能进行简单地多步骤执行,但执行路径通常是预设的或有严格限制的。鉴定为 “能干事、但干不了复杂的大事”。24 年流行的很多可联网搜索内容、调用知识库和工具的 AI 应用,都属于这类智能体。比如 ChatGPT + Plugins:

3)自主规划智能体:也叫目标导向智能体,能够根据任⁠务目标自主分解任务、制定计划、选择工具并一步步执行,直到完成任务。

比如 25 年初很火的 Manus 项目,它的核心亮点在于其 “自主执行” 能力。据官方介绍,Manus 能够在虚拟机中调用各⁠种工具(如编写代码、爬取数据)完成任务。其应用场景覆盖旅行规划、股票分析、教育内容生成等 40 余个领域,所以在当时给了很多人震撼感。

但其实早在这之前,就有类似的项目了,比如 AutoGPT,所以 Manus 大火的同时也被人诟病 “会营销而已”。甚至没隔多久就有小团队开源了 Manus 的复刻版 —— OpenManus,这类智能体通过 “思考-行动-观察” 的循环模式工作,能够持续推进任务直至完成目标。

需要注意,自主规划能力是智能体发展的重要方向,但并非所有应用场景都⁠需要完全的自主规划能力。在某些场景中,限制智能体的自主性反而能提高效率和安全性。

二、智能体实现关键技术

在自主开发智能体前,我们要先了解一下智能体的关⁠键实现技术,也就是方案设计阶段做的事情。

CoT 思维链

CoT(Chain of Thought)思维链是一种让 AI 像人类一样 “思考” 的技术,帮助 AI 在处理复杂问题⁠时能够按步骤思考。对于复杂的推理类问题,先思考后执行,效果往往更好。而且还可以让模型在生成答案时展示推理过程,便于我们理解和优化 AI。

CoT 的实现方式其实很简单,可以在输入 Prompt 时,给模型提供额外的提示或引导,比如 “让我们一步一步思考这个问题”,让模型以逐⁠步推理的方式生成回答。还可以运用 Prompt 的优化技巧 few shot,给模型提供包含思维链的示例问题和答案,让模型学习如何构建自己的思维链。

在 OpenManus 早期版本中,可以看到实⁠现 CoT 的系统提示词:

You are an assistant focused on Chain of Thought reasoning. For each question, please follow these steps:  1. Break down the problem: Divide complex problems into smaller, more manageable parts  2. Think step by step: Think through each part in detail, showing your reasoning process  3. Synthesize conclusions: Integrate the thinking from each part into a complete solution  4. Provide an answer: Give a final concise answer  Your response should follow this format:  Thinking: [Detailed thought process, including problem decomposition, reasoning for each step, and analysis]  Answer: [Final answer based on the thought process, clear and concise]  Remember, the thinking process is more important than the final answer, as it demonstrates how you reached your conclusion.

我们来测试一下:

```plaintext
// 普通提示词(直接要结果)String normalPrompt = “帮我规划周末野餐,直接说方案”;// CoT提示词(引导分步思考)String cotPrompt = “”" 帮我规划周末野餐,请按步骤思考: 1. 先考虑需要做哪些准备(天气、地点、物品等); 2. 分析每个步骤的具体做法; 3. 最后整理成完整方案。 请先输出你的思考过程,再给结果。 思考:[你的思考步骤] 方案:[最终方案]“”";// 调用AIString response = chatClient.prompt().user(cotPrompt).call().content();


### Agent Loop 执行循环

Agent Loop 是智能体最核心的工作机制,指⁠智能体在没有用户输入的情况下,自主重复执行推理和工具调用的过程。

在传统的聊天模型中,每次用户提问后,AI 回复一次就结束了。但在智能体中,AI 回复后可能会继续自主⁠执行后续动作(如调用工具、处理结果、继续推理),形成一个自主执行的循环,直到任务完成(或者超出预设的最大步骤数)。

Agent Loop 的实现很简单,参考代码如下:

```plaintext
public String execute() {      List<String> results = new ArrayList<>();      while (currentStep < MAX_STEPS && !isFinished) {          currentStep++;          // 这里实现具体的步骤逻辑  String stepResult = executeStep();          results.add("步骤 " + currentStep + ": " + stepResult);      }      if (currentStep >= MAX_STEPS) {          results.add("达到最大步骤数: " + MAX_STEPS);      }      return String.join("\n", results);  }public class SimpleAgent {    // 最大步骤数(防止无限循环)    private static final int MAX_STEPS = 5;    private int currentStep = 0;    private boolean isTaskFinished = false;    // 执行任务的循环方法    public String executeTask(String task) {        List<String> results = new ArrayList<>();        results.add("开始执行任务:" + task);        // 循环执行,直到任务完成或达到最大步骤        while (currentStep < MAX_STEPS && !isTaskFinished) {            currentStep++;            String stepResult = doStep(task); // 执行一步任务            results.add("步骤" + currentStep + ":" + stepResult);            // 检查是否完成(实际中根据任务结果判断)            if (stepResult.contains("完成")) {                isTaskFinished = true;            }        }        if (!isTaskFinished) {            results.add("已达最大步骤(" + MAX_STEPS + "),任务中止");        }        return String.join("\n", results);    }    // 每一步的具体执行逻辑    private String doStep(String task) {        // 实际中这里会调用AI推理、工具等        if (currentStep == 1) {            return "查询到周末天气晴朗,适合野餐";        } else if (currentStep == 2) {            return "选定中央公园作为野餐地点";        } else if (currentStep == 3) {            return "生成物品清单,任务完成";        }        return "等待下一步...";    }}

ReAct 模式

ReAct(Reasoning + Acting)是一种结合推理和行动的智能体架构,它模仿人类解决问题时 ”思考 - 行动 - 观察” 的循环,目的是通过交互式决策解决复杂任务,是目前最常用的智能体工作模式之一。

核心思想:

  1. 推理(Reason):将原始问题拆分为多步骤任务,明确当前要执行的步骤,比如 “第一步需要打开网站”。
  2. 行动(Act):调用外部工具执行动作,比如调用搜索引擎、打开浏览器访问网页等。
  3. 观察(Observe):获取工具返回的结果,反馈给智能体进行下一步决策。比如将打开的网页代码输入给 AI。
  4. 循环迭代:不断重复上述 3 个过程,直到任务完成或达到终止条件。

ReAct 流程如图:

示例实现代码:

void executeReAct(String task) {      String state = "开始";      while (!state.equals("完成")) {          // 1. 推理 (Reason)  String thought = "思考下一步行动";          System.out.println("推理: " + thought);          // 2. 行动 (Act)  String action = "执行具体操作";          System.out.println("行动: " + action);          // 3. 观察 (Observe)  String observation = "观察执行结果";          System.out.println("观察: " + observation);          // 更新状态          state = "完成";      }  }

所需支持系统

除了基本的工作机制外,智能体的实现还依赖于很多支持系统。

1)首先是 AI 大模型,这个就不多说了,大模型提供了思⁠考、推理和决策的核心能力,越强的 AI 大模型通常执行任务的效果越好。

2)记忆系统

智能体需要记忆系统来存储对话历史、中间结果和执行状态,这样它才能够进行连续对话并根据历史对⁠话分析接下来的工作步骤。之前我们学习过如何使用 Spring AI 的 ChatMemory 实现对话记忆。

3)知识库

尽管大语言模型拥有丰富的参数知识,但针对特定领域的专业知识往往需要额外的知识库支持。之⁠前我们学习过,通过 RAG 检索增强生成 + 向量数据库等技术,智能体可以检索并利用专业知识回答问题。

4)工具调用

工具是扩展智能体能力边界的关键,智能体通过工具调用可以访问搜索引擎、数据库、AP⁠I 接口等外部服务,极大地增强了其解决实际问题的能力。当然,MCP 也可以算是工具调用的一种。

💡 这里分享一种特殊的工具调用 —— Compute Use,它允许智能体直接与计算环境交互,⁠比如执行代码、操作文件系统等。目前 Claude 等平台已提供此类能力,感兴趣的同学可以阅读下面的资料学习:

  • Claude 支持 Compute Use:https://docs.anthropic.com/en/docs/agents-and-tools/computer-use
  • Compute Use 实现示例:https://github.com/anthropics/anthropic-quickstarts/tree/main/computer-use-demo
  • Compute Use 开源实现:https://github.com/e2b-dev/open-computer-use、https://github.com/showlab/computer_use_ootb

使用 Compute Use 功能时需要注意,这些操作会⁠在我们自己的系统上执行,建议在虚拟环境中运行,防止意外操作影响实际系统。

综合上面 4 类技术,并且结合 CoT、Agent Loop、ReAct 等机制⁠(可以总称为 “规划执行机制”),我们就可以构建一个完整的、有自主规划能力的智能体系统啦。

三、使用 AI 智能体

有 3 种方式可以使用 AI 智能体,之前的教⁠程中其实我们已经有接触过,这里再快速复习一遍。

1、平台中使用

许多 AI 大模型开发平台已经提供了智能体创建⁠和使用功能,最简单的方式就是直接在这些平台上创建和使用智能体。

比如可以在阿里云百炼、Dify 等平台上创建智能体

2、软件中使用

AI 开发工具 Cursor、tare 就集成了 AI 智能体,可以帮我们生成完整的项目代码、或⁠者解释项目代码。选择 Agent 模式就可以直接使用了,建议同时开启 Thinking 深度思考:

运行效果如图,典型的 CoT + Agent Loop + ReAct 模式:

3、程序中使用

之前有讲解过,如果使用 AI 大模型开发平台创建了智能体⁠,那么可以直接通过 SDK 或 API 在自己的程序中调用智能体。

当然也可以完全通过编程自主开发智能体,之前我们通过 Spring AI 创建⁠的模拟面试官 就已经是个智能体了,包含了大模型决策、记忆能力、知识库集成和工具调用。

ChatResponse response = chatClient          .prompt()          .user(message)          // 记忆          .advisors(spec -> spec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId)                  .param(CHAT_MEMORY_RETRIEVE_SIZE_KEY, 10))          // 开启日志,便于观察效果          .advisors(new MyLoggerAdvisor())          // 知识库          .advisors(new QuestionAnswerAdvisor(interviewAppVectorStore))          // 工具调用          .tools(toolCallbackProvider)          .call()          .chatResponse();

但是,这个实现还不具备完整的自主规划和执行能力⁠,下面我们将学习更高级的自主规划智能体架构。

四、OpenManus 实现原理

在我们开发超级智能体前,可以先学习下优秀的开源项目。下面以 OpenManus 项目 为例,会带大家从 0 开始阅读项目源码,学到超级智能体实现方式的同时,帮助大家掌握快速学习开源项目的方法。

💡 友情提示,带大家分析的源码是 2025 年 5⁠ 月 16 日的,后续源码可能会持续更新,重点在于学习分析源码的套路。

快速运行

学习开源项目的第一步是阅读官方提供的 README.md 项目介绍文档,先把项目跑起来,借此熟悉业务流程,而不是一上来就啃源码。

OpenManus 官方提供了一个 Demo ⁠视频,我们就不必再自己搭建环境来跑项目了:

下面让我们看一个 OpenManus 的对话示例,帮助我们了解它的工作⁠流程。假设用户要完成一个任务 —— 在网站上寻找热门文章,对话如下:

用户:帮我在网站上找到最近的几篇热门文章  系统:OpenManus 已启动,正在处理您的请求...  助手:我会帮您在网站查找最近的热门文章。让我先用浏览器工具访问网站。  系统:正在调用浏览器工具...  助手:我成功打开了浏览器并访问了网站。从网站内容来看,我现在需要导航到热门文章区域,然后提取这些文章的信息。  系统:正在执行浏览器操作...  助手:我已在网站找到了热门文章区域,以下是最近的几篇热门文章:  1. 《Spring Boot 实战:构建微服务应用》- 阅读量:3200  2. 《前端框架对比:React vs Vue vs Angular》- 阅读量:2850  3. 《Python 数据分析入门:从 Pandas 到可视化》- 阅读量:2400  4. 《Docker 容器化应用部署最佳实践》- 阅读量:2150  5. 《算法学习路线:从入门到精通》- 阅读量:1980  这些文章是根据网站显示的阅读量排序的热门文章。  系统:正在调用终止工具...  助手:已完成任务,成功找到网站上的热门文章。如果您对某篇文章感兴趣或有其他需求,请随时告诉我。

在这个示例中,OpenManus 首先理解用户任务,然后选择合适的工具(浏览器工具)来执行任务,最后整理信息并返回结果。整个过⁠程体现了 ReAct 模式的核心要素:思考(Thinking)、行动(Action)和观察(Observation)的循环,直到任务完成。

OpenManus 整体架构

了解了 OpenManus 的核心功能后,先 从整体到局部 地观察 OpenManus 的架构,简单来说就是看文件夹的组织形式、文件的命名规则、以及文件之间的关联,暂时不用深入源码。

1、整体文件夹组织形式

先看文件夹的组织形式,见名知意:

2、agent 目录

agent 目录是 OpenManus 实现的核心,采用⁠了分层的代理架构,不同层次的代理负责不同的功能,这样更利于系统的扩展。

OpenManus 的代理架构主要包含以下几层:

  • BaseAgent:最基础的代理抽象类,定义了所有代理的基本状态管理和执行循环
  • ReActAgent:实现 ReAct 模式的代理,具有思考(Think)和行动(Act)两个主要步骤
  • ToolCallAgent:能够调用工具的代理,继承自 ReActAgent 并扩展了工具调用能力
  • Manus:具体实现的智能体实例,集成了所有能力并添加了更多专业工具

还有更多适用于特定领域的智能体实例,比如 DataAnalysis 数据分析 Agent、SWE 软件开发工程⁠师 Agent、MCP 服务交互 Agent、Browser 浏览器操作 Agent,它们都继承了 ToolCallAgent。

3、tool 目录

tool 目录定义了各种各样的工具,比如网页搜⁠索、文件操作、询求用户帮助、代码执行器等等:

4、prompt 目录

prompt 目录定义了整个项目中可能会用到的提⁠示词。从下图中我们可以看到提示词写的比较专业,这块也是比较值得学习的。

5、其他支持

为了实现完整的智能体功能,OpenManus ⁠依赖以下关键组件:

  • 记忆系统:使用 Memory 类存储对话历史和中间状态
  • LLM 大模型:通过 LLM 类提供思考和决策能力
  • 工具系统:提供 BaseTool 和 ToolCollection 类扩展智能体的能力边界
  • 流程控制:通过 AgentState 和执行循环管理状态转换和任务流程

在 OpenManus 中,这些都是自主实现的:

AI 智能体核心实现

了解整体架构后,我们重点学习 Agent 分层代理架构。

1、BaseAgent

BaseAgent 是所有代理的基础,定义了代理状态管理和执行循环的核心逻辑。查看 base.py 文件,关键代码就是 Agent Loop 的实现,通过 while 实现循环,并且定义了死循环检查机制:

class BaseAgent(BaseModel, ABC):      async def run(self, request: Optional[str] = None) -> str:          """执行代理的主循环"""          if self.state != AgentState.IDLE:              raise RuntimeError(f"Cannot run agent from state: {self.state}")          if request:              self.update_memory("user", request)          results: List[str] = []          async with self.state_context(AgentState.RUNNING):              while (self.current_step < self.max_steps and                     self.state != AgentState.FINISHED):                  self.current_step += 1                  step_result = await self.step()                  # 检查是否陷入循环  if self.is_stuck():                      self.handle_stuck_state()                  results.append(f"Step {self.current_step}: {step_result}")              if self.current_step >= self.max_steps:                  self.current_step = 0                  self.state = AgentState.IDLE                  results.append(f"Terminated: Reached max steps ({self.max_steps})")          return "\n".join(results) if results else "No steps executed"      @abstractmethod  async def step(self) -> str:          """执行单步操作,必须由子类实现"""

这里其实使用了模板方法设计模式,父类定义执行流⁠程,具体的执行方法(step)交给子类实现。

2、ReActAgent

ReActAgent 实现了 ReAct 模式,将代理的执行过程分为思考(Think)和行动(Act)两个关键步骤。查看 react.py 文件:

class ReActAgent(BaseAgent, ABC):      @abstractmethod  async def think(self) -> bool:          """处理当前状态并决定下一步行动"""      @abstractmethod  async def act(self) -> str:          """执行决定的行动"""      async def step(self) -> str:          """执行单步:思考和行动"""          should_act = await self.think()          if not should_act:              return "Thinking complete - no action needed"          return await self.act()

上述代码同样运用了模板方法设计模式,这种设计体现了 ReAct 模式的核心思想,⁠也就是 “思考 - 行动 - 观察” 的循环过程。但是具体怎么思考、怎么行动,交给子类去实现。

3、ToolCallAgent

ToolCallAgent 在 ReAct 模式的基础上增加了工具调用能力,是 OpenManus 最重要的一个层次。查看 toolcall.py 文件,虽然代码比较复杂,但原理很简单,就是工具调用机制的具体实现:

  1. think:和 AI 交互思考使用什么工具
  2. act:程序执行工具
  3. observe:将结果返回给 AI
class ToolCallAgent(ReActAgent):      """能够执行工具调用的代理类"""      available_tools: ToolCollection = ToolCollection(          CreateChatCompletion(), Terminate()      )      tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO      special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])      async def think(self) -> bool:          """处理当前状态并使用工具决定下一步行动"""          # 添加下一步提示到用户消息  if self.next_step_prompt:              user_msg = Message.user_message(self.next_step_prompt)              self.messages += [user_msg]          # 请求 LLM 选择工具          response = await self.llm.ask_tool(              messages=self.messages,              system_msgs=([Message.system_message(self.system_prompt)]                           if self.system_prompt else None),              tools=self.available_tools.to_params(),              tool_choice=self.tool_choices,          )          # 处理工具调用          self.tool_calls = tool_calls = (              response.tool_calls if response and response.tool_calls else []          )          content = response.content if response and response.content else ""          # 添加助手消息到记忆          assistant_msg = (              Message.from_tool_calls(content=content, tool_calls=self.tool_calls)              if self.tool_calls              else Message.assistant_message(content)          )          self.memory.add_message(assistant_msg)          # 决定是否应该执行行动  return bool(self.tool_calls or content)      async def act(self) -> str:          """执行工具调用并处理结果"""          if not self.tool_calls:              # 返回最后一条消息内容,如果没有工具调用  return self.messages[-1].content or "No content or commands to execute"          results = []          for command in self.tool_calls:              # 执行工具              result = await self.execute_tool(command)              # 记录工具响应到记忆              tool_msg = Message.tool_message(                  content=result,                  tool_call_id=command.id,                  name=command.function.name,                  base64_image=self._current_base64_image,              )              self.memory.add_message(tool_msg)              results.append(result)          return "\n\n".join(results)
4、Manus

Manus 类是 OpenManus 的核心智能体实例,集成了各种工具和能力。查看 manus.py 文件:

class Manus(ToolCallAgent):      """多功能通用智能体,支持本地和 MCP 工具"""      name: str = "Manus"      description: str = "A versatile agent that can solve various tasks using multiple tools"      # 添加各种通用工具到工具集合      available_tools: ToolCollection = Field(          default_factory=lambda: ToolCollection(              PythonExecute(),              BrowserUseTool(),              StrReplaceEditor(),              AskHuman(),              Terminate(),          )      )
MCP 客户端用于远程工具访问

mcp_clients: MCPClients = Field(default_factory=MCPClients)

@classmethod  async def create(cls, **kwargs) -> "Manus":          """工厂方法创建并初始化 Manus 实例"""          instance = cls(**kwargs)          await instance.initialize_mcp_servers()          instance._initialized = True          return instance      async def think(self) -> bool:          """处理当前状态并根据上下文决定下一步行动"""          if not self._initialized:              await self.initialize_mcp_servers()              self._initialized = True          # 为浏览器工具添加特殊上下文          original_prompt = self.next_step_prompt          browser_in_use = self._check_browser_in_use()          if browser_in_use:              self.next_step_prompt = (                  await self.browser_context_helper.format_next_step_prompt()              )          result = await super().think()          # 恢复原始提示          self.next_step_prompt = original_prompt          return result

关键实现细节

学完了超级智能体的核心实现后,我们再学习一些项⁠目中比较微妙的实现细节,对我们自己开发项目也会很有帮助。

1、工具系统设计

1)工具抽象层 BaseTool

所有工具均继承自 BaseTool 抽象基类,提供统一的接口和行为:

class BaseTool(ABC, BaseModel):      name: str      description: str      parameters: Optional[dict] = None      async def __call__(self, **kwargs) -> Any:          """使用给定参数执行工具"""          return await self.execute(**kwargs)      @abstractmethod  async def execute(self, **kwargs) -> Any:          """执行工具的具体逻辑,由子类实现"""      def to_param(self) -> Dict:          """将工具转换为函数调用格式"""          return {              "type": "function",              "function": {                  "name": self.name,                  "description": self.description,                  "parameters": self.parameters,              },          }

这种设计使得每个工具都有统一的调用方式,同时具⁠有规范化的参数描述,便于 LLM 理解工具的使用方法。

2)终止工具 Terminate

Terminate 工具是一个特殊的工具,允许智能⁠体通过 AI 大模型自主决定何时结束任务,避免无限循环或者过早结束。

class Terminate(BaseTool):      name: str = "terminate"      description: str = """Terminate the interaction when the request is met OR                             if the assistant cannot proceed further with the task.                            When you have finished all the tasks, call this tool to end the work."""      parameters: dict = {          "type": "object",          "properties": {              "status": {                  "type": "string",                  "description": "The finish status of the interaction.",                  "enum": ["success", "failure"],              }          },          "required": ["status"],      }      async def execute(self, status: str) -> str:          """完成当前执行"""          return f"The interaction has been completed with status: {status}"

在 agent 源码中,有一个 special_tool_names 变量,用于指定终止工具等特殊工具:

toolcall.py 源码中,对特殊工具进行处理:

3)询问工具 AskHuman

AskHuman 工具允许智能体在遇到无法自主解决的问题时向人类寻⁠求帮助,也就是给用户一个输入框,让我们能够更好地干预智能体完成任务的过程。

class AskHuman(BaseTool):      """Add a tool to ask human for help."""      name: str = "ask_human"      description: str = "Use this tool to ask human for help."      parameters: str = {          "type": "object",          "properties": {              "inquire": {                  "type": "string",                  "description": "The question you want to ask human.",              }          },          "required": ["inquire"],      }      async def execute(self, inquire: str) -> str:          return input(f"""Bot: {inquire}\n\nYou: """).strip()

这个工具实现虽然简单,但极大地提升了智能体的实⁠用性和安全性。

4)工具集合 ToolCollection

OpenManus 设计了 ToolCollect⁠ion 类来管理多个工具实例,提供统一的工具注册和执行接口:

class ToolCollection:      """A collection of defined tools."""      def __init__(self, *tools: BaseTool):          self.tools = tools          self.tool_map = {tool.name: tool for tool in tools}      def to_params(self) -> List[Dict[str, Any]]:          return [tool.to_param() for tool in self.tools]      async def execute(self, *, name: str, tool_input: Dict[str, Any] = None) -> ToolResult:          tool = self.tool_map.get(name)          if not tool:              return ToolFailure(error=f"Tool {name} is invalid")          try:              result = await tool(**tool_input)              return result          except ToolError as e:              return ToolFailure(error=e.message)      def add_tools(self, *tools: BaseTool):          """Add multiple tools to the collection."""          for tool in tools:              self.add_tool(tool)          return self

这种设计使得 OpenManus 可以灵活地添加、移除和管理工具,实现了工具⁠系统的可插拔性。我们之前利用 Spring AI 开发工具调用时,也写了个类似的工具注册类。

2、MCP 协议支持

1)MCP 与工具系统的集成

还记得么,之前在教程中提到过 “MCP 的本质就是工具调用”,OpenManus 的实现也是遵循了这一思想。通过 MCPClients 类(继承自 ToolCollection)将 MCP 服务集成到现有工具系统中。查看 tool/mcp.py 的源码:

class MCPClients(ToolCollection):      """      A collection of tools that connects to multiple MCP servers and manages available tools through the Model Context Protocol.      """      sessions: Dict[str, ClientSession] = {}      exit_stacks: Dict[str, AsyncExitStack] = {}      description: str = "MCP client tools for server interaction"      def __init__(self):          super().__init__()  # Initialize with empty tools list          self.name = "mcp"  # Keep name for backward compatibility

2)动态工具代理

每当连接到 MCP 服务器时,OpenManus 会动态创建 MCPClientTool 实例(继承自 BaseTool)作为每个远程工具的代理。查看 tool/mcp.py 的源码,通过向 MCP 服务器发送远程请求来执行工具:

class MCPClientTool(BaseTool):      """Represents a tool proxy that can be called on the MCP server from the client side."""      session: Optional[ClientSession] = None      server_id: str = ""  # Add server identifier      original_name: str = ""      async def execute(self, **kwargs) -> ToolResult:          """Execute the tool by making a remote call to the MCP server."""          if not self.session:              return ToolResult(error="Not connected to MCP server")          try:              result = await self.session.call_tool(self.original_name, kwargs)              content_str = ", ".join(                  item.text for item in result.content if isinstance(item, TextContent)              )              return ToolResult(output=content_str or "No output returned.")          except Exception as e:              return ToolResult(error=f"Error executing tool: {str(e)}")

3)Manus 中的 MCP 集成机制

Manus 智能体 通过工具调用 实现了与 MCP 服务器的无缝集成。查看 agent/manus.py 的源码,发现本质上是把 MCP 服务提供的工具动态添加到可用的工具集合中:

async def connect_mcp_server(      self,      server_url: str,      server_id: str = "",      use_stdio: bool = False,      stdio_args: List[str] = None,  ) -> None:      """Connect to an MCP server and add its tools."""      if use_stdio:          await self.mcp_clients.connect_stdio(server_url, stdio_args or [], server_id)      else:          await self.mcp_clients.connect_sse(server_url, server_id)      # 关键实现:动态添加该服务器的工具到可用工具集合      new_tools = [          tool for tool in self.mcp_clients.tools if tool.server_id == server_id      ]      self.available_tools.add_tools(*new_tools)

这样一来,对大模型来说,MCP 工具与本地工具⁠的调用方式一致,很巧妙地复用了代码。

3、其他值得学习的源码

1)Python 代码执行沙箱

PythonExecute 工具实现了一个安全的 Python 代码执行环境,这是一个值得学习的安全实现。查看 tool/python_execute.py 源码:

class PythonExecute(BaseTool):      name: str = "python_execute"      async def execute(self, code: str, timeout: int = 5) -> Dict:          """安全执行 Python 代码"""          with multiprocessing.Manager() as manager:              result = manager.dict({"observation": "", "success": False})              # 安全的全局环境              safe_globals = {"__builtins__": __builtins__.__dict__.copy()}              # 使用独立进程执行代码              proc = multiprocessing.Process(                  target=self._run_code, args=(code, result, safe_globals)              )              proc.start()              proc.join(timeout)              # 超时处理  if proc.is_alive():                  proc.terminate()                  proc.join(1)                  return {                      "observation": f"Execution timeout after {timeout} seconds",                      "success": False,                  }              return dict(result)

这段代码展示了几个安全编程的最佳实践:

  • 使用独立进程隔离代码执行
  • 实现了超时机制防止无限循环
  • 截获和处理所有异常
  • 重定向标准输出以捕获打印内容

2)状态管理与上下文切换机制

BaseAgent 实现了一个优雅的状态管理和上下文切换机制:

@asynccontextmanager  async def state_context(self, new_state: AgentState):      """Context manager for safe agent state transitions."""      if not isinstance(new_state, AgentState):          raise ValueError(f"Invalid state: {new_state}")      previous_state = self.state      self.state = new_state      try:          yield      except Exception as e:          self.state = AgentState.ERROR  # Transition to ERROR on failure  raise e      finally:          self.state = previous_state  # Revert to previous state

这个上下文管理器确保了状态转换的安全性和可靠性⁠,即使在异常情况下也能正确恢复状态,是一个值得学习的设计模式。

3)工具结果统一表示与组合

OpenManus 设计了 ToolResult 类来统一表示工具执行结果,并支持结果组合。查看 tool/base.py 源码:

class ToolResult(BaseModel):      """Represents the result of a tool execution."""      output: Any = Field(default=None)      error: Optional[str] = Field(default=None)      base64_image: Optional[str] = Field(default=None)      system: Optional[str] = Field(default=None)      def __add__(self, other: "ToolResult"):          """组合两个工具结果"""          def combine_fields(field: Optional[str], other_field: Optional[str], concatenate: bool = True):              if field and other_field:                  if concatenate:                      return field + other_field                  raise ValueError("Cannot combine tool results")              return field or other_field          return ToolResult(              output=combine_fields(self.output, other.output),              error=combine_fields(self.error, other.error),              base64_image=combine_fields(self.base64_image, other.base64_image, False),              system=combine_fields(self.system, other.system),          )

这种设计使得工具结果处理更加统一和灵活,特别是⁠在需要组合多个工具结果或处理异常情况时。

五、自主实现 Manus 智能体

虽然 OpenManus 代码量很大,但其实很多代码都是在实现智能体所需的支持系统,比如调用大模型、会话记忆、工具调用能力等。如果使用 ⁠AI 开发框架,这些能力都不需要我们自己实现,代码量会简单很多。下面就让我们基于 Spring AI 框架,实现一个简化版的 Manus 智能体。

定义数据模型

新建 agent.model 包,将所有用到的数据模型(实体类、枚举类等)都放到该包下。

目前我们只需要定义 Agent 的状态枚举,用⁠于控制智能体的执行。AgentState 代码如下:

/**   * 代理执行状态的枚举类   */  public enum AgentState {      /**       * 空闲状态       */      IDLE,      /**       * 运行中状态       */      RUNNING,      /**       * 已完成状态       */      FINISHED,      /**       * 错误状态       */      ERROR  }

核心架构开发

首先定义智能体的核心架构,包括以下类:

  • BaseAgent:智能体基类,定义基本信息和多步骤执行流程
  • ReActAgent:实现思考和行动两个步骤的智能体
  • ToolCallAgent:实现工具调用能力的智能体
  • 通用全能 Manus 智能体:最终可使用的 Manus 实例
1、开发基础 Agent 类

参考 OpenManus 的实现方式,Base⁠Agent 的代码如下:

/**   * 抽象基础代理类,用于管理代理状态和执行流程。   *    * 提供状态转换、内存管理和基于步骤的执行循环的基础功能。   * 子类必须实现step方法。   */  @Data  @Slf4j  public abstract class BaseAgent {      // 核心属性  private String name;      // 提示  private String systemPrompt;      private String nextStepPrompt;      // 状态  private AgentState state = AgentState.IDLE;      // 执行控制  private int maxSteps = 10;      private int currentStep = 0;      // LLM  private ChatClient chatClient;      // Memory(需要自主维护会话上下文)  private List<Message> messageList = new ArrayList<>();      /**       * 运行代理       *       * @param userPrompt 用户提示词       * @return 执行结果       */      public String run(String userPrompt) {          if (this.state != AgentState.IDLE) {              throw new RuntimeException("Cannot run agent from state: " + this.state);          }          if (StringUtil.isBlank(userPrompt)) {              throw new RuntimeException("Cannot run agent with empty user prompt");          }          // 更改状态          state = AgentState.RUNNING;          // 记录消息上下文          messageList.add(new UserMessage(userPrompt));          // 保存结果列表          List<String> results = new ArrayList<>();          try {              for (int i = 0; i < maxSteps && state != AgentState.FINISHED; i++) {                  int stepNumber = i + 1;                  currentStep = stepNumber;                  log.info("Executing step " + stepNumber + "/" + maxSteps);                  // 单步执行  String stepResult = step();                  String result = "Step " + stepNumber + ": " + stepResult;                  results.add(result);              }              // 检查是否超出步骤限制  if (currentStep >= maxSteps) {                  state = AgentState.FINISHED;                  results.add("Terminated: Reached max steps (" + maxSteps + ")");              }              return String.join("\n", results);          } catch (Exception e) {              state = AgentState.ERROR;              log.error("Error executing agent", e);              return "执行错误" + e.getMessage();          } finally {              // 清理资源  this.cleanup();          }      }      /**       * 执行单个步骤       *       * @return 步骤执行结果       */      public abstract String step();      /**       * 清理资源       */      protected void cleanup() {          // 子类可以重写此方法来清理资源      }  }

上述代码中,我们要注意 3 点:

  1. 包含 chatClient 属性,由调用方传入具体调用大模型的对象,而不是写死使用的大模型,更灵活
  2. 包含 messageList 属性,用于维护消息上下文列表
  3. 通过 state 属性来控制智能体的执行流程
2、开发 ReActAgent 类

参考 OpenManus 的实现方式,继承自 BaseAgent,并且将 s⁠tep 方法分解为 think 和 act 两个抽象方法。ReActAgent 的代码如下:

/**   * ReAct (Reasoning and Acting) 模式的代理抽象类   * 实现了思考-行动的循环模式   */  @EqualsAndHashCode(callSuper = true)  @Data  public abstract class ReActAgent extends BaseAgent {      /**       * 处理当前状态并决定下一步行动       *       * @return 是否需要执行行动,true表示需要执行,false表示不需要执行       */      public abstract boolean think();      /**       * 执行决定的行动       *       * @return 行动执行结果       */      public abstract String act();      /**       * 执行单个步骤:思考和行动       *       * @return 步骤执行结果       */      @Override      public String step() {          try {              boolean shouldAct = think();              if (!shouldAct) {                  return "思考完成 - 无需行动";              }              return act();          } catch (Exception e) {              // 记录异常日志              e.printStackTrace();              return "步骤执行失败: " + e.getMessage();          }      }  }
3、开发 ToolCallAgent 类

ToolCallAgent 负责实现工具调用能力,继承自 R⁠eActAgent,具体实现了 think 和 act 两个抽象方法。

我们有 3 种方案来实现 ToolCallAgent:

1)基于 Spring AI 的工具调用能力,⁠手动控制工具执行。

其实 Spring 的 ChatClient 已经支持选择工具进行调用(内部完成了 think、act、observe),但这里我们要自主实现,可以使用 Spring AI 提供的 手动控制工具执行。

2)基于 Spring AI 的工具调用能力,⁠简化调用流程。

由于 Spring AI 完全托管了工具调用,我们可以直接把⁠所有工具调用的代码作为 think 方法,而 act 方法不定义任何动作。

3)自主实现工具调用能力。

也就是工具调用章节提到的实现原理:自己写 Prompt,引导 AI⁠ 回复想要调用的工具列表和调用参数,然后再执行工具并将结果返送给 AI 再次执行。

使用哪种方案呢?

如果是为了学习 ReAct 模式,让流程更清晰,推荐第一种;如果只是为了快速实现,推荐第二种;不⁠建议采用第三种方案,过于原生,开发成本高。

下面我们采用第一种方案实现 ToolCallA⁠gent,先定义所需的属性和构造方法:

/**   * 处理工具调用的基础代理类,具体实现了 think 和 act 方法,可以用作创建实例的父类   */  @EqualsAndHashCode(callSuper = true)  @Data  @Slf4j  public class ToolCallAgent extends ReActAgent {      // 可用的工具  private final ToolCallback[] availableTools;      // 保存了工具调用信息的响应  private ChatResponse toolCallChatResponse;      // 工具调用管理者  private final ToolCallingManager toolCallingManager;      // 禁用内置的工具调用机制,自己维护上下文  private final ChatOptions chatOptions;      public ToolCallAgent(ToolCallback[] availableTools) {          super();          this.availableTools = availableTools;          this.toolCallingManager = ToolCallingManager.builder().build();          // 禁用 Spring AI 内置的工具调用机制,自己维护选项和消息上下文  this.chatOptions = DashScopeChatOptions.builder()                  .withProxyToolCalls(true)                  .build();      }  }

注意,在上述代码中,我们通过将 DashScopeChatOptions 的 withPr⁠oxyToolCalls 选项设置为 true,来禁止 Spring AI 托管工具调用,而是我们自主实现。

虽然官方提供的示例代码是设置 internalToolExecutionEnabled 为 false 来禁用 Spring AI⁠ 托管工具调用,但是由于我们使用的是阿里的 DashScopeChatModel 大模型客户端,如果按照下面的方式,会直接导致工具调用失效!

ChatOptions chatOptions = ToolCallingChatOptions.builder()      .toolCallbacks(new CustomerTools())      .internalToolExecutionEnabled(false)      .build();

下面我们实现 think 方法,传入工具列表并⁠调用大模型,得到需要调用的工具列表:

/**   * 处理当前状态并决定下一步行动   *   * @return 是否需要执行行动   */  @Override  public boolean think() {      if (getNextStepPrompt() != null && !getNextStepPrompt().isEmpty()) {          UserMessage userMessage = new UserMessage(getNextStepPrompt());          getMessageList().add(userMessage);      }      List<Message> messageList = getMessageList();      Prompt prompt = new Prompt(messageList, chatOptions);      try {          // 获取带工具选项的响应  ChatResponse chatResponse = getChatClient().prompt(prompt)                  .system(getSystemPrompt())                  .tools(availableTools)                  .call()                  .chatResponse();          // 记录响应,用于 Act  this.toolCallChatResponse = chatResponse;          AssistantMessage assistantMessage = chatResponse.getResult().getOutput();          // 输出提示信息  String result = assistantMessage.getText();          List<AssistantMessage.ToolCall> toolCallList = assistantMessage.getToolCalls();          log.info(getName() + "的思考: " + result);          log.info(getName() + "选择了 " + toolCallList.size() + " 个工具来使用");          String toolCallInfo = toolCallList.stream()                  .map(toolCall -> String.format("工具名称:%s,参数:%s",                          toolCall.name(),                          toolCall.arguments())                  )                  .collect(Collectors.joining("\n"));          log.info(toolCallInfo);          if (toolCallList.isEmpty()) {              // 只有不调用工具时,才记录助手消息              getMessageList().add(assistantMessage);              return false;          } else {              // 需要调用工具时,无需记录助手消息,因为调用工具时会自动记录  return true;          }      } catch (Exception e) {          log.error(getName() + "的思考过程遇到了问题: " + e.getMessage());          getMessageList().add(                  new AssistantMessage("处理时遇到错误: " + e.getMessage()));          return false;      }  }

最后实现 act 方法,执行工具调用列表,得到⁠返回结果,并将工具的响应添加到消息列表中:

/**   * 执行工具调用并处理结果   *   * @return 执行结果   */  @Override  public String act() {      if (!toolCallChatResponse.hasToolCalls()) {          return "没有工具调用";      }      // 调用工具  Prompt prompt = new Prompt(getMessageList(), chatOptions);      ToolExecutionResult toolExecutionResult = toolCallingManager.executeToolCalls(prompt, toolCallChatResponse);      // 记录消息上下文,conversationHistory 已经包含了助手消息和工具调用返回的结果      setMessageList(toolExecutionResult.conversationHistory());      // 当前工具调用的结果  ToolResponseMessage toolResponseMessage = (ToolResponseMessage) CollUtil.getLast(toolExecutionResult.conversationHistory());      String results = toolResponseMessage.getResponses().stream()              .map(response -> "工具 " + response.name() + " 完成了它的任务!结果: " + response.responseData())              .collect(Collectors.joining("\n"));      log.info(results);      return results;  }

注意维护消息上下文,不要重复添加了消息,因为 toolExecutionResult.conversationHistory() 方法已经包含了助手消息和工具调用返回的结果。

可以带看下 toolCallingManager.executeToolCalls 的源码,会自动追加助手消息到上下文中:

千万别忘了,我们还需要定义一个终止工具,让智能⁠体可以自行决定任务结束。

1)在 tools 包下新建 TerminateTool:

public class TerminateTool {      @Tool(description = """              Terminate the interaction when the request is met OR if the assistant cannot proceed further with the task.              "When you have finished all the tasks, call this tool to end the work.              """)      public String doTerminate() {          return "任务结束";      }  }

2)修改 ToolRegistration,注册中止工具:

TerminateTool terminateTool = new TerminateTool();  return ToolCallbacks.from(          fileOperationTool,          webSearchTool,          webScrapingTool,          resourceDownloadTool,          terminalOperationTool,          pdfGenerationTool,          terminateTool  );

3)完善 act 方法,当调用了终止工具时,修⁠改 agent 的状态为 “已结束”:

// 当前工具调用的结果  ToolResponseMessage toolResponseMessage = (ToolResponseMessage) CollUtil.getLast(toolExecutionResult.conversationHistory());  String results = toolResponseMessage.getResponses().stream()          .map(response -> "工具 " + response.name() + " 完成了它的任务!结果: " + response.responseData())          .collect(Collectors.joining("\n"));  // 判断是否调用了终止工具  boolean terminateToolCalled = toolResponseMessage.getResponses().stream()          .anyMatch(response -> "doTerminate".equals(response.name()));  if (terminateToolCalled) {      setState(AgentState.FINISHED);  }  log.info(results);  return results;
4、开发 通用全能 Manus 智能体 类

通用全能 Manus 智能体 是可以直接提供给其他方法调用的 AI 超级智能体实例,继承自 Tool⁠CallAgent,需要给智能体设置各种参数,比如对话客户端 chatClient、工具调用列表等。

代码如下:

@Component  public class 通用全能 Manus 智能体 extends ToolCallAgent {      public 通用全能 Manus 智能体(ToolCallback[] allTools, ChatModel dashscopeChatModel) {          super(allTools);          this.setName("通用全能 Manus 智能体");          String SYSTEM_PROMPT = """                  You are 通用全能 Manus 智能体, an all-capable AI assistant, aimed at solving any task presented by the user.                  You have various tools at your disposal that you can call upon to efficiently complete complex requests.                  """;          this.setSystemPrompt(SYSTEM_PROMPT);          String NEXT_STEP_PROMPT = """                  Based on user needs, proactively select the most appropriate tool or combination of tools.                  For complex tasks, you can break down the problem and use different tools step by step to solve it.                  After using each tool, clearly explain the execution results and suggest the next steps.                  If you want to stop the interaction at any point, use the `terminate` tool/function call.                  """;          this.setNextStepPrompt(NEXT_STEP_PROMPT);          this.setMaxSteps(20);          // 初始化客户端  ChatClient chatClient = ChatClient.builder(dashscopeChatModel)                  .defaultAdvisors(new MyLoggerAdvisor())                  .build();          this.setChatClient(chatClient);      }  }

测试智能体

编写单元测试类,让超级智能体完成一个较为复杂的任务:

@SpringBootTest  class 通用全能 Manus 智能体Test {      @Resource      private 通用全能 Manus 智能体 通用全能 Manus 智能体;      @Test      void run() {          String userPrompt = """                  我的另一半居住在上海静安区,请帮我找到 5 公里内合适的约会地点,                  并结合一些网络图片,制定一份详细的约会计划,                  并以 PDF 格式输出""";          String answer = 通用全能 Manus 智能体.run(userPrompt);          Assertions.assertNotNull(answer);      }  }

以 Debug 模式运行,能够查看到 AI 每⁠一步的思考过程:

执行完成后,可以看到每一步的执行结果,并且可以⁠在本地查看到已下载的图片和生成的 PDF。

注意,由于 AI 是存在随机性的,执行的步骤数也不固定。经过测试大家会发现,拥有自主规划能力的超级智能体虽然能够完成复杂的任务,但缺点是 非常浪费 tokens,并且可能陷入无限循环。 所以还是要按需使用。

感兴趣的同学可以参考 OpenManus 的实现来⁠进一步优化超级智能体,比如防止陷入无限循环、能够和用户交互等。

六、扩展知识

智能体工作流

当我们面对复杂任务时,单一智能体可能无法满足需求。因此智能体工作流(Agen⁠t Workflow)应运而生,通过简单的编排,允许多个专业智能体协同工作,各司其职。

智能体工作流编排的精髓在于 将复杂任务分解为连贯的节点链,每个节点由最适合的智能体处理,节点间通过条件路由灵活连接,形成一个高效、可靠的执行网络。

Anthropic 曾经在一篇 研究报告《Building effective agents》 中提到了多种不同的智能体工作模式,大家需要了解每种模式的特点和适用场景。

智能体工作模式

1)Prompt Chaining 提示链工作流

Prompt Chaining 是最常见的智能体工作流模式之一。它的核心思想是将⁠一个复杂任务拆解为一系列有序的子任务,每一步由 LLM 处理前一步的输出,逐步推进任务完成。

比如在内容生成场景中,可以先让模型生成大纲,再根据大纲生成详细内容⁠,最后进行润色和校对。每一步都可以插入校验和中间检查,确保流程正确、输出更精准。

这种模式结构清晰,易于调试,非常适合任务可以被⁠自然分解为多个阶段的场景。

2)Routing 路由分流工作流

Routing 工作流模式则更像是一个智能的路由器。系统会根据输入内容的类型⁠或特征,将任务分发给最合适的下游智能体或处理流程。非常适合多样化输入和多种处理策略的场景。

比如在客服系统中,可以将常见问题、退款请求、技术支持等分流到不同的处理模块;在多模型系统中⁠,可以将简单问题分配给小模型,复杂问题交给大模型。这样既提高了处理效率,也保证了每类问题都能得到最优解答。

3)Parallelization 并行化工作流

在 Parallelization 并行化模式下,⁠任务会被拆分为多个可以并行处理的子任务,最后聚合各自的结果。

比如在代码安全审查场景中,可以让多个智能体分别对同一段代码进行安全审查,最后 “投票” 决定是否有问题。又比如⁠在处理长文档时,可以将文档分段,每段由不同智能体并行总结。这种模式可以显著提升处理速度,并通过 “投票” 机制提升结果的准确度。

4)Orchestrator-Workers ⁠协调器-执行者工作流

对于复杂的任务、参与任务的智能体增多时,我们可以引入一位 “管理者”,会根据任务动态拆解出多个子任务,并将这些子任务分配⁠给多个 “工人” 智能体,最后再整合所有工人的结果。这种中央协调机制提高了复杂系统的整体效率,适合任务结构不确定、需要动态分解的复杂场景。

5)Evaluator-Optimizer 评⁠估-优化循环工作流

Evaluator-Optimizer 模式模拟了人类 “写 => 评 => 改” 的过程。一个智能体负责生成⁠初步结果,另一个智能体负责评估和反馈,二者循环迭代优化输出

举个例子,在机器翻译场景中,先由翻译智能体输出,再由评审智能体给出改进⁠建议,反复迭代直到达到满意的质量。这种模式特别适合需要多轮打磨和质量提升的任务。

智能体工作流编排框架

如果要实现上述工作流,我们可以采用 Activiti、LiteFlow 这种工作流框架,把智能体处理当做工作流的一个节点。也可以直接使用专门的 AI 智能体⁠工作流编排框架,比如 LangGraph 和 Spring AI Alibaba Graph。这两个框架大家简单了解一下,直到怎么组合工作流即可,实际学习成本都不高。

1)LangGraph

LangGraph 是 LangChain 团队开发的前沿工作流编排框架,专为大语言模型应用设计,是构建复杂 AI 系统的首选工具。LangGraph 的核心理念是将智能体工作流表示为状态转换图,每个节点可以是函数、智能体或子工作流,边则代表状态转换条件。

相比于传统的工作流框架,LangGraph 的独特之处在于它对大语言模型工作流的深度优化。框架提供了丰富的功能来满足大模型应用开发⁠场景,比如动态分支和并行执行、思维链支持、对话管理、内置监控与可视化等。开发者可以轻松实现复杂的推理路径,让模型在需要时进行反思、规划和纠错。

前面讲到的几种工作流模式,以及普通的工具调用代理场景,使用 LangGraph 都能轻松完成,具体用法和代码示例可以参考 官方文档。

下面给大家一段简单的工作流代码示例:

from langgraph.graph import StateGraph  from typing import TypedDict, List, Annotated  # 定义工作流状态  class AgentState(TypedDict):      messages: List[dict]      task: str      context: dict  # 创建状态图  workflow = StateGraph(AgentState)  # 添加节点  workflow.add_node("understand", understand_task)  workflow.add_node("research", research_information)  workflow.add_node("analyze", analyze_data)  workflow.add_node("generate", generate_response)  # 定义边和条件  workflow.add_edge("understand", "research")  workflow.add_conditional_edges(      "research",      decide_next_step,      {          "needs_more_info": "research",          "ready_for_analysis": "analyze",          "skip_analysis": "generate"      }  )  workflow.add_edge("analyze", "generate")  # 设置起始节点  workflow.set_entry_point("understand")

这段代码利用 LangGraph 框架定义了一个多阶段的智能体工作流:首先通过 AgentState 描述了每一步的状态结构,然后用 StateGraph 创建了一个状态流转图,依次添加了 “理解任务”、“⁠信息检索”、“数据分析” 和 “结果生成” 四个节点,并通过有向边和条件分支将它们串联起来,实现了流程的跳转和循环,最后指定工作流从 “理解任务” 节点开始,整体上实现了一个可自动决策、可循环的智能体任务处理流程。

2)Spring AI Alibaba Graph

目前 Spring AI 官方还没有提供工作流编排能力,但是国内的 Spring AI Alibaba 已经提供了工作流编排框架 Spring AI Alibaba Graph。

虽然目前还处于 Demo 阶段,已经支持了:

  • 基础工作流的编排
  • ReAct Agent 模式
  • Orchestrator-Workers 协调器-工作者模式

像我们前面自主开发的 ReAct 模式的 Manus 智能体⁠,使用 Spring AI Alibaba Graph,几行代码就搞定了:

ReactAgent reactAgent = ReactAgent.builder()      .name("React Agent Demo")      .prompt("请完成接下来用户输入给你的任务。")      .chatClient(chatClient)      .resolver(resolver)      .maxIterations(10)      .build();  reactAgent.invoke(Map.of("messages", new UserMessage(query)))

让我们通过一个实际案例来感受像 Spring AI Alibaba Graph ⁠这种框架的作用,比如编写一个数据分析和可视化报告生成任务的工作流,示例代码如下(仅供参考):

public AIGraph<String, String> dataAnalysisWorkflow(          PlanningAgent planningAgent,          DataCollectionAgent dataCollectionAgent,          AnalysisAgent analysisAgent,          VisualizationAgent visualizationAgent,          ReportingAgent reportingAgent) {      return AIGraph.builder(String.class, String.class)              // 任务规划节点              .addNode("planning", input -> planningAgent.run(input), Map.of(                      "success", "data_collection"              ))              // 数据收集节点              .addNode("data_collection", input -> dataCollectionAgent.run(input), Map.of(                      "complete", "analysis",                      "incomplete", "planning"              ))              // 数据分析节点              .addNode("analysis", input -> analysisAgent.run(input), Map.of(                      "success", "visualization",                      "failure", "data_collection"              ))              // 数据可视化节点              .addNode("visualization", input -> visualizationAgent.run(input), Map.of(                      "success", "reporting"              ))              // 报告生成节点              .addNode("reporting", input -> reportingAgent.run(input), Map.of())              .withStartAt("planning")              .build();  }

这个工作流展现了数据分析的完整生命周期:从规划智能体制定分析策略开始,到数据收集智能体获取必要数⁠据,然后是分析智能体处理这些数据,接着由可视化智能体创建直观图表,最后由报告智能体整合所有发现生成完整报告。

除了上面这些工作流开发框架外,之前我们还学习过一种可视化的智能体工作流开发方法 —— 使用⁠ AI 大模型应用开发平台创建工作流应用。目前很多公司都在用这种方法构建复杂的智能体工作流,确实很方便~

OWL 框架

前面提到过 AI 智能体依赖工具调用能力来拓展⁠能力边界,这里再给大家分享一个很实用的开源项目 OWL。

OWL (Optimized Workforce Learning) 是由 CAMEL-AI 团队开源的一款面向 多智能体协作与真实世界任务自动化 的前沿框架。通过 OWL,AI 智能体可以执行终端命令、访问网络资源、运行各种编程语言的代码、使用各种开发工具等等。

它的主要特点:

  • 多智能体协作:OWL 支持多个智能体之间的动态交互与协作,能够模拟真实团队协作场景,适合解决需要多角色、多技能配合的复杂任务。
  • 丰富的工具集成:内置了浏览器自动化、代码执行、文档处理、音视频分析、搜索引擎等多种工具包,支持多模态任务(如网页操作、图片/视频/音频分析等)。
  • MCP 协议支持:通过 MCP,OWL 能够与外部工具和数据源标准化对接,极大扩展了智能体的能力边界。
  • 可定制与易用性:用户可以根据实际需求灵活配置和组合所需工具,优化性能和资源消耗。
  • Web 可视化界面:提供基于 Gradio 的本地 Web UI,支持模型选择、环境变量管理、交互历史查看等功能,方便开发和调试。

其实 OWL 的本质就是利用 AI 来增强传统的自动化办公场⁠景,像自动化数据分析和处理、自动化网页信息检索,都能够用 OWL 轻松完成。

下图是官方介绍的 Demo,OWL 可以自动打开我⁠们的浏览器访问网页,执行滚动、点击、打字操作,并且实时抓取网页信息:

A2A 协议

什么是 A2A 协议?

A2A(Agent to Agent)也是最近很热门的一个概念,简单来说,A2A 协议 就是为 “智能体之间如何直接交流和协作” 制定的一套标准。

A2A 协议的核心,是让每个智能体都能像 “网络节点” 一样,拥有自己的身份、能力描述和通信接口。它不仅规定了消息的格式和传递⁠方式,还包括了身份认证、能力发现、任务委托、结果回传等机制。这样一来,智能体之间就可以像人类团队一样,互相打招呼、询问对方能做什么、请求协助。

可以把 A2A 类比为智能体世界里的 HTTP协议,HTTP 协议让全球不同服务器和电脑之间能够交换数据,A2A 协议则是让不同厂商、不同平台、不同能⁠力的智能体能够像团队成员一样互相理解、协作和分工。如果说 HTTP 协议让互联网成为了一个开放、互联的世界,那么 A2A 协议则让智能体世界变得开放、协作和高效。

A2A 协议的应用场景

A2A 协议的应用非常广泛,总结下来 4 个字就是 开放互联。比如在自动驾驶领域,不同车辆的智能体可以实时交换路况信息,协同避障和规划路线;在制造车间,生产线上的各类机器人智能体可以根据任务动态分工,互相补位;在金融风控、智能客服等场景,不同的智能体可以根据自身专长协作处理复杂业务流程。

我们还可以大胆想象,未来开发者可以像调用云服务一样,按需租用或组合不同的智能体服务,甚⁠至实现智能体之间的自动交易和结算。目前其实就有很多智能体平台,只不过智能体之间的连接协作甚少。

和 MCP 协议的区别

虽然 A2A 和 MCP 都算是协议(或者标准⁠),但二者存在本质上的区别。

MCP 协议是 智能体和外部工具之间的标准,它规定了智能体如何安全、规范地调用外部的数据库、搜索引擎、代码执行等工具资源。你可以把 MCP 理解为 “智能体-工具” 的 HTTP 协议。

而 A2A 协议则是 智能体之间的通信协议。它更像是让不同的 AI 角色之间可以直接对话、协作和分工。

从安全角度看,MCP 和 A2A 处理的是不同⁠层面的安全问题:

  • MCP 的安全关注点:主要集中在单个智能体与工具之间的安全交互,主要防范的是工具滥用和提示词注入攻击。
  • A2A 的安全关注点:更关注智能体网络中的身份认证、授权和信任链。A2A 需要解决“我怎么知道我在和谁通信”、“这个智能体有权限请求这项任务吗”、“如何防止恶意智能体窃取或篡改任务数据” 等问题。(这些问题也是 HTTP 协议需要考虑的)

显然,A2A 面临的安全挑战更加复杂,因为它处⁠理的是跨网络、跨平台、多方协作的场景。

对于一个成熟的智能体系统,可能会同时运用 MCP 和 A2A,MC⁠P 负责某个智能体内部调用工具完成任务,A2A 负责智能体之间协同完成任务。

目前 A2A 协议的落地并不成熟,暂时大家也不必关注如何进行 A2A 的开发,了解它是什么、有什么用就足够了。感兴趣的同学,可以阅读 这篇文章 来了解 A2A 协议的实现细节和交互流程。

扩展思路

1)给智能体添加循环检测和处理机制,防止智能体陷入无限⁠循环。

可以参考 OpenManus 源码实现,示例代码如下:

private int duplicateThreshold = 2;  /**   * 处理陷入循环的状态   */  protected void handleStuckState() {      String stuckPrompt = "观察到重复响应。考虑新策略,避免重复已尝试过的无效路径。";      this.nextStepPrompt = stuckPrompt + "\n" + (this.nextStepPrompt != null ? this.nextStepPrompt : "");      System.out.println("Agent detected stuck state. Added prompt: " + stuckPrompt);  }  /**   * 检查代理是否陷入循环   *   * @return 是否陷入循环   */  protected boolean isStuck() {      List<Message> messages = this.memory.getMessages();      if (messages.size() < 2) {          return false;      }      Message lastMessage = messages.get(messages.size() - 1);      if (lastMessage.getContent() == null || lastMessage.getContent().isEmpty()) {          return false;      }      // 计算重复内容出现次数  int duplicateCount = 0;      for (int i = messages.size() - 2; i >= 0; i--) {          Message msg = messages.get(i);          if (msg.getRole() == Role.ASSISTANT &&                  lastMessage.getContent().equals(msg.getContent())) {              duplicateCount++;          }      }      return duplicateCount >= this.duplicateThreshold;  }  // 每一步 step 执行完都要检查是否陷入循环  if (isStuck()) {      handleStuckState();  }

2)智能体支持交互式执行,可以向用户询问信息或⁠获取反馈,从而优化任务的完成效果。

实现思路可以参考 OpenManus,专门定义一个 As⁠kHuman 工具,让 AI 自主决定什么时候需要寻求人类帮助:

当然也可以通过编写 Prompt 实现,比如 Prompt 中提到 “如果你认为需要向人类寻求帮助,输出结果中需要包含 [ASK_USER]{寻求帮助的具体问题}”,并且检查每一条 AI 给出的消息,如果包含了 ASK_USER 标记,就通过系统控制台和用户交互。

示例代码如下:

public boolean think() {      boolean shouldAct = super.think();      // 获取最新的助手消息  Message lastMessage = getMessageList().get(getMessageList().size() - 1);      if (lastMessage instanceof AssistantMessage) {          String content = lastMessage.getContent();          // 检查是否包含向用户询问的标记  if (content.contains("[ASK_USER]")) {              // 提取问题  String question = content.substring(content.indexOf("[ASK_USER]") + 10);              // 向用户输出问题              System.out.println("智能体需要你的帮助: " + question);              // 获取用户输入  Scanner scanner = new Scanner(System.in);              String userAnswer = scanner.nextLine();              // 添加用户回答到消息列表  UserMessage userResponse = new UserMessage("用户回答: " + userAnswer);              getMessageList().add(userResponse);              // 需要继续思考  return true;          }      }      return shouldAct;  }

3)开发支持 MCP 协议的超级智能体

思路:其实利用 Spring AI,只需要把 MCP 服务中⁠的工具提取出来变成工具列表,就可以复用已有的 ToolCallAgent 了

4)自行开发一个特定领域的超级智能体,可以直接⁠继承 ToolCallAgent 实现

5)参考 OpenManus 的源码,实现更多功能,比如 “规划并执行” 模式的智能体工作流(参考 flow/planning.py

CSDN独家福利

最后,感谢每一个认真阅读我文章的人,礼尚往来总是要有的,下面资料虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐