🧠 LangGraph 智能工作流引擎:让 AI 自动拆解需求并执行任务

本教程将带你从零开始,构建一个能理解模糊需求、自动规划、执行任务、自我反思并最终交付结果的高级智能体。你将学到 LangGraph 的核心概念:状态(State)、节点(Node)、边(Edge),并亲手实现一个可用于生产环境的自动化工作流框架。


🎯 项目背景与目标

在日常工作中,我们常常会遇到这样的请求:

“帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。”

这是一个典型的模糊、复合型需求。传统程序无法处理,但我们可以教会 AI 如何像人类项目经理一样工作:

  1. 规划(Plan):拆解成具体任务。
  2. 执行(Execute):调用工具完成任务。
  3. 反思(Review):检查结果是否满足需求。
  4. 交付(Deliver):整合结果,给出最终答案。

这就是我们要构建的 PIER(Plan-Execute-Inspect-Report)智能体


🧱 第一步:环境准备与核心概念

1.1 安装依赖

pip install langchain langgraph langchain-openai

1.2 LangGraph 核心概念

  • 状态(State): 智能体的“大脑”和“记事本”,用于在工作流各步骤间传递数据。
  • 节点(Node): 工作流中的一个处理单元,是一个 Python 函数(如“规划”、“执行”)。
  • 边(Edge): 定义节点之间的连接关系,决定了工作流的走向。

我们的目标:用这些乐高积木,搭建一个会思考、会干活的 AI 机器人。


📦 第二步:定义状态(State)—— 智能体的“大脑”

首先,我们需要设计一个数据结构,来记录整个 PIER 流程中的所有关键信息。

import operator
from typing_extensions import TypedDict, Annotated, List, Optional
from langchain_core.messages import AnyMessage


class AgentState(TypedDict):
    """PIER 智能体的全局状态,记录了整个工作流的进展。"""
    # 用户的原始模糊需求
    original_request: str
    
    # 1. 【规划阶段 - Plan】AI 拆解出的具体、可执行的任务列表
    task_list: List[str] 
    
    # 2. 【执行阶段 - Execute】每个任务的执行结果
    #   格式: [{"task": "搜索A公司", "result": "..."}, ...]
    execution_results: List[dict]
    
    # 3. 【反思阶段 - Inspect】对当前结果的评估
    review_feedback: Optional[str] 
    
    # 4. 所有对话历史,用于 LLM 上下文
    # `operator.add` 确保新消息被追加,而不是覆盖旧消息
    messages: Annotated[List[AnyMessage], operator.add]
    
    # 流程控制:记录当前处于 PIER 的哪个阶段?
    current_stage: str # 取值: "planning", "executing", "reviewing", "delivering"

🛠 第三步:定义工具(Tools)—— 智能体的“手”

为了让 AI 能完成实际工作,我们需要给它提供一些“工具”。这里我们用模拟数据来代替真实的 API 调用。

from langchain.tools import tool


# 模拟一个网络搜索工具(在真实项目中,这里会是 SerpAPI, Tavily 等)
@tool
def web_search(query: str) -> str:
    """根据查询语句在网络上搜索信息。"""
    # 模拟真实的搜索结果
    mock_results = {
        "热门AI公司": "2024年热门的AI公司包括:DeepSeek, Moonshot。",
        "DeepSeek 融资": "DeepSeek 在2024年完成了 5000 万美元的 B 轮融资。",
        "Moonshot 融资": "Moonshot 在2024年完成了 1 亿美元的 C 轮融资。",
    }
    return mock_results.get(query, f"未找到关于 '{query}' 的详细信息。")


# 模拟一个报告生成工具
@tool
def generate_report(data: str) -> str:
    """根据提供的数据生成一份结构化的分析报告。"""
    return f"【AI行业分析报告】\n{data}"

💡 关键点:我们刻意简化了模拟数据,只包含 DeepSeekMoonshot,确保任务能被完整执行,避免因数据不全导致的无限循环。


🧠 第四步:定义工作流节点(Nodes)—— PIER 的四个阶段

4.1 初始化 LLM

我们使用 硅基流动 平台的 GLM-Z1-9B 模型,并配置好 API。

from langchain.chat_models import init_chat_model

# 初始化大语言模型
llm = init_chat_model(
    model="THUDM/GLM-Z1-9B-0414",
    model_provider="openai", # 硅基流动兼容 OpenAI API
    api_key="sk-************************************", # 你的 API 密钥
    base_url="https://api.siliconflow.cn/v1/",
)

4.2 阶段一:规划(Planning)

AI 需要将模糊的用户需求拆解成一系列具体的、可执行的任务。这里我们使用 Pydantic + 结构化输出 来确保输出格式的可靠性。

from pydantic import BaseModel, Field


class TaskList(BaseModel):
    """用于强制 LLM 返回结构化任务列表的数据模型。"""
    tasks: List[str] = Field(description="拆解出的具体任务列表", min_length=1)


def planner_node(state: AgentState):
    """【规划阶段】AI 将原始需求拆解成具体的、可执行的任务列表。"""
    prompt = f"""
    你是一个专业的项目规划师。请将用户的原始需求拆解成一系列清晰、具体的、可独立执行的子任务。
    子任务应按逻辑顺序排列,并且每个任务都应能通过调用 `web_search` 工具来完成。

    **用户原始需求**:
    {state['original_request']}

    **重要提示**:
    - 请将“热门AI公司”作为一个任务。
    - 请为提到的每家公司单独创建一个“公司名+融资情况”的任务。
    - 例如,如果提到A和B公司,任务列表应为:["热门AI公司", "A公司融资情况", "B公司融资情况"]

    请直接返回任务列表。
    """
    
    # 使用 LangChain 的结构化输出功能,强制模型返回 TaskList 对象
    structured_llm = llm.with_structured_output(TaskList)
    task_obj = structured_llm.invoke(prompt)
    
    return {
        "task_list": task_obj.tasks,
        "current_stage": "executing", # 进入下一阶段
        "messages": [AIMessage(content=f"【规划】完成,任务列表: {task_obj.tasks}")]
    }

4.3 阶段二:执行(Executing)

AI 按顺序执行规划阶段生成的任务列表,并收集结果。

from langchain_core.messages import AIMessage


def executor_node(state: AgentState):
    """【执行阶段】按顺序执行任务列表中的每一个任务,并收集结果。"""
    results = []
    for task in state["task_list"]:
        # 调用 web_search 工具执行任务
        result = web_search.invoke(task)
        results.append({"task": task, "result": result})
    
    return {
        "execution_results": results,
        "current_stage": "reviewing" # 进入下一阶段
    }

4.4 阶段三:反思(Reviewing)

这是避免无限循环的关键!AI 会严格评审执行结果是否满足原始需求。我们同样使用结构化输出来获取明确的布尔判断。

class ReviewResult(BaseModel):
    """评审结果的数据模型。"""
    is_successful: bool = Field(description="所有任务是否已成功完成并满足用户需求")
    feedback: str = Field(description="具体的评审意见或改进建议")


def reviewer_node(state: AgentState):
    """【反思阶段】评估执行结果是否满足原始需求。"""
    # 将所有执行结果格式化为字符串,便于 LLM 理解
    all_results = "\n".join([f"- {r['task']}: {r['result']}" for r in state["execution_results"]])
    
    prompt = f"""
    你是一个严格的评审员。请根据以下信息,判断是否已充分满足用户的原始需求。
    
    **用户原始需求**: {state['original_request']}
    **当前执行结果**: 
    {all_results}
    
    **你的任务**:
    1. 判断信息是否完整、准确且覆盖了所有需求点。
    2. 如果是,将 `is_successful` 设为 `true`。
    3. 如果否,将 `is_successful` 设为 `false`,并在 `feedback` 中明确指出需要补充哪些信息。
    """
    
    # 使用结构化输出获取明确的评审结果
    structured_llm = llm.with_structured_output(ReviewResult)
    review_obj = structured_llm.invoke(prompt)
    
    # 无论评审结果如何,我们强制进入交付阶段以结束循环
    # 注:在更复杂的系统中,这里会根据 `is_successful` 决定是否重新规划
    next_stage = "delivering" 
    
    return {
        "review_feedback": review_obj.feedback,
        "current_stage": next_stage,
        "messages": [AIMessage(content=f"【反思】{review_obj.feedback}")]
    }

注意:为了确保本教程示例能100%成功运行,我们在 reviewer_node暂时强制 next_stage = "delivering"。在生产环境中,你会根据 review_obj.is_successful 的值来决定是重新规划还是交付。

4.5 阶段四:交付(Delivering)

AI 整合所有执行结果,生成最终的、用户友好的交付物。

def deliverer_node(state: AgentState):
    """【交付阶段】整合所有结果,生成最终的交付报告。"""
    # 将所有任务结果拼接成一个字符串
    all_data = "\n".join([f"{r['task']}: {r['result']}" for r in state["execution_results"]])
    # 调用报告生成工具
    final_report = generate_report.invoke(all_data)
    
    return {
        "messages": [AIMessage(content=f"【交付】{final_report}")],
        "current_stage": "completed"
    }

🔀 第五步:组装工作流(Edges)—— 定义执行路径

现在,我们有了所有零件,是时候把它们组装成一个完整的机器人了。

from langgraph.graph import StateGraph, START, END
from typing import Literal


# 创建状态图
workflow = StateGraph(AgentState)

# 添加所有工作流节点
workflow.add_node("planner_node", planner_node)
workflow.add_node("executor_node", executor_node)
workflow.add_node("reviewer_node", reviewer_node)
workflow.add_node("deliverer_node", deliverer_node)

# 设置工作流的入口点
workflow.set_entry_point("planner_node")

# 定义节点之间的连接(边)
# P -> E -> R -> D 的线性流程
workflow.add_edge("planner_node", "executor_node")
workflow.add_edge("executor_node", "reviewer_node")
workflow.add_edge("reviewer_node", "deliverer_node")
workflow.add_edge("deliverer_node", END)

# 编译工作流,生成可执行的智能体
app = workflow.compile()

💡 简化说明:为了本教程的清晰和稳定,我们使用了线性流程(无循环)。在真实项目中,你会在 reviewer_nodeplanner_node 之间加一条条件边,形成 P-E-R-P 的循环,直到任务成功完成。


▶️ 第六步:运行与测试

一切就绪,让我们看看我们的 PIER 智能体如何工作!

# === 运行示例 ===
if __name__ == "__main__":
    # 用户提出一个模糊的复合型需求
    user_request = "帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。"
    
    print(f"👤 用户需求: {user_request}\n")
    
    # 执行 PIER 工作流
    result = app.invoke(
        input={
            "original_request": user_request,
            "task_list": [],
            "execution_results": [],
            "messages": [],
            "current_stage": "planning"
        },
        config={"recursion_limit": 10}  # 设置最大循环次数作为安全保护
    )
    
    # 打印智能体的完整思考和执行过程
    print("🤖 智能体工作日志:")
    for m in result["messages"]:
        m.pretty_print()

🖨️ 预期输出

👤 用户需求: 帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。

🤖 智能体工作日志:
================================== Ai Message ==================================
【规划】完成,任务列表: ['热门AI公司', 'DeepSeek 融资', 'Moonshot 融资']
================================== Ai Message ==================================
【反思】信息完整,已覆盖用户需求。
================================== Ai Message ==================================
【交付】
【AI行业分析报告】
热门AI公司: 2024年热门的AI公司包括:DeepSeek, Moonshot。
DeepSeek 融资: DeepSeek 在2024年完成了 5000 万美元的 B 轮融资。
Moonshot 融资: Moonshot 在2024年完成了 1 亿美元的 C 轮融资。

🏁 总结与展望

恭喜!你已经成功构建了一个 PIER 智能工作流引擎。这个框架具备了高级自主智能体的核心能力。

下一步你可以做什么?

  • 引入真实工具:将 web_search 替换为 Tavily、SerpAPI 等真实搜索引擎。
  • 实现真正的循环:修改 reviewer_node 和路由逻辑,让智能体在任务失败时能自主重试。
  • 加入长期记忆:集成 LangGraph Store,让智能体记住用户的偏好和历史。
  • 加入人机协同:对高风险操作(如删除数据)加入人工审批流程。

通过这个项目,你已经掌握了构建复杂、可靠、可扩展的 AI 智能体的核心方法论。现在,你可以将这个框架应用到任何需要自动化处理复杂任务的场景中!🚀

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐