简单复刻manus, 使用 LangGraph 开发智能体的入门教程

智能体架构设计

  • LangGraph 是一个专为大语言模型应用设计的框架,它通过有向图结构组织智能体的工作流程,使复杂任务分解为可管理的节点和状态转换。开发者可以定义明确的执行路径、条件分支和循环逻辑,同时保持状态的一致性和可追踪性。LangGraph 智能体具备记忆管理、上下文感知、工具调用和错误恢复等核心能力,特别适合构建需要多步推理、持续对话和复杂决策的AI应用。
  • 最近manus火上天际,我们使用LangGraph简单开发一个能做任务分解和调用工具执行的简单智能体,整个架构包含以下核心组件:
    架构总览
+------------------------------------------+
|            旅行规划智能体                 |
+------------------------------------------+
                    |
        +-----------+-----------+
        |                       |
+-------v-------+      +--------v--------+
|   状态管理     |      |    工作流管理    |
| (AgentState)  |      | (StateGraph)    |
+---------------+      +-----------------+
        |                       |
        |              +--------+--------+
        |              |                 |
        |       +------v------+   +------v------+   +------v------+
        |       | create_plan |-->|  collect_   |-->| generate_   |
        |       |             |   | information |   |  summary    |
        |       +-------------+   +-------------+   +-------------+
        |                               |
        |                               |
        |                               v
        |                      +----------------+
        |                      | should_continue|
        |                      |  _collecting   |
        |                      +----------------+
        |                         |         |
        |                         |         |
        |                    +----v---+ +---v----+
        |                    |continue| |summarize|
        |                    +--------+ +--------+
        |                         |         |
        |                         |         |
        |                         +---------+
        |                               |
        v                               v
+---------------+              +-----------------+
|  工具集合      |              |    LLM 引擎     |
| (Tools)       |<------------>| (ChatOpenAI)    |
+---------------+              +-----------------+
        |
        |
+-------v--------+
| search_web     |
| get_weather    |
| search_        |
|  attractions   |
| search_hotels  |
| search_        |
|  restaurants   |
| search_        |
| transportation |
+----------------+
  • 架构图比较简单, 可以看到,我们使用langgraph开发智能体只需要两个组件,StateGraph管理流程,AgentState管理状态
  • AgentState 需要我们自己定义, 可以自由定义我们的任务执行需要哪些字段。
# 定义状态模型
class AgentState(TypedDict):
    """智能体状态模型"""
    messages: List[BaseMessage]  # 对话历史
    tool_results: Dict[str, Any]  # 工具执行结果
    plan: Optional[Dict[str, Any]]  # 任务规划
    current_step: Optional[str]  # 当前执行步骤
    final_answer: Optional[str]  # 最终回答
    error_count: int  # 错误计数

2. 代码讲解

2.1 用户输入

  • 我们先定义一个main函数,在初始化initial_state 的时候messages 模拟用户输入。
  • create_travel_agent 返回我们经过grpah构建的工作流,传入initial_state 调用。
def main():
    """主函数"""
    # 创建智能体
    agent = create_travel_agent()
    
    # 初始化状态
    initial_state = {
        "messages": [
            SystemMessage(content=SYSTEM_PROMPT),
            HumanMessage(content="我想去北京旅游5天,预算适中,喜欢历史文化和美食,请帮我规划一下行程。")
        ],
        "tool_results": {},
        "plan": None,
        "current_step": None,
        "final_answer": None,
        "error_count": 0
    }
    
    # 执行智能体
    result = agent.invoke(initial_state)
    
    # 打印结果
    for message in result["messages"]:
        if isinstance(message, HumanMessage):
            print(f"\n用户: {message.content}")
        elif isinstance(message, AIMessage):
            print(f"\n助手: {message.content}")
        elif isinstance(message, ToolMessage):
            print(f"\n工具 ({message.name}): {message.content[:100]}...")

2.2 状态图管理器, 构建工作流程

  • 我们使用了langgraph的StateGraph来管理我们的工作流。 通过create_agent 来构建我们的工作图。
# 创建工作流
def create_travel_agent() -> StateGraph:
    """创建旅行规划智能体工作流"""
    # 初始化工作流
    workflow = StateGraph(AgentState)
    
    # 添加节点
    workflow.add_node("create_plan", create_plan)
    workflow.add_node("collect_information", collect_information)
    workflow.add_node("generate_summary", generate_summary)
    
    # 设置入口点
    workflow.set_entry_point("create_plan")
    
    # 添加边
    workflow.add_edge("create_plan", "collect_information")
    
    # 添加条件边
    workflow.add_conditional_edges(
        "collect_information",
        should_continue_collecting,
        {
            "continue": "collect_information",
            "summarize": "generate_summary"
        }
    )
    
    # 添加结束边
    workflow.add_edge("generate_summary", END)
    
    return workflow.compile()
  • StateGraph 创建一个graph, 用于下面的流程编排。
  • set_entry_point 这是起始节点
  • add_node 添加工作节点,
  • add_edge 连接节点直接的边。会把参数的节点1 和节点2 构建一条连接边,执行完节点1后执行节点2
  • add_conditional_edges 添加带跳转条件的边, 根据条件决定是否执下一个节点
  • 用户请求进来, 线执行 create_plan 拆解任务。 然后让ai去分析执行这个计划需要
    收集那些信息, collect_information根据ai返回的工具调用信息, 去执行工具选择信息, collect_information节点根据条件should_continue_collecting 函数来判断是要继续执行,还是采集结束, 去generate_summary 总结报告
    节点graph流程
'
+-------------+
|   __start__ |
+------+------+
       |
       v
+------+------+
| create_plan |
+------+------+
       |
       v
+------+-----------------+
| collect_information    |<---------+
+------+-----------------+          |
       |                            |
       |                            |
       |            |               | 
       |            |               |
       |            v               |
       |        [continue]          |
       |            |               |
   [summarize]      +---------------+
       |
       |    
       |
       v
+------+-----------------+
| generate_summary       |
+------+-----------------+
       |
       v
+------+------+
|   __end__   |
+-------------+
'

2.3 任务理解节点,拆分任务

  • create_plan 是流程图里面的第一个节点, 负责拆分任务。拆分任务后,返回的是AgentState
  • langgraph拿到返回的状态后,会更新state, 然后根据状态,决策判断要执行的下一个节点。
  • 通过 AgentState 可以控制grpah的调用流程。
  • 代码功能比较简单, 调用了llm, 让llm帮我们拆分任务。和确认每个任务要调用的工具。然后把plan 存储在了我们的AgentState里面。
def create_plan(state: AgentState) -> AgentState:
    """创建旅行计划"""
    llm = get_llm()
    messages = state["messages"]
    
    # 构建提示
    planning_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="""你是一位专业的旅行规划专家。请分析用户的旅行需求,并制定一个详细的信息收集计划。
        
你需要确定:
1. 旅行目的地
2. 旅行时间和持续时间
3. 旅行预算
4. 用户的兴趣爱好和偏好
5. 需要收集的关键信息

请以JSON格式返回你的计划,包括以下字段:
- destination: 目的地
- duration: 旅行持续时间(天数)
- budget_level: 预算级别(经济型/舒适型/豪华型)
- interests: 兴趣爱好列表
- information_needs: 需要收集的信息列表
- tool_calls: 计划使用的工具调用列表,每个调用包含tool_name和parameters

确保你的计划全面且有针对性。"""),
        MessagesPlaceholder(variable_name="messages")
    ])
    
    # 调用LLM创建计划
    response = llm.invoke(planning_prompt.format(messages=messages))
    
    # 尝试解析JSON响应
    try:
        plan_text = response.content
        # 提取JSON部分
        json_match = re.search(r'```json\n(.*?)\n```', plan_text, re.DOTALL)
        if json_match:
            plan_json = json_match.group(1)
        else:
            # 尝试直接解析
            plan_json = plan_text
        
        plan = json.loads(plan_json)
    except Exception as e:
        # 如果解析失败,创建一个基本计划
        plan = {
            "destination": "未指定",
            "duration": 0,
            "budget_level": "未指定",
            "interests": [],
            "information_needs": ["基本旅游信息"],
            "tool_calls": [{"tool_name": "search_web", "parameters": {"query": "旅游规划建议"}}]
        }
    
    # 更新状态
    return {
        **state,
        "messages": messages + [AIMessage(content=f"我将为您规划一次{plan.get('destination', '旅行')}之旅。正在收集相关信息...")],
        "plan": plan,
        "current_step": "collect_information"
    }

2.4 执行计划 collect_information

  • 拿到计划后, 我们让大模型线帮我们分析要执行哪些工具, 和参数
  • 然后 我们通过 tool_function.invoke 调用对应的工具,填充结果。
  • 工具返回的信息, 我们依然存储到AgentState 中,
  • 示例中, 我们简单判断了采集信息的梳理 if len(tool_results) >= 8: 够了就去总结报告
def collect_information(state: AgentState) -> AgentState:
    """收集旅行相关信息"""
    llm = get_llm()
    messages = state["messages"]
    plan = state["plan"]
    tool_results = state.get("tool_results", {})
    
    # 检查是否已经收集了足够的信息
    if len(tool_results) >= 8:
        return {
            **state,
            "current_step": "generate_summary"
        }
    
    # 构建提示
    tool_selection_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="""你是一位旅行规划助手。基于当前的旅行计划和已收集的信息,确定下一步需要使用的工具。
        
可用的工具包括:
- search_web: 搜索网络获取旅游相关信息
- get_weather: 获取指定地点的天气信息
- search_attractions: 搜索特定地点的旅游景点
- search_hotels: 搜索特定地点的酒店信息
- search_restaurants: 搜索特定地点的餐厅信息
- search_transportation: 搜索两地之间的交通信息

请选择最合适的工具并提供必要的参数。以JSON格式返回,包含以下字段:
- tool_name: 工具名称
- parameters: 工具参数字典

如果已经收集了足够的信息,请返回 {"tool_name": "none", "parameters": {}}

注意:如果你认为已经收集了足够的信息来生成旅行计划,必须返回tool_name为"none"。"""),
        MessagesPlaceholder(variable_name="messages"),
        SystemMessage(content="当前旅行计划:{plan}"),
        SystemMessage(content="已收集的信息:{tool_results}"),
        SystemMessage(content="已收集的信息数量:{info_count}。如果已收集6条或更多信息,请考虑返回none。")
    ])
    
    # 调用LLM选择工具
    response = llm.invoke(
        tool_selection_prompt.format(
            messages=messages,
            plan=json.dumps(plan, ensure_ascii=False),
            tool_results=json.dumps(tool_results, ensure_ascii=False),
            info_count=len(tool_results)
        )
    )
    
    # 尝试解析JSON响应
    try:
        tool_call_text = response.content
        # 提取JSON部分
        json_match = re.search(r'```json\n(.*?)\n```', tool_call_text, re.DOTALL)
        if json_match:
            tool_call_json = json_match.group(1)
        else:
            # 尝试直接解析
            tool_call_json = tool_call_text
        
        tool_call = json.loads(tool_call_json)
        
        # 确保tool_call是字典格式
        if isinstance(tool_call, list) and len(tool_call) > 0:
            tool_call = tool_call[0]  # 取第一个元素
    except Exception as e:
        # 如果解析失败,记录错误并进入总结阶段
        error_message = AIMessage(content=f"在解析工具调用时发生错误: {str(e)}。将进入总结阶段。")
        return {
            **state,
            "messages": messages + [error_message],
            "current_step": "generate_summary"
        }
    
    # 检查是否决定不再使用工具
    tool_name = tool_call.get("tool_name", "").lower()
    if tool_name == "none" or tool_name == "" or tool_name not in [t.name for t in tools]:
        return {
            **state,
            "messages": messages + [AIMessage(content="已收集足够的信息,正在为您生成旅行计划...")],
            "current_step": "generate_summary"
        }
    
    # 执行工具调用
    parameters = tool_call.get("parameters", {})
    tool_function = next((tool for tool in tools if tool.name == tool_name), None)
    result = tool_function.invoke(input=parameters,config={"callbacks": [CustomCallbackHandler()]})
    
    # 更新工具结果
    updated_tool_results = {
        **tool_results,
        f"{tool_name}_{datetime.now().strftime('%H%M%S')}": {
            "parameters": parameters,
            "result": result
        }
    }
    
    # 添加工具消息
    tool_message = ToolMessage(
        content=result,
        tool_call_id=f"{tool_name}_{datetime.now().strftime('%H%M%S')}",
        name=tool_name
    )
    
    # 更新状态
    return {
        **state,
        "messages": messages + [tool_message],
        "tool_results": updated_tool_results,
        "current_step": "collect_information"  # 继续收集信息
    }

2.5 总结报告 generate_summary

  • 总结报告也很简单, 我们把之前的计划, 和采集的信息。 丢给大模型,让大模型去给我们总结,是不是很省事.
  • 总结的报告, 我们依然放回 AgentState, 然后返回。
def generate_summary(state: AgentState) -> AgentState:
    """生成旅行计划总结"""
    llm = get_llm()
    messages = state["messages"]
    plan = state["plan"]
    tool_results = state.get("tool_results", {})
    
    # 构建提示
    summary_prompt = ChatPromptTemplate.from_messages([
        SystemMessage(content="""你是一位专业的旅行规划师。请基于收集到的所有信息,为用户生成一份详细、实用的旅行计划。
        
你的旅行计划应包括以下内容:
1. 目的地概览:简要介绍目的地的特色和亮点
2. 行程安排:每天的详细行程安排,包括景点、活动、用餐等
3. 住宿推荐:适合用户预算的住宿选择
4. 交通信息:往返交通和当地交通建议
5. 美食推荐:当地特色美食和餐厅推荐
6. 旅行贴士:实用的旅行建议、注意事项和小技巧
7. 预算规划:旅行预算的详细规划

请确保你的计划具体、实用,并且考虑到用户的偏好和需求。使用清晰的标题和分段,使计划易于阅读和理解。"""),
        MessagesPlaceholder(variable_name="messages"),
        SystemMessage(content="旅行计划:{plan}"),
        SystemMessage(content="收集到的信息:{tool_results}")
    ])
    
    # 调用LLM生成总结
    response = llm.invoke(
        summary_prompt.format(
            messages=messages,
            plan=json.dumps(plan, ensure_ascii=False),
            tool_results=json.dumps(tool_results, ensure_ascii=False)
        )
    )
    
    # 生成最终回答
    final_answer = response.content
    
    # 更新状态
    return {
        **state,
        "messages": messages + [AIMessage(content=final_answer)],
        "final_answer": final_answer,
        "current_step": "end"
    }

2.6 callback调试

  • langgraph 支持callback调试, 让我们可以清晰的知道大模型走到那步了。
  • callback 可以监听的地方我一一列出来了。
class CustomCallbackHandler(BaseCallbackHandler):
    """增强版回调处理器,提供详细执行监控和错误处理"""
    
    def __init__(self):
        self.execution_stack = []
        self.timers = {}

    def on_llm_start(self, serialized, prompts, **kwargs):
        self._start_timer('llm')
        print(f"🕒 [{self._get_timestamp()}] LLM开始生成 | 提示数量: {len(prompts)}")

    def on_llm_end(self, response, **kwargs):
        duration = self._end_timer('llm')
        print(f"✅ [{self._get_timestamp()}] LLM生成完成 | 耗时: {duration:.2f}s | 响应长度: {len(str(response))}")

    def on_tool_start(self, serialized, input_str, **kwargs):
        self._start_timer('tool')
        print(f"🔧 [{self._get_timestamp()}] 工具开始执行 | 工具: {serialized.get('name')} | 输入: {input_str[:200]}")

    def on_tool_end(self, output, **kwargs):
        duration = self._end_timer('tool')
        print(f"✅ [{self._get_timestamp()}] 工具执行完成 | 耗时: {duration:.2f}s | 输出: {str(output)[:200]}")

    def on_tool_error(self, error, **kwargs):
        print(f"❌ [{self._get_timestamp()}] 工具执行错误 | 错误类型: {type(error).__name__} | 详情: {str(error)[:500]}")

    def on_chain_start(self, serialized, inputs, **kwargs):
        self._start_timer('chain')
        chain_type = serialized.get('name', '未知链')
        print(f"⛓️ [{self._get_timestamp()}] 链开始执行 | 类型: {chain_type} | 输入参数: {self._format_inputs(inputs)}")

    def on_chain_end(self, outputs, **kwargs):
        duration = self._end_timer('chain')
        print(f"✅ [{self._get_timestamp()}] 链执行完成 | 耗时: {duration:.2f}s | 输出参数: {self._format_outputs(outputs)}")

    def on_agent_action(self, action, **kwargs):
        print(f"🤔 [{self._get_timestamp()}] Agent决策 | 选择工具: {action.tool} | 输入: {action.tool_input[:200]}")

    def _start_timer(self, event_type):
        self.timers[event_type] = time.time()

    def _end_timer(self, event_type):
        return time.time() - self.timers.pop(event_type, time.time())

    def _get_timestamp(self):
        return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    def _format_inputs(self, inputs):
        return json.dumps(inputs, indent=2, ensure_ascii=False)[:300]

    def _format_outputs(self, outputs):
        return json.dumps(outputs, indent=2, ensure_ascii=False)[:300]

3.总结

通过以上步骤,您可以使用 LangGraph 框架快速构建智能体。该框架提供了灵活的状态管理和任务分解能力,使得开发复杂的智能体变得更加简单和高效。希望本教程能帮助您快速上手 LangGraph 的使用。

关注我获取更多信息
在这里插入图片描述

git源码:https://github.com/lovelly/aicode.git

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐