第13节:LangGraph 平台实战指南
摘要 本文介绍了LangGraph平台及其在构建复杂AI工作流中的应用。LangGraph是一个基于LangChain生态的有状态、多参与者应用框架,支持图结构工作流设计,适用于多步骤决策、工具协作等场景。文章详细讲解了LangGraph的核心概念(节点、边、状态、图等)和部署方法,包括环境搭建、依赖安装和基础工作流示例。通过一个简单的对话工作流示例,展示了如何使用LangGraph接收用户输入、

文章目录
一、前言
在人工智能技术飞速发展的今天,大型语言模型(LLM)的应用已经渗透到各个行业领域。然而,如何有效地将多个LLM、外部工具和业务逻辑组织成可靠的工作流,仍然是开发者面临的重要挑战。LangGraph 应运而生,它提供了一个强大的框架,用于构建复杂、有状态的图结构应用程序,特别适合需要多步骤决策、工具调用和状态管理的AI应用场景。
本文旨在为初级到中级的技术人员提供一个实践指南,无论您是后端开发者、前端工程师、运维人员、AI爱好者,还是在校学生,都能通过本文学会如何使用
LangGraph 平台构建实际的AI应用。我们将从基础概念入手,逐步深入到实际部署和开发,最后通过练习题巩固所学知识。
通过阅读本文,您将掌握 LangGraph 的核心概念、工作流设计方法以及实际部署技巧,能够独立构建基于工作流的AI应用程序。让我们开始这段探索之旅,一起解锁LangGraph的强大能力。
二、LangGraph 平台
2.1 平台介绍
2.1.1 什么是LangGraph
LangGraph 是一个用于构建有状态、多参与者的应用框架,建立在 LangChain 生态系统之上。与传统的线性链式调用不同,LangGraph 允许开发者创建图结构的工作流,其中节点可以是LLM调用、工具使用或自定义函数,边定义了节点之间的流转逻辑。
LangGraph 的核心特点:
- 有状态性:工作流可以维护和更新状态,支持复杂、多轮次的交互
- 图结构:支持分支、循环、并行执行等复杂流程控制
- 可观察性:提供完整的执行跟踪和调试工具
- 多参与者:支持多个"智能体"协同完成任务
- 人类参与:可以在工作流中插入人类干预步骤
2.1.2 核心概念解析
1. 节点(Node)
节点是工作流中的基本执行单元,可以是:
- LLM调用
- 工具函数
- 自定义Python函数
- 条件判断
2. 边(Edge)
边定义了节点之间的流转关系,可以是:
- 无条件流转
- 基于条件的流转
- 并行流转
3. 状态(State)
状态是工作流中共享的数据结构,通常是一个字典,包含所有节点需要访问和修改的数据。
4. 图(Graph)
图是节点和边的集合,定义了完整的工作流结构。
5. 检查点(Checkpoint)
LangGraph 支持检查点机制,允许工作流在任意点暂停、恢复,这对于长时运行任务至关重要。
2.1.3 应用场景
LangGraph 适用于多种复杂AI应用场景:
- 复杂决策系统:需要多步骤推理和决策的应用
- 多工具协作:需要调用多个外部工具或API的应用
- 对话系统:复杂的多轮对话管理
- 工作流自动化:业务流程的自动化处理
- 数据分析流水线:多步骤的数据处理和分析流程
2.1.4 与其他框架对比
与其他工作流框架相比,LangGraph 的主要优势在于:
| 特性 | LangGraph | LangChain | AutoGen |
|---|---|---|---|
| 状态管理 | 内置强大状态管理 | 基础状态管理 | 有限状态管理 |
| 循环支持 | 原生支持 | 需要手动实现 | 有限支持 |
| 检查点 | 内置支持 | 不支持 | 不支持 |
| 复杂性 | 适合复杂工作流 | 适合简单链 | 适合多智能体 |
2.2 部署 LangGraph 项目
2.2.1 环境搭建
1. 基础环境要求
首先,确保您的系统满足以下要求:
- Python 3.8+
- pip 包管理器
- 虚拟环境(推荐)
2. 创建虚拟环境
# 创建项目目录
mkdir langgraph-project
cd langgraph-project
# 创建虚拟环境
python -m venv venv
# 激活虚拟环境
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
3. 安装依赖包
创建 requirements.txt 文件:
langgraph==0.0.1
langchain==0.1.0
openai==1.3.0
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
安装依赖:
pip install -r requirements.txt
4. 环境变量配置
创建 .env 文件:
# OpenAI API 配置
OPENAI_API_KEY=your_api_key_here
# 其他配置
APP_ENV=development
APP_PORT=8000
创建 config.py 配置文件:
import os
from dotenv import load_dotenv
from pydantic_settings import BaseSettings
# 加载环境变量
load_dotenv()
class Settings(BaseSettings):
"""应用配置类"""
# OpenAI 配置
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
# 应用配置
app_env: str = os.getenv("APP_ENV", "development")
app_port: int = int(os.getenv("APP_PORT", 8000))
# LangGraph 配置
max_iterations: int = 10
debug_mode: bool = os.getenv("APP_ENV") == "development"
class Config:
env_file = ".env"
# 创建全局配置实例
settings = Settings()
2.2.2 第一个 LangGraph 应用
让我们创建一个简单的对话工作流,展示 LangGraph 的基本用法。
1. 基础工作流示例
创建 basic_workflow.py:
"""
基础 LangGraph 工作流示例
演示如何创建一个简单的对话流程
"""
from typing import TypedDict, Annotated, List
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from config import settings
# 定义状态结构
class ConversationState(TypedDict):
"""对话状态定义"""
messages: Annotated[List, "对话消息历史"]
query: str = "用户查询"
response: str = "AI回复"
def create_basic_workflow():
"""
创建基础对话工作流
返回:
app: 编译后的工作流应用
"""
# 1. 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.7,
api_key=settings.openai_api_key
)
# 2. 定义节点函数
def receive_input(state: ConversationState) -> dict:
"""
接收用户输入节点
参数:
state: 当前状态
返回:
更新后的状态
"""
print(f"收到用户查询: {state['query']}")
# 将用户消息添加到历史
messages = state.get("messages", [])
messages.append(HumanMessage(content=state["query"]))
return {"messages": messages}
def generate_response(state: ConversationState) -> dict:
"""
生成AI回复节点
参数:
state: 当前状态
返回:
更新后的状态
"""
messages = state["messages"]
# 调用LLM生成回复
response = llm.invoke(messages)
# 将AI回复添加到历史
messages.append(AIMessage(content=response.content))
print(f"生成AI回复: {response.content[:50]}...")
return {
"messages": messages,
"response": response.content
}
def format_output(state: ConversationState) -> dict:
"""
格式化输出节点
参数:
state: 当前状态
返回:
格式化后的状态
"""
response = state["response"]
# 简单的格式化处理
formatted_response = f"🤖 AI回复: {response}"
print(f"格式化后的输出: {formatted_response[:50]}...")
return {"response": formatted_response}
# 3. 创建工作流图
workflow = StateGraph(ConversationState)
# 4. 添加节点
workflow.add_node("receive_input", receive_input)
workflow.add_node("generate_response", generate_response)
workflow.add_node("format_output", format_output)
# 5. 设置入口点
workflow.set_entry_point("receive_input")
# 6. 添加边
workflow.add_edge("receive_input", "generate_response")
workflow.add_edge("generate_response", "format_output")
workflow.add_edge("format_output", END)
# 7. 编译工作流
app = workflow.compile()
return app
def run_basic_workflow():
"""运行基础工作流示例"""
print("=" * 50)
print("开始运行基础工作流示例")
print("=" * 50)
# 创建工作流
app = create_basic_workflow()
# 初始状态
initial_state = {
"messages": [],
"query": "请介绍一下人工智能的发展历史",
"response": ""
}
# 执行工作流
print("\n执行工作流...")
result = app.invoke(initial_state)
print("\n" + "=" * 50)
print("工作流执行完成")
print("=" * 50)
print(f"\n最终回复: {result['response']}")
return result
if __name__ == "__main__":
# 运行示例
run_basic_workflow()
2. 带条件分支的工作流
创建 conditional_workflow.py:
"""
带条件分支的 LangGraph 工作流示例
演示如何根据条件选择不同的执行路径
"""
from typing import TypedDict, Annotated, Literal
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from config import settings
# 定义状态结构
class ConditionalState(TypedDict):
"""条件工作流状态定义"""
query: str
query_type: Literal["simple", "complex", "unknown"]
response: str
steps: Annotated[list, "执行步骤记录"]
def create_conditional_workflow():
"""
创建条件分支工作流
返回:
app: 编译后的工作流应用
"""
# 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.7,
api_key=settings.openai_api_key
)
# 定义节点函数
def analyze_query(state: ConditionalState) -> dict:
"""
分析查询类型节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
steps = state.get("steps", [])
# 简单的查询类型分析
if len(query) < 20:
query_type = "simple"
elif "计算" in query or "分析" in query:
query_type = "complex"
else:
query_type = "unknown"
steps.append(f"分析查询: '{query}' -> 类型: {query_type}")
print(f"查询分析结果: {query_type}")
return {
"query_type": query_type,
"steps": steps
}
def handle_simple_query(state: ConditionalState) -> dict:
"""
处理简单查询节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
steps = state.get("steps", [])
# 简单查询直接回复
messages = [HumanMessage(content=f"请简要回答: {query}")]
response = llm.invoke(messages)
steps.append(f"处理简单查询: 直接生成回复")
return {
"response": response.content,
"steps": steps
}
def handle_complex_query(state: ConditionalState) -> dict:
"""
处理复杂查询节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
steps = state.get("steps", [])
# 复杂查询分步处理
steps.append(f"处理复杂查询: 分步分析")
# 第一步:分析问题
analysis_prompt = f"请分析这个问题需要哪些步骤来解决: {query}"
analysis = llm.invoke([HumanMessage(content=analysis_prompt)])
# 第二步:生成详细回答
answer_prompt = f"根据以下分析,给出详细回答: {analysis.content}"
answer = llm.invoke([HumanMessage(content=answer_prompt)])
steps.append(f"分析步骤: {analysis.content[:50]}...")
return {
"response": answer.content,
"steps": steps
}
def handle_unknown_query(state: ConditionalState) -> dict:
"""
处理未知类型查询节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
steps = state.get("steps", [])
steps.append(f"处理未知查询: 请求澄清")
return {
"response": f"我不确定如何处理这个查询: '{query}'。请提供更多细节或重新表述您的问题。",
"steps": steps
}
def route_query(state: ConditionalState) -> str:
"""
路由函数:根据查询类型选择下一个节点
参数:
state: 当前状态
返回:
下一个节点的名称
"""
query_type = state["query_type"]
routes = {
"simple": "handle_simple_query",
"complex": "handle_complex_query",
"unknown": "handle_unknown_query"
}
next_node = routes.get(query_type, "handle_unknown_query")
print(f"路由决策: {query_type} -> {next_node}")
return next_node
# 创建工作流图
workflow = StateGraph(ConditionalState)
# 添加节点
workflow.add_node("analyze_query", analyze_query)
workflow.add_node("handle_simple_query", handle_simple_query)
workflow.add_node("handle_complex_query", handle_complex_query)
workflow.add_node("handle_unknown_query", handle_unknown_query)
# 设置入口点
workflow.set_entry_point("analyze_query")
# 添加条件边
workflow.add_conditional_edges(
"analyze_query",
route_query,
{
"handle_simple_query": "handle_simple_query",
"handle_complex_query": "handle_complex_query",
"handle_unknown_query": "handle_unknown_query"
}
)
# 添加普通边
workflow.add_edge("handle_simple_query", END)
workflow.add_edge("handle_complex_query", END)
workflow.add_edge("handle_unknown_query", END)
# 编译工作流
app = workflow.compile()
return app
def run_conditional_workflow():
"""运行条件工作流示例"""
print("=" * 50)
print("开始运行条件分支工作流示例")
print("=" * 50)
# 创建工作流
app = create_conditional_workflow()
# 测试不同查询
test_queries = [
("你好", "simple"),
("请分析人工智能在医疗领域的应用前景,并给出详细的市场分析报告", "complex"),
("@#$%^&*", "unknown")
]
for query, expected_type in test_queries:
print(f"\n{'='*30}")
print(f"测试查询: {query}")
print(f"期望类型: {expected_type}")
print(f"{'='*30}")
# 初始状态
initial_state = {
"query": query,
"query_type": "unknown",
"response": "",
"steps": []
}
# 执行工作流
result = app.invoke(initial_state)
print(f"\n查询类型: {result['query_type']}")
print(f"执行步骤: {len(result['steps'])} 步")
for i, step in enumerate(result['steps'], 1):
print(f" 步骤{i}: {step}")
print(f"回复: {result['response'][:100]}...")
return True
if __name__ == "__main__":
# 运行示例
run_conditional_workflow()
2.2.3 工具调用工作流
创建 tool_workflow.py:
"""
工具调用 LangGraph 工作流示例
演示如何在工作流中调用外部工具
"""
from typing import TypedDict, Annotated, List, Optional
from datetime import datetime
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.prompts import PromptTemplate
from config import settings
# 定义状态结构
class ToolState(TypedDict):
"""工具工作流状态定义"""
query: str
tools_used: Annotated[List[str], "已使用的工具列表"]
intermediate_results: Annotated[List[str], "中间结果"]
final_answer: str
error: Optional[str]
def create_tool_workflow():
"""
创建工具调用工作流
返回:
app: 编译后的工作流应用
"""
# 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.7,
api_key=settings.openai_api_key
)
# 定义自定义工具
def get_current_time(*args, **kwargs) -> str:
"""获取当前时间工具"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[工具调用] get_current_time: {current_time}")
return f"当前时间是: {current_time}"
def calculate(expression: str) -> str:
"""计算器工具(简化版)"""
print(f"[工具调用] calculate: {expression}")
try:
# 安全评估表达式
allowed_chars = set("0123456789+-*/(). ")
if not all(c in allowed_chars for c in expression):
return "错误: 表达式包含非法字符"
# 使用eval计算(注意:生产环境需要更安全的方法)
result = eval(expression)
return f"{expression} = {result}"
except Exception as e:
return f"计算错误: {str(e)}"
def search_knowledge(query: str) -> str:
"""知识搜索工具(模拟)"""
print(f"[工具调用] search_knowledge: {query}")
# 模拟知识库
knowledge_base = {
"python": "Python是一种高级编程语言,以其简洁易读的语法而闻名。",
"人工智能": "人工智能是计算机科学的一个分支,致力于创建智能机器。",
"机器学习": "机器学习是人工智能的子领域,使计算机能够从数据中学习。",
"LangGraph": "LangGraph是用于构建有状态、多参与者应用的框架。"
}
query_lower = query.lower()
for key, value in knowledge_base.items():
if key in query_lower:
return value
return f"未找到关于'{query}'的详细信息。"
# 创建工具列表
tools = [
Tool(
name="get_current_time",
func=get_current_time,
description="获取当前日期和时间"
),
Tool(
name="calculate",
func=calculate,
description="计算数学表达式,例如:calculate('2 + 3 * 4')"
),
Tool(
name="search_knowledge",
func=search_knowledge,
description="搜索知识库获取信息"
)
]
# 定义节点函数
def analyze_and_plan(state: ToolState) -> dict:
"""
分析查询并制定计划节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
prompt = f"""
用户查询: {query}
请分析这个查询是否需要使用工具,以及需要使用哪些工具。
可用工具:
1. get_current_time - 获取当前时间
2. calculate - 计算数学表达式
3. search_knowledge - 搜索知识库
如果不需要工具,直接回复"无需工具"。
如果需要工具,列出需要的工具名称。
"""
analysis = llm.invoke([HumanMessage(content=prompt)])
print(f"[分析结果] {analysis.content}")
return {
"intermediate_results": [f"分析: {analysis.content}"],
"tools_used": []
}
def execute_tools(state: ToolState) -> dict:
"""
执行工具节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
intermediate_results = state.get("intermediate_results", [])
tools_used = state.get("tools_used", [])
# 根据查询决定使用哪个工具
tool_results = []
if "时间" in query or "几点" in query:
result = tools[0].func()
tool_results.append(f"时间查询: {result}")
tools_used.append("get_current_time")
if "计算" in query or any(op in query for op in ["+", "-", "*", "/"]):
# 提取数学表达式
import re
numbers = re.findall(r"\d+", query)
if numbers:
# 简单示例:计算所有数字的和
numbers = [int(n) for n in numbers]
expression = "+".join(map(str, numbers))
result = tools[1].func(expression)
tool_results.append(f"计算: {result}")
tools_used.append("calculate")
if "是什么" in query or "介绍" in query or "知识" in query:
# 提取关键词
keywords = ["Python", "人工智能", "机器学习", "LangGraph"]
for keyword in keywords:
if keyword in query:
result = tools[2].func(keyword)
tool_results.append(f"知识搜索({keyword}): {result}")
tools_used.append("search_knowledge")
break
intermediate_results.extend(tool_results)
print(f"[工具执行] 使用了 {len(tool_results)} 个工具")
return {
"intermediate_results": intermediate_results,
"tools_used": tools_used
}
def generate_final_answer(state: ToolState) -> dict:
"""
生成最终答案节点
参数:
state: 当前状态
返回:
更新后的状态
"""
query = state["query"]
intermediate_results = state.get("intermediate_results", [])
tools_used = state.get("tools_used", [])
# 构建提示
prompt_parts = [f"用户查询: {query}"]
if intermediate_results:
prompt_parts.append("工具执行结果:")
prompt_parts.extend(intermediate_results)
prompt_parts.append("请根据以上信息,生成完整、友好的回答。")
prompt = "\n".join(prompt_parts)
response = llm.invoke([HumanMessage(content=prompt)])
print(f"[生成回答] 使用了 {len(tools_used)} 个工具")
return {
"final_answer": response.content,
"intermediate_results": intermediate_results
}
def handle_error(state: ToolState) -> dict:
"""
错误处理节点
参数:
state: 当前状态
返回:
更新后的状态
"""
error = state.get("error", "未知错误")
return {
"final_answer": f"抱歉,处理过程中出现错误: {error}。请稍后重试或联系支持。",
"error": error
}
# 创建工作流图
workflow = StateGraph(ToolState)
# 添加节点
workflow.add_node("analyze_and_plan", analyze_and_plan)
workflow.add_node("execute_tools", execute_tools)
workflow.add_node("generate_final_answer", generate_final_answer)
workflow.add_node("handle_error", handle_error)
# 设置入口点
workflow.set_entry_point("analyze_and_plan")
# 添加边
workflow.add_edge("analyze_and_plan", "execute_tools")
workflow.add_edge("execute_tools", "generate_final_answer")
workflow.add_edge("generate_final_answer", END)
# 添加错误处理边
workflow.add_edge("handle_error", END)
# 编译工作流
app = workflow.compile()
return app
def run_tool_workflow():
"""运行工具工作流示例"""
print("=" * 50)
print("开始运行工具调用工作流示例")
print("=" * 50)
# 创建工作流
app = create_tool_workflow()
# 测试查询
test_queries = [
"现在几点了?",
"计算一下 25 加上 38 等于多少",
"请介绍一下Python语言",
"今天天气怎么样?"
]
for query in test_queries:
print(f"\n{'='*30}")
print(f"测试查询: {query}")
print(f"{'='*30}")
# 初始状态
initial_state = {
"query": query,
"tools_used": [],
"intermediate_results": [],
"final_answer": "",
"error": None
}
try:
# 执行工作流
result = app.invoke(initial_state)
print(f"\n使用的工具: {result['tools_used']}")
print(f"中间结果: {len(result['intermediate_results'])} 个")
for i, res in enumerate(result['intermediate_results'], 1):
print(f" 结果{i}: {res}")
print(f"最终回答: {result['final_answer'][:150]}...")
except Exception as e:
error_state = {"error": str(e)}
result = app.invoke({**initial_state, **error_state})
print(f"错误处理结果: {result['final_answer']}")
return True
if __name__ == "__main__":
# 运行示例
run_tool_workflow()
2.2.4 多智能体协作工作流
创建 multi_agent_workflow.py:
"""
多智能体协作 LangGraph 工作流示例
演示多个智能体如何协同工作完成任务
"""
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from config import settings
# 定义状态结构
class MultiAgentState(TypedDict):
"""多智能体工作流状态定义"""
task: str
specialist_analysis: str
reviewer_feedback: str
final_output: str
status: Annotated[str, "任务状态"]
def create_multi_agent_workflow():
"""
创建多智能体协作工作流
返回:
app: 编译后的工作流应用
"""
# 初始化不同角色的LLM
specialist_llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.8, # 更高的创造力
api_key=settings.openai_api_key
)
reviewer_llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.3, # 更保守
api_key=settings.openai_api_key
)
coordinator_llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.5, # 中等创造力
api_key=settings.openai_api_key
)
# 定义节点函数
def task_analyzer(state: MultiAgentState) -> dict:
"""
任务分析节点
参数:
state: 当前状态
返回:
更新后的状态
"""
task = state["task"]
system_prompt = """你是一个任务分析专家。你的职责是:
1. 分析任务的复杂性和要求
2. 确定完成任务需要的步骤
3. 预估可能遇到的挑战
4. 提出初步解决方案
请提供详细的分析报告。"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"请分析以下任务: {task}")
]
analysis = specialist_llm.invoke(messages)
print(f"[任务分析] 完成分析,长度: {len(analysis.content)} 字符")
return {
"specialist_analysis": analysis.content,
"status": "已分析"
}
def quality_reviewer(state: MultiAgentState) -> dict:
"""
质量审查节点
参数:
state: 当前状态
返回:
更新后的状态
"""
analysis = state["specialist_analysis"]
task = state["task"]
system_prompt = """你是一个质量审查专家。你的职责是:
1. 审查分析报告的质量
2. 指出可能的问题或遗漏
3. 提出改进建议
4. 评估报告的完整性和准确性
请提供详细的审查反馈。"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"任务: {task}\n\n分析报告: {analysis}\n\n请审查这份分析报告。")
]
feedback = reviewer_llm.invoke(messages)
print(f"[质量审查] 完成审查,反馈长度: {len(feedback.content)} 字符")
return {
"reviewer_feedback": feedback.content,
"status": "已审查"
}
def solution_integrator(state: MultiAgentState) -> dict:
"""
解决方案整合节点
参数:
state: 当前状态
返回:
更新后的状态
"""
task = state["task"]
analysis = state["specialist_analysis"]
feedback = state["reviewer_feedback"]
system_prompt = """你是一个解决方案整合专家。你的职责是:
1. 综合考虑分析报告和审查反馈
2. 整合最佳的解决方案
3. 生成最终的输出
4. 确保方案的完整性和可行性
请基于分析和反馈,生成最终的解决方案。"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"任务: {task}\n\n分析报告: {analysis}\n\n审查反馈: {feedback}\n\n请生成最终的解决方案。")
]
final_output = coordinator_llm.invoke(messages)
print(f"[解决方案整合] 生成最终方案,长度: {len(final_output.content)} 字符")
return {
"final_output": final_output.content,
"status": "已完成"
}
def quality_check(state: MultiAgentState) -> str:
"""
质量检查决策节点
参数:
state: 当前状态
返回:
下一个节点的名称
"""
feedback = state["reviewer_feedback"]
# 简单的质量检查逻辑
if "需要修改" in feedback or "不足" in feedback or "缺少" in feedback:
print("[质量检查] 需要重新分析")
return "redo_analysis"
else:
print("[质量检查] 质量合格,继续整合")
return "proceed_integration"
def redo_analysis(state: MultiAgentState) -> dict:
"""
重新分析节点
参数:
state: 当前状态
返回:
更新后的状态
"""
task = state["task"]
feedback = state["reviewer_feedback"]
system_prompt = """你是一个任务分析专家。根据审查反馈,请重新分析任务并改进分析报告。"""
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"任务: {task}\n\n审查反馈: {feedback}\n\n请根据反馈重新分析任务。")
]
revised_analysis = specialist_llm.invoke(messages)
print(f"[重新分析] 完成修订,长度: {len(revised_analysis.content)} 字符")
return {
"specialist_analysis": revised_analysis.content,
"status": "已修订"
}
# 创建工作流图
workflow = StateGraph(MultiAgentState)
# 添加节点
workflow.add_node("task_analyzer", task_analyzer)
workflow.add_node("quality_reviewer", quality_reviewer)
workflow.add_node("solution_integrator", solution_integrator)
workflow.add_node("redo_analysis", redo_analysis)
# 设置入口点
workflow.set_entry_point("task_analyzer")
# 添加边
workflow.add_edge("task_analyzer", "quality_reviewer")
# 添加条件边
workflow.add_conditional_edges(
"quality_reviewer",
quality_check,
{
"redo_analysis": "redo_analysis",
"proceed_integration": "solution_integrator"
}
)
# 重新分析后返回审查
workflow.add_edge("redo_analysis", "quality_reviewer")
# 整合后结束
workflow.add_edge("solution_integrator", END)
# 编译工作流
app = workflow.compile()
return app
def run_multi_agent_workflow():
"""运行多智能体工作流示例"""
print("=" * 50)
print("开始运行多智能体协作工作流示例")
print("=" * 50)
# 创建工作流
app = create_multi_agent_workflow()
# 测试任务
test_tasks = [
"设计一个简单的电商网站的用户注册和登录系统",
"制定一个为期一个月的Python学习计划",
"分析人工智能在金融风控中的应用场景"
]
for i, task in enumerate(test_tasks, 1):
print(f"\n{'='*30}")
print(f"测试任务 {i}: {task}")
print(f"{'='*30}")
# 初始状态
initial_state = {
"task": task,
"specialist_analysis": "",
"reviewer_feedback": "",
"final_output": "",
"status": "开始"
}
# 执行工作流
result = app.invoke(initial_state)
print(f"\n任务状态: {result['status']}")
print(f"分析报告摘要: {result['specialist_analysis'][:100]}...")
print(f"审查反馈摘要: {result['reviewer_feedback'][:100]}...")
print(f"最终输出摘要: {result['final_output'][:150]}...")
# 显示执行步骤
print(f"\n执行步骤统计:")
print(f" 1. 任务分析: 完成")
print(f" 2. 质量审查: 完成")
if "已修订" in result['status']:
print(f" 3. 重新分析: 完成")
print(f" 4. 方案整合: 完成")
return True
if __name__ == "__main__":
# 运行示例
run_multi_agent_workflow()
2.2.5 API 服务部署
创建 api_service.py:
"""
LangGraph API 服务
将工作流部署为 RESTful API 服务
"""
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import uvicorn
from basic_workflow import create_basic_workflow
from conditional_workflow import create_conditional_workflow
from tool_workflow import create_tool_workflow
from multi_agent_workflow import create_multi_agent_workflow
from config import settings
# 创建 FastAPI 应用
app = FastAPI(
title="LangGraph API 服务",
description="提供多种 LangGraph 工作流的 API 接口",
version="1.0.0"
)
# 预加载工作流
workflows = {}
class WorkflowRequest(BaseModel):
"""工作流请求模型"""
input_data: Dict[str, Any]
workflow_type: str = "basic"
max_iterations: Optional[int] = None
class WorkflowResponse(BaseModel):
"""工作流响应模型"""
success: bool
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
execution_time: float
workflow_type: str
@app.on_event("startup")
async def startup_event():
"""启动时初始化工作流"""
print("初始化工作流...")
try:
workflows["basic"] = create_basic_workflow()
workflows["conditional"] = create_conditional_workflow()
workflows["tool"] = create_tool_workflow()
workflows["multi_agent"] = create_multi_agent_workflow()
print(f"工作流初始化完成: {list(workflows.keys())}")
except Exception as e:
print(f"工作流初始化失败: {e}")
raise
@app.get("/")
async def root():
"""根端点"""
return {
"service": "LangGraph API 服务",
"version": "1.0.0",
"available_workflows": list(workflows.keys()),
"endpoints": [
"GET / - 本页面",
"GET /health - 健康检查",
"GET /workflows - 可用工作流",
"POST /execute - 执行工作流"
]
}
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"workflows_loaded": len(workflows),
"service": "running"
}
@app.get("/workflows")
async def list_workflows():
"""列出所有可用的工作流"""
workflow_info = {}
for name, workflow in workflows.items():
workflow_info[name] = {
"nodes": list(workflow.nodes.keys()),
"edges": len(workflow.edges),
"has_entry_point": workflow.entry_point is not None
}
return {
"workflows": workflow_info,
"count": len(workflows)
}
@app.post("/execute", response_model=WorkflowResponse)
async def execute_workflow(request: WorkflowRequest):
"""
执行工作流
参数:
request: 工作流请求
返回:
执行结果
"""
import time
start_time = time.time()
# 检查工作流类型
if request.workflow_type not in workflows:
raise HTTPException(
status_code=400,
detail=f"未知的工作流类型: {request.workflow_type}。可用类型: {list(workflows.keys())}"
)
try:
# 获取工作流
workflow = workflows[request.workflow_type]
# 设置最大迭代次数
config = {}
if request.max_iterations:
config["recursion_limit"] = request.max_iterations
# 执行工作流
result = workflow.invoke(request.input_data, config=config)
execution_time = time.time() - start_time
return WorkflowResponse(
success=True,
result=result,
execution_time=execution_time,
workflow_type=request.workflow_type
)
except Exception as e:
execution_time = time.time() - start_time
return WorkflowResponse(
success=False,
error=str(e),
execution_time=execution_time,
workflow_type=request.workflow_type
)
@app.post("/execute/basic")
async def execute_basic_workflow(query: str):
"""执行基础工作流(简化接口)"""
workflow = workflows.get("basic")
if not workflow:
raise HTTPException(status_code=500, detail="基础工作流未加载")
try:
result = workflow.invoke({
"messages": [],
"query": query,
"response": ""
})
return {
"success": True,
"query": query,
"response": result.get("response", ""),
"message_count": len(result.get("messages", []))
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
@app.post("/execute/conditional")
async def execute_conditional_workflow(query: str):
"""执行条件工作流(简化接口)"""
workflow = workflows.get("conditional")
if not workflow:
raise HTTPException(status_code=500, detail="条件工作流未加载")
try:
result = workflow.invoke({
"query": query,
"query_type": "unknown",
"response": "",
"steps": []
})
return {
"success": True,
"query": query,
"query_type": result.get("query_type", "unknown"),
"response": result.get("response", ""),
"steps": result.get("steps", [])
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}")
@app.get("/workflow/{workflow_name}/graph")
async def get_workflow_graph(workflow_name: str):
"""获取工作流图结构"""
if workflow_name not in workflows:
raise HTTPException(
status_code=404,
detail=f"工作流不存在: {workflow_name}"
)
workflow = workflows[workflow_name]
# 构建图结构信息
graph_info = {
"name": workflow_name,
"nodes": list(workflow.nodes.keys()),
"edges": [],
"entry_point": workflow.entry_point,
"graph_structure": str(workflow.graph)
}
return graph_info
if __name__ == "__main__":
# 启动API服务
print(f"启动 LangGraph API 服务,端口: {settings.app_port}")
print(f"环境: {settings.app_env}")
print(f"OpenAI API 密钥: {'已设置' if settings.openai_api_key else '未设置'}")
uvicorn.run(
"api_service:app",
host="0.0.0.0",
port=settings.app_port,
reload=settings.debug_mode
)
2.2.6 部署脚本
创建 deploy.py:
"""
LangGraph 项目部署脚本
提供一键部署和配置功能
"""
import os
import sys
import subprocess
import argparse
from pathlib import Path
def check_environment():
"""检查环境配置"""
print("=" * 50)
print("检查环境配置")
print("=" * 50)
# 检查Python版本
python_version = sys.version_info
if python_version.major < 3 or (python_version.major == 3 and python_version.minor < 8):
print(f"✗ Python 版本不满足要求: {python_version.major}.{python_version.minor}")
print(" 需要 Python 3.8 或更高版本")
return False
print(f"✓ Python 版本: {python_version.major}.{python_version.minor}.{python_version.micro}")
# 检查虚拟环境
if os.getenv("VIRTUAL_ENV"):
print(f"✓ 虚拟环境: {os.path.basename(os.getenv('VIRTUAL_ENV'))}")
else:
print("⚠ 警告: 未检测到虚拟环境,建议在虚拟环境中运行")
# 检查依赖
try:
import langgraph
import langchain
import openai
print("✓ 核心依赖已安装")
except ImportError as e:
print(f"✗ 依赖未安装: {e}")
return False
# 检查环境变量
env_file = Path(".env")
if env_file.exists():
print("✓ 找到 .env 文件")
# 检查必要的环境变量
with open(env_file, 'r') as f:
content = f.read()
if "OPENAI_API_KEY" in content:
print("✓ OpenAI API 密钥配置存在")
else:
print("⚠ 警告: OpenAI API 密钥未配置")
else:
print("⚠ 警告: 未找到 .env 文件,创建示例配置...")
create_example_env()
return True
def create_example_env():
"""创建示例环境变量文件"""
example_env = """# OpenAI API 配置
OPENAI_API_KEY=your_api_key_here
# 应用配置
APP_ENV=development
APP_PORT=8000
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=langgraph.log
# 工作流配置
MAX_ITERATIONS=20
TIMEOUT_SECONDS=30
"""
with open(".env.example", "w") as f:
f.write(example_env)
print(" 已创建 .env.example 文件,请复制为 .env 并填写配置")
def install_dependencies():
"""安装依赖"""
print("\n" + "=" * 50)
print("安装依赖")
print("=" * 50)
requirements_file = "requirements.txt"
if not os.path.exists(requirements_file):
print(f"✗ 未找到 {requirements_file} 文件")
return False
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", requirements_file])
print("✓ 依赖安装完成")
return True
except subprocess.CalledProcessError as e:
print(f"✗ 依赖安装失败: {e}")
return False
def run_tests():
"""运行测试"""
print("\n" + "=" * 50)
print("运行测试")
print("=" * 50)
test_files = [
"basic_workflow.py",
"conditional_workflow.py",
"tool_workflow.py",
"multi_agent_workflow.py"
]
all_passed = True
for test_file in test_files:
if os.path.exists(test_file):
print(f"运行测试: {test_file}")
try:
# 动态导入并运行测试
module_name = test_file[:-3]
module = __import__(module_name)
if hasattr(module, f"run_{module_name}"):
test_func = getattr(module, f"run_{module_name}")
test_func()
print(f" ✓ {test_file} 测试通过")
else:
print(f" ⚠ {test_file} 无测试函数")
except Exception as e:
print(f" ✗ {test_file} 测试失败: {e}")
all_passed = False
else:
print(f" ⚠ {test_file} 不存在")
return all_passed
def start_service():
"""启动API服务"""
print("\n" + "=" * 50)
print("启动API服务")
print("=" * 50)
api_file = "api_service.py"
if not os.path.exists(api_file):
print(f"✗ 未找到 {api_file} 文件")
return False
try:
print("服务正在启动...")
print(f"访问地址: http://localhost:8000")
print(f"API文档: http://localhost:8000/docs")
print("按 Ctrl+C 停止服务")
subprocess.call([sys.executable, api_file])
return True
except KeyboardInterrupt:
print("\n服务已停止")
return True
except Exception as e:
print(f"✗ 服务启动失败: {e}")
return False
def deploy_all():
"""一键部署"""
print("开始一键部署 LangGraph 项目")
print("=" * 50)
steps = [
("检查环境", check_environment),
("安装依赖", install_dependencies),
("运行测试", run_tests),
("启动服务", start_service)
]
for step_name, step_func in steps:
print(f"\n步骤: {step_name}")
print("-" * 30)
if not step_func():
print(f"\n✗ 部署失败在步骤: {step_name}")
return False
print("\n" + "=" * 50)
print("✓ 部署完成!")
print("=" * 50)
return True
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="LangGraph 项目部署工具")
parser.add_argument("action", nargs="?", default="deploy",
choices=["deploy", "check", "install", "test", "run"],
help="部署动作 (默认: deploy)")
args = parser.parse_args()
if args.action == "deploy":
deploy_all()
elif args.action == "check":
check_environment()
elif args.action == "install":
install_dependencies()
elif args.action == "test":
run_tests()
elif args.action == "run":
start_service()
if __name__ == "__main__":
main()
2.2.7 Docker 部署
创建 Dockerfile:
# 使用官方 Python 镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PORT=8000
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非 root 用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# 暴露端口
EXPOSE $PORT
# 启动命令
CMD ["python", "api_service.py"]
创建 docker-compose.yml:
version: '3.8'
services:
langgraph-api:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:-your_api_key_here}
- APP_ENV=${APP_ENV:-production}
- LOG_LEVEL=${LOG_LEVEL:-INFO}
volumes:
- ./logs:/app/logs
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
networks:
- langgraph-network
# 可选: 添加 Redis 用于状态持久化
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: redis-server --appendonly yes
networks:
- langgraph-network
# 可选: 添加监控面板
monitoring:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
networks:
- langgraph-network
depends_on:
- langgraph-api
networks:
langgraph-network:
driver: bridge
volumes:
redis-data:
grafana-data:
创建部署脚本 deploy.sh:
#!/bin/bash
# LangGraph 项目部署脚本
set -e # 遇到错误退出
echo "开始部署 LangGraph 项目..."
# 检查 Docker
if ! command -v docker &> /dev/null; then
echo "错误: Docker 未安装"
exit 1
fi
# 检查 Docker Compose
if ! command -v docker-compose &> /dev/null; then
echo "错误: Docker Compose 未安装"
exit 1
fi
# 创建环境变量文件
if [ ! -f .env ]; then
echo "创建 .env 文件..."
cp .env.example .env
echo "请编辑 .env 文件设置您的配置"
exit 1
fi
# 加载环境变量
source .env
# 检查必要的环境变量
if [ -z "$OPENAI_API_KEY" ] || [ "$OPENAI_API_KEY" = "your_api_key_here" ]; then
echo "错误: 请设置 OPENAI_API_KEY 环境变量"
exit 1
fi
# 构建和启动服务
echo "构建 Docker 镜像..."
docker-compose build
echo "启动服务..."
docker-compose up -d
echo "等待服务启动..."
sleep 10
# 检查服务状态
if curl -f http://localhost:8000/health > /dev/null 2>&1; then
echo "✓ LangGraph API 服务启动成功"
echo "访问地址: http://localhost:8000"
echo "API文档: http://localhost:8000/docs"
if [ "$APP_ENV" = "production" ]; then
echo "监控面板: http://localhost:3000 (用户名: admin, 密码: admin)"
fi
else
echo "✗ 服务启动失败,检查日志: docker-compose logs"
exit 1
fi
echo "部署完成!"
三、课后练习题及其答案
3.1 选择题
1. LangGraph 的主要特点是什么?
A) 只能处理线性工作流
B) 无状态的工作流管理
C) 支持有状态、图结构的工作流
D) 只能用于前端开发
2. 在 LangGraph 中,节点(Node)不能是以下哪种类型?
A) LLM 调用
B) 工具函数
C) 数据库连接
D) 自定义 Python 函数
3. 条件边(Conditional Edge)的作用是什么?
A) 强制所有节点按顺序执行
B) 根据条件决定下一个执行节点
C) 并行执行多个节点
D) 终止工作流执行
4. 以下哪个是 LangGraph 状态(State)的正确描述?
A) 只包含字符串类型的数据
B) 在工作流执行过程中保持不变
C) 是节点之间共享的数据结构
D) 只能被单个节点访问
5. 检查点(Checkpoint)机制的主要作用是什么?
A) 加速工作流执行
B) 减少内存使用
C) 允许工作流暂停和恢复
D) 自动修复错误
答案:
- C) 支持有状态、图结构的工作流
- C) 数据库连接(节点可以是LLM调用、工具函数、自定义函数,但不是直接的数据库连接)
- B) 根据条件决定下一个执行节点
- C) 是节点之间共享的数据结构
- C) 允许工作流暂停和恢复
3.2 填空题
1. LangGraph 建立在 ______ 生态系统之上。
2. 在条件分支工作流中,使用 ______ 函数来决定下一个执行的节点。
3. 多智能体工作流中,不同角色可以使用不同的 ______ 参数来调整创造力水平。
4. 部署 LangGraph 应用时,建议使用 ______ 来管理 Python 依赖环境。
5. Docker 部署时,通过 ______ 文件定义多容器应用的服务配置。
答案:
- LangChain
- 路由(或 route)
- temperature
- 虚拟环境(virtual environment)
- docker-compose.yml
3.3 简答题
1. 解释 LangGraph 中"有状态"工作流和传统链式调用的主要区别。
答: 传统链式调用通常是线性的、无状态的,每个步骤独立执行,步骤之间不共享状态。而 LangGraph 的有状态工作流维护一个共享的状态对象,所有节点都可以读取和修改这个状态,使得工作流可以处理复杂的、多轮次的交互,支持条件分支、循环和状态依赖的执行路径。
2. 描述在什么场景下应该使用条件分支工作流。
答: 应该在以下场景使用条件分支工作流:
- 需要根据输入内容选择不同处理路径时
- 需要根据中间结果决定后续步骤时
- 需要错误处理或异常恢复机制时
- 需要根据用户类型或权限提供不同服务时
- 需要实现复杂的决策逻辑时
3. 在多智能体工作流中,如何确保不同角色的 LLM 有不同行为?
答: 可以通过以下方式确保不同行为:
- 使用不同的 system prompt 定义角色职责
- 调整 temperature 参数控制创造力水平
- 为不同角色设置不同的模型或参数
- 通过上下文隔离确保角色专注特定任务
- 使用角色特定的工具和知识库
4. 解释工具调用工作流中工具函数的作用和设计原则。
答: 工具函数的作用是扩展 LLM 的能力,使其能够执行特定任务,如计算、搜索、API调用等。设计原则包括:
- 单一职责:每个工具只做一件事
- 明确接口:清晰的输入输出定义
- 错误处理:完善的异常处理机制
- 安全性:防止不安全操作
- 文档化:清晰的工具描述和使用示例
5. 列出部署 LangGraph 应用时需要考虑的三个重要因素。
答:
- 环境配置:正确设置环境变量,特别是 API 密钥
- 可观察性:实现日志记录、监控和跟踪机制
- 扩展性:设计支持水平扩展的架构
- 安全性:实现适当的认证和授权机制
- 错误处理:完善的异常处理和恢复机制
3.4 简单实操题
题目:创建一个简单的内容审核工作流
要求:
- 创建一个 LangGraph 工作流,用于审核用户提交的文本内容
- 工作流应包含以下节点:
- 内容接收节点
- 敏感词检测节点
- 情感分析节点
- 审核决策节点
- 根据检测结果,工作流应该能够:
- 如果没有敏感词且情感积极,直接通过
- 如果有敏感词,拒绝并给出原因
- 如果情感消极但无敏感词,发送人工审核
- 编写测试代码验证工作流的正确性
参考实现:
"""
内容审核工作流实操题实现
"""
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from config import settings
# 定义状态结构
class ModerationState(TypedDict):
"""内容审核状态定义"""
content: str
has_sensitive_words: bool
sentiment_score: float
decision: Literal["approve", "reject", "manual_review"]
reason: str
sensitive_words_found: Annotated[list, "检测到的敏感词"]
def create_moderation_workflow():
"""
创建内容审核工作流
返回:
app: 编译后的工作流应用
"""
# 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.3,
api_key=settings.openai_api_key
)
# 敏感词列表(示例)
SENSITIVE_WORDS = ["暴力", "色情", "诈骗", "毒品", "政治"]
def receive_content(state: ModerationState) -> dict:
"""
接收内容节点
"""
content = state["content"]
print(f"[审核] 收到内容: {content[:50]}...")
return {"content": content}
def check_sensitive_words(state: ModerationState) -> dict:
"""
检查敏感词节点
"""
content = state["content"]
found_words = []
# 简单的敏感词检测
for word in SENSITIVE_WORDS:
if word in content:
found_words.append(word)
has_sensitive = len(found_words) > 0
print(f"[敏感词检测] 发现敏感词: {has_sensitive}, 具体: {found_words}")
return {
"has_sensitive_words": has_sensitive,
"sensitive_words_found": found_words
}
def analyze_sentiment(state: ModerationState) -> dict:
"""
情感分析节点
"""
content = state["content"]
prompt = f"""
请分析以下文本的情感倾向,返回一个-1到1之间的分数:
-1表示非常消极,0表示中性,1表示非常积极
文本:{content}
只返回分数数字,不要有其他文字。
"""
try:
response = llm.invoke([{"role": "user", "content": prompt}])
score = float(response.content.strip())
# 确保分数在合理范围内
score = max(-1.0, min(1.0, score))
except:
score = 0.0 # 解析失败时使用中性分数
print(f"[情感分析] 分数: {score}")
return {"sentiment_score": score}
def make_decision(state: ModerationState) -> dict:
"""
审核决策节点
"""
has_sensitive = state["has_sensitive_words"]
sentiment = state["sentiment_score"]
sensitive_words = state.get("sensitive_words_found", [])
if has_sensitive:
decision = "reject"
reason = f"包含敏感词: {', '.join(sensitive_words)}"
elif sentiment < -0.3: # 消极情感
decision = "manual_review"
reason = f"情感倾向消极(分数: {sentiment:.2f}),需要人工审核"
else: # 积极或中性
decision = "approve"
reason = "内容合规,情感积极或中性"
print(f"[审核决策] 决定: {decision}, 原因: {reason}")
return {
"decision": decision,
"reason": reason
}
def approve_content(state: ModerationState) -> dict:
"""
通过内容节点
"""
print("[处理] 内容已通过审核")
return {"decision": "approve"}
def reject_content(state: ModerationState) -> dict:
"""
拒绝内容节点
"""
reason = state["reason"]
print(f"[处理] 内容被拒绝: {reason}")
return {"decision": "reject"}
def send_to_manual_review(state: ModerationState) -> dict:
"""
发送人工审核节点
"""
reason = state["reason"]
print(f"[处理] 已发送人工审核: {reason}")
return {"decision": "manual_review"}
def decide_flow(state: ModerationState) -> str:
"""
决策路由函数
"""
decision = state["decision"]
routes = {
"approve": "approve_content",
"reject": "reject_content",
"manual_review": "send_to_manual_review"
}
return routes.get(decision, "send_to_manual_review")
# 创建工作流图
workflow = StateGraph(ModerationState)
# 添加节点
workflow.add_node("receive_content", receive_content)
workflow.add_node("check_sensitive_words", check_sensitive_words)
workflow.add_node("analyze_sentiment", analyze_sentiment)
workflow.add_node("make_decision", make_decision)
workflow.add_node("approve_content", approve_content)
workflow.add_node("reject_content", reject_content)
workflow.add_node("send_to_manual_review", send_to_manual_review)
# 设置入口点
workflow.set_entry_point("receive_content")
# 添加边
workflow.add_edge("receive_content", "check_sensitive_words")
workflow.add_edge("check_sensitive_words", "analyze_sentiment")
workflow.add_edge("analyze_sentiment", "make_decision")
# 添加条件边
workflow.add_conditional_edges(
"make_decision",
decide_flow,
{
"approve_content": "approve_content",
"reject_content": "reject_content",
"send_to_manual_review": "send_to_manual_review"
}
)
# 添加结束边
workflow.add_edge("approve_content", END)
workflow.add_edge("reject_content", END)
workflow.add_edge("send_to_manual_review", END)
# 编译工作流
app = workflow.compile()
return app
def test_moderation_workflow():
"""测试内容审核工作流"""
print("=" * 50)
print("测试内容审核工作流")
print("=" * 50)
# 创建工作流
app = create_moderation_workflow()
# 测试用例
test_cases = [
{
"content": "这是一个非常好的产品,我非常喜欢!",
"expected": "approve" # 应该通过
},
{
"content": "我讨厌这一切,太糟糕了!",
"expected": "manual_review" # 应该人工审核
},
{
"content": "这里有暴力和色情内容",
"expected": "reject" # 应该拒绝
},
{
"content": "政治话题讨论需要谨慎",
"expected": "reject" # 应该拒绝
}
]
for i, test in enumerate(test_cases, 1):
print(f"\n测试用例 {i}: {test['content'][:30]}...")
print("-" * 40)
# 初始状态
initial_state = {
"content": test["content"],
"has_sensitive_words": False,
"sentiment_score": 0.0,
"decision": "",
"reason": "",
"sensitive_words_found": []
}
# 执行工作流
result = app.invoke(initial_state)
# 检查结果
decision = result["decision"]
expected = test["expected"]
print(f"审核结果: {decision}")
print(f"期望结果: {expected}")
print(f"原因: {result.get('reason', '无')}")
if decision == expected:
print("✓ 测试通过")
else:
print("✗ 测试失败")
return True
if __name__ == "__main__":
# 运行测试
test_moderation_workflow()
实操题答案要点:
- 工作流设计:正确实现了四个核心节点,状态流转合理
- 敏感词检测:使用预定义列表进行简单检测
- 情感分析:调用 LLM 进行情感评分
- 决策逻辑:根据敏感词和情感分数做出合理决策
- 测试用例:涵盖了三种可能的决策结果
- 错误处理:包含基本的异常处理机制
- 可扩展性:设计便于添加新的审核规则
这个实操题展示了如何构建一个实用的内容审核系统,涵盖了工作流设计、条件分支、LLM集成和业务逻辑实现等关键概念。通过这个练习,学员可以加深对
LangGraph 的理解,并掌握实际应用开发的能力。
四、总结
LangGraph 作为一个强大的有状态工作流框架,为构建复杂的 AI 应用提供了全新的可能性。通过本文的学习,我们系统地掌握了 LangGraph 的核心概念、工作流设计模式和实际部署方法。
首先,我们了解了 LangGraph 的基本概念,包括节点、边、状态和图等核心要素,理解了它与传统链式调用的本质区别。LangGraph 的有状态特性和图结构支持使得它能够处理更加复杂、多分支的工作流,特别适合需要多步骤决策和状态管理的应用场景。
接着,我们通过四个循序渐进的实际示例,深入掌握了 LangGraph 的使用方法:
- 基础工作流展示了最简单的线性工作流实现
- 条件分支工作流演示了如何根据条件选择不同执行路径
- 工具调用工作流介绍了如何集成外部工具扩展 LLM 能力
- 多智能体工作流展示了多个智能体协同工作的复杂场景
在部署方面,我们学习了完整的项目部署流程,从环境搭建、API 服务开发到 Docker 容器化部署。特别是通过 FastAPI 封装工作流为 RESTful API,使得 LangGraph 应用能够方便地集成到现有的技术栈中。Docker 和 Docker Compose 的使用进一步确保了应用的可移植性和可扩展性。
最后,通过课后练习题,我们巩固了所学知识,从选择题的基础概念回顾,到填空题的关键技术点记忆,再到简答题的深入理解,最后通过实操题的动手实践,全面检验了学习效果。特别是内容审核工作流的实现,展示了如何将 LangGraph 应用于实际的业务场景。
LangGraph 的优势在于其灵活性和表达能力,它不仅仅是一个工具,更是一种构建复杂 AI 应用的方法论。通过将复杂流程分解为可管理的节点,并通过状态和边有机地连接起来,开发者可以构建出既强大又易于维护的 AI 应用。
随着 AI 技术的不断发展,工作流编排框架如 LangGraph 将在 AI 应用开发中扮演越来越重要的角色。无论是简单的自动化任务,还是复杂的多智能体系统,LangGraph 都提供了强大而优雅的解决方案。希望本文能为您的 LangGraph 学习之旅提供坚实的起点,助您在实际项目中充分发挥其强大能力。
🌟 感谢您耐心阅读到这里!
🚀 技术成长没有捷径,但每一次的阅读、思考和实践,都在默默缩短您与成功的距离。
💡 如果本文对您有所启发,欢迎点赞👍、收藏📌、分享📤给更多需要的伙伴!
🗣️ 期待在评论区看到您的想法、疑问或建议,我会认真回复,让我们共同探讨、一起进步~
🔔 关注我,持续获取更多干货内容!
🤗 我们下篇文章见!
更多推荐


所有评论(0)