LangGraph 智能工作流引擎:让 AI 自动拆解需求并执行任务
本文介绍了如何利用LangGraph构建一个智能工作流引擎(PIER智能体),能够自动处理模糊需求并完成任务。主要内容包括: 项目目标:构建Plan-Execute-Inspect-Report(PIER)智能体,实现需求理解、任务拆解、执行和结果交付的全流程自动化 核心组件: 状态(State):设计AgentState数据结构记录工作流进展 工具(Tools):定义web_search和gen
🧠 LangGraph 智能工作流引擎:让 AI 自动拆解需求并执行任务
本教程将带你从零开始,构建一个能理解模糊需求、自动规划、执行任务、自我反思并最终交付结果的高级智能体。你将学到 LangGraph 的核心概念:状态(State)、节点(Node)、边(Edge),并亲手实现一个可用于生产环境的自动化工作流框架。
🎯 项目背景与目标
在日常工作中,我们常常会遇到这样的请求:
“帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。”
这是一个典型的模糊、复合型需求。传统程序无法处理,但我们可以教会 AI 如何像人类项目经理一样工作:
- 规划(Plan):拆解成具体任务。
- 执行(Execute):调用工具完成任务。
- 反思(Review):检查结果是否满足需求。
- 交付(Deliver):整合结果,给出最终答案。
这就是我们要构建的 PIER(Plan-Execute-Inspect-Report)智能体。
🧱 第一步:环境准备与核心概念
1.1 安装依赖
pip install langchain langgraph langchain-openai
1.2 LangGraph 核心概念
- 状态(State): 智能体的“大脑”和“记事本”,用于在工作流各步骤间传递数据。
- 节点(Node): 工作流中的一个处理单元,是一个 Python 函数(如“规划”、“执行”)。
- 边(Edge): 定义节点之间的连接关系,决定了工作流的走向。
我们的目标:用这些乐高积木,搭建一个会思考、会干活的 AI 机器人。
📦 第二步:定义状态(State)—— 智能体的“大脑”
首先,我们需要设计一个数据结构,来记录整个 PIER 流程中的所有关键信息。
import operator
from typing_extensions import TypedDict, Annotated, List, Optional
from langchain_core.messages import AnyMessage
class AgentState(TypedDict):
"""PIER 智能体的全局状态,记录了整个工作流的进展。"""
# 用户的原始模糊需求
original_request: str
# 1. 【规划阶段 - Plan】AI 拆解出的具体、可执行的任务列表
task_list: List[str]
# 2. 【执行阶段 - Execute】每个任务的执行结果
# 格式: [{"task": "搜索A公司", "result": "..."}, ...]
execution_results: List[dict]
# 3. 【反思阶段 - Inspect】对当前结果的评估
review_feedback: Optional[str]
# 4. 所有对话历史,用于 LLM 上下文
# `operator.add` 确保新消息被追加,而不是覆盖旧消息
messages: Annotated[List[AnyMessage], operator.add]
# 流程控制:记录当前处于 PIER 的哪个阶段?
current_stage: str # 取值: "planning", "executing", "reviewing", "delivering"
🛠 第三步:定义工具(Tools)—— 智能体的“手”
为了让 AI 能完成实际工作,我们需要给它提供一些“工具”。这里我们用模拟数据来代替真实的 API 调用。
from langchain.tools import tool
# 模拟一个网络搜索工具(在真实项目中,这里会是 SerpAPI, Tavily 等)
@tool
def web_search(query: str) -> str:
"""根据查询语句在网络上搜索信息。"""
# 模拟真实的搜索结果
mock_results = {
"热门AI公司": "2024年热门的AI公司包括:DeepSeek, Moonshot。",
"DeepSeek 融资": "DeepSeek 在2024年完成了 5000 万美元的 B 轮融资。",
"Moonshot 融资": "Moonshot 在2024年完成了 1 亿美元的 C 轮融资。",
}
return mock_results.get(query, f"未找到关于 '{query}' 的详细信息。")
# 模拟一个报告生成工具
@tool
def generate_report(data: str) -> str:
"""根据提供的数据生成一份结构化的分析报告。"""
return f"【AI行业分析报告】\n{data}"
💡 关键点:我们刻意简化了模拟数据,只包含
DeepSeek和Moonshot,确保任务能被完整执行,避免因数据不全导致的无限循环。
🧠 第四步:定义工作流节点(Nodes)—— PIER 的四个阶段
4.1 初始化 LLM
我们使用 硅基流动 平台的 GLM-Z1-9B 模型,并配置好 API。
from langchain.chat_models import init_chat_model
# 初始化大语言模型
llm = init_chat_model(
model="THUDM/GLM-Z1-9B-0414",
model_provider="openai", # 硅基流动兼容 OpenAI API
api_key="sk-************************************", # 你的 API 密钥
base_url="https://api.siliconflow.cn/v1/",
)
4.2 阶段一:规划(Planning)
AI 需要将模糊的用户需求拆解成一系列具体的、可执行的任务。这里我们使用 Pydantic + 结构化输出 来确保输出格式的可靠性。
from pydantic import BaseModel, Field
class TaskList(BaseModel):
"""用于强制 LLM 返回结构化任务列表的数据模型。"""
tasks: List[str] = Field(description="拆解出的具体任务列表", min_length=1)
def planner_node(state: AgentState):
"""【规划阶段】AI 将原始需求拆解成具体的、可执行的任务列表。"""
prompt = f"""
你是一个专业的项目规划师。请将用户的原始需求拆解成一系列清晰、具体的、可独立执行的子任务。
子任务应按逻辑顺序排列,并且每个任务都应能通过调用 `web_search` 工具来完成。
**用户原始需求**:
{state['original_request']}
**重要提示**:
- 请将“热门AI公司”作为一个任务。
- 请为提到的每家公司单独创建一个“公司名+融资情况”的任务。
- 例如,如果提到A和B公司,任务列表应为:["热门AI公司", "A公司融资情况", "B公司融资情况"]
请直接返回任务列表。
"""
# 使用 LangChain 的结构化输出功能,强制模型返回 TaskList 对象
structured_llm = llm.with_structured_output(TaskList)
task_obj = structured_llm.invoke(prompt)
return {
"task_list": task_obj.tasks,
"current_stage": "executing", # 进入下一阶段
"messages": [AIMessage(content=f"【规划】完成,任务列表: {task_obj.tasks}")]
}
4.3 阶段二:执行(Executing)
AI 按顺序执行规划阶段生成的任务列表,并收集结果。
from langchain_core.messages import AIMessage
def executor_node(state: AgentState):
"""【执行阶段】按顺序执行任务列表中的每一个任务,并收集结果。"""
results = []
for task in state["task_list"]:
# 调用 web_search 工具执行任务
result = web_search.invoke(task)
results.append({"task": task, "result": result})
return {
"execution_results": results,
"current_stage": "reviewing" # 进入下一阶段
}
4.4 阶段三:反思(Reviewing)
这是避免无限循环的关键!AI 会严格评审执行结果是否满足原始需求。我们同样使用结构化输出来获取明确的布尔判断。
class ReviewResult(BaseModel):
"""评审结果的数据模型。"""
is_successful: bool = Field(description="所有任务是否已成功完成并满足用户需求")
feedback: str = Field(description="具体的评审意见或改进建议")
def reviewer_node(state: AgentState):
"""【反思阶段】评估执行结果是否满足原始需求。"""
# 将所有执行结果格式化为字符串,便于 LLM 理解
all_results = "\n".join([f"- {r['task']}: {r['result']}" for r in state["execution_results"]])
prompt = f"""
你是一个严格的评审员。请根据以下信息,判断是否已充分满足用户的原始需求。
**用户原始需求**: {state['original_request']}
**当前执行结果**:
{all_results}
**你的任务**:
1. 判断信息是否完整、准确且覆盖了所有需求点。
2. 如果是,将 `is_successful` 设为 `true`。
3. 如果否,将 `is_successful` 设为 `false`,并在 `feedback` 中明确指出需要补充哪些信息。
"""
# 使用结构化输出获取明确的评审结果
structured_llm = llm.with_structured_output(ReviewResult)
review_obj = structured_llm.invoke(prompt)
# 无论评审结果如何,我们强制进入交付阶段以结束循环
# 注:在更复杂的系统中,这里会根据 `is_successful` 决定是否重新规划
next_stage = "delivering"
return {
"review_feedback": review_obj.feedback,
"current_stage": next_stage,
"messages": [AIMessage(content=f"【反思】{review_obj.feedback}")]
}
注意:为了确保本教程示例能100%成功运行,我们在
reviewer_node中暂时强制next_stage = "delivering"。在生产环境中,你会根据review_obj.is_successful的值来决定是重新规划还是交付。
4.5 阶段四:交付(Delivering)
AI 整合所有执行结果,生成最终的、用户友好的交付物。
def deliverer_node(state: AgentState):
"""【交付阶段】整合所有结果,生成最终的交付报告。"""
# 将所有任务结果拼接成一个字符串
all_data = "\n".join([f"{r['task']}: {r['result']}" for r in state["execution_results"]])
# 调用报告生成工具
final_report = generate_report.invoke(all_data)
return {
"messages": [AIMessage(content=f"【交付】{final_report}")],
"current_stage": "completed"
}
🔀 第五步:组装工作流(Edges)—— 定义执行路径
现在,我们有了所有零件,是时候把它们组装成一个完整的机器人了。
from langgraph.graph import StateGraph, START, END
from typing import Literal
# 创建状态图
workflow = StateGraph(AgentState)
# 添加所有工作流节点
workflow.add_node("planner_node", planner_node)
workflow.add_node("executor_node", executor_node)
workflow.add_node("reviewer_node", reviewer_node)
workflow.add_node("deliverer_node", deliverer_node)
# 设置工作流的入口点
workflow.set_entry_point("planner_node")
# 定义节点之间的连接(边)
# P -> E -> R -> D 的线性流程
workflow.add_edge("planner_node", "executor_node")
workflow.add_edge("executor_node", "reviewer_node")
workflow.add_edge("reviewer_node", "deliverer_node")
workflow.add_edge("deliverer_node", END)
# 编译工作流,生成可执行的智能体
app = workflow.compile()
💡 简化说明:为了本教程的清晰和稳定,我们使用了线性流程(无循环)。在真实项目中,你会在
reviewer_node和planner_node之间加一条条件边,形成 P-E-R-P 的循环,直到任务成功完成。
▶️ 第六步:运行与测试
一切就绪,让我们看看我们的 PIER 智能体如何工作!
# === 运行示例 ===
if __name__ == "__main__":
# 用户提出一个模糊的复合型需求
user_request = "帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。"
print(f"👤 用户需求: {user_request}\n")
# 执行 PIER 工作流
result = app.invoke(
input={
"original_request": user_request,
"task_list": [],
"execution_results": [],
"messages": [],
"current_stage": "planning"
},
config={"recursion_limit": 10} # 设置最大循环次数作为安全保护
)
# 打印智能体的完整思考和执行过程
print("🤖 智能体工作日志:")
for m in result["messages"]:
m.pretty_print()
🖨️ 预期输出
👤 用户需求: 帮我分析下最近 AI 行业的热门公司,并总结他们的融资情况。
🤖 智能体工作日志:
================================== Ai Message ==================================
【规划】完成,任务列表: ['热门AI公司', 'DeepSeek 融资', 'Moonshot 融资']
================================== Ai Message ==================================
【反思】信息完整,已覆盖用户需求。
================================== Ai Message ==================================
【交付】
【AI行业分析报告】
热门AI公司: 2024年热门的AI公司包括:DeepSeek, Moonshot。
DeepSeek 融资: DeepSeek 在2024年完成了 5000 万美元的 B 轮融资。
Moonshot 融资: Moonshot 在2024年完成了 1 亿美元的 C 轮融资。
🏁 总结与展望
恭喜!你已经成功构建了一个 PIER 智能工作流引擎。这个框架具备了高级自主智能体的核心能力。
下一步你可以做什么?
- 引入真实工具:将
web_search替换为 Tavily、SerpAPI 等真实搜索引擎。 - 实现真正的循环:修改
reviewer_node和路由逻辑,让智能体在任务失败时能自主重试。 - 加入长期记忆:集成 LangGraph Store,让智能体记住用户的偏好和历史。
- 加入人机协同:对高风险操作(如删除数据)加入人工审批流程。
通过这个项目,你已经掌握了构建复杂、可靠、可扩展的 AI 智能体的核心方法论。现在,你可以将这个框架应用到任何需要自动化处理复杂任务的场景中!🚀
更多推荐


所有评论(0)