具体案例设计

在这里插入图片描述

使用 LangGraph 构建智能体时,首先需要将其拆解为一个个离散的步骤,称为 节点(Nodes)。接着,你需要描述从每个节点出发的不同决策和转换逻辑。最后,通过一个共享的 状态(State) 将这些节点连接起来,每个节点都可以对该状态进行读取和写入。

假设你需要构建一个处理客户支持邮件的 AI 智能体。产品团队给出了以下需求:

智能体应当:
- 读取收到的客户邮件
- 按紧迫性和主题进行分类
- 搜索相关文档以回答问题
- 起草适当的回复
- 将复杂问题升级给人工客服
- 根据需要安排后续跟进

需要处理的典型场景:
1. 简单的产品问题:“如何重置密码?”
2. 缺陷报告:“选择 PDF 格式时,导出功能会崩溃。”
3. 紧急账单问题:“我的订阅被扣了两次费!”
4. 功能需求:“能在移动应用中加入深色模式吗?”
5. 复杂的个人技术问题:“我们的 API 集成间歇性出现 504 错误。”

工作流映射

首先识别流程中的各个独立步骤。每一步都将成为一个 节点(执行特定任务的函数)。然后,勾勒出这些步骤如何相互连接。

开始

读取邮件

分类意图

文档搜索

缺陷追踪

人工审核

起草回复

人工审核

发送回复

结束

  • 读取邮件:提取并解析邮件内容。
  • 分类意图:使用大模型(LLM)分类紧迫性和主题,然后路由到相应的动作。
  • 文档搜索:查询知识库获取相关信息。
  • 缺陷追踪:在追踪系统中创建或更新问题。
  • 起草回复:生成合适的响应内容。
  • 人工审核:升级给人工进行审批或处理。
  • 发送回复:发出邮件响应。

注意某些节点会根据情况做出“去向”决策(如分类意图、起草回复、人工审核),而其他节点则总是流向同一个下一步(如读取邮件总是流向分类意图)。


明确职责

针对图中的每个节点,确定它属于哪种操作类型以及需要哪些上下文。

  • LLM 步骤:用于理解、分析、生成文本或进行推理决策。
  • 数据步骤:用于从外部源检索信息。
  • 动作步骤:用于执行外部操作。
  • 用户输入步骤:用于需要人工干预的环节。

LLM 步骤

  • 分类意图
    • 静态上下文(提示词):分类类别、紧急程度定义、输出格式。
    • 动态上下文(来自状态):邮件内容、发件人信息。
    • 预期结果:决定路由的结构化分类数据。
  • 起草回复
    • 静态上下文:语气指南、公司政策、回复模板。
    • 动态上下文:分类结果、搜索结果、客户历史。

数据步骤

  • 文档搜索:根据意图和主题构建查询。支持重试策略和缓存。
  • 客户历史查询:根据发件人 ID 获取 CRM 数据。

动作步骤

  • 发送回复:在获批后执行,需要网络重试策略,不应缓存。
  • 缺陷追踪:当意图为“bug”时执行,返回工单 ID。

用户输入步骤

  • 人工审核节点:提供原始邮件、草稿和分类信息,等待人工审批或编辑。

设计状态

状态是智能体中所有节点都能访问的共享内存

什么应该放入状态?

  • 存入状态:数据是否需要跨步骤持久化?如果是,放入状态。
  • 不要存储:数据是否可以从其他数据推导出来?如果是,在使用时计算即可。

对于邮件智能体,我们需要追踪:

  • 原始邮件和发件人信息(无法重建)。
  • 分类结果(后续多个节点需要)。
  • 搜索结果和客户数据(重新获取成本高)。
  • 回复草案(需要经过人工审核环节)。

核心原则:保持状态原始化,按需格式化提示词

你的状态应该存储 原始数据,而不是格式化好的文本。在节点内部再将数据填入提示词模板。

这种分离意味着:

  • 不同的节点可以根据自身需求以不同方式格式化相同的数据
  • 你可以在不修改状态结构(state schema)的情况下更改提示模板(prompt templates)
  • 调试更加清晰——你能清楚地看到每个节点接收到的具体数据
  • 你的智能体(agent)可以持续演进,而不会破坏现有的状态

构建节点

节点就是一个接收当前状态并返回状态更新的 Python 函数。

错误处理策略

错误类型 修复者 策略 适用场景
瞬时错误 (网络、限流) 系统 重试策略 自动恢复的临时失败
LLM 可恢复错误 (解析失败) LLM 存入状态并回溯 LLM 看到错误后可调整重试
用户可修复错误 (信息缺失) 人工 使用 interrupt() 暂停 需要人工补充关键信息
非预期错误 开发者 让错误抛出 需要调试的程序缺陷

组装图

最后,我们将节点连接成一个运行的图。由于我们的节点通过 Command 对象自行处理路由决策,因此只需定义基础的起始和结束连接。

总结

构建此邮件智能体展示了 LangGraph 的思维方式:

  1. 拆解为离散步骤:每个节点只做一件事,便于调试、流式输出和断点续传。
  2. 状态即共享内存:存储原始数据,避免在状态中存储提示词或说明文字。
  3. 节点即函数:接收状态,返回更新,通过 Command 控制去向。
  4. 将错误纳入流程:不同类型的失败有不同的处理路径。
  5. 人工干预是一等公民interrupt() 可以在任何节点暂停并在几天后安全恢复。
  6. 结构自然涌现:由节点内部逻辑驱动路由,使控制流清晰且可追溯。

代码实现

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command, RetryPolicy
from langgraph.checkpoint.memory import MemorySaver

from langchain.messages import HumanMessage


# 定义邮件分类结构
class EmailClassification(TypedDict):
    intent: Literal["question", "bug", "billing", "feature", "complex"]
    urgency: Literal["low", "medium", "high", "critical"]
    topic: str
    summary: str


class EmailAgentState(TypedDict):
    # 原始邮件数据
    email_content: str
    sender_email: str
    email_id: str

    # 分类结果
    classification: EmailClassification | None

    # 原始搜索/API结果
    search_results: list[str] | None  # 原始文档块列表
    customer_history: dict | None  # CRM中的原始客户数据

    # 生成的内容
    draft_response: str | None
    messages: list[str] | None



# 初始化LLM
llm = init_chat_model("openai:qwen-flash",
    api_key="sk-xxxx",
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    extra_body={"enable_thinking": False}
)



def read_email(state: EmailAgentState) -> dict:
    """提取和解析邮件内容"""
    # 在生产环境中,这将连接到你的邮件服务
    return {
        "messages": [HumanMessage(content=f"正在处理邮件:{state['email_content']}")]
    }


def classify_intent(state: EmailAgentState) -> Command[Literal["search_documentation", "human_review", "draft_response", "bug_tracking"]]:
    """使用LLM对邮件意图和紧急程度进行分类,然后相应地路由"""

    # 创建返回EmailClassification字典的结构化LLM
    structured_llm = llm.with_structured_output(EmailClassification)

    # 按需格式化提示,不存储在状态中
    classification_prompt = f"""
    分析这封客户邮件并进行分类:

    邮件内容:{state['email_content']}
    发件人:{state['sender_email']}

    提供分类,包括意图、紧急程度、主题和摘要。
    """

    # 直接以字典形式获取结构化响应
    classification = structured_llm.invoke(classification_prompt)

    # 根据分类确定下一个节点
    if classification['intent'] == 'billing' or classification['urgency'] == 'critical':
        goto = "human_review"
    elif classification['intent'] in ['question', 'feature']:
        goto = "search_documentation"
    elif classification['intent'] == 'bug':
        goto = "bug_tracking"
    else:
        goto = "draft_response"

    # 将分类作为单个字典存储在状态中
    return Command(
        update={"classification": classification},
        goto=goto
    )
    

def search_documentation(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """搜索知识库以获取相关信息"""

    # 从分类构建搜索查询
    classification = state.get('classification', {})
    query = f"{classification.get('intent', '')} {classification.get('topic', '')}"

    try:
        # 在此实现你的搜索逻辑
        # 存储原始搜索结果,而不是格式化的文本
        search_results = [
            "通过设置 > 安全 > 更改密码来重置密码",
            "密码必须至少12个字符",
            "包括大写字母、小写字母、数字和符号"
        ]
    except SearchAPIError as e:
        # 对于可恢复的搜索错误,存储错误并继续
        search_results = [f"搜索暂时不可用:{str(e)}"]

    return Command(
        update={"search_results": search_results},  # 存储原始结果或错误
        goto="draft_response"
    )


def bug_tracking(state: EmailAgentState) -> Command[Literal["draft_response"]]:
    """创建或更新错误跟踪工单"""

    # 在你的错误跟踪系统中创建工单
    ticket_id = "BUG-12345"  # 将通过API创建

    return Command(
        update={
            "search_results": [f"错误工单 {ticket_id} 已创建"],
            "current_step": "bug_tracked"
        },
        goto="draft_response"
    )
    

def draft_response(state: EmailAgentState) -> Command[Literal["human_review", "send_reply"]]:
    """使用上下文生成响应并根据质量进行路由"""

    classification = state.get('classification', {})

    # 按需从原始状态数据格式化上下文
    context_sections = []

    if state.get('search_results'):
        # 为提示格式化搜索结果
        formatted_docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_sections.append(f"相关文档:\n{formatted_docs}")

    if state.get('customer_history'):
        # 为提示格式化客户数据
        context_sections.append(f"客户等级:{state['customer_history'].get('tier', 'standard')}")

    # 使用格式化的上下文构建提示
    draft_prompt = f"""
    为这封客户邮件起草回复:
    {state['email_content']}

    邮件意图:{classification.get('intent', 'unknown')}
    紧急程度:{classification.get('urgency', 'medium')}

    {chr(10).join(context_sections)}

    指南:
    - 保持专业和乐于助人
    - 针对他们的具体问题
    - 在相关时使用提供的文档
    """

    response = llm.invoke(draft_prompt)

    # 根据紧急程度和意图确定是否需要人工审核
    needs_review = (
        classification.get('urgency') in ['high', 'critical'] or
        classification.get('intent') == 'complex'
    )

    # 路由到适当的下一个节点
    goto = "human_review" if needs_review else "send_reply"

    return Command(
        update={"draft_response": response.content},  # 仅存储原始响应
        goto=goto
    )


def human_review(state: EmailAgentState) -> Command[Literal["send_reply", END]]:
    """使用中断暂停以进行人工审核,并根据决策进行路由"""

    classification = state.get('classification', {})

    # interrupt()必须首先调用 - 在它之前的任何代码都会在恢复时重新运行
    human_decision = interrupt({
        "email_id": state.get('email_id', ''),
        "original_email": state.get('email_content', ''),
        "draft_response": state.get('draft_response', ''),
        "urgency": classification.get('urgency'),
        "intent": classification.get('intent'),
        "action": "请审核并批准/编辑此回复"
    })

    # 现在处理人工的决策
    if human_decision.get("approved"):
        return Command(
            update={"draft_response": human_decision.get("edited_response", state.get('draft_response', ''))},
            goto="send_reply"
        )
    else:
        # 拒绝意味着人工将直接处理
        return Command(update={}, goto=END)


def send_reply(state: EmailAgentState) -> dict:
    """发送邮件回复"""
    # 与邮件服务集成
    print(f"正在发送回复:{state['draft_response'][:100]}...")
    return {}
    


# 创建工作流图
workflow = StateGraph(EmailAgentState)

# 添加具有适当错误处理的节点
workflow.add_node("read_email", read_email)
workflow.add_node("classify_intent", classify_intent)

# 为可能出现瞬态故障的节点添加重试策略
workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3)
)
workflow.add_node("bug_tracking", bug_tracking)
workflow.add_node("draft_response", draft_response)
workflow.add_node("human_review", human_review)
workflow.add_node("send_reply", send_reply)

# 仅添加必要的边
workflow.add_edge(START, "read_email")
workflow.add_edge("read_email", "classify_intent")
workflow.add_edge("send_reply", END)

# 使用检查点器编译以实现持久性,如果使用Local_Server运行图则请不使用检查点器编译
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)


# 测试紧急的账单问题
initial_state = {
    "email_content": "我的订阅被重复收费了!这很紧急!",
    "sender_email": "customer@example.com",
    "email_id": "email_123",
    "messages": []
}

# 使用thread_id运行以实现持久性
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# 图将在human_review处暂停
print(f"人工审核中断:{result['__interrupt__']}")

# 准备好后,提供人工输入以恢复执行
human_response = Command(
    resume={
        "approved": True,
        "edited_response": "我们对重复收费深表歉意。我已经启动了立即退款..."
    }
)

# 恢复执行
final_result = app.invoke(human_response, config)
print("邮件发送成功!")
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐