LangGraph从入门到精通完整教程
✅状态管理:设计清晰的状态结构✅图结构:合理划分节点和边✅条件控制:使用条件边实现复杂逻辑✅人工介入:在关键节点添加人工审核✅错误处理:每个节点都要考虑异常情况从简单的单节点图开始,逐步增加复杂度,最终构建出强大的AI Agent应用!
第一章:LangGraph基础认知
1.1 什么是LangGraph?
LangGraph是LangChain团队开发的一个用于构建有状态、多角色应用程序的框架。简单来说,它就像是给AI Agent装上了一个"大脑神经网络",让AI能够:
- 记住之前做过什么(状态管理)
- 按照设定的流程工作(图结构)
- 在不同任务之间灵活切换(节点跳转)
形象比喻:如果把AI Agent比作一个员工,LangGraph就是这个员工的工作流程图,规定了他在什么情况下做什么事,以及如何处理各种突发情况。
1.2 为什么需要LangGraph?
传统的LangChain是线性的链式调用,而LangGraph引入了图结构,解决了以下问题:
- ✅ 循环处理:可以让AI重复某个步骤直到满足条件
- ✅ 条件分支:根据不同情况执行不同逻辑
- ✅ 状态持久化:记住整个对话的上下文
- ✅ 人工介入:在关键节点可以暂停等待人工审核
1.3 核心概念
节点(Nodes)
节点就是执行具体任务的函数,比如"调用LLM"、"搜索数据库"、"计算结果"等。
边(Edges)
边决定了节点之间的连接关系,有两种:
- 普通边:直接连接两个节点
- 条件边:根据条件决定跳转到哪个节点
状态(State)
状态是在整个图执行过程中共享的数据,每个节点都可以读取和修改。
图(Graph)
由节点、边、状态组成的完整工作流。
第二章:环境搭建
2.1 安装依赖
# 安装LangGraph和相关依赖
pip install langgraph langchain langchain-openai
# 如果使用其他模型
pip install langchain-anthropic # Anthropic Claude
pip install langchain-google-genai # Google Gemini
2.2 配置API密钥
import os
# 设置OpenAI API密钥
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# 或者使用环境变量文件
from dotenv import load_dotenv
load_dotenv()
第三章:第一个LangGraph应用
3.1 最简单的示例:单节点图
from langgraph.graph import StateGraph, END
from typing import TypedDict
# 1. 定义状态
class State(TypedDict):
message: str
count: int
# 2. 定义节点函数
def process_node(state: State) -> State:
return {
"message": f"处理了: {state['message']}",
"count": state["count"] + 1
}
# 3. 创建图
workflow = StateGraph(State)
# 4. 添加节点
workflow.add_node("process", process_node)
# 5. 设置入口和出口
workflow.set_entry_point("process")
workflow.add_edge("process", END)
# 6. 编译图
app = workflow.compile()
# 7. 运行
result = app.invoke({"message": "Hello", "count": 0})
print(result)
# 输出: {'message': '处理了: Hello', 'count': 1}
代码解析:
State:定义了图中共享的数据结构process_node:节点函数,接收状态并返回新状态workflow.compile():将图编译成可执行的应用
3.2 多节点线性流程
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
input: str
output: str
steps: list
# 节点1:数据清洗
def clean_data(state: State) -> State:
cleaned = state["input"].strip().lower()
return {
"input": state["input"],
"output": cleaned,
"steps": state["steps"] + ["数据清洗完成"]
}
# 节点2:数据处理
def process_data(state: State) -> State:
processed = f"[已处理] {state['output']}"
return {
"input": state["input"],
"output": processed,
"steps": state["steps"] + ["数据处理完成"]
}
# 节点3:数据保存
def save_data(state: State) -> State:
return {
"input": state["input"],
"output": state["output"],
"steps": state["steps"] + ["数据保存完成"]
}
# 构建图
workflow = StateGraph(State)
workflow.add_node("clean", clean_data)
workflow.add_node("process", process_data)
workflow.add_node("save", save_data)
# 连接节点
workflow.set_entry_point("clean")
workflow.add_edge("clean", "process")
workflow.add_edge("process", "save")
workflow.add_edge("save", END)
# 编译运行
app = workflow.compile()
result = app.invoke({
"input": " Hello World ",
"output": "",
"steps": []
})
print(result)
# 输出所有步骤和最终结果
第四章:条件分支与循环
4.1 条件边:根据条件选择路径
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class State(TypedDict):
number: int
result: str
# 判断奇偶性
def check_number(state: State) -> State:
return state
# 处理偶数
def handle_even(state: State) -> State:
return {
"number": state["number"],
"result": f"{state['number']} 是偶数"
}
# 处理奇数
def handle_odd(state: State) -> State:
return {
"number": state["number"],
"result": f"{state['number']} 是奇数"
}
# 路由函数:决定下一步走哪
def route_number(state: State) -> Literal["even", "odd"]:
if state["number"] % 2 == 0:
return "even"
else:
return "odd"
# 构建图
workflow = StateGraph(State)
workflow.add_node("check", check_number)
workflow.add_node("even", handle_even)
workflow.add_node("odd", handle_odd)
workflow.set_entry_point("check")
# 添加条件边
workflow.add_conditional_edges(
"check", # 从哪个节点出发
route_number, # 路由函数
{
"even": "even", # 如果返回"even",跳转到even节点
"odd": "odd" # 如果返回"odd",跳转到odd节点
}
)
workflow.add_edge("even", END)
workflow.add_edge("odd", END)
app = workflow.compile()
# 测试
print(app.invoke({"number": 4, "result": ""}))
# {'number': 4, 'result': '4 是偶数'}
print(app.invoke({"number": 7, "result": ""}))
# {'number': 7, 'result': '7 是奇数'}
4.2 循环处理:重试机制
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class State(TypedDict):
attempts: int
max_attempts: int
success: bool
message: str
# 模拟可能失败的操作
def attempt_operation(state: State) -> State:
import random
success = random.random() > 0.5 # 50%成功率
return {
"attempts": state["attempts"] + 1,
"max_attempts": state["max_attempts"],
"success": success,
"message": f"第{state['attempts'] + 1}次尝试: {'成功' if success else '失败'}"
}
# 检查是否需要重试
def should_retry(state: State) -> Literal["retry", "finish"]:
if state["success"]:
return "finish"
elif state["attempts"] >= state["max_attempts"]:
return "finish"
else:
return "retry"
# 构建图
workflow = StateGraph(State)
workflow.add_node("attempt", attempt_operation)
workflow.set_entry_point("attempt")
# 条件循环
workflow.add_conditional_edges(
"attempt",
should_retry,
{
"retry": "attempt", # 继续尝试
"finish": END # 结束
}
)
app = workflow.compile()
# 运行
result = app.invoke({
"attempts": 0,
"max_attempts": 3,
"success": False,
"message": ""
})
print(f"总共尝试: {result['attempts']}次")
print(f"最终状态: {result['message']}")
print(f"是否成功: {result['success']}")
第五章:集成LLM构建智能Agent
5.1 简单的聊天Agent
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
from operator import add
class State(TypedDict):
messages: Annotated[list, add] # 自动追加消息
# 调用LLM
def call_model(state: State) -> State:
llm = ChatOpenAI(model="gpt-4", temperature=0)
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 构建图
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)
app = workflow.compile()
# 使用
from langchain_core.messages import HumanMessage
result = app.invoke({
"messages": [HumanMessage(content="你好,请介绍一下自己")]
})
print(result["messages"][-1].content)
5.2 带工具调用的Agent
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Annotated, Literal
from operator import add
# 定义工具
@tool
def search_weather(city: str) -> str:
"""查询城市天气"""
# 模拟查询
weather_data = {
"北京": "晴天,25℃",
"上海": "多云,22℃",
"深圳": "雨天,28℃"
}
return weather_data.get(city, "未找到该城市天气信息")
@tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
result = eval(expression)
return f"计算结果: {result}"
except:
return "计算错误"
# 工具列表
tools = [search_weather, calculate]
tool_node = ToolNode(tools)
# 定义状态
class State(TypedDict):
messages: Annotated[list, add]
# LLM节点
def call_model(state: State) -> State:
llm = ChatOpenAI(model="gpt-4", temperature=0).bind_tools(tools)
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 路由函数:决定是调用工具还是结束
def should_continue(state: State) -> Literal["tools", "end"]:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return "end"
# 构建图
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
"end": END
}
)
workflow.add_edge("tools", "agent") # 工具执行后回到agent
app = workflow.compile()
# 测试
result = app.invoke({
"messages": [HumanMessage(content="北京今天天气怎么样?")]
})
for msg in result["messages"]:
if hasattr(msg, "content") and msg.content:
print(msg.content)
第六章:人工介入(Human-in-the-Loop)
6.1 需要人工审核的流程
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict
class State(TypedDict):
data: str
approved: bool
feedback: str
# 处理数据
def process_data(state: State) -> State:
return {
"data": f"已处理: {state['data']}",
"approved": False,
"feedback": ""
}
# 人工审核节点(会暂停)
def human_review(state: State) -> State:
# 这里会暂停,等待外部输入
return state
# 最终提交
def submit_data(state: State) -> State:
return {
"data": state["data"],
"approved": True,
"feedback": "已提交"
}
# 构建图(需要检查点保存器)
memory = MemorySaver()
workflow = StateGraph(State)
workflow.add_node("process", process_data)
workflow.add_node("review", human_review)
workflow.add_node("submit", submit_data)
workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "submit")
workflow.add_edge("submit", END)
app = workflow.compile(checkpointer=memory, interrupt_before=["review"])
# 使用
config = {"configurable": {"thread_id": "1"}}
# 第一步:执行到审核节点
result = app.invoke({
"data": "重要数据",
"approved": False,
"feedback": ""
}, config)
print("等待人工审核...")
print(result)
# 第二步:人工审核通过后继续
result = app.invoke(None, config) # 传入None继续执行
print("审核通过,已提交:")
print(result)
第七章:实战项目 - 智能客服Agent
7.1 项目需求
构建一个智能客服系统,功能包括:
- 意图识别(咨询、投诉、退款)
- 根据意图调用不同处理流程
- 查询订单信息
- 复杂问题转人工
7.2 完整代码
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Literal, Annotated
from operator import add
# 状态定义
class CustomerServiceState(TypedDict):
messages: Annotated[list, add]
intent: str # 意图:query, complaint, refund, transfer
order_id: str
customer_info: dict
resolved: bool
# 1. 意图识别节点
def classify_intent(state: CustomerServiceState) -> CustomerServiceState:
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = f"""
分析用户意图,只返回以下之一:query(咨询), complaint(投诉), refund(退款), transfer(转人工)
用户消息: {state['messages'][-1].content}
"""
response = llm.invoke([HumanMessage(content=prompt)])
intent = response.content.strip().lower()
return {
"intent": intent,
"messages": []
}
# 2. 订单查询节点
def handle_query(state: CustomerServiceState) -> CustomerServiceState:
# 模拟订单查询
order_info = {
"order_id": "12345",
"status": "已发货",
"tracking": "SF1234567890"
}
response_msg = f"您的订单信息如下:\n订单号:{order_info['order_id']}\n状态:{order_info['status']}\n物流单号:{order_info['tracking']}"
return {
"messages": [SystemMessage(content=response_msg)],
"resolved": True
}
# 3. 投诉处理节点
def handle_complaint(state: CustomerServiceState) -> CustomerServiceState:
response_msg = "非常抱歉给您带来不便。我们已记录您的投诉,客服专员会在24小时内联系您。投诉单号:C20240107001"
return {
"messages": [SystemMessage(content=response_msg)],
"resolved": True
}
# 4. 退款处理节点
def handle_refund(state: CustomerServiceState) -> CustomerServiceState:
response_msg = "退款申请已提交,预计3-5个工作日到账。退款单号:R20240107001"
return {
"messages": [SystemMessage(content=response_msg)],
"resolved": True
}
# 5. 转人工节点
def transfer_human(state: CustomerServiceState) -> CustomerServiceState:
response_msg = "正在为您转接人工客服,请稍候..."
return {
"messages": [SystemMessage(content=response_msg)],
"resolved": False
}
# 路由函数
def route_intent(state: CustomerServiceState) -> Literal["query", "complaint", "refund", "transfer"]:
return state["intent"]
# 构建图
workflow = StateGraph(CustomerServiceState)
# 添加节点
workflow.add_node("classify", classify_intent)
workflow.add_node("query", handle_query)
workflow.add_node("complaint", handle_complaint)
workflow.add_node("refund", handle_refund)
workflow.add_node("transfer", transfer_human)
# 设置入口
workflow.set_entry_point("classify")
# 添加条件边
workflow.add_conditional_edges(
"classify",
route_intent,
{
"query": "query",
"complaint": "complaint",
"refund": "refund",
"transfer": "transfer"
}
)
# 所有处理节点都指向END
workflow.add_edge("query", END)
workflow.add_edge("complaint", END)
workflow.add_edge("refund", END)
workflow.add_edge("transfer", END)
# 编译
app = workflow.compile()
# 测试不同场景
test_cases = [
"我的订单什么时候发货?",
"你们的产品质量太差了,我要投诉!",
"我要申请退款",
"这个问题太复杂了,我要找真人"
]
for test_input in test_cases:
print(f"\n用户: {test_input}")
result = app.invoke({
"messages": [HumanMessage(content=test_input)],
"intent": "",
"order_id": "",
"customer_info": {},
"resolved": False
})
print(f"意图: {result['intent']}")
print(f"回复: {result['messages'][-1].content}")
print(f"是否解决: {result['resolved']}")
第八章:高级特性
8.1 子图(Subgraphs)
当某个功能模块很复杂时,可以单独构建一个子图:
from langgraph.graph import StateGraph, END
# 子图状态
class SubState(TypedDict):
value: int
# 子图节点
def sub_step1(state: SubState) -> SubState:
return {"value": state["value"] * 2}
def sub_step2(state: SubState) -> SubState:
return {"value": state["value"] + 10}
# 创建子图
sub_workflow = StateGraph(SubState)
sub_workflow.add_node("step1", sub_step1)
sub_workflow.add_node("step2", sub_step2)
sub_workflow.set_entry_point("step1")
sub_workflow.add_edge("step1", "step2")
sub_workflow.add_edge("step2", END)
sub_graph = sub_workflow.compile()
# 主图状态
class MainState(TypedDict):
number: int
result: int
# 主图节点(调用子图)
def call_subgraph(state: MainState) -> MainState:
result = sub_graph.invoke({"value": state["number"]})
return {"number": state["number"], "result": result["value"]}
# 主图
main_workflow = StateGraph(MainState)
main_workflow.add_node("process", call_subgraph)
main_workflow.set_entry_point("process")
main_workflow.add_edge("process", END)
main_app = main_workflow.compile()
# 测试
result = main_app.invoke({"number": 5, "result": 0})
print(result) # number=5, result=20 (5*2+10)
8.2 并行执行节点
from langgraph.graph import StateGraph, END
from typing import TypedDict
import time
class State(TypedDict):
task1_result: str
task2_result: str
task3_result: str
def task1(state: State) -> State:
time.sleep(1) # 模拟耗时操作
return {"task1_result": "任务1完成"}
def task2(state: State) -> State:
time.sleep(1)
return {"task2_result": "任务2完成"}
def task3(state: State) -> State:
time.sleep(1)
return {"task3_result": "任务3完成"}
def aggregate(state: State) -> State:
return state
# 构建图
workflow = StateGraph(State)
workflow.add_node("task1", task1)
workflow.add_node("task2", task2)
workflow.add_node("task3", task3)
workflow.add_node("aggregate", aggregate)
workflow.set_entry_point("task1")
workflow.add_edge("task1", "task2")
workflow.add_edge("task1", "task3") # task1完成后,task2和task3并行
workflow.add_edge("task2", "aggregate")
workflow.add_edge("task3", "aggregate")
workflow.add_edge("aggregate", END)
app = workflow.compile()
start = time.time()
result = app.invoke({
"task1_result": "",
"task2_result": "",
"task3_result": ""
})
end = time.time()
print(f"总耗时: {end - start:.2f}秒")
print(result)
8.3 流式输出
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
count: int
def counter(state: State) -> State:
return {"count": state["count"] + 1}
workflow = StateGraph(State)
workflow.add_node("count", counter)
workflow.set_entry_point("count")
workflow.add_edge("count", END)
app = workflow.compile()
# 流式执行
for chunk in app.stream({"count": 0}):
print(chunk)
# 输出每一步的状态变化
第九章:最佳实践
9.1 状态设计原则
- 最小化状态:只保存必要的信息
- 使用Annotated:对列表类型使用
Annotated[list, add]自动追加 - 明确类型:使用TypedDict严格定义状态结构
from typing import TypedDict, Annotated
from operator import add
# 好的状态设计
class GoodState(TypedDict):
messages: Annotated[list, add] # 消息自动追加
user_id: str # 用户ID
context: dict # 上下文信息
# 避免的状态设计
class BadState(TypedDict):
everything: dict # 太宽泛,不明确
9.2 错误处理
from langgraph.graph import StateGraph, END
from typing import TypedDict
class State(TypedDict):
data: str
error: str
success: bool
def risky_operation(state: State) -> State:
try:
# 可能失败的操作
result = process_data(state["data"])
return {
"data": result,
"error": "",
"success": True
}
except Exception as e:
return {
"data": state["data"],
"error": str(e),
"success": False
}
def handle_error(state: State) -> State:
# 错误处理逻辑
return {
"data": state["data"],
"error": f"已处理错误: {state['error']}",
"success": False
}
workflow = StateGraph(State)
workflow.add_node("operation", risky_operation)
workflow.add_node("error_handler", handle_error)
workflow.set_entry_point("operation")
workflow.add_conditional_edges(
"operation",
lambda s: "success" if s["success"] else "error",
{
"success": END,
"error": "error_handler"
}
)
workflow.add_edge("error_handler", END)
app = workflow.compile()
9.3 日志和调试
from langgraph.graph import StateGraph, END
from typing import TypedDict
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class State(TypedDict):
value: int
def logged_node(state: State) -> State:
logger.info(f"节点开始执行,当前状态: {state}")
result = {"value": state["value"] * 2}
logger.info(f"节点执行完成,新状态: {result}")
return result
workflow = StateGraph(State)
workflow.add_node("process", logged_node)
workflow.set_entry_point("process")
workflow.add_edge("process", END)
app = workflow.compile()
# 查看图结构
print(app.get_graph().draw_ascii())
# 执行时会输出日志
result = app.invoke({"value": 5})
9.4 性能优化
- 避免重复计算:将结果存储在状态中
- 使用缓存:对LLM调用使用缓存
- 批处理:合并多个小请求
- 并行执行:独立任务使用并行节点
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache
# 启用LLM缓存
set_llm_cache(InMemoryCache())
# 节点函数中避免重复计算
def optimized_node(state: State) -> State:
# 检查是否已计算
if "cached_result" in state:
return state
# 执行计算
result = expensive_computation()
return {
**state,
"cached_result": result
}
第十章:常见问题解答
Q1: LangGraph和LangChain有什么区别?
A: LangChain是链式调用,适合简单的线性流程;LangGraph是图结构,支持循环、条件分支、状态管理,适合复杂的多步骤Agent。
Q2: 如何保存和恢复对话状态?
A: 使用Checkpointer:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 使用thread_id标识会话
config = {"configurable": {"thread_id": "user_123"}}
result = app.invoke(input_data, config)
Q3: 如何可视化图结构?
from IPython.display import Image
# 生成图片
Image(app.get_graph().draw_mermaid_png())
# 或者ASCII形式
print(app.get_graph().draw_ascii())
Q4: 节点函数必须返回完整状态吗?
A: 不需要,只返回修改的字段即可,LangGraph会自动合并。
Q5: 如何处理长时间运行的任务?
A: 使用异步节点和流式输出:
async def async_node(state: State) -> State:
result = await long_running_task()
return {"result": result}
# 异步执行
result = await app.ainvoke(input_data)
总结
LangGraph是构建复杂AI Agent的强大工具,掌握以下核心点:
- ✅ 状态管理:设计清晰的状态结构
- ✅ 图结构:合理划分节点和边
- ✅ 条件控制:使用条件边实现复杂逻辑
- ✅ 人工介入:在关键节点添加人工审核
- ✅ 错误处理:每个节点都要考虑异常情况
从简单的单节点图开始,逐步增加复杂度,最终构建出强大的AI Agent应用!
学习路径建议
- 第1-3天:理解基础概念,完成简单示例
- 第4-7天:掌握条件分支和循环,集成LLM
- 第8-14天:完成实战项目,理解高级特性
- 第15天+:优化性能,探索复杂应用场景
祝你学习顺利!🚀
更简单的小白学习版本
LangGraph零基础学习指南 - 像搭积木一样简单
别担心,这份教程会用最简单的语言,像讲故事一样教你。即使你是小白也完全没问题!
📺 推荐的视频教程资源
在开始之前,我先为你推荐一些优质的学习资源:
英文资源(带字幕)
-
LangChain Academy - Introduction to LangGraph(免费)
- 官方课程,完全免费
- 地址:https://academy.langchain.com/courses/intro-to-langgraph
- 这是最权威的入门课程
-
DataCamp的LangGraph视频教程
- 有配套的文字讲解
- 地址:https://www.datacamp.com/tutorial/langgraph-tutorial
-
Real Python的LangGraph教程
- 有详细的代码示例
- 地址:https://realpython.com/langgraph-python/
中文资源
-
知乎上的LangGraph入门系列
- 搜索"LangGraph 入门"会有很多文章
- 博客园也有很多中文教程
-
B站视频
- 搜索"LangGraph 教程"有不少讲解视频
- 建议选择2024年以后的视频,因为库更新很快
建议学习路径
- 先看本文档理解概念(1-2小时)
- 看官方视频课程(2-3小时)
- 跟着本文档敲代码(3-5天)
- 做自己的小项目(1-2周)
第一部分:理论基础 - 先理解概念
1.1 什么是Agent?就像你的智能助理
想象一下,你有一个超级聪明的助理:
普通助理(传统程序):
- 你说:"帮我订机票"
- 助理:只能按照固定步骤操作,如果遇到问题就卡住了
AI Agent(智能助理):
- 你说:"帮我订机票"
- Agent会:
- 先问你:"去哪里?什么时候?"
- 搜索可用航班
- 如果没票,自动帮你查火车
- 找到选项后问你:"这几个可以吗?"
- 根据你的反馈继续操作
Agent的核心能力:
- 🤔 能思考:分析问题,制定计划
- 🔧 会使用工具:调用搜索、数据库、计算器等
- 🔄 能循环:不断尝试直到完成任务
- 🧠 有记忆:记得之前说过什么
1.2 为什么需要LangGraph?
想象你要开餐厅:
传统方式(LangChain的链式调用):
接待客人 → 点菜 → 做菜 → 上菜 → 结账
这是一条直线,每个步骤按顺序执行,不能回头,不能跳过。
LangGraph方式(图结构):
接待客人 → 点菜 → 做菜 → 上菜 → 结账
↑ ↓
└─ 客人要加菜 ←┘
↓
菜做错了
↓
重新做菜
看到区别了吗?可以循环、可以跳转、可以根据情况走不同的路径!
1.3 图(Graph)到底是什么?
图就是一张流程图,由3个部分组成:
1️⃣ 节点(Node)- 工作站
就像餐厅的各个工作区:
- 接待台(接待节点)
- 厨房(做菜节点)
- 收银台(结账节点)
在编程中,节点就是一个函数,负责做一件具体的事。
2️⃣ 边(Edge)- 路径
连接节点的箭头,告诉我们"接下来去哪里":
- 普通边:直接连接,"做完A就去B"
- 条件边:根据情况决定,"如果下雨去A,否则去B"
3️⃣ 状态(State)- 记事本
一个共享的记事本,记录所有重要信息:
- 客人点了什么菜?
- 菜做到哪一步了?
- 账单多少钱?
每个节点都可以读和写这个记事本。
第二部分:核心概念深度理解
2.1 状态管理 - 程序的记忆
场景故事: 假设你在和AI聊天:
没有状态管理(每次都失忆):
你:"我叫小明"
AI:"你好!"
你:"我叫什么名字?"
AI:"我不知道" ← 忘记了!
有状态管理:
你:"我叫小明"
AI:"你好小明!"(记在状态里:name="小明")
你:"我叫什么名字?"
AI:"你叫小明啊" ← 记得!
代码表示状态:
# 定义一个状态(记事本)
class State(TypedDict):
name: str # 用户名字
messages: list # 聊天记录
count: int # 计数器
2.2 节点执行 - 做事的地方
每个节点就是一个函数,它:
- 接收状态(读记事本)
- 做一些事情
- 返回新状态(更新记事本)
生活例子:
输入状态:{"钱": 100, "购物清单": ["苹果", "牛奶"]}
↓
【购物节点】
↓
输出状态:{"钱": 80, "购物清单": [], "购买物品": ["苹果", "牛奶"]}
代码表示:
def shopping(state):
# 1. 读状态
money = state["钱"]
items = state["购物清单"]
# 2. 做事情
total_cost = buy_items(items)
money = money - total_cost
# 3. 返回新状态
return {
"钱": money,
"购物清单": [],
"购买物品": items
}
2.3 边的流转 - 决定下一步
普通边 - 固定路线
就像地铁线:
A站 → B站 → C站(固定顺序)
条件边 - 智能路线
就像导航软件:
你在家 → 【判断天气】
↓
下雨 → 打车
晴天 → 骑车
代码表示:
def 判断天气(state):
if state["天气"] == "下雨":
return "打车"
else:
return "骑车"
2.4 循环 - 重复直到成功
生活场景:考试
开始 → 做题 → 检查
↑ ↓
└── 有错误(继续改)
↓
没错误(结束)
AI场景:写代码
开始 → 写代码 → 测试
↑ ↓
└── 有bug(继续修)
↓
没bug(完成)
第三部分:手把手实战 - 从0开始
3.1 安装环境
第1步:安装Python
- 确保你有Python 3.8以上版本
- 打开命令行,输入
python --version检查
第2步:安装库
pip install langgraph
pip install langchain
pip install langchain-openai
第3步:配置API密钥
- 去OpenAI官网申请API密钥
- 或者用国内的替代服务(智谱、通义千问等)
import os
os.environ["OPENAI_API_KEY"] = "你的密钥"
3.2 第一个程序 - Hello World
目标:创建一个简单的图,有一个节点,输出"Hello"
步骤1:导入库
from langgraph.graph import StateGraph, END
from typing import TypedDict
步骤2:定义状态(记事本)
class State(TypedDict):
message: str # 我们要传递的消息
步骤3:创建节点(做事的函数)
def say_hello(state: State) -> State:
# 这个函数接收状态,返回新状态
return {"message": "Hello, World!"}
步骤4:构建图
# 1. 创建一个空图
graph = StateGraph(State)
# 2. 添加节点
graph.add_node("hello", say_hello)
# 3. 设置入口(从哪里开始)
graph.set_entry_point("hello")
# 4. 设置出口(到哪里结束)
graph.add_edge("hello", END)
# 5. 编译成可运行的应用
app = graph.compile()
步骤5:运行
# 运行图
result = app.invoke({"message": ""})
print(result) # {'message': 'Hello, World!'}
完整代码:
from langgraph.graph import StateGraph, END
from typing import TypedDict
# 定义状态
class State(TypedDict):
message: str
# 定义节点
def say_hello(state: State) -> State:
return {"message": "Hello, World!"}
# 构建图
graph = StateGraph(State)
graph.add_node("hello", say_hello)
graph.set_entry_point("hello")
graph.add_edge("hello", END)
# 运行
app = graph.compile()
result = app.invoke({"message": ""})
print(result["message"]) # 输出:Hello, World!
理解这段代码:
State:定义了我们的"记事本"有什么内容say_hello:一个节点,负责说"Hello"graph.add_node:把节点添加到图里set_entry_point:告诉程序从哪里开始add_edge:告诉程序做完这个节点后去哪里compile:把图变成可执行的程序invoke:运行程序
3.3 第二个程序 - 简单对话
目标:创建一个会回答问题的AI
步骤1:准备工作
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
from operator import add
# 创建AI模型
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
步骤2:定义状态
class ChatState(TypedDict):
# 这里的Annotated[list, add]是特殊语法
# 意思是:这个列表会自动追加新消息,不会覆盖旧的
messages: Annotated[list, add]
步骤3:创建对话节点
def chat_node(state: ChatState) -> ChatState:
# 调用AI回答问题
response = llm.invoke(state["messages"])
# 返回AI的回复
return {"messages": [response]}
步骤4:构建图
graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.set_entry_point("chat")
graph.add_edge("chat", END)
app = graph.compile()
步骤5:使用
# 用户提问
result = app.invoke({
"messages": [HumanMessage(content="你好,请做个自我介绍")]
})
# 打印AI的回复
print(result["messages"][-1].content)
完整代码:
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
from operator import add
# 状态
class ChatState(TypedDict):
messages: Annotated[list, add]
# 节点
def chat_node(state: ChatState) -> ChatState:
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
response = llm.invoke(state["messages"])
return {"messages": [response]}
# 构建图
graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.set_entry_point("chat")
graph.add_edge("chat", END)
app = graph.compile()
# 使用
result = app.invoke({
"messages": [HumanMessage(content="你好!")]
})
print(result["messages"][-1].content)
3.4 第三个程序 - 条件分支
场景:判断数字是奇数还是偶数,走不同的路径
流程图:
输入数字
↓
【判断】
↓
偶数 → 【显示"是偶数"】 → 结束
↓
奇数 → 【显示"是奇数"】 → 结束
步骤1:定义状态
class State(TypedDict):
number: int # 输入的数字
result: str # 结果
步骤2:创建三个节点
# 节点1:判断节点(什么都不做,只是传递)
def check_node(state: State) -> State:
return state
# 节点2:处理偶数
def handle_even(state: State) -> State:
return {
"number": state["number"],
"result": f"{state['number']} 是偶数"
}
# 节点3:处理奇数
def handle_odd(state: State) -> State:
return {
"number": state["number"],
"result": f"{state['number']} 是奇数"
}
步骤3:创建路由函数(决定走哪条路)
def route_by_number(state: State):
# 根据数字决定下一步
if state["number"] % 2 == 0:
return "even" # 偶数走even节点
else:
return "odd" # 奇数走odd节点
步骤4:构建图
graph = StateGraph(State)
# 添加所有节点
graph.add_node("check", check_node)
graph.add_node("even", handle_even)
graph.add_node("odd", handle_odd)
# 设置入口
graph.set_entry_point("check")
# 关键:添加条件边
graph.add_conditional_edges(
"check", # 从check节点出发
route_by_number, # 用这个函数决定去哪
{
"even": "even", # 如果函数返回"even",去even节点
"odd": "odd" # 如果函数返回"odd",去odd节点
}
)
# 两个节点都结束
graph.add_edge("even", END)
graph.add_edge("odd", END)
app = graph.compile()
步骤5:测试
# 测试偶数
result = app.invoke({"number": 8, "result": ""})
print(result["result"]) # 8 是偶数
# 测试奇数
result = app.invoke({"number": 7, "result": ""})
print(result["result"]) # 7 是奇数
理解条件边:
add_conditional_edges:添加条件边- 第一个参数:从哪个节点出发
- 第二个参数:路由函数,返回下一个节点的名字
- 第三个参数:映射表,把返回值对应到实际节点
3.5 第四个程序 - 循环重试
场景:模拟一个可能失败的操作,失败了就重试
流程图:
开始
↓
【尝试操作】
↓
成功?
├─ 是 → 结束
└─ 否 → 【尝试操作】(循环回去)
完整代码:
from langgraph.graph import StateGraph, END
from typing import TypedDict
import random
# 状态
class RetryState(TypedDict):
attempts: int # 尝试次数
max_attempts: int # 最大尝试次数
success: bool # 是否成功
message: str # 结果信息
# 节点:尝试操作
def try_operation(state: RetryState) -> RetryState:
# 模拟50%成功率的操作
success = random.random() > 0.5
attempts = state["attempts"] + 1
message = f"第{attempts}次尝试: "
message += "成功!" if success else "失败"
return {
"attempts": attempts,
"max_attempts": state["max_attempts"],
"success": success,
"message": message
}
# 路由函数:决定是重试还是结束
def should_retry(state: RetryState):
# 如果成功了,结束
if state["success"]:
return "finish"
# 如果达到最大次数,也结束
if state["attempts"] >= state["max_attempts"]:
return "finish"
# 否则继续重试
return "retry"
# 构建图
graph = StateGraph(RetryState)
graph.add_node("try", try_operation)
graph.set_entry_point("try")
# 添加条件边(关键:形成循环)
graph.add_conditional_edges(
"try",
should_retry,
{
"retry": "try", # 重试:回到try节点(循环!)
"finish": END # 结束:到END
}
)
app = graph.compile()
# 运行
result = app.invoke({
"attempts": 0,
"max_attempts": 5,
"success": False,
"message": ""
})
print(f"总共尝试了 {result['attempts']} 次")
print(f"最终状态: {result['message']}")
print(f"是否成功: {result['success']}")
运行结果示例:
总共尝试了 3 次
最终状态: 第3次尝试: 成功!
是否成功: True
理解循环: 关键在于 "retry": "try",这让图可以回到之前的节点,形成循环!
第四部分:进阶理解
4.1 状态的更新机制
问题:如果多个节点都要修改同一个数据怎么办?
例子:记录操作历史
class State(TypedDict):
history: list # 操作历史
# 如果直接这样写,会有问题:
def node1(state):
return {"history": ["操作1"]} # 会覆盖原来的历史!
解决方案:使用Annotated
from operator import add
class State(TypedDict):
# add表示:追加,不覆盖
history: Annotated[list, add]
# 现在这样写就对了:
def node1(state):
return {"history": ["操作1"]} # 会追加到现有历史
def node2(state):
return {"history": ["操作2"]} # 继续追加
4.2 子图 - 把复杂任务拆分
场景:做一顿饭太复杂,拆成几个小任务
主任务(主图):
准备食材 → 【烹饪(子图)】 → 摆盘 → 结束
烹饪子图:
清洗 → 切菜 → 炒菜 → 调味
代码示例:
# 子图的状态
class CookingState(TypedDict):
dish: str
status: str
# 子图的节点
def wash(state):
return {"dish": state["dish"], "status": "已清洗"}
def cut(state):
return {"dish": state["dish"], "status": "已切好"}
def cook(state):
return {"dish": state["dish"], "status": "已炒好"}
# 创建子图
sub_graph = StateGraph(CookingState)
sub_graph.add_node("wash", wash)
sub_graph.add_node("cut", cut)
sub_graph.add_node("cook", cook)
sub_graph.set_entry_point("wash")
sub_graph.add_edge("wash", "cut")
sub_graph.add_edge("cut", "cook")
sub_graph.add_edge("cook", END)
cooking_app = sub_graph.compile()
# 主图可以调用子图
def cooking_node(state):
result = cooking_app.invoke({
"dish": state["dish"],
"status": ""
})
return {"dish": result["dish"], "status": result["status"]}
第五部分:学习建议
5.1 学习顺序
第1周:基础概念
- 理解图、节点、边、状态
- 完成3个简单示例
- 每天1-2小时
第2周:条件和循环
- 练习条件边
- 练习循环逻辑
- 尝试组合使用
第3周:集成LLM
- 学习如何调用AI模型
- 实现简单对话
- 添加工具调用
第4周:实战项目
- 选一个小项目
- 从头到尾实现
- 不断迭代优化
5.2 遇到问题怎么办?
1. 代码报错
- 仔细看错误信息
- 检查拼写、缩进
- Google搜索错误信息
2. 逻辑不通
- 画出流程图
- 在纸上模拟执行
- 用print打印状态
3. 概念不懂
- 看视频教程
- 找类比的生活场景
- 问AI助手解释
5.3 练习建议
每天必做:
- 回顾昨天的代码
- 写一个新的小例子
- 记录遇到的问题
每周必做:
- 完成一个小项目
- 复习本周知识点
- 总结学到的东西
不要做的:
- ❌ 一次学太多
- ❌ 只看不练
- ❌ 遇到困难就放弃
第六部分:常见问题解答
Q1: 我是零基础,能学会吗? A: 完全可以!只要你:
- 会基础的Python(会写函数就行)
- 愿意花时间练习
- 不怕出错,多试几次
Q2: 要学多久? A:
- 基础入门:1-2周
- 能做简单项目:3-4周
- 精通:2-3个月
Q3: 和LangChain什么关系? A:
- LangChain是工具箱(提供各种工具)
- LangGraph是流程管理(组织这些工具怎么用)
- 可以一起用,也可以单独用LangGraph
Q4: 必须要OpenAI的API吗? A: 不是必须的,可以用:
- 国内的通义千问、智谱AI
- 本地模型(Ollama)
- 其他支持的模型
Q5: 图画不出来怎么办? A:
- 先用纸笔画流程图
- 用简单的方框和箭头
- 不用画得很专业,自己看懂就行
第七部分:第一个实用项目 - 天气查询机器人
目标:做一个能查天气的聊天机器人
功能:
- 用户说"北京天气"
- AI识别出要查天气
- 调用天气查询工具
- 返回结果
代码(逐步理解):
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
# 1. 定义状态
class WeatherBotState(TypedDict):
messages: Annotated[list, add] # 对话历史
city: str # 要查询的城市
weather_result: str # 天气结果
# 2. 工具:查询天气
def get_weather(city: str) -> str:
# 这里模拟查询(真实项目会调用API)
weather_data = {
"北京": "晴天,15°C",
"上海": "多云,18°C",
"深圳": "小雨,25°C"
}
return weather_data.get(city, "未找到该城市")
# 3. 节点1:理解用户意图
def understand_intent(state: WeatherBotState):
# 简化版:直接从消息中提取城市名
user_input = state["messages"][-1]
# 提取城市(真实项目用AI理解)
if "北京" in user_input:
city = "北京"
elif "上海" in user_input:
city = "上海"
elif "深圳" in user_input:
city = "深圳"
else:
city = ""
return {"city": city}
# 4. 节点2:查询天气
def query_weather(state: WeatherBotState):
city = state["city"]
weather = get_weather(city)
return {"weather_result": weather}
# 5. 节点3:回复用户
def reply_to_user(state: WeatherBotState):
city = state["city"]
weather = state["weather_result"]
reply = f"{city}的天气是:{weather}"
return {"messages": [reply]}
# 6. 构建图
graph = StateGraph(WeatherBotState)
graph.add_node("understand", understand_intent)
graph.add_node("query", query_weather)
graph.add_node("reply", reply_to_user)
graph.set_entry_point("understand")
graph.add_edge("understand", "query")
graph.add_edge("query", "reply")
graph.add_edge("reply", END)
app = graph.compile()
# 7. 使用
result = app.invoke({
"messages": ["北京天气怎么样?"],
"city": "",
"weather_result": ""
})
print(result["messages"][-1])
# 输出:北京的天气是:晴天,15°C
理解这个项目:
- 三个节点依次执行
- 每个节点负责一件事
- 状态在节点间传递
- 最后得到结果
总结
你已经学到了: ✅ 什么是Agent和LangGraph ✅ 图、节点、边、状态的概念 ✅ 如何写简单的LangGraph程序 ✅ 条件分支和循环的用法 ✅ 第一个实用项目
下一步:
- 把每个代码示例都敲一遍
- 修改代码,看看会发生什么
- 试着做一个自己的小项目
- 看推荐的视频教程
记住:
- 🐌 慢慢来,不要急
- 💪 多练习,多动手
- 🤔 遇到问题就问
- 🎯 每天进步一点点
你一定可以学会的!加油! 🚀
附录:重要资源链接
- 官方文档:https://python.langchain.com/docs/langgraph
- 官方课程:https://academy.langchain.com/courses/intro-to-langgraph
- GitHub仓库:https://github.com/langchain-ai/langgraph
- 中文社区:搜索"LangGraph 中文教程"
- 视频教程:B站搜索"LangGraph"
有问题随时问我,我会一直陪着你学习!💪
更多推荐


所有评论(0)