第一章:LangGraph基础认知

1.1 什么是LangGraph?

LangGraph是LangChain团队开发的一个用于构建有状态、多角色应用程序的框架。简单来说,它就像是给AI Agent装上了一个"大脑神经网络",让AI能够:

  • 记住之前做过什么(状态管理)
  • 按照设定的流程工作(图结构)
  • 在不同任务之间灵活切换(节点跳转)

形象比喻:如果把AI Agent比作一个员工,LangGraph就是这个员工的工作流程图,规定了他在什么情况下做什么事,以及如何处理各种突发情况。

1.2 为什么需要LangGraph?

传统的LangChain是线性的链式调用,而LangGraph引入了图结构,解决了以下问题:

  • 循环处理:可以让AI重复某个步骤直到满足条件
  • 条件分支:根据不同情况执行不同逻辑
  • 状态持久化:记住整个对话的上下文
  • 人工介入:在关键节点可以暂停等待人工审核

1.3 核心概念

节点(Nodes)

节点就是执行具体任务的函数,比如"调用LLM"、"搜索数据库"、"计算结果"等。

边(Edges)

边决定了节点之间的连接关系,有两种:

  • 普通边:直接连接两个节点
  • 条件边:根据条件决定跳转到哪个节点
状态(State)

状态是在整个图执行过程中共享的数据,每个节点都可以读取和修改。

图(Graph)

由节点、边、状态组成的完整工作流。


第二章:环境搭建

2.1 安装依赖

# 安装LangGraph和相关依赖
pip install langgraph langchain langchain-openai

# 如果使用其他模型
pip install langchain-anthropic  # Anthropic Claude
pip install langchain-google-genai  # Google Gemini

2.2 配置API密钥

import os

# 设置OpenAI API密钥
os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# 或者使用环境变量文件
from dotenv import load_dotenv
load_dotenv()

第三章:第一个LangGraph应用

3.1 最简单的示例:单节点图

from langgraph.graph import StateGraph, END
from typing import TypedDict

# 1. 定义状态
class State(TypedDict):
    message: str
    count: int

# 2. 定义节点函数
def process_node(state: State) -> State:
    return {
        "message": f"处理了: {state['message']}",
        "count": state["count"] + 1
    }

# 3. 创建图
workflow = StateGraph(State)

# 4. 添加节点
workflow.add_node("process", process_node)

# 5. 设置入口和出口
workflow.set_entry_point("process")
workflow.add_edge("process", END)

# 6. 编译图
app = workflow.compile()

# 7. 运行
result = app.invoke({"message": "Hello", "count": 0})
print(result)
# 输出: {'message': '处理了: Hello', 'count': 1}

代码解析

  • State:定义了图中共享的数据结构
  • process_node:节点函数,接收状态并返回新状态
  • workflow.compile():将图编译成可执行的应用

3.2 多节点线性流程

from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    input: str
    output: str
    steps: list

# 节点1:数据清洗
def clean_data(state: State) -> State:
    cleaned = state["input"].strip().lower()
    return {
        "input": state["input"],
        "output": cleaned,
        "steps": state["steps"] + ["数据清洗完成"]
    }

# 节点2:数据处理
def process_data(state: State) -> State:
    processed = f"[已处理] {state['output']}"
    return {
        "input": state["input"],
        "output": processed,
        "steps": state["steps"] + ["数据处理完成"]
    }

# 节点3:数据保存
def save_data(state: State) -> State:
    return {
        "input": state["input"],
        "output": state["output"],
        "steps": state["steps"] + ["数据保存完成"]
    }

# 构建图
workflow = StateGraph(State)
workflow.add_node("clean", clean_data)
workflow.add_node("process", process_data)
workflow.add_node("save", save_data)

# 连接节点
workflow.set_entry_point("clean")
workflow.add_edge("clean", "process")
workflow.add_edge("process", "save")
workflow.add_edge("save", END)

# 编译运行
app = workflow.compile()
result = app.invoke({
    "input": "  Hello World  ",
    "output": "",
    "steps": []
})

print(result)
# 输出所有步骤和最终结果

第四章:条件分支与循环

4.1 条件边:根据条件选择路径

from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class State(TypedDict):
    number: int
    result: str

# 判断奇偶性
def check_number(state: State) -> State:
    return state

# 处理偶数
def handle_even(state: State) -> State:
    return {
        "number": state["number"],
        "result": f"{state['number']} 是偶数"
    }

# 处理奇数
def handle_odd(state: State) -> State:
    return {
        "number": state["number"],
        "result": f"{state['number']} 是奇数"
    }

# 路由函数:决定下一步走哪
def route_number(state: State) -> Literal["even", "odd"]:
    if state["number"] % 2 == 0:
        return "even"
    else:
        return "odd"

# 构建图
workflow = StateGraph(State)
workflow.add_node("check", check_number)
workflow.add_node("even", handle_even)
workflow.add_node("odd", handle_odd)

workflow.set_entry_point("check")

# 添加条件边
workflow.add_conditional_edges(
    "check",  # 从哪个节点出发
    route_number,  # 路由函数
    {
        "even": "even",  # 如果返回"even",跳转到even节点
        "odd": "odd"     # 如果返回"odd",跳转到odd节点
    }
)

workflow.add_edge("even", END)
workflow.add_edge("odd", END)

app = workflow.compile()

# 测试
print(app.invoke({"number": 4, "result": ""}))
# {'number': 4, 'result': '4 是偶数'}

print(app.invoke({"number": 7, "result": ""}))
# {'number': 7, 'result': '7 是奇数'}

4.2 循环处理:重试机制

from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class State(TypedDict):
    attempts: int
    max_attempts: int
    success: bool
    message: str

# 模拟可能失败的操作
def attempt_operation(state: State) -> State:
    import random
    success = random.random() > 0.5  # 50%成功率
    
    return {
        "attempts": state["attempts"] + 1,
        "max_attempts": state["max_attempts"],
        "success": success,
        "message": f"第{state['attempts'] + 1}次尝试: {'成功' if success else '失败'}"
    }

# 检查是否需要重试
def should_retry(state: State) -> Literal["retry", "finish"]:
    if state["success"]:
        return "finish"
    elif state["attempts"] >= state["max_attempts"]:
        return "finish"
    else:
        return "retry"

# 构建图
workflow = StateGraph(State)
workflow.add_node("attempt", attempt_operation)

workflow.set_entry_point("attempt")

# 条件循环
workflow.add_conditional_edges(
    "attempt",
    should_retry,
    {
        "retry": "attempt",  # 继续尝试
        "finish": END        # 结束
    }
)

app = workflow.compile()

# 运行
result = app.invoke({
    "attempts": 0,
    "max_attempts": 3,
    "success": False,
    "message": ""
})

print(f"总共尝试: {result['attempts']}次")
print(f"最终状态: {result['message']}")
print(f"是否成功: {result['success']}")

第五章:集成LLM构建智能Agent

5.1 简单的聊天Agent

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated
from operator import add

class State(TypedDict):
    messages: Annotated[list, add]  # 自动追加消息

# 调用LLM
def call_model(state: State) -> State:
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.set_entry_point("agent")
workflow.add_edge("agent", END)

app = workflow.compile()

# 使用
from langchain_core.messages import HumanMessage

result = app.invoke({
    "messages": [HumanMessage(content="你好,请介绍一下自己")]
})

print(result["messages"][-1].content)

5.2 带工具调用的Agent

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Annotated, Literal
from operator import add

# 定义工具
@tool
def search_weather(city: str) -> str:
    """查询城市天气"""
    # 模拟查询
    weather_data = {
        "北京": "晴天,25℃",
        "上海": "多云,22℃",
        "深圳": "雨天,28℃"
    }
    return weather_data.get(city, "未找到该城市天气信息")

@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except:
        return "计算错误"

# 工具列表
tools = [search_weather, calculate]
tool_node = ToolNode(tools)

# 定义状态
class State(TypedDict):
    messages: Annotated[list, add]

# LLM节点
def call_model(state: State) -> State:
    llm = ChatOpenAI(model="gpt-4", temperature=0).bind_tools(tools)
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 路由函数:决定是调用工具还是结束
def should_continue(state: State) -> Literal["tools", "end"]:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return "end"

# 构建图
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",
        "end": END
    }
)

workflow.add_edge("tools", "agent")  # 工具执行后回到agent

app = workflow.compile()

# 测试
result = app.invoke({
    "messages": [HumanMessage(content="北京今天天气怎么样?")]
})

for msg in result["messages"]:
    if hasattr(msg, "content") and msg.content:
        print(msg.content)

第六章:人工介入(Human-in-the-Loop)

6.1 需要人工审核的流程

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from typing import TypedDict

class State(TypedDict):
    data: str
    approved: bool
    feedback: str

# 处理数据
def process_data(state: State) -> State:
    return {
        "data": f"已处理: {state['data']}",
        "approved": False,
        "feedback": ""
    }

# 人工审核节点(会暂停)
def human_review(state: State) -> State:
    # 这里会暂停,等待外部输入
    return state

# 最终提交
def submit_data(state: State) -> State:
    return {
        "data": state["data"],
        "approved": True,
        "feedback": "已提交"
    }

# 构建图(需要检查点保存器)
memory = MemorySaver()
workflow = StateGraph(State)

workflow.add_node("process", process_data)
workflow.add_node("review", human_review)
workflow.add_node("submit", submit_data)

workflow.set_entry_point("process")
workflow.add_edge("process", "review")
workflow.add_edge("review", "submit")
workflow.add_edge("submit", END)

app = workflow.compile(checkpointer=memory, interrupt_before=["review"])

# 使用
config = {"configurable": {"thread_id": "1"}}

# 第一步:执行到审核节点
result = app.invoke({
    "data": "重要数据",
    "approved": False,
    "feedback": ""
}, config)

print("等待人工审核...")
print(result)

# 第二步:人工审核通过后继续
result = app.invoke(None, config)  # 传入None继续执行
print("审核通过,已提交:")
print(result)

第七章:实战项目 - 智能客服Agent

7.1 项目需求

构建一个智能客服系统,功能包括:

  1. 意图识别(咨询、投诉、退款)
  2. 根据意图调用不同处理流程
  3. 查询订单信息
  4. 复杂问题转人工

7.2 完整代码

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from typing import TypedDict, Literal, Annotated
from operator import add

# 状态定义
class CustomerServiceState(TypedDict):
    messages: Annotated[list, add]
    intent: str  # 意图:query, complaint, refund, transfer
    order_id: str
    customer_info: dict
    resolved: bool

# 1. 意图识别节点
def classify_intent(state: CustomerServiceState) -> CustomerServiceState:
    llm = ChatOpenAI(model="gpt-4", temperature=0)
    
    prompt = f"""
    分析用户意图,只返回以下之一:query(咨询), complaint(投诉), refund(退款), transfer(转人工)
    
    用户消息: {state['messages'][-1].content}
    """
    
    response = llm.invoke([HumanMessage(content=prompt)])
    intent = response.content.strip().lower()
    
    return {
        "intent": intent,
        "messages": []
    }

# 2. 订单查询节点
def handle_query(state: CustomerServiceState) -> CustomerServiceState:
    # 模拟订单查询
    order_info = {
        "order_id": "12345",
        "status": "已发货",
        "tracking": "SF1234567890"
    }
    
    response_msg = f"您的订单信息如下:\n订单号:{order_info['order_id']}\n状态:{order_info['status']}\n物流单号:{order_info['tracking']}"
    
    return {
        "messages": [SystemMessage(content=response_msg)],
        "resolved": True
    }

# 3. 投诉处理节点
def handle_complaint(state: CustomerServiceState) -> CustomerServiceState:
    response_msg = "非常抱歉给您带来不便。我们已记录您的投诉,客服专员会在24小时内联系您。投诉单号:C20240107001"
    
    return {
        "messages": [SystemMessage(content=response_msg)],
        "resolved": True
    }

# 4. 退款处理节点
def handle_refund(state: CustomerServiceState) -> CustomerServiceState:
    response_msg = "退款申请已提交,预计3-5个工作日到账。退款单号:R20240107001"
    
    return {
        "messages": [SystemMessage(content=response_msg)],
        "resolved": True
    }

# 5. 转人工节点
def transfer_human(state: CustomerServiceState) -> CustomerServiceState:
    response_msg = "正在为您转接人工客服,请稍候..."
    
    return {
        "messages": [SystemMessage(content=response_msg)],
        "resolved": False
    }

# 路由函数
def route_intent(state: CustomerServiceState) -> Literal["query", "complaint", "refund", "transfer"]:
    return state["intent"]

# 构建图
workflow = StateGraph(CustomerServiceState)

# 添加节点
workflow.add_node("classify", classify_intent)
workflow.add_node("query", handle_query)
workflow.add_node("complaint", handle_complaint)
workflow.add_node("refund", handle_refund)
workflow.add_node("transfer", transfer_human)

# 设置入口
workflow.set_entry_point("classify")

# 添加条件边
workflow.add_conditional_edges(
    "classify",
    route_intent,
    {
        "query": "query",
        "complaint": "complaint",
        "refund": "refund",
        "transfer": "transfer"
    }
)

# 所有处理节点都指向END
workflow.add_edge("query", END)
workflow.add_edge("complaint", END)
workflow.add_edge("refund", END)
workflow.add_edge("transfer", END)

# 编译
app = workflow.compile()

# 测试不同场景
test_cases = [
    "我的订单什么时候发货?",
    "你们的产品质量太差了,我要投诉!",
    "我要申请退款",
    "这个问题太复杂了,我要找真人"
]

for test_input in test_cases:
    print(f"\n用户: {test_input}")
    result = app.invoke({
        "messages": [HumanMessage(content=test_input)],
        "intent": "",
        "order_id": "",
        "customer_info": {},
        "resolved": False
    })
    print(f"意图: {result['intent']}")
    print(f"回复: {result['messages'][-1].content}")
    print(f"是否解决: {result['resolved']}")

第八章:高级特性

8.1 子图(Subgraphs)

当某个功能模块很复杂时,可以单独构建一个子图:

from langgraph.graph import StateGraph, END

# 子图状态
class SubState(TypedDict):
    value: int

# 子图节点
def sub_step1(state: SubState) -> SubState:
    return {"value": state["value"] * 2}

def sub_step2(state: SubState) -> SubState:
    return {"value": state["value"] + 10}

# 创建子图
sub_workflow = StateGraph(SubState)
sub_workflow.add_node("step1", sub_step1)
sub_workflow.add_node("step2", sub_step2)
sub_workflow.set_entry_point("step1")
sub_workflow.add_edge("step1", "step2")
sub_workflow.add_edge("step2", END)

sub_graph = sub_workflow.compile()

# 主图状态
class MainState(TypedDict):
    number: int
    result: int

# 主图节点(调用子图)
def call_subgraph(state: MainState) -> MainState:
    result = sub_graph.invoke({"value": state["number"]})
    return {"number": state["number"], "result": result["value"]}

# 主图
main_workflow = StateGraph(MainState)
main_workflow.add_node("process", call_subgraph)
main_workflow.set_entry_point("process")
main_workflow.add_edge("process", END)

main_app = main_workflow.compile()

# 测试
result = main_app.invoke({"number": 5, "result": 0})
print(result)  # number=5, result=20 (5*2+10)

8.2 并行执行节点

from langgraph.graph import StateGraph, END
from typing import TypedDict
import time

class State(TypedDict):
    task1_result: str
    task2_result: str
    task3_result: str

def task1(state: State) -> State:
    time.sleep(1)  # 模拟耗时操作
    return {"task1_result": "任务1完成"}

def task2(state: State) -> State:
    time.sleep(1)
    return {"task2_result": "任务2完成"}

def task3(state: State) -> State:
    time.sleep(1)
    return {"task3_result": "任务3完成"}

def aggregate(state: State) -> State:
    return state

# 构建图
workflow = StateGraph(State)
workflow.add_node("task1", task1)
workflow.add_node("task2", task2)
workflow.add_node("task3", task3)
workflow.add_node("aggregate", aggregate)

workflow.set_entry_point("task1")
workflow.add_edge("task1", "task2")
workflow.add_edge("task1", "task3")  # task1完成后,task2和task3并行
workflow.add_edge("task2", "aggregate")
workflow.add_edge("task3", "aggregate")
workflow.add_edge("aggregate", END)

app = workflow.compile()

start = time.time()
result = app.invoke({
    "task1_result": "",
    "task2_result": "",
    "task3_result": ""
})
end = time.time()

print(f"总耗时: {end - start:.2f}秒")
print(result)

8.3 流式输出

from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    count: int

def counter(state: State) -> State:
    return {"count": state["count"] + 1}

workflow = StateGraph(State)
workflow.add_node("count", counter)
workflow.set_entry_point("count")
workflow.add_edge("count", END)

app = workflow.compile()

# 流式执行
for chunk in app.stream({"count": 0}):
    print(chunk)
    
# 输出每一步的状态变化

第九章:最佳实践

9.1 状态设计原则

  1. 最小化状态:只保存必要的信息
  2. 使用Annotated:对列表类型使用Annotated[list, add]自动追加
  3. 明确类型:使用TypedDict严格定义状态结构
from typing import TypedDict, Annotated
from operator import add

# 好的状态设计
class GoodState(TypedDict):
    messages: Annotated[list, add]  # 消息自动追加
    user_id: str  # 用户ID
    context: dict  # 上下文信息

# 避免的状态设计
class BadState(TypedDict):
    everything: dict  # 太宽泛,不明确

9.2 错误处理

from langgraph.graph import StateGraph, END
from typing import TypedDict

class State(TypedDict):
    data: str
    error: str
    success: bool

def risky_operation(state: State) -> State:
    try:
        # 可能失败的操作
        result = process_data(state["data"])
        return {
            "data": result,
            "error": "",
            "success": True
        }
    except Exception as e:
        return {
            "data": state["data"],
            "error": str(e),
            "success": False
        }

def handle_error(state: State) -> State:
    # 错误处理逻辑
    return {
        "data": state["data"],
        "error": f"已处理错误: {state['error']}",
        "success": False
    }

workflow = StateGraph(State)
workflow.add_node("operation", risky_operation)
workflow.add_node("error_handler", handle_error)

workflow.set_entry_point("operation")
workflow.add_conditional_edges(
    "operation",
    lambda s: "success" if s["success"] else "error",
    {
        "success": END,
        "error": "error_handler"
    }
)
workflow.add_edge("error_handler", END)

app = workflow.compile()

9.3 日志和调试

from langgraph.graph import StateGraph, END
from typing import TypedDict
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class State(TypedDict):
    value: int

def logged_node(state: State) -> State:
    logger.info(f"节点开始执行,当前状态: {state}")
    result = {"value": state["value"] * 2}
    logger.info(f"节点执行完成,新状态: {result}")
    return result

workflow = StateGraph(State)
workflow.add_node("process", logged_node)
workflow.set_entry_point("process")
workflow.add_edge("process", END)

app = workflow.compile()

# 查看图结构
print(app.get_graph().draw_ascii())

# 执行时会输出日志
result = app.invoke({"value": 5})

9.4 性能优化

  1. 避免重复计算:将结果存储在状态中
  2. 使用缓存:对LLM调用使用缓存
  3. 批处理:合并多个小请求
  4. 并行执行:独立任务使用并行节点
from langchain.cache import InMemoryCache
from langchain.globals import set_llm_cache

# 启用LLM缓存
set_llm_cache(InMemoryCache())

# 节点函数中避免重复计算
def optimized_node(state: State) -> State:
    # 检查是否已计算
    if "cached_result" in state:
        return state
    
    # 执行计算
    result = expensive_computation()
    
    return {
        **state,
        "cached_result": result
    }

第十章:常见问题解答

Q1: LangGraph和LangChain有什么区别?

A: LangChain是链式调用,适合简单的线性流程;LangGraph是图结构,支持循环、条件分支、状态管理,适合复杂的多步骤Agent。

Q2: 如何保存和恢复对话状态?

A: 使用Checkpointer:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# 使用thread_id标识会话
config = {"configurable": {"thread_id": "user_123"}}
result = app.invoke(input_data, config)

Q3: 如何可视化图结构?

from IPython.display import Image

# 生成图片
Image(app.get_graph().draw_mermaid_png())

# 或者ASCII形式
print(app.get_graph().draw_ascii())

Q4: 节点函数必须返回完整状态吗?

A: 不需要,只返回修改的字段即可,LangGraph会自动合并。

Q5: 如何处理长时间运行的任务?

A: 使用异步节点和流式输出:

async def async_node(state: State) -> State:
    result = await long_running_task()
    return {"result": result}

# 异步执行
result = await app.ainvoke(input_data)

总结

LangGraph是构建复杂AI Agent的强大工具,掌握以下核心点:

  1. 状态管理:设计清晰的状态结构
  2. 图结构:合理划分节点和边
  3. 条件控制:使用条件边实现复杂逻辑
  4. 人工介入:在关键节点添加人工审核
  5. 错误处理:每个节点都要考虑异常情况

从简单的单节点图开始,逐步增加复杂度,最终构建出强大的AI Agent应用!


学习路径建议

  1. 第1-3天:理解基础概念,完成简单示例
  2. 第4-7天:掌握条件分支和循环,集成LLM
  3. 第8-14天:完成实战项目,理解高级特性
  4. 第15天+:优化性能,探索复杂应用场景

祝你学习顺利!🚀

更简单的小白学习版本

LangGraph零基础学习指南 - 像搭积木一样简单

别担心,这份教程会用最简单的语言,像讲故事一样教你。即使你是小白也完全没问题!


📺 推荐的视频教程资源

在开始之前,我先为你推荐一些优质的学习资源:

英文资源(带字幕)

  1. LangChain Academy - Introduction to LangGraph(免费)

    • 官方课程,完全免费
    • 地址:https://academy.langchain.com/courses/intro-to-langgraph
    • 这是最权威的入门课程
  2. DataCamp的LangGraph视频教程

    • 有配套的文字讲解
    • 地址:https://www.datacamp.com/tutorial/langgraph-tutorial
  3. Real Python的LangGraph教程

    • 有详细的代码示例
    • 地址:https://realpython.com/langgraph-python/

中文资源

  1. 知乎上的LangGraph入门系列

    • 搜索"LangGraph 入门"会有很多文章
    • 博客园也有很多中文教程
  2. B站视频

    • 搜索"LangGraph 教程"有不少讲解视频
    • 建议选择2024年以后的视频,因为库更新很快

建议学习路径

  1. 先看本文档理解概念(1-2小时)
  2. 看官方视频课程(2-3小时)
  3. 跟着本文档敲代码(3-5天)
  4. 做自己的小项目(1-2周)

第一部分:理论基础 - 先理解概念

1.1 什么是Agent?就像你的智能助理

想象一下,你有一个超级聪明的助理:

普通助理(传统程序)

  • 你说:"帮我订机票"
  • 助理:只能按照固定步骤操作,如果遇到问题就卡住了

AI Agent(智能助理)

  • 你说:"帮我订机票"
  • Agent会:
    1. 先问你:"去哪里?什么时候?"
    2. 搜索可用航班
    3. 如果没票,自动帮你查火车
    4. 找到选项后问你:"这几个可以吗?"
    5. 根据你的反馈继续操作

Agent的核心能力

  • 🤔 能思考:分析问题,制定计划
  • 🔧 会使用工具:调用搜索、数据库、计算器等
  • 🔄 能循环:不断尝试直到完成任务
  • 🧠 有记忆:记得之前说过什么

1.2 为什么需要LangGraph?

想象你要开餐厅:

传统方式(LangChain的链式调用)

接待客人 → 点菜 → 做菜 → 上菜 → 结账

这是一条直线,每个步骤按顺序执行,不能回头,不能跳过

LangGraph方式(图结构)

接待客人 → 点菜 → 做菜 → 上菜 → 结账
         ↑         ↓
         └─ 客人要加菜 ←┘
                ↓
            菜做错了
                ↓
           重新做菜

看到区别了吗?可以循环、可以跳转、可以根据情况走不同的路径

1.3 图(Graph)到底是什么?

图就是一张流程图,由3个部分组成:

1️⃣ 节点(Node)- 工作站

就像餐厅的各个工作区:

  • 接待台(接待节点)
  • 厨房(做菜节点)
  • 收银台(结账节点)

在编程中,节点就是一个函数,负责做一件具体的事。

2️⃣ 边(Edge)- 路径

连接节点的箭头,告诉我们"接下来去哪里":

  • 普通边:直接连接,"做完A就去B"
  • 条件边:根据情况决定,"如果下雨去A,否则去B"
3️⃣ 状态(State)- 记事本

一个共享的记事本,记录所有重要信息:

  • 客人点了什么菜?
  • 菜做到哪一步了?
  • 账单多少钱?

每个节点都可以读和写这个记事本


第二部分:核心概念深度理解

2.1 状态管理 - 程序的记忆

场景故事: 假设你在和AI聊天:

没有状态管理(每次都失忆)

你:"我叫小明"
AI:"你好!"
你:"我叫什么名字?"
AI:"我不知道"  ← 忘记了!

有状态管理

你:"我叫小明"
AI:"你好小明!"(记在状态里:name="小明")
你:"我叫什么名字?"
AI:"你叫小明啊"  ← 记得!

代码表示状态

# 定义一个状态(记事本)
class State(TypedDict):
    name: str          # 用户名字
    messages: list     # 聊天记录
    count: int         # 计数器

2.2 节点执行 - 做事的地方

每个节点就是一个函数,它:

  1. 接收状态(读记事本)
  2. 做一些事情
  3. 返回新状态(更新记事本)

生活例子

输入状态:{"钱": 100, "购物清单": ["苹果", "牛奶"]}
         ↓
    【购物节点】
         ↓
输出状态:{"钱": 80, "购物清单": [], "购买物品": ["苹果", "牛奶"]}

代码表示

def shopping(state):
    # 1. 读状态
    money = state["钱"]
    items = state["购物清单"]
    
    # 2. 做事情
    total_cost = buy_items(items)
    money = money - total_cost
    
    # 3. 返回新状态
    return {
        "钱": money,
        "购物清单": [],
        "购买物品": items
    }

2.3 边的流转 - 决定下一步

普通边 - 固定路线

就像地铁线:

A站 → B站 → C站(固定顺序)
条件边 - 智能路线

就像导航软件:

你在家 → 【判断天气】
           ↓
   下雨 → 打车
   晴天 → 骑车

代码表示

def 判断天气(state):
    if state["天气"] == "下雨":
        return "打车"
    else:
        return "骑车"

2.4 循环 - 重复直到成功

生活场景:考试

开始 → 做题 → 检查
         ↑      ↓
         └── 有错误(继续改)
              ↓
            没错误(结束)

AI场景:写代码

开始 → 写代码 → 测试
         ↑      ↓
         └── 有bug(继续修)
              ↓
           没bug(完成)

第三部分:手把手实战 - 从0开始

3.1 安装环境

第1步:安装Python

  • 确保你有Python 3.8以上版本
  • 打开命令行,输入 python --version 检查

第2步:安装库

pip install langgraph
pip install langchain
pip install langchain-openai

第3步:配置API密钥

  • 去OpenAI官网申请API密钥
  • 或者用国内的替代服务(智谱、通义千问等)
import os
os.environ["OPENAI_API_KEY"] = "你的密钥"

3.2 第一个程序 - Hello World

目标:创建一个简单的图,有一个节点,输出"Hello"

步骤1:导入库

from langgraph.graph import StateGraph, END
from typing import TypedDict

步骤2:定义状态(记事本)

class State(TypedDict):
    message: str  # 我们要传递的消息

步骤3:创建节点(做事的函数)

def say_hello(state: State) -> State:
    # 这个函数接收状态,返回新状态
    return {"message": "Hello, World!"}

步骤4:构建图

# 1. 创建一个空图
graph = StateGraph(State)

# 2. 添加节点
graph.add_node("hello", say_hello)

# 3. 设置入口(从哪里开始)
graph.set_entry_point("hello")

# 4. 设置出口(到哪里结束)
graph.add_edge("hello", END)

# 5. 编译成可运行的应用
app = graph.compile()

步骤5:运行

# 运行图
result = app.invoke({"message": ""})
print(result)  # {'message': 'Hello, World!'}

完整代码

from langgraph.graph import StateGraph, END
from typing import TypedDict

# 定义状态
class State(TypedDict):
    message: str

# 定义节点
def say_hello(state: State) -> State:
    return {"message": "Hello, World!"}

# 构建图
graph = StateGraph(State)
graph.add_node("hello", say_hello)
graph.set_entry_point("hello")
graph.add_edge("hello", END)

# 运行
app = graph.compile()
result = app.invoke({"message": ""})
print(result["message"])  # 输出:Hello, World!

理解这段代码

  1. State:定义了我们的"记事本"有什么内容
  2. say_hello:一个节点,负责说"Hello"
  3. graph.add_node:把节点添加到图里
  4. set_entry_point:告诉程序从哪里开始
  5. add_edge:告诉程序做完这个节点后去哪里
  6. compile:把图变成可执行的程序
  7. invoke:运行程序

3.3 第二个程序 - 简单对话

目标:创建一个会回答问题的AI

步骤1:准备工作

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
from operator import add

# 创建AI模型
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

步骤2:定义状态

class ChatState(TypedDict):
    # 这里的Annotated[list, add]是特殊语法
    # 意思是:这个列表会自动追加新消息,不会覆盖旧的
    messages: Annotated[list, add]

步骤3:创建对话节点

def chat_node(state: ChatState) -> ChatState:
    # 调用AI回答问题
    response = llm.invoke(state["messages"])
    # 返回AI的回复
    return {"messages": [response]}

步骤4:构建图

graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.set_entry_point("chat")
graph.add_edge("chat", END)
app = graph.compile()

步骤5:使用

# 用户提问
result = app.invoke({
    "messages": [HumanMessage(content="你好,请做个自我介绍")]
})

# 打印AI的回复
print(result["messages"][-1].content)

完整代码

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from typing import TypedDict, Annotated
from operator import add

# 状态
class ChatState(TypedDict):
    messages: Annotated[list, add]

# 节点
def chat_node(state: ChatState) -> ChatState:
    llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

# 构建图
graph = StateGraph(ChatState)
graph.add_node("chat", chat_node)
graph.set_entry_point("chat")
graph.add_edge("chat", END)
app = graph.compile()

# 使用
result = app.invoke({
    "messages": [HumanMessage(content="你好!")]
})
print(result["messages"][-1].content)

3.4 第三个程序 - 条件分支

场景:判断数字是奇数还是偶数,走不同的路径

流程图

输入数字
   ↓
【判断】
   ↓
偶数 → 【显示"是偶数"】 → 结束
   ↓
奇数 → 【显示"是奇数"】 → 结束

步骤1:定义状态

class State(TypedDict):
    number: int     # 输入的数字
    result: str     # 结果

步骤2:创建三个节点

# 节点1:判断节点(什么都不做,只是传递)
def check_node(state: State) -> State:
    return state

# 节点2:处理偶数
def handle_even(state: State) -> State:
    return {
        "number": state["number"],
        "result": f"{state['number']} 是偶数"
    }

# 节点3:处理奇数
def handle_odd(state: State) -> State:
    return {
        "number": state["number"],
        "result": f"{state['number']} 是奇数"
    }

步骤3:创建路由函数(决定走哪条路)

def route_by_number(state: State):
    # 根据数字决定下一步
    if state["number"] % 2 == 0:
        return "even"  # 偶数走even节点
    else:
        return "odd"   # 奇数走odd节点

步骤4:构建图

graph = StateGraph(State)

# 添加所有节点
graph.add_node("check", check_node)
graph.add_node("even", handle_even)
graph.add_node("odd", handle_odd)

# 设置入口
graph.set_entry_point("check")

# 关键:添加条件边
graph.add_conditional_edges(
    "check",           # 从check节点出发
    route_by_number,   # 用这个函数决定去哪
    {
        "even": "even",  # 如果函数返回"even",去even节点
        "odd": "odd"     # 如果函数返回"odd",去odd节点
    }
)

# 两个节点都结束
graph.add_edge("even", END)
graph.add_edge("odd", END)

app = graph.compile()

步骤5:测试

# 测试偶数
result = app.invoke({"number": 8, "result": ""})
print(result["result"])  # 8 是偶数

# 测试奇数
result = app.invoke({"number": 7, "result": ""})
print(result["result"])  # 7 是奇数

理解条件边

  • add_conditional_edges:添加条件边
  • 第一个参数:从哪个节点出发
  • 第二个参数:路由函数,返回下一个节点的名字
  • 第三个参数:映射表,把返回值对应到实际节点

3.5 第四个程序 - 循环重试

场景:模拟一个可能失败的操作,失败了就重试

流程图

开始
  ↓
【尝试操作】
  ↓
成功?
  ├─ 是 → 结束
  └─ 否 → 【尝试操作】(循环回去)

完整代码

from langgraph.graph import StateGraph, END
from typing import TypedDict
import random

# 状态
class RetryState(TypedDict):
    attempts: int       # 尝试次数
    max_attempts: int   # 最大尝试次数
    success: bool       # 是否成功
    message: str        # 结果信息

# 节点:尝试操作
def try_operation(state: RetryState) -> RetryState:
    # 模拟50%成功率的操作
    success = random.random() > 0.5
    attempts = state["attempts"] + 1
    
    message = f"第{attempts}次尝试: "
    message += "成功!" if success else "失败"
    
    return {
        "attempts": attempts,
        "max_attempts": state["max_attempts"],
        "success": success,
        "message": message
    }

# 路由函数:决定是重试还是结束
def should_retry(state: RetryState):
    # 如果成功了,结束
    if state["success"]:
        return "finish"
    
    # 如果达到最大次数,也结束
    if state["attempts"] >= state["max_attempts"]:
        return "finish"
    
    # 否则继续重试
    return "retry"

# 构建图
graph = StateGraph(RetryState)
graph.add_node("try", try_operation)
graph.set_entry_point("try")

# 添加条件边(关键:形成循环)
graph.add_conditional_edges(
    "try",
    should_retry,
    {
        "retry": "try",    # 重试:回到try节点(循环!)
        "finish": END      # 结束:到END
    }
)

app = graph.compile()

# 运行
result = app.invoke({
    "attempts": 0,
    "max_attempts": 5,
    "success": False,
    "message": ""
})

print(f"总共尝试了 {result['attempts']} 次")
print(f"最终状态: {result['message']}")
print(f"是否成功: {result['success']}")

运行结果示例

总共尝试了 3 次
最终状态: 第3次尝试: 成功!
是否成功: True

理解循环: 关键在于 "retry": "try",这让图可以回到之前的节点,形成循环!


第四部分:进阶理解

4.1 状态的更新机制

问题:如果多个节点都要修改同一个数据怎么办?

例子:记录操作历史

class State(TypedDict):
    history: list  # 操作历史

# 如果直接这样写,会有问题:
def node1(state):
    return {"history": ["操作1"]}  # 会覆盖原来的历史!

解决方案:使用Annotated

from operator import add

class State(TypedDict):
    # add表示:追加,不覆盖
    history: Annotated[list, add]

# 现在这样写就对了:
def node1(state):
    return {"history": ["操作1"]}  # 会追加到现有历史

def node2(state):
    return {"history": ["操作2"]}  # 继续追加

4.2 子图 - 把复杂任务拆分

场景:做一顿饭太复杂,拆成几个小任务

主任务(主图)

准备食材 → 【烹饪(子图)】 → 摆盘 → 结束

烹饪子图

清洗 → 切菜 → 炒菜 → 调味

代码示例

# 子图的状态
class CookingState(TypedDict):
    dish: str
    status: str

# 子图的节点
def wash(state):
    return {"dish": state["dish"], "status": "已清洗"}

def cut(state):
    return {"dish": state["dish"], "status": "已切好"}

def cook(state):
    return {"dish": state["dish"], "status": "已炒好"}

# 创建子图
sub_graph = StateGraph(CookingState)
sub_graph.add_node("wash", wash)
sub_graph.add_node("cut", cut)
sub_graph.add_node("cook", cook)
sub_graph.set_entry_point("wash")
sub_graph.add_edge("wash", "cut")
sub_graph.add_edge("cut", "cook")
sub_graph.add_edge("cook", END)
cooking_app = sub_graph.compile()

# 主图可以调用子图
def cooking_node(state):
    result = cooking_app.invoke({
        "dish": state["dish"],
        "status": ""
    })
    return {"dish": result["dish"], "status": result["status"]}

第五部分:学习建议

5.1 学习顺序

第1周:基础概念

  • 理解图、节点、边、状态
  • 完成3个简单示例
  • 每天1-2小时

第2周:条件和循环

  • 练习条件边
  • 练习循环逻辑
  • 尝试组合使用

第3周:集成LLM

  • 学习如何调用AI模型
  • 实现简单对话
  • 添加工具调用

第4周:实战项目

  • 选一个小项目
  • 从头到尾实现
  • 不断迭代优化

5.2 遇到问题怎么办?

1. 代码报错

  • 仔细看错误信息
  • 检查拼写、缩进
  • Google搜索错误信息

2. 逻辑不通

  • 画出流程图
  • 在纸上模拟执行
  • 用print打印状态

3. 概念不懂

  • 看视频教程
  • 找类比的生活场景
  • 问AI助手解释

5.3 练习建议

每天必做

  1. 回顾昨天的代码
  2. 写一个新的小例子
  3. 记录遇到的问题

每周必做

  1. 完成一个小项目
  2. 复习本周知识点
  3. 总结学到的东西

不要做的

  • ❌ 一次学太多
  • ❌ 只看不练
  • ❌ 遇到困难就放弃

第六部分:常见问题解答

Q1: 我是零基础,能学会吗? A: 完全可以!只要你:

  • 会基础的Python(会写函数就行)
  • 愿意花时间练习
  • 不怕出错,多试几次

Q2: 要学多久? A:

  • 基础入门:1-2周
  • 能做简单项目:3-4周
  • 精通:2-3个月

Q3: 和LangChain什么关系? A:

  • LangChain是工具箱(提供各种工具)
  • LangGraph是流程管理(组织这些工具怎么用)
  • 可以一起用,也可以单独用LangGraph

Q4: 必须要OpenAI的API吗? A: 不是必须的,可以用:

  • 国内的通义千问、智谱AI
  • 本地模型(Ollama)
  • 其他支持的模型

Q5: 图画不出来怎么办? A:

  • 先用纸笔画流程图
  • 用简单的方框和箭头
  • 不用画得很专业,自己看懂就行

第七部分:第一个实用项目 - 天气查询机器人

目标:做一个能查天气的聊天机器人

功能

  1. 用户说"北京天气"
  2. AI识别出要查天气
  3. 调用天气查询工具
  4. 返回结果

代码(逐步理解)

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

# 1. 定义状态
class WeatherBotState(TypedDict):
    messages: Annotated[list, add]  # 对话历史
    city: str                        # 要查询的城市
    weather_result: str              # 天气结果

# 2. 工具:查询天气
def get_weather(city: str) -> str:
    # 这里模拟查询(真实项目会调用API)
    weather_data = {
        "北京": "晴天,15°C",
        "上海": "多云,18°C",
        "深圳": "小雨,25°C"
    }
    return weather_data.get(city, "未找到该城市")

# 3. 节点1:理解用户意图
def understand_intent(state: WeatherBotState):
    # 简化版:直接从消息中提取城市名
    user_input = state["messages"][-1]
    
    # 提取城市(真实项目用AI理解)
    if "北京" in user_input:
        city = "北京"
    elif "上海" in user_input:
        city = "上海"
    elif "深圳" in user_input:
        city = "深圳"
    else:
        city = ""
    
    return {"city": city}

# 4. 节点2:查询天气
def query_weather(state: WeatherBotState):
    city = state["city"]
    weather = get_weather(city)
    return {"weather_result": weather}

# 5. 节点3:回复用户
def reply_to_user(state: WeatherBotState):
    city = state["city"]
    weather = state["weather_result"]
    reply = f"{city}的天气是:{weather}"
    return {"messages": [reply]}

# 6. 构建图
graph = StateGraph(WeatherBotState)
graph.add_node("understand", understand_intent)
graph.add_node("query", query_weather)
graph.add_node("reply", reply_to_user)

graph.set_entry_point("understand")
graph.add_edge("understand", "query")
graph.add_edge("query", "reply")
graph.add_edge("reply", END)

app = graph.compile()

# 7. 使用
result = app.invoke({
    "messages": ["北京天气怎么样?"],
    "city": "",
    "weather_result": ""
})

print(result["messages"][-1])
# 输出:北京的天气是:晴天,15°C

理解这个项目

  1. 三个节点依次执行
  2. 每个节点负责一件事
  3. 状态在节点间传递
  4. 最后得到结果

总结

你已经学到了: ✅ 什么是Agent和LangGraph ✅ 图、节点、边、状态的概念 ✅ 如何写简单的LangGraph程序 ✅ 条件分支和循环的用法 ✅ 第一个实用项目

下一步

  1. 把每个代码示例都敲一遍
  2. 修改代码,看看会发生什么
  3. 试着做一个自己的小项目
  4. 看推荐的视频教程

记住

  • 🐌 慢慢来,不要急
  • 💪 多练习,多动手
  • 🤔 遇到问题就问
  • 🎯 每天进步一点点

你一定可以学会的!加油! 🚀


附录:重要资源链接

  1. 官方文档:https://python.langchain.com/docs/langgraph
  2. 官方课程:https://academy.langchain.com/courses/intro-to-langgraph
  3. GitHub仓库:https://github.com/langchain-ai/langgraph
  4. 中文社区:搜索"LangGraph 中文教程"
  5. 视频教程:B站搜索"LangGraph"

有问题随时问我,我会一直陪着你学习!💪

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐