LangGraph(二)具体案例设计
该案例使用 LangGraph 构建了一个处理客户支持邮件的智能体,将流程拆解为读取邮件、分类意图、文档搜索、缺陷追踪、起草回复、人工审核和发送回复等节点,通过共享状态传递原始数据,并结合 LLM、外部工具与人工干预,实现自动分类、响应生成与复杂问题升级,同时支持错误处理与断点续传。
具体案例设计

使用 LangGraph 构建智能体时,首先需要将其拆解为一个个离散的步骤,称为 节点(Nodes)。接着,你需要描述从每个节点出发的不同决策和转换逻辑。最后,通过一个共享的 状态(State) 将这些节点连接起来,每个节点都可以对该状态进行读取和写入。
假设你需要构建一个处理客户支持邮件的 AI 智能体。产品团队给出了以下需求:
智能体应当:
- 读取收到的客户邮件
- 按紧迫性和主题进行分类
- 搜索相关文档以回答问题
- 起草适当的回复
- 将复杂问题升级给人工客服
- 根据需要安排后续跟进
需要处理的典型场景:
1. 简单的产品问题:“如何重置密码?”
2. 缺陷报告:“选择 PDF 格式时,导出功能会崩溃。”
3. 紧急账单问题:“我的订阅被扣了两次费!”
4. 功能需求:“能在移动应用中加入深色模式吗?”
5. 复杂的个人技术问题:“我们的 API 集成间歇性出现 504 错误。”
工作流映射
首先识别流程中的各个独立步骤。每一步都将成为一个 节点(执行特定任务的函数)。然后,勾勒出这些步骤如何相互连接。
读取邮件:提取并解析邮件内容。分类意图:使用大模型(LLM)分类紧迫性和主题,然后路由到相应的动作。文档搜索:查询知识库获取相关信息。缺陷追踪:在追踪系统中创建或更新问题。起草回复:生成合适的响应内容。人工审核:升级给人工进行审批或处理。发送回复:发出邮件响应。
注意某些节点会根据情况做出“去向”决策(如分类意图、起草回复、人工审核),而其他节点则总是流向同一个下一步(如读取邮件总是流向分类意图)。
明确职责
针对图中的每个节点,确定它属于哪种操作类型以及需要哪些上下文。
- LLM 步骤:用于理解、分析、生成文本或进行推理决策。
- 数据步骤:用于从外部源检索信息。
- 动作步骤:用于执行外部操作。
- 用户输入步骤:用于需要人工干预的环节。
LLM 步骤
- 分类意图:
- 静态上下文(提示词):分类类别、紧急程度定义、输出格式。
- 动态上下文(来自状态):邮件内容、发件人信息。
- 预期结果:决定路由的结构化分类数据。
- 起草回复:
- 静态上下文:语气指南、公司政策、回复模板。
- 动态上下文:分类结果、搜索结果、客户历史。
数据步骤
- 文档搜索:根据意图和主题构建查询。支持重试策略和缓存。
- 客户历史查询:根据发件人 ID 获取 CRM 数据。
动作步骤
- 发送回复:在获批后执行,需要网络重试策略,不应缓存。
- 缺陷追踪:当意图为“bug”时执行,返回工单 ID。
用户输入步骤
- 人工审核节点:提供原始邮件、草稿和分类信息,等待人工审批或编辑。
设计状态
状态是智能体中所有节点都能访问的共享内存。
什么应该放入状态?
- 存入状态:数据是否需要跨步骤持久化?如果是,放入状态。
- 不要存储:数据是否可以从其他数据推导出来?如果是,在使用时计算即可。
对于邮件智能体,我们需要追踪:
- 原始邮件和发件人信息(无法重建)。
- 分类结果(后续多个节点需要)。
- 搜索结果和客户数据(重新获取成本高)。
- 回复草案(需要经过人工审核环节)。
核心原则:保持状态原始化,按需格式化提示词
你的状态应该存储 原始数据,而不是格式化好的文本。在节点内部再将数据填入提示词模板。
这种分离意味着:
- 不同的节点可以根据自身需求以不同方式格式化相同的数据
- 你可以在不修改状态结构(state schema)的情况下更改提示模板(prompt templates)
- 调试更加清晰——你能清楚地看到每个节点接收到的具体数据
- 你的智能体(agent)可以持续演进,而不会破坏现有的状态
构建节点
节点就是一个接收当前状态并返回状态更新的 Python 函数。
错误处理策略
| 错误类型 | 修复者 | 策略 | 适用场景 |
|---|---|---|---|
| 瞬时错误 (网络、限流) | 系统 | 重试策略 | 自动恢复的临时失败 |
| LLM 可恢复错误 (解析失败) | LLM | 存入状态并回溯 | LLM 看到错误后可调整重试 |
| 用户可修复错误 (信息缺失) | 人工 | 使用 interrupt() 暂停 |
需要人工补充关键信息 |
| 非预期错误 | 开发者 | 让错误抛出 | 需要调试的程序缺陷 |
组装图
最后,我们将节点连接成一个运行的图。由于我们的节点通过 Command 对象自行处理路由决策,因此只需定义基础的起始和结束连接。
总结
构建此邮件智能体展示了 LangGraph 的思维方式:
- 拆解为离散步骤:每个节点只做一件事,便于调试、流式输出和断点续传。
- 状态即共享内存:存储原始数据,避免在状态中存储提示词或说明文字。
- 节点即函数:接收状态,返回更新,通过
Command控制去向。 - 将错误纳入流程:不同类型的失败有不同的处理路径。
- 人工干预是一等公民:
interrupt()可以在任何节点暂停并在几天后安全恢复。 - 结构自然涌现:由节点内部逻辑驱动路由,使控制流清晰且可追溯。
代码实现
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langgraph.checkpoint.memory import MemorySaver
from langchain.messages import HumanMessage
# 定义邮件分类结构
class EmailClassification(TypedDict):
intent: Literal["question", "bug", "billing", "feature", "complex"]
urgency: Literal["low", "medium", "high", "critical"]
topic: str
summary: str
class EmailAgentState(TypedDict):
# 原始邮件数据
email_content: str
sender_email: str
email_id: str
# 分类结果
classification: EmailClassification | None
# 原始搜索/API结果
search_results: list[str] | None # 原始文档块列表
customer_history: dict | None # CRM中的原始客户数据
# 生成的内容
draft_response: str | None
messages: list[str] | None
# 初始化LLM
llm = init_chat_model("openai:qwen-flash",
api_key="sk-xxxx",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
extra_body={"enable_thinking": False}
)
def read_email(state: EmailAgentState) -> dict:
"""提取和解析邮件内容"""
# 在生产环境中,这将连接到你的邮件服务
return {
"messages": [HumanMessage(content=f"正在处理邮件:{state['email_content']}")]
}
def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
"""使用LLM对邮件意图和紧急程度进行分类,然后相应地路由"""
# 创建返回EmailClassification字典的结构化LLM
structured_llm = llm.with_structured_output(EmailClassification)
# 按需格式化提示,不存储在状态中
classification_prompt = f"""
分析这封客户邮件并进行分类:
邮件内容:{state['email_content']}
发件人:{state['sender_email']}
提供分类,包括意图、紧急程度、主题和摘要。
"""
# 直接以字典形式获取结构化响应
classification = structured_llm.invoke(classification_prompt)
# 根据分类确定下一个节点
if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
goto = "human_review"
elif classification['intent'] in ['question', 'feature']:
goto = "search_documentation"
elif classification['intent'] == 'bug':
goto = "bug_tracking"
else:
goto = "draft_response"
# 将分类作为单个字典存储在状态中
return Command(
update={"classification": classification},
goto=goto
)
def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
"""搜索知识库以获取相关信息"""
# 从分类构建搜索查询
classification = state.get('classification', {})
query = f"{classification.get('intent', '')} {classification.get('topic', '')}"
try:
# 在此实现你的搜索逻辑
# 存储原始搜索结果,而不是格式化的文本
search_results = [
"通过设置 > 安全 > 更改密码来重置密码",
"密码必须至少12个字符",
"包括大写字母、小写字母、数字和符号"
]
except SearchAPIError as e:
# 对于可恢复的搜索错误,存储错误并继续
search_results = [f"搜索暂时不可用:{str(e)}"]
return Command(
update={"search_results": search_results}, # 存储原始结果或错误
goto="draft_response"
)
def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
"""创建或更新错误跟踪工单"""
# 在你的错误跟踪系统中创建工单
ticket_id = "BUG-12345" # 将通过API创建
return Command(
update={
"search_results": [f"错误工单 {ticket_id} 已创建"],
"current_step": "bug_tracked"
},
goto="draft_response"
)
def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
"""使用上下文生成响应并根据质量进行路由"""
classification = state.get('classification', {})
# 按需从原始状态数据格式化上下文
context_sections = []
if state.get('search_results'):
# 为提示格式化搜索结果
formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
context_sections.append(f"相关文档:\n{formatted_docs}")
if state.get('customer_history'):
# 为提示格式化客户数据
context_sections.append(f"客户等级:{state['customer_history'].get('tier', 'standard')}")
# 使用格式化的上下文构建提示
draft_prompt = f"""
为这封客户邮件起草回复:
{state['email_content']}
邮件意图:{classification.get('intent', 'unknown')}
紧急程度:{classification.get('urgency', 'medium')}
{chr(10).join(context_sections)}
指南:
- 保持专业和乐于助人
- 针对他们的具体问题
- 在相关时使用提供的文档
"""
response = llm.invoke(draft_prompt)
# 根据紧急程度和意图确定是否需要人工审核
needs_review = (
classification.get('urgency') in ['high', 'critical'] or
classification.get('intent') == 'complex'
)
# 路由到适当的下一个节点
goto = "human_review" if needs_review else "send_reply"
return Command(
update={"draft_response": response.content}, # 仅存储原始响应
goto=goto
)
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
"""使用中断暂停以进行人工审核,并根据决策进行路由"""
classification = state.get('classification', {})
# interrupt()必须首先调用 - 在它之前的任何代码都会在恢复时重新运行
human_decision = interrupt({
"email_id": state.get('email_id', ''),
"original_email": state.get('email_content', ''),
"draft_response": state.get('draft_response', ''),
"urgency": classification.get('urgency'),
"intent": classification.get('intent'),
"action": "请审核并批准/编辑此回复"
})
# 现在处理人工的决策
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get('draft_response', ''))},
goto="send_reply"
)
else:
# 拒绝意味着人工将直接处理
return Command(update={}, goto=END)
def send_reply(state: EmailAgentState) -> dict:
"""发送邮件回复"""
# 与邮件服务集成
print(f"正在发送回复:{state['draft_response'][:100]}...")
return {}
# 创建工作流图
workflow = StateGraph(EmailAgentState)
# 添加具有适当错误处理的节点
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)
# 为可能出现瞬态故障的节点添加重试策略
workflow.add_node(
"search_documentation",
search_documentation,
retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)
# 仅添加必要的边
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)
# 使用检查点器编译以实现持久性,如果使用Local_Server运行图则请不使用检查点器编译
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 测试紧急的账单问题
initial_state = {
"email_content": "我的订阅被重复收费了!这很紧急!",
"sender_email": "customer@example.com",
"email_id": "email_123",
"messages": []
}
# 使用thread_id运行以实现持久性
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# 图将在human_review处暂停
print(f"人工审核中断:{result['__interrupt__']}")
# 准备好后,提供人工输入以恢复执行
human_response = Command(
resume={
"approved": True,
"edited_response": "我们对重复收费深表歉意。我已经启动了立即退款..."
}
)
# 恢复执行
final_result = app.invoke(human_response, config)
print("邮件发送成功!")
更多推荐



所有评论(0)