基于langgraph意图识别框架搭建,包括知识检索(不支持自定义检索参数),工具调用 :验证并兼容mcp工具的调用,http提供服务的工具注册以及调用,本地工具注册、意图识别兼容路由工作流的能力
本文介绍了LangGraph框架在意图识别和任务处理中的应用原理。该框架基于状态机工作流和图计算模型,将复杂任务拆解为可复用的节点,通过定义节点间的连接关系构建有向图,实现任务的动态流转。核心内容包括:1)框架底层原理,采用有限状态机、声明式编程和检查点机制;2)工具调用系统实现,通过依赖注入和接口隔离支持多类型工具适配;3)路由工作流与中断恢复机制,基于条件路由和决策树模型实现动态流程控制;4)
一、LangGraph 意图识别框架的底层原理
LangGraph 是基于状态机工作流和图计算模型的框架,其核心思想是将复杂任务拆解为可复用的节点(Node),通过定义节点间的连接关系(Edges)构建有向图,实现任务的动态流转。在意图识别场景中,这一框架解决了传统线性流程无法处理复杂分支、中断恢复等问题。
- 有限状态机(FSM):每个节点代表一个状态,状态转换由路由规则(Route)决定,确保流程按预定逻辑或动态条件流转。
- 声明式编程:通过定义节点功能和连接关系描述 “做什么”,而非 “怎么做”,框架自动处理节点调度和状态管理。
- 检查点(Checkpoint)机制:通过持久化中间状态实现流程中断与恢复,解决长对话、多轮交互场景的状态延续问题。
与传统框架的差异:
传统函数调用链(如 LangChain 的 Chain)是线性执行,无法动态调整流程;而 LangGraph 通过图结构支持分支、循环、并行等复杂逻辑,更适合意图识别中 “根据用户输入动态选择工具 / 检索” 的场景。
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver # 用于中断恢复
from typing import List, Dict, Any, Optional
import json
# 1. 定义状态结构(融合RAG流程所需的所有数据)
class RAGState:
"""RAG系统的状态管理类,保存整个流程的中间结果"""
user_query: str = "" # 用户原始查询
intent: Optional[str] = None # 识别的意图
retrieval_results: List[Dict] = [] # 检索结果(RAG的检索环节输出)
tool_calls: List[Dict] = [] # 工具调用记录
tool_results: List[Dict] = [] # 工具返回结果
thinking_process: List[str] = [] # 思考过程(下周计划实现)
streaming_output: List[str] = [] # 流式输出内容
checkpoint: Optional[Dict] = None # 检查点数据(用于中断恢复)
# 2. 初始化图和检查点(支持中断恢复的基础)
memory = MemorySaver() # 存储检查点的内存管理器
workflow = StateGraph(RAGState)
# 3. 定义核心节点函数
def intent_recognition_node(state: RAGState) -> Dict[str, Any]:
"""意图识别节点:判断用户查询是否需要检索或工具调用"""
# 调用意图分类模型(可使用LLM或微调模型)
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
# 构建意图识别提示词
prompt = f"""
分析用户查询的意图类型,返回以下之一:
- "retrieval": 需要检索知识库(如事实性问题)
- "tool": 需要调用工具(如计算、查询API)
- "direct": 可直接回答(如常识性问题)
用户查询:{state.user_query}
"""
# 调用LLM获取意图
response = llm.predict(prompt)
intent = response.strip().lower()
# 日志记录
print(f"识别意图: {intent}")
return {"intent": intent}
# 4. 知识检索节点(当前版本固定参数)
def retrieval_node(state: RAGState) -> Dict[str, Any]:
"""RAG检索节点:从知识库获取相关文档"""
if state.intent != "retrieval":
return {"retrieval_results": []}
# 模拟知识库检索(实际项目中替换为向量数据库查询)
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
# 初始化向量存储(实际中应从持久化存储加载)
embeddings = OpenAIEmbeddings()
vector_db = FAISS.from_texts(
texts=["示例文档1内容...", "示例文档2内容..."],
embedding=embeddings
)
# 当前版本:固定检索参数(下周计划改为动态参数)
results = vector_db.similarity_search(
query=state.user_query,
k=3 # 硬编码的top_k参数
)
# 格式化检索结果
formatted_results = [
{"content": doc.page_content, "score": doc.metadata.get("score", 0)}
for doc in results
]
return {"retrieval_results": formatted_results}
二、工具调用系统的实现原理
工具调用系统是连接意图识别与外部功能的桥梁,其核心是统一接口抽象与多类型适配,实现对不同来源工具的无缝调用。
1.设计思想:依赖注入与接口隔离
- 依赖注入(DI):工具实例通过注册机制注入框架,避免硬编码依赖,便于扩展和替换。
- 接口隔离原则(ISP):不同类型工具(MCP/HTTP/ 本地函数)通过统一的
invoke
方法暴露功能,调用方无需关注工具内部实现。
2. 多类型工具的适配逻辑
当意图识别为 “需要调用工具” 时,系统通过工具管理器的invoke
方法调用目标工具,无论工具类型如何,调用方均使用相同的接口:
# 统一调用接口伪代码
result = tool_manager.invoke(tool_name, params)
# 内部根据工具类型路由到具体实现
class ToolManager:
"""工具管理器:统一注册和调用各类工具"""
def __init__(self):
self.tools = {} # 工具注册表: {工具名: 工具配置}
def register_mcp_tool(self, tool_name: str, endpoint: str, api_key: str):
"""注册MCP格式工具(微服务通用协议)"""
self.tools[tool_name] = {
"type": "mcp",
"endpoint": endpoint,
"headers": {"Authorization": f"Bearer {api_key}"}
}
def register_http_tool(self, tool_name: str, url: str, method: str = "POST"):
"""注册HTTP工具"""
self.tools[tool_name] = {
"type": "http",
"url": url,
"method": method
}
def register_local_tool(self, tool_name: str, func):
"""注册本地函数工具"""
self.tools[tool_name] = {
"type": "local",
"function": func
}
async def invoke(self, tool_name: str, params: Dict) -> Dict:
"""统一调用入口:根据工具类型执行不同调用逻辑"""
tool = self.tools.get(tool_name)
if not tool:
return {"error": f"工具 {tool_name} 未注册"}
try:
if tool["type"] == "mcp":
# MCP工具调用:遵循特定协议格式
import aiohttp
async with aiohttp.ClientSession() as session:
response = await session.post(
tool["endpoint"],
headers=tool["headers"],
json={"params": params}
)
return await response.json()
elif tool["type"] == "http":
# HTTP工具调用
import aiohttp
async with aiohttp.ClientSession() as session:
method = tool["method"].lower()
func = getattr(session, method)
response = await func(tool["url"], json=params)
return await response.json()
elif tool["type"] == "local":
# 本地工具调用
return tool["function"](**params)
except Exception as e:
return {"error": str(e)}
# 工具调用节点
async def tool_invoke_node(state: RAGState) -> Dict[str, Any]:
"""工具调用节点:根据意图调用相应工具"""
if state.intent != "tool":
return {"tool_results": []}
# 初始化工具管理器并注册工具
tool_manager = ToolManager()
tool_manager.register_local_tool("calculator", lambda a, b: a + b) # 本地计算器
tool_manager.register_http_tool("weather", "https://api.weather.com/query") # 天气API
# 确定需要调用的工具(实际中应由LLM根据意图生成)
tool_name = "calculator"
tool_params = {"a": 10, "b": 20}
# 调用工具
result = await tool_manager.invoke(tool_name, tool_params)
# 记录调用和结果
return {
"tool_calls": [{"name": tool_name, "params": tool_params}],
"tool_results": [result]
}
三、路由工作流与中断恢复的核心逻辑
路由工作流实现了 “根据意图动态选择下一步操作”,中断恢复则解决了流程中途需要用户输入或外部干预的场景。
1. 路由工作流的理论基础
- 条件路由:基于当前状态(如意图类型、工具返回结果)通过路由函数动态选择下一个节点,实现 “意图→检索”“意图→工具” 等分支逻辑。
- 决策树模型:路由函数本质是决策树的实现,输入为当前状态,输出为下一个节点名称,支持多分支(如 “检索→生成”“检索→工具→生成” 等)。
路由函数的设计原则:
- 单一职责:每个路由函数只负责一个分支判断,避免逻辑臃肿。
- 可测试性:通过输入状态预测输出节点,便于单元测试。
2. 中断恢复的实现机制
- 检查点存储:通过
MemorySaver
或分布式存储(如 Redis)保存流程状态(包括用户输入、中间结果、当前节点位置)。 - 状态序列化:将 Python 对象(如
RAGState
)序列化为 JSON/msgpack 等格式,确保可持久化和反序列化。 - 中断触发条件:当流程需要用户澄清、权限验证等外部输入时,主动抛出中断异常,框架自动保存检查点并暂停执行。
恢复流程:
用户提供新输入后,框架从检查点加载历史状态,跳过已执行节点,从中断处继续流程,确保状态一致性。
# 路由决策函数:根据当前状态决定下一步
def route_decider(state: RAGState) -> str:
"""根据意图和中间结果决定工作流走向"""
if state.intent == "retrieval" and not state.retrieval_results:
return "retrieval" # 需要检索
elif state.intent == "tool" and not state.tool_results:
return "tool_invoke" # 需要调用工具
elif need_user_input(state): # 自定义判断是否需要用户输入
# 触发中断:保存当前状态并等待用户输入
raise InterruptWorkflow("需要用户澄清", state)
else:
return "response_generator" # 生成响应
# 构建完整工作流
workflow.add_node("intent_recognition", intent_recognition_node)
workflow.add_node("retrieval", retrieval_node)
workflow.add_node("tool_invoke", tool_invoke_node)
workflow.add_node("response_generator", response_generator_node)
# 设置流程路径
workflow.set_entry_point("intent_recognition")
workflow.add_edge("intent_recognition", "route")
workflow.add_conditional_edges("route", route_decider)
workflow.add_edge("retrieval", "response_generator")
workflow.add_edge("tool_invoke", "response_generator")
workflow.add_edge("response_generator", END)
# 编译工作流(启用检查点支持中断恢复)
app = workflow.compile(checkpointer=memory, interrupt_before=["route"])
# 运行工作流并支持中断恢复
async def run_workflow(user_query: str, thread_id: str = "default", checkpoint=None):
"""运行工作流,支持从检查点恢复"""
config = {"configurable": {"thread_id": thread_id}} # 用于区分不同会话
if checkpoint:
# 从检查点恢复
result = await app.ainvoke(
{"user_query": user_query},
config=config,
checkpoint=checkpoint
)
else:
# 全新运行
result = await app.ainvoke({"user_query": user_query}, config=config)
return {
"output": result["streaming_output"],
"checkpoint": app.get_checkpoint(result) # 保存检查点用于恢复
}
四、流式输出与后端对接的技术解析
流式输出解决了大模型生成响应慢、用户体验差的问题,其核心是增量传输与服务器推送技术的结合。
1. 流式输出的实现原理
- 生成器模式(Generator):节点函数通过异步生成器(
async def
+yield
)增量返回结果,而非等待完整结果生成后一次性返回。 - Server-Sent Events(SSE):基于 HTTP 协议的服务器推送技术,通过
text/event-stream
格式持续向客户端发送数据,无需客户端轮询。
与 WebSocket 的对比:
SSE 更适合单向推送(如模型生成结果),实现简单且兼容 HTTP 生态;WebSocket 适合双向实时通信,但在意图识别场景中必要性较低。
2. 后端对接的接口设计
# 响应生成节点(支持流式输出)
async def response_generator_node(state: RAGState) -> Dict[str, Any]:
"""生成最终响应,支持流式输出"""
# 构建提示词(整合检索结果和工具返回)
prompt = f"""
根据以下信息回答用户问题:
用户问题:{state.user_query}
检索结果:{state.retrieval_results}
工具结果:{state.tool_results}
"""
# 流式生成响应
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", streaming=True)
streaming_output = []
async for chunk in llm.astream(prompt):
content = chunk.content
if content:
streaming_output.append(content)
# 每生成一块就返回一次(实现流式)
yield {"streaming_output": streaming_output.copy()}
return {"streaming_output": streaming_output}
# 后端接口对接(FastAPI示例)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import json
app = FastAPI()
class QueryRequest(BaseModel):
user_query: str
thread_id: str = "default"
checkpoint: Optional[Dict] = None
@app.post("/api/intent-rag")
async def rag_endpoint(request: QueryRequest):
"""RAG意图识别接口,支持流式响应"""
try:
async def stream_generator():
"""生成SSE格式的流式响应"""
async for update in run_workflow(
request.user_query,
request.thread_id,
request.checkpoint
):
yield f"data: {json.dumps(update)}\n\n"
await asyncio.sleep(0.05) # 控制流速度
yield "data: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
五、知识检索模块的参数动态传入机制
知识检索模块的参数动态传入解决了 “固定检索参数无法适配多样化用户需求” 的问题,核心是意图解析与参数提取的结合。
1. 参数来源与优先级
- 用户显式输入:如 “查询前 5 条结果” 中的
top_k=5
,优先级最高。 - 意图隐式提取:通过大模型分析用户查询,提取潜在参数(如 “最近发布的文档” 对应
time_filter="recent"
)。 - 系统默认配置:当用户未指定且无法提取时,使用预设参数(如
top_k=3
)。
2. 实现逻辑:自然语言理解(NLU)
通过提示词工程引导大模型将用户查询转换为结构化参数:
提示词核心逻辑:
1. 识别用户是否指定检索数量(如“前N条”)、过滤条件(如“2023年后的文档”)。
2. 将识别结果转换为JSON格式(如{"top_k": 5, "filters": {"year": ">2023"}})。
3. 无明确参数时返回默认值。
# 优化后的检索节点(支持动态参数)
def retrieval_node(state: RAGState) -> Dict[str, Any]:
"""支持动态参数的RAG检索节点"""
if state.intent != "retrieval":
return {"retrieval_results": []}
# 1. 从用户查询中提取检索参数
# 例如:用户说"查询最近3篇关于AI的文档",应提取top_k=3, filter={"topic": "AI"}
param_extractor_prompt = f"""
从用户查询中提取检索参数,格式为JSON:
{{
"top_k": 整数(返回结果数量),
"filters": {{字段: 值}}(过滤条件),
"threshold": 浮点数(相似度阈值)
}}
如无明确参数,使用默认值:{{"top_k": 3, "filters": {{}}, "threshold": 0.7}}
用户查询:{state.user_query}
"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo")
param_response = llm.predict(param_extractor_prompt)
retrieval_params = json.loads(param_response)
# 2. 执行带参数的检索
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
vector_db = FAISS.load_local("knowledge_base", OpenAIEmbeddings(), allow_dangerous_deserialization=True)
# 应用过滤条件(实际实现取决于向量数据库支持)
results = vector_db.similarity_search(
query=state.user_query,
k=retrieval_params["top_k"],
filter=retrieval_params["filters"]
)
# 3. 按阈值过滤结果
filtered_results = [
{"content": doc.page_content, "score": doc.metadata.get("score", 0)}
for doc in results
if doc.metadata.get("score", 0) >= retrieval_params["threshold"]
]
return {
"retrieval_results": filtered_results,
"retrieval_params": retrieval_params # 记录使用的参数
}
六、思考大模型与思考过程输出的设计逻辑
思考过程输出(“思维链”)增强了系统的可解释性,让用户理解 “为什么系统会这样回答”,核心是提示词引导与增量输出。
1. 理论基础:思维链(Chain-of-Thought, CoT)
- 认知模拟:通过引导模型输出推理步骤,模拟人类思考过程,提升复杂问题的解决能力。
- 可解释性增强:将 “黑箱” 决策过程转化为 “白箱” 步骤,降低用户对系统的不信任感。
2. 实现要点
- 提示词设计:明确要求模型分步骤推理,如 “1. 是否需要检索?2. 需要调用什么工具?3. 如何整合结果?”。
- 流式输出:思考过程与最终结果分离,通过不同的 SSE 事件类型(如
thinking
/output
)分别传输
- RESTful 风格:通过 POST 接口接收用户查询,返回 SSE 流,符合 HTTP 规范。
- 会话隔离:通过
thread_id
区分不同对话,确保多用户并发时状态不混淆。 - 错误处理:流式传输中通过特定事件(如
error
类型)传递异常信息,客户端可据此中断接收并提示用户。
# 思考过程节点
async def thinking_node(state: RAGState) -> Dict[str, Any]:
"""思考过程生成节点:输出模型的推理逻辑"""
# 构建思考提示词(引导模型输出推理过程)
thinking_prompt = f"""
你需要分析如何回答用户的问题,逐步思考并记录你的推理过程:
1. 这个问题需要检索知识库吗?为什么?
2. 需要调用工具吗?应该调用什么工具?
3. 回答这个问题需要哪些关键信息?
用户问题:{state.user_query}
已知信息:{state.retrieval_results + state.tool_results}
"""
# 调用大模型生成思考过程
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", streaming=True)
thinking_process = []
async for chunk in llm.astream(thinking_prompt):
content = chunk.content
if content:
thinking_process.append(content)
# 流式输出思考过程
yield {
"thinking_process": thinking_process.copy(),
"streaming_output": [f"思考中:{''.join(thinking_process)}"]
}
return {"thinking_process": thinking_process}
# 将思考节点添加到工作流
workflow.add_node("thinking", thinking_node)
# 调整流程:意图识别 → 思考 → 路由
workflow.add_edge("intent_recognition", "thinking")
workflow.add_edge("thinking", "route")
七、致谢
谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,
可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!
请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!
更多推荐
所有评论(0)