一、LangGraph 意图识别框架的底层原理

LangGraph 是基于状态机工作流图计算模型的框架,其核心思想是将复杂任务拆解为可复用的节点(Node),通过定义节点间的连接关系(Edges)构建有向图,实现任务的动态流转。在意图识别场景中,这一框架解决了传统线性流程无法处理复杂分支、中断恢复等问题。

  • 有限状态机(FSM):每个节点代表一个状态,状态转换由路由规则(Route)决定,确保流程按预定逻辑或动态条件流转。
  • 声明式编程:通过定义节点功能和连接关系描述 “做什么”,而非 “怎么做”,框架自动处理节点调度和状态管理。
  • 检查点(Checkpoint)机制:通过持久化中间状态实现流程中断与恢复,解决长对话、多轮交互场景的状态延续问题。

与传统框架的差异
传统函数调用链(如 LangChain 的 Chain)是线性执行,无法动态调整流程;而 LangGraph 通过图结构支持分支、循环、并行等复杂逻辑,更适合意图识别中 “根据用户输入动态选择工具 / 检索” 的场景。

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver  # 用于中断恢复
from typing import List, Dict, Any, Optional
import json

# 1. 定义状态结构(融合RAG流程所需的所有数据)
class RAGState:
    """RAG系统的状态管理类,保存整个流程的中间结果"""
    user_query: str = ""  # 用户原始查询
    intent: Optional[str] = None  # 识别的意图
    retrieval_results: List[Dict] = []  # 检索结果(RAG的检索环节输出)
    tool_calls: List[Dict] = []  # 工具调用记录
    tool_results: List[Dict] = []  # 工具返回结果
    thinking_process: List[str] = []  # 思考过程(下周计划实现)
    streaming_output: List[str] = []  # 流式输出内容
    checkpoint: Optional[Dict] = None  # 检查点数据(用于中断恢复)

# 2. 初始化图和检查点(支持中断恢复的基础)
memory = MemorySaver()  # 存储检查点的内存管理器
workflow = StateGraph(RAGState)

# 3. 定义核心节点函数
def intent_recognition_node(state: RAGState) -> Dict[str, Any]:
    """意图识别节点:判断用户查询是否需要检索或工具调用"""
    # 调用意图分类模型(可使用LLM或微调模型)
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    
    # 构建意图识别提示词
    prompt = f"""
    分析用户查询的意图类型,返回以下之一:
    - "retrieval": 需要检索知识库(如事实性问题)
    - "tool": 需要调用工具(如计算、查询API)
    - "direct": 可直接回答(如常识性问题)
    
    用户查询:{state.user_query}
    """
    
    # 调用LLM获取意图
    response = llm.predict(prompt)
    intent = response.strip().lower()
    
    # 日志记录
    print(f"识别意图: {intent}")
    return {"intent": intent}

# 4. 知识检索节点(当前版本固定参数)
def retrieval_node(state: RAGState) -> Dict[str, Any]:
    """RAG检索节点:从知识库获取相关文档"""
    if state.intent != "retrieval":
        return {"retrieval_results": []}
    
    # 模拟知识库检索(实际项目中替换为向量数据库查询)
    from langchain_community.vectorstores import FAISS
    from langchain_community.embeddings import OpenAIEmbeddings
    
    # 初始化向量存储(实际中应从持久化存储加载)
    embeddings = OpenAIEmbeddings()
    vector_db = FAISS.from_texts(
        texts=["示例文档1内容...", "示例文档2内容..."],
        embedding=embeddings
    )
    
    # 当前版本:固定检索参数(下周计划改为动态参数)
    results = vector_db.similarity_search(
        query=state.user_query,
        k=3  # 硬编码的top_k参数
    )
    
    # 格式化检索结果
    formatted_results = [
        {"content": doc.page_content, "score": doc.metadata.get("score", 0)}
        for doc in results
    ]
    
    return {"retrieval_results": formatted_results}
二、工具调用系统的实现原理

工具调用系统是连接意图识别与外部功能的桥梁,其核心是统一接口抽象多类型适配,实现对不同来源工具的无缝调用。

1.设计思想:依赖注入与接口隔离
  • 依赖注入(DI):工具实例通过注册机制注入框架,避免硬编码依赖,便于扩展和替换。
  • 接口隔离原则(ISP):不同类型工具(MCP/HTTP/ 本地函数)通过统一的invoke方法暴露功能,调用方无需关注工具内部实现。
2. 多类型工具的适配逻辑


当意图识别为 “需要调用工具” 时,系统通过工具管理器的invoke方法调用目标工具,无论工具类型如何,调用方均使用相同的接口:

# 统一调用接口伪代码
result = tool_manager.invoke(tool_name, params)
# 内部根据工具类型路由到具体实现
class ToolManager:
    """工具管理器:统一注册和调用各类工具"""
    def __init__(self):
        self.tools = {}  # 工具注册表: {工具名: 工具配置}
    
    def register_mcp_tool(self, tool_name: str, endpoint: str, api_key: str):
        """注册MCP格式工具(微服务通用协议)"""
        self.tools[tool_name] = {
            "type": "mcp",
            "endpoint": endpoint,
            "headers": {"Authorization": f"Bearer {api_key}"}
        }
    
    def register_http_tool(self, tool_name: str, url: str, method: str = "POST"):
        """注册HTTP工具"""
        self.tools[tool_name] = {
            "type": "http",
            "url": url,
            "method": method
        }
    
    def register_local_tool(self, tool_name: str, func):
        """注册本地函数工具"""
        self.tools[tool_name] = {
            "type": "local",
            "function": func
        }
    
    async def invoke(self, tool_name: str, params: Dict) -> Dict:
        """统一调用入口:根据工具类型执行不同调用逻辑"""
        tool = self.tools.get(tool_name)
        if not tool:
            return {"error": f"工具 {tool_name} 未注册"}
        
        try:
            if tool["type"] == "mcp":
                # MCP工具调用:遵循特定协议格式
                import aiohttp
                async with aiohttp.ClientSession() as session:
                    response = await session.post(
                        tool["endpoint"],
                        headers=tool["headers"],
                        json={"params": params}
                    )
                    return await response.json()
            
            elif tool["type"] == "http":
                # HTTP工具调用
                import aiohttp
                async with aiohttp.ClientSession() as session:
                    method = tool["method"].lower()
                    func = getattr(session, method)
                    response = await func(tool["url"], json=params)
                    return await response.json()
            
            elif tool["type"] == "local":
                # 本地工具调用
                return tool["function"](**params)
        
        except Exception as e:
            return {"error": str(e)}

# 工具调用节点
async def tool_invoke_node(state: RAGState) -> Dict[str, Any]:
    """工具调用节点:根据意图调用相应工具"""
    if state.intent != "tool":
        return {"tool_results": []}
    
    # 初始化工具管理器并注册工具
    tool_manager = ToolManager()
    tool_manager.register_local_tool("calculator", lambda a, b: a + b)  # 本地计算器
    tool_manager.register_http_tool("weather", "https://api.weather.com/query")  # 天气API
    
    # 确定需要调用的工具(实际中应由LLM根据意图生成)
    tool_name = "calculator"
    tool_params = {"a": 10, "b": 20}
    
    # 调用工具
    result = await tool_manager.invoke(tool_name, tool_params)
    
    # 记录调用和结果
    return {
        "tool_calls": [{"name": tool_name, "params": tool_params}],
        "tool_results": [result]
    }
三、路由工作流与中断恢复的核心逻辑

路由工作流实现了 “根据意图动态选择下一步操作”,中断恢复则解决了流程中途需要用户输入或外部干预的场景。

1. 路由工作流的理论基础
  • 条件路由:基于当前状态(如意图类型、工具返回结果)通过路由函数动态选择下一个节点,实现 “意图→检索”“意图→工具” 等分支逻辑。
  • 决策树模型:路由函数本质是决策树的实现,输入为当前状态,输出为下一个节点名称,支持多分支(如 “检索→生成”“检索→工具→生成” 等)。

路由函数的设计原则

  • 单一职责:每个路由函数只负责一个分支判断,避免逻辑臃肿。
  • 可测试性:通过输入状态预测输出节点,便于单元测试。
2. 中断恢复的实现机制
  • 检查点存储:通过MemorySaver或分布式存储(如 Redis)保存流程状态(包括用户输入、中间结果、当前节点位置)。
  • 状态序列化:将 Python 对象(如RAGState)序列化为 JSON/msgpack 等格式,确保可持久化和反序列化。
  • 中断触发条件:当流程需要用户澄清、权限验证等外部输入时,主动抛出中断异常,框架自动保存检查点并暂停执行。

恢复流程
用户提供新输入后,框架从检查点加载历史状态,跳过已执行节点,从中断处继续流程,确保状态一致性。

# 路由决策函数:根据当前状态决定下一步
def route_decider(state: RAGState) -> str:
    """根据意图和中间结果决定工作流走向"""
    if state.intent == "retrieval" and not state.retrieval_results:
        return "retrieval"  # 需要检索
    elif state.intent == "tool" and not state.tool_results:
        return "tool_invoke"  # 需要调用工具
    elif need_user_input(state):  # 自定义判断是否需要用户输入
        # 触发中断:保存当前状态并等待用户输入
        raise InterruptWorkflow("需要用户澄清", state)
    else:
        return "response_generator"  # 生成响应

# 构建完整工作流
workflow.add_node("intent_recognition", intent_recognition_node)
workflow.add_node("retrieval", retrieval_node)
workflow.add_node("tool_invoke", tool_invoke_node)
workflow.add_node("response_generator", response_generator_node)

# 设置流程路径
workflow.set_entry_point("intent_recognition")
workflow.add_edge("intent_recognition", "route")
workflow.add_conditional_edges("route", route_decider)
workflow.add_edge("retrieval", "response_generator")
workflow.add_edge("tool_invoke", "response_generator")
workflow.add_edge("response_generator", END)

# 编译工作流(启用检查点支持中断恢复)
app = workflow.compile(checkpointer=memory, interrupt_before=["route"])

# 运行工作流并支持中断恢复
async def run_workflow(user_query: str, thread_id: str = "default", checkpoint=None):
    """运行工作流,支持从检查点恢复"""
    config = {"configurable": {"thread_id": thread_id}}  # 用于区分不同会话
    
    if checkpoint:
        # 从检查点恢复
        result = await app.ainvoke(
            {"user_query": user_query},
            config=config,
            checkpoint=checkpoint
        )
    else:
        # 全新运行
        result = await app.ainvoke({"user_query": user_query}, config=config)
    
    return {
        "output": result["streaming_output"],
        "checkpoint": app.get_checkpoint(result)  # 保存检查点用于恢复
    }
四、流式输出与后端对接的技术解析

流式输出解决了大模型生成响应慢、用户体验差的问题,其核心是增量传输服务器推送技术的结合。

1. 流式输出的实现原理
  • 生成器模式(Generator):节点函数通过异步生成器(async def + yield)增量返回结果,而非等待完整结果生成后一次性返回。
  • Server-Sent Events(SSE):基于 HTTP 协议的服务器推送技术,通过text/event-stream格式持续向客户端发送数据,无需客户端轮询。

与 WebSocket 的对比
SSE 更适合单向推送(如模型生成结果),实现简单且兼容 HTTP 生态;WebSocket 适合双向实时通信,但在意图识别场景中必要性较低。

2. 后端对接的接口设计
# 响应生成节点(支持流式输出)
async def response_generator_node(state: RAGState) -> Dict[str, Any]:
    """生成最终响应,支持流式输出"""
    # 构建提示词(整合检索结果和工具返回)
    prompt = f"""
    根据以下信息回答用户问题:
    用户问题:{state.user_query}
    检索结果:{state.retrieval_results}
    工具结果:{state.tool_results}
    """
    
    # 流式生成响应
    from langchain_openai import ChatOpenAI
    llm = ChatOpenAI(model="gpt-3.5-turbo", streaming=True)
    
    streaming_output = []
    async for chunk in llm.astream(prompt):
        content = chunk.content
        if content:
            streaming_output.append(content)
            # 每生成一块就返回一次(实现流式)
            yield {"streaming_output": streaming_output.copy()}
    
    return {"streaming_output": streaming_output}

# 后端接口对接(FastAPI示例)
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import json

app = FastAPI()

class QueryRequest(BaseModel):
    user_query: str
    thread_id: str = "default"
    checkpoint: Optional[Dict] = None

@app.post("/api/intent-rag")
async def rag_endpoint(request: QueryRequest):
    """RAG意图识别接口,支持流式响应"""
    try:
        async def stream_generator():
            """生成SSE格式的流式响应"""
            async for update in run_workflow(
                request.user_query,
                request.thread_id,
                request.checkpoint
            ):
                yield f"data: {json.dumps(update)}\n\n"
                await asyncio.sleep(0.05)  # 控制流速度
            yield "data: [DONE]\n\n"
        
        return StreamingResponse(stream_generator(), media_type="text/event-stream")
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

    五、知识检索模块的参数动态传入机制

    知识检索模块的参数动态传入解决了 “固定检索参数无法适配多样化用户需求” 的问题,核心是意图解析参数提取的结合。

    1. 参数来源与优先级
    • 用户显式输入:如 “查询前 5 条结果” 中的top_k=5,优先级最高。
    • 意图隐式提取:通过大模型分析用户查询,提取潜在参数(如 “最近发布的文档” 对应time_filter="recent")。
    • 系统默认配置:当用户未指定且无法提取时,使用预设参数(如top_k=3)。
    2. 实现逻辑:自然语言理解(NLU)

    通过提示词工程引导大模型将用户查询转换为结构化参数:

    提示词核心逻辑:
    1. 识别用户是否指定检索数量(如“前N条”)、过滤条件(如“2023年后的文档”)。
    2. 将识别结果转换为JSON格式(如{"top_k": 5, "filters": {"year": ">2023"}})。
    3. 无明确参数时返回默认值。

    # 优化后的检索节点(支持动态参数)
    def retrieval_node(state: RAGState) -> Dict[str, Any]:
        """支持动态参数的RAG检索节点"""
        if state.intent != "retrieval":
            return {"retrieval_results": []}
        
        # 1. 从用户查询中提取检索参数
        # 例如:用户说"查询最近3篇关于AI的文档",应提取top_k=3, filter={"topic": "AI"}
        param_extractor_prompt = f"""
        从用户查询中提取检索参数,格式为JSON:
        {{
            "top_k": 整数(返回结果数量),
            "filters": {{字段: 值}}(过滤条件),
            "threshold": 浮点数(相似度阈值)
        }}
        如无明确参数,使用默认值:{{"top_k": 3, "filters": {{}}, "threshold": 0.7}}
        
        用户查询:{state.user_query}
        """
        
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-3.5-turbo")
        param_response = llm.predict(param_extractor_prompt)
        retrieval_params = json.loads(param_response)
        
        # 2. 执行带参数的检索
        from langchain_community.vectorstores import FAISS
        from langchain_community.embeddings import OpenAIEmbeddings
        
        vector_db = FAISS.load_local("knowledge_base", OpenAIEmbeddings(), allow_dangerous_deserialization=True)
        
        # 应用过滤条件(实际实现取决于向量数据库支持)
        results = vector_db.similarity_search(
            query=state.user_query,
            k=retrieval_params["top_k"],
            filter=retrieval_params["filters"]
        )
        
        # 3. 按阈值过滤结果
        filtered_results = [
            {"content": doc.page_content, "score": doc.metadata.get("score", 0)}
            for doc in results
            if doc.metadata.get("score", 0) >= retrieval_params["threshold"]
        ]
        
        return {
            "retrieval_results": filtered_results,
            "retrieval_params": retrieval_params  # 记录使用的参数
        }

    六、思考大模型与思考过程输出的设计逻辑

    思考过程输出(“思维链”)增强了系统的可解释性,让用户理解 “为什么系统会这样回答”,核心是提示词引导增量输出

    1. 理论基础:思维链(Chain-of-Thought, CoT)
    • 认知模拟:通过引导模型输出推理步骤,模拟人类思考过程,提升复杂问题的解决能力。
    • 可解释性增强:将 “黑箱” 决策过程转化为 “白箱” 步骤,降低用户对系统的不信任感。
    2. 实现要点
    • 提示词设计:明确要求模型分步骤推理,如 “1. 是否需要检索?2. 需要调用什么工具?3. 如何整合结果?”。
    • 流式输出:思考过程与最终结果分离,通过不同的 SSE 事件类型(如thinking/output)分别传输

    • RESTful 风格:通过 POST 接口接收用户查询,返回 SSE 流,符合 HTTP 规范。
    • 会话隔离:通过thread_id区分不同对话,确保多用户并发时状态不混淆。
    • 错误处理:流式传输中通过特定事件(如error类型)传递异常信息,客户端可据此中断接收并提示用户。
    # 思考过程节点
    async def thinking_node(state: RAGState) -> Dict[str, Any]:
        """思考过程生成节点:输出模型的推理逻辑"""
        # 构建思考提示词(引导模型输出推理过程)
        thinking_prompt = f"""
        你需要分析如何回答用户的问题,逐步思考并记录你的推理过程:
        1. 这个问题需要检索知识库吗?为什么?
        2. 需要调用工具吗?应该调用什么工具?
        3. 回答这个问题需要哪些关键信息?
        
        用户问题:{state.user_query}
        已知信息:{state.retrieval_results + state.tool_results}
        """
        
        # 调用大模型生成思考过程
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-4", streaming=True)
        
        thinking_process = []
        async for chunk in llm.astream(thinking_prompt):
            content = chunk.content
            if content:
                thinking_process.append(content)
                # 流式输出思考过程
                yield {
                    "thinking_process": thinking_process.copy(),
                    "streaming_output": [f"思考中:{''.join(thinking_process)}"]
                }
        
        return {"thinking_process": thinking_process}
    
    # 将思考节点添加到工作流
    workflow.add_node("thinking", thinking_node)
    # 调整流程:意图识别 → 思考 → 路由
    workflow.add_edge("intent_recognition", "thinking")
    workflow.add_edge("thinking", "route")

    七、致谢

    谢谢大家的阅读,很多不足支出,欢迎大家在评论区指出,如果我的内容对你有帮助,

    可以点赞 , 收藏 ,大家的支持就是我坚持下去的动力!

    请赐予我平静,去接受我无法改变的 :赐予我勇气,去改变我能改变的!

    Logo

    有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

    更多推荐