文章介绍了一种深度思考型智能体RAG流水线,通过规划、检索、精炼、反思、批判和综合六个环节,解决了传统RAG系统无法处理复杂多跳、多源查询的问题。该系统使用LangGraph构建循环工作流,包含自适应检索漏斗、查询重写、元数据感知分块和自我批判机制等关键技术。与传统线性RAG相比,该架构能够分解复杂问题、选择适当工具、迭代优化检索结果,并通过策略智能体决定何时结束研究。实验证明,该系统在处理需要多源知识整合的复杂查询时表现显著优于基础RAG系统。


构建深度思考型智能体 RAG 流水线以解决复杂查询

涵盖规划、检索、反思、批判、综合等多个环节

一个 RAG 系统之所以会失败,通常不是因为大型语言模型(LLM)不够智能,而是因为它的架构过于简单。它试图用一种线性的、一次性的方法来处理一个循环的、多步骤的问题。

许多复杂查询需要推理反思以及在何时采取行动方面的明智决策,这与我们面对问题时检索信息的方式非常相似。这正是智能体驱动的操作在 RAG 流水线中发挥作用的地方。让我们来看看一个典型的深度思考 RAG 流水线是什么样的……

深度思考 RAG 流水线

    1. 规划 (Plan): 首先,智能体将复杂的用户查询分解为一个结构化的、多步骤的研究计划,并为每一步决定需要使用哪种工具(内部文档搜索或网络搜索)。
    1. 检索 (Retrieve): 对于每一步,它会执行一个自适应的多阶段检索漏斗,使用一个监督器(supervisor)来动态选择最佳的搜索策略(向量、关键词或混合搜索)。
    1. 精炼 (Refine): 接着,它使用一个高精度的交叉编码器(cross-encoder)对初步结果进行重排序,并使用一个蒸馏智能体(distiller agent)将最佳证据压缩为简洁的上下文。
    1. 反思 (Reflect): 每一步之后,智能体都会总结其发现并更新其研究历史,从而逐步建立对问题的累积理解。
    1. 批判 (Critique): 然后,一个策略智能体(policy agent)会检查这段历史,做出战略决策:是继续下一步研究,还是在遇到死胡同时修正计划,或是结束任务。
    1. 综合 (Synthesize): 一旦研究完成,一个最终的智能体会将所有来源收集到的证据综合成一个单一、全面且可引用的答案。

在本博客中,我们将实现整个深度思考 RAG 流水线,并将其与一个基础的 RAG 流水线进行比较,以展示它如何解决复杂的多跳查询(multi-hop queries)

所有代码和理论都可以在作者 GitHub 仓库中找到:

https://github.com/FareedKhan-dev/deep-thinking-rag

环境设置

在开始编写深度 RAG 流水线的代码之前,我们需要一个坚实的基础,因为一个生产级别的人工智能系统不仅关乎最终的算法,也关乎我们在设置过程中做出的审慎选择。

我们将要实现的每一步对于最终系统的有效性和可靠性都至关重要。

当我们开始开发一个流水线并进行反复试验时,最好将配置定义为一个简单的字典格式,因为后续当流水线变得复杂时,我们可以简单地回顾这个字典来更改配置,并观察其对整体性能的影响。

# 中央配置字典,用于管理所有系统参数config = {    "data_dir": "./data",                           # 存储原始和清理后数据的目录    "vector_store_dir": "./vector_store",           # 持久化向量存储的目录    "llm_provider": "openai",                       # 我们使用的 LLM 提供商    "reasoning_llm": "gpt-4o",                      # 用于规划和综合的强大模型    "fast_llm": "gpt-4o-mini",                      # 用于简单任务(如基线 RAG)的更快速、更便宜的模型    "embedding_model": "text-embedding-3-small",    # 用于创建文档嵌入的模型    "reranker_model": "cross-encoder/ms-marco-MiniLM-L-6-v2", # 用于精确重排序的模型    "max_reasoning_iterations": 7,                  # 防止智能体陷入无限循环的安全措施    "top_k_retrieval": 10,                          # 初始广泛召回的文档数量    "top_n_rerank": 3,                              # 精确重排序后保留的文档数量}

这些键都相当容易理解,但有三个键值得一提:

  • llm_provider:这是我们正在使用的 LLM 提供商,本文中使用 OpenAI。我使用 OpenAI 是因为在 LangChain 中可以轻松更换模型和提供商,但你可以选择任何适合你需求的提供商,比如 Ollama。
  • reasoning_llm:这必须是我们整个设置中最强大的模型,因为它将用于规划和综合。
  • fast_llm:这应该是一个更快、更便宜的模型,因为它将用于像基线 RAG 这样的简单任务。

现在我们需要导入将在整个流水线中使用的库,并将 API 密钥设置为环境变量,以避免在代码块中暴露它们。

import os                  # 用于与操作系统交互(例如,管理环境变量)import re                  # 用于正则表达式操作,有助于文本清理import json                # 用于处理 JSON 数据from getpass import getpass # 用于安全地提示用户输入(如 API 密钥),而不会在屏幕上回显from pprint import pprint   # 用于美观地打印 Python 对象,使其更具可读性import uuid                # 用于生成唯一标识符from typing importList, Dict, TypedDict, Literal, Optional# 用于类型提示,以创建清晰、可读和可维护的代码# 辅助函数,用于在环境变量不存在时安全地设置它们def_set_env(var: str):    # 检查环境变量是否尚未设置    ifnot os.environ.get(var):        # 如果未设置,则提示用户安全地输入        os.environ[var] = getpass(f"Enter your {var}: ")# 设置我们将使用的服务的 API 密钥_set_env("OPENAI_API_KEY")      # 用于访问 OpenAI 模型(GPT-4o, embeddings)_set_env("LANGSMITH_API_KEY")   # 用于使用 LangSmith 进行追踪和调试_set_env("TAVILY_API_KEY")      # 用于网络搜索工具# 启用 LangSmith 追踪,以获取我们智能体执行的详细日志和可视化os.environ["LANGSMITH_TRACING"] = "true"# 在 LangSmith 中定义一个项目名称,以组织我们的运行记录os.environ["LANGSMITH_PROJECT"] = "Advanced-Deep-Thinking-RAG"

我们还启用了 LangSmith 进行追踪。当你处理一个具有复杂、循环工作流的智能体系统时,追踪不仅仅是一个锦上添花的功能,它至关重要。它能帮助你可视化正在发生的事情,并使调试智能体的思考过程变得更加容易。

知识库溯源

一个生产级别的 RAG 系统需要一个既复杂又要求高的知识库,才能真正展示其有效性。为此,我们将使用 NVIDIA 的 2023 年 10-K 文件,这是一份超过一百页的综合性文件,详细说明了该公司的业务运营、财务表现以及披露的风险因素。

知识库溯源

首先,我们将实现一个自定义函数,该函数以编程方式直接从美国证券交易委员会(SEC)的 EDGAR 数据库下载 10-K 文件,解析原始 HTML,并将其转换为适合我们 RAG 流水线摄入的干净、结构化的文本格式。现在让我们来编写这个函数。

import requests # 用于发起 HTTP 请求以下载文档from bs4 import BeautifulSoup # 一个用于解析 HTML 和 XML 文档的强大库from langchain.docstore.document import Document # LangChain 中用于表示一段文本的标准数据结构defdownload_and_parse_10k(url, doc_path_raw, doc_path_clean):    # 检查清理后的文件是否已存在,以避免重复下载    if os.path.exists(doc_path_clean):        print(f"Cleaned 10-K file already exists at: {doc_path_clean}")        return    print(f"Downloading 10-K filing from {url}...")    # 设置一个 User-Agent 请求头来模拟浏览器,因为一些服务器会阻止脚本访问    headers = {'User-Agent': 'Mozilla/5.0'}    # 向 URL 发起 GET 请求    response = requests.get(url, headers=headers)    # 如果下载失败(例如,404 Not Found),则抛出错误    response.raise_for_status()    # 将原始 HTML 内容保存到文件中以便检查    withopen(doc_path_raw, 'w', encoding='utf-8') as f:        f.write(response.text)    print(f"Raw document saved to {doc_path_raw}")    # 使用 BeautifulSoup 解析和清理 HTML 内容    soup = BeautifulSoup(response.content, 'html.parser')    # 从常见的 HTML 标签中提取文本,并尝试保留段落结构    text = ''    for p in soup.find_all(['p', 'div', 'span']):        # 获取每个标签的文本,去除多余的空白,并添加换行符        text += p.get_text(strip=True) + '\n\n'    # 使用正则表达式清理多余的换行符和空格,以获得更干净的最终文本    clean_text = re.sub(r'\n{3,}', '\n\n', text).strip() # 将 3 个及以上的换行符合并为 2 个    clean_text = re.sub(r'\s{2,}', ' ', clean_text).strip() # 将 2 个及以上的空格合并为 1 个    # 将最终清理后的文本保存到 .txt 文件中    withopen(doc_path_clean, 'w', encoding='utf-8') as f:        f.write(clean_text)    print(f"Cleaned text content extracted and saved to {doc_path_clean}")

代码相当容易理解,我们使用 beautifulsoup4 来解析 HTML 内容并提取文本。它帮助我们轻松地导航 HTML 结构,检索相关信息,同时忽略像脚本或样式这样不必要的元素。

现在,让我们执行它,看看效果如何。

print("Downloading and parsing NVIDIA's 2023 10-K filing...")# 执行下载和解析函数download_and_parse_10k(url_10k, doc_path_raw, doc_path_clean)# 打开清理后的文件并打印一个样本以验证结果withopen(doc_path_clean, 'r', encoding='utf-8') as f:    print("\n--- Sample content from cleaned 10-K ---")    print(f.read(1000) + "...")#### OUTPUT ####Downloading and parsing NVIDIA 202310-K filing...Successfully downloaded 10-K filing from https://www.sec.gov/Archives/edgar/data/1045810/000104581023000017/nvda-20230129.htmRaw document saved to ./data/nvda_10k_2023_raw.htmlCleaned text content extracted and saved to ./data/nvda_10k_2023_clean.txt# --- Sample content from cleaned 10-K ---Item 1. Business.  OVERVIEW  NVIDIA is the pioneer of accelerated computing. We are a full-stack computing company with a platform strategy that brings together hardware, systems, software, algorithms, libraries, and services to create unique value for the markets we serve. Our work in accelerated computing and AI is reshaping the worlds largest industries and profoundly impacting society.  Founded in1993, we started as a PC graphics chip company, inventing the graphics processing unit, or GPU. The GPU was essential for the growth of the PC gaming market and has since been repurposed to revolutionize computer graphics, high performance computing, or HPC, and AI.  The programmability of our GPUs made them ...

我们只是简单地调用这个函数,将所有内容存储在一个 txt 文件中,这个文件将作为我们 RAG 流水线的上下文。

当我们运行上述代码时,你可以看到它开始为我们下载报告,并且我们能看到下载内容的样本是什么样的。

理解我们的多源、多跳查询

为了测试我们实现的流水线并将其与基础 RAG 进行比较,我们需要使用一个非常复杂的查询,它涵盖了我们正在处理的文档的不同方面。

我们的复杂查询:"根据 NVIDIA 的 2023 年 10-K 文件,找出其与竞争相关的关键风险。然后,查找关于 AMD 的 AI 芯片战略的最新消息(文件发布后,即 2024 年的新闻),并解释这一新战略如何直接应对或加剧了 NVIDIA 所陈述的风险之一。"

让我们来分析一下为什么这个查询对于一个标准的 RAG 流水线来说如此困难:

    1. 多跳推理 (Multi-Hop Reasoning): 它无法一步到位地回答。系统必须首先识别风险,然后找到关于 AMD 的新闻,最后将两者综合起来。
    1. 多源知识 (Multi-Source Knowledge): 所需信息存在于两个完全不同的地方。风险在我们静态的内部文档(10-K 文件)中,而关于 AMD 的新闻则是外部的,需要访问实时网络。
    1. 综合与分析 (Synthesis and Analysis): 查询并非要求简单地罗列事实。它要求解释一组事实如何使另一组事实恶化,这是一项需要真正综合能力的任务。

在下一节中,我们将实现一个基础的 RAG 流水线,并实际看看简单的 RAG 是如何在这个问题上失败的。

构建一个注定会失败的浅层 RAG 流水线

既然我们已经配置好了环境,并准备好了具有挑战性的知识库,我们下一个合乎逻辑的步骤就是构建一个标准的原生 RAG 流水线。这样做有一个至关重要的目的……

通过首先构建最简单的解决方案,我们可以用我们的复杂查询来测试它,并确切地观察它如何以及为何会失败。

在本节中,我们将要做以下事情:

浅层 RAG 流水线

  • 加载并分块文档: 我们将摄入清理后的 10-K 文件,并将其分割成小的、固定大小的块——这是一种常见但在语义上较为粗糙的方法。
  • 创建向量存储: 然后,我们将对这些块进行嵌入,并在 ChromaDB 向量存储中建立索引,以实现基本的语义搜索。
  • 组装 RAG 链: 我们将使用 LangChain 表达式语言(LangChain Expression Language, LCEL),它将我们的检索器、提示模板和一个 LLM 连接成一个线性的流水线。
  • 展示关键失败点: 我们将用我们的多跳、多源查询来执行这个简单的系统,并分析其不充分的响应。

首先,我们需要加载清理后的文档并进行分割。我们将使用 RecursiveCharacterTextSplitter,这是 LangChain 生态系统中的一个标准工具。

from langchain_community.document_loaders import TextLoader # 一个用于 .txt 文件的简单加载器from langchain.text_splitter import RecursiveCharacterTextSplitter # 一个标准的文本分割器print("Loading and chunking the document...")# 使用我们清理后的 10-K 文件的路径初始化加载器loader = TextLoader(doc_path_clean, encoding='utf-8')# 将文档加载到内存中documents = loader.load()# 使用定义的块大小和重叠来初始化文本分割器# chunk_size=1000: 每个块大约 1000 个字符长。# chunk_overlap=150: 每个块将与前一个块共享 150 个字符以保持一定的上下文。text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)# 将加载的文档分割成更小、更易于管理的块doc_chunks = text_splitter.split_documents(documents)print(f"Document loaded and split into {len(doc_chunks)} chunks.")#### OUTPUT ####Loading and chunking the document...Document loaded and split into 378 chunks.

我们的主文档被分成了 378 个块,下一步是让它们变得可搜索。为此,我们需要创建向量嵌入并将它们存储在数据库中。我们将使用 ChromaDB,一个流行的内存向量存储,以及我们配置中定义的 OpenAI text-embedding-3-small 模型。

from langchain_community.vectorstores import Chroma # 我们将使用的向量存储from langchain_openai import OpenAIEmbeddings # 用于创建嵌入的函数print("Creating baseline vector store...")# 使用我们配置中指定的模型初始化嵌入函数embedding_function = OpenAIEmbeddings(model=config['embedding_model'])# 从我们的文档块创建 Chroma 向量存储# 这个过程会为每个块创建一个嵌入,并将其索引化。baseline_vector_store = Chroma.from_documents(    documents=doc_chunks,    embedding=embedding_function)# 从向量存储创建一个检索器# 检索器是实际执行搜索的组件。# search_kwargs={"k": 3}: 这告诉检索器对于任何给定的查询,返回最相关的 3 个块。baseline_retriever = baseline_vector_store.as_retriever(search_kwargs={"k": 3})print(f"Vector store created with {baseline_vector_store._collection.count()} embeddings.")#### OUTPUT ####Creating baseline vector store...Vector store created with378 embeddings.

Chroma.from_documents 组织了这个过程,并将所有向量存储在一个可搜索的索引中。最后一步是使用 LangChain 表达式语言(LCEL)将它们组装成一个单一、可运行的 RAG 链。

这个链将定义数据的线性流程:从用户的问题到检索器,然后到提示,最后到 LLM。

from langchain_core.prompts import ChatPromptTemplate # 用于创建提示模板from langchain_openai import ChatOpenAI # OpenAI 聊天模型接口from langchain_core.runnable import RunnablePassthrough # 一个用于在链中传递输入的工具from langchain_core.output_parsers import StrOutputParser # 用于将 LLM 的输出解析为简单字符串# 这个模板指示 LLM 如何行事。# {context}: 这里我们将注入从检索到的文档中获得的内容。# {question}: 这里将放入用户的原始问题。template = """You are an AI financial analyst. Answer the question based only on the following context:{context}Question: {question}"""prompt = ChatPromptTemplate.from_template(template)# 我们使用配置中定义的 'fast_llm' 来完成这个简单的任务llm = ChatOpenAI(model=config["fast_llm"], temperature=0)# 一个辅助函数,用于将检索到的文档列表格式化为单个字符串defformat_docs(docs):    return"\n\n---\n\n".join(doc.page_content for doc in docs)# 使用 LCEL 的管道(|)语法定义的完整 RAG 链baseline_rag_chain = (    # 第一步是一个字典,定义了我们提示的输入    {"context": baseline_retriever | format_docs, "question": RunnablePassthrough()}    # context 是通过将问题传递给检索器并格式化结果来生成的    # 原始问题则原封不动地传递下去    | prompt # 然后这个字典被传递给提示模板    | llm # 格式化后的提示被传递给语言模型    | StrOutputParser() # LLM 的输出消息被解析为字符串)

你可能已经注意到,我们将一个字典定义为第一步。它的 context 键由一个子链填充:输入的问题进入 baseline_retriever,其输出(一个 Document 对象列表)由 format_docs 格式化为单个字符串。question 键则通过使用 RunnablePassthrough 简单地传递原始输入来填充。

让我们运行这个简单的流水线,并理解它在哪些地方失败了。

from rich.console import Console # 用于以 markdown 格式美观地打印输出from rich.markdown import Markdown# 初始化 rich 控制台以获得更好的输出格式console = Console()# 我们的复杂、多跳、多源查询complex_query_adv = "Based on NVIDIA's 2023 10-K filing, identify their key risks related to competition. Then, find recent news (post-filing, from 2024) about AMD's AI chip strategy and explain how this new strategy directly addresses or exacerbates one of NVIDIA's stated risks."print("Executing complex query on the baseline RAG chain...")# 使用我们具有挑战性的查询调用该链baseline_result = baseline_rag_chain.invoke(complex_query_adv)console.print("\n--- BASELINE RAG FAILED OUTPUT ---")# 使用 markdown 格式打印结果以提高可读性console.print(Markdown(baseline_result))

当你运行上述代码时,我们会得到以下输出。

#### OUTPUT ####Executing complex query on the baseline RAG chain...--- BASELINE RAG FAILED OUTPUT ---Based on the provided context, NVIDIA operates in an intensely competitive semiconductorindustry and faces competition from companies like AMD. The context mentionsthat the industry is characterized by rapid technological change. However, the provided documents do not contain any specific information about AMD's recent AI chip strategy from 2024 or how it might impact NVIDIA's stated risks.

在这个失败的 RAG 流水线及其输出中,你可能已经注意到了三件事。

  • 不相关的上下文: 检索器抓取了关于“NVIDIA”、“竞争”和“AMD”的一般性文本块,但错过了关于 2024 年 AMD 战略的具体细节。
  • 信息缺失: 关键的失败在于 2023 年的数据无法覆盖 2024 年的事件。系统没有意识到自己缺少关键信息。
  • 没有规划或工具使用: 将复杂查询当作简单问题处理。无法将其分解为步骤,也无法使用像网络搜索这样的工具来填补信息空白。

系统之所以失败,不是因为 LLM 愚蠢,而是因为架构过于简单。它是一个线性的、一次性的过程,却试图解决一个循环的、多步骤的问题。

既然我们已经理解了基础 RAG 流水线的问题所在,现在我们可以开始实现我们的深度思考方法,并看看它能多好地解决我们的复杂查询。

在这里插入图片描述

为中央智能体系统定义 RAG 状态

要构建我们的推理智能体,我们首先需要一种管理其状态的方法。在我们简单的 RAG 链中,每一步都是无状态的,但是……

一个智能的智能体需要记忆。它需要记住最初的问题、它创建的计划以及到目前为止收集到的证据。

RAG 状态

RAGState 将作为一个中央记忆库,在我们 LangGraph 工作流的每个节点之间传递。为了构建它,我们将定义一系列结构化的数据类,从最基本的构建块开始:一个研究计划中的单一步骤。

我们希望定义智能体计划的原子单位。每个 Step 不仅必须包含一个要回答的问题,还必须包含其背后的推理,以及至关重要的,智能体应该使用的具体工具。这迫使智能体的规划过程变得明确和结构化。

from langchain_core.documents import Documentfrom langchain_core.pydantic_v1 import BaseModel, Field# 用于表示智能体推理计划中单一步骤的 Pydantic 模型class Step(BaseModel):    # 此研究步骤的一个具体的、可回答的子问题    sub_question: str = Field(description="A specific, answerable question for this step.")    # 智能体对为何此步骤是必要的的论证    justification: str = Field(description="A brief explanation of why this step is necessary to answer the main query.")    # 此步骤要使用的具体工具:内部文档搜索或外部网络搜索    tool: Literal["search_10k", "search_web"] = Field(description="The tool to use for this step.")    # 一系列关键关键词,以提高搜索的准确性    keywords: List[str] = Field(description="A list of critical keywords for searching relevant document sections.")    # (可选) 一个可能的文档章节,用于执行更具针对性的、过滤后的搜索    document_section: Optional[str] = Field(description="A likely document section title (e.g., 'Item 1A. Risk Factors') to search within. Only for 'search_10k' tool.")

我们的 Step 类使用 Pydantic BaseModel,作为我们规划器智能体(Planner Agent)的严格契约。tool: Literal[...] 字段强制 LLM 在使用我们的内部知识(search_10k)和寻求外部信息(search_web)之间做出具体决策。

这种结构化的输出远比尝试解析自然语言计划要可靠得多。

现在我们已经定义了一个单独的 Step,我们需要一个容器来存放整个步骤序列。我们将创建一个 Plan 类,它只是一个 Step 对象的列表。这代表了智能体完整的、端到端的研究策略。

# 用于表示整个计划的 Pydantic 模型,它是一个包含多个独立步骤的列表class Plan(BaseModel):    # 一个 Step 对象的列表,概述了完整的系列研究计划    steps: List[Step] = Field(description="A detailed, multi-step plan to answer the user's query.")

我们编写的 Plan 类将为整个研究过程提供结构。当我们调用我们的规划器智能体时,我们会要求它返回一个符合此模式的 JSON 对象。这确保了智能体的策略在任何检索操作执行之前都是清晰、有序且机器可读的。

接下来,当我们的智能体执行其计划时,它需要一种方式来记住它学到了什么。我们将定义一个 PastStep 字典来存储每个已完成步骤的结果。这将构成智能体的研究历史实验笔记

# 一个 TypedDict,用于在我们的研究历史中存储已完成步骤的结果class PastStep(TypedDict):    step_index: int              # 已完成步骤的索引(例如,1, 2, 3)    sub_question: str            # 在此步骤中解决的子问题    retrieved_docs: List[Document] # 为此步骤检索和重排序的精确文档    summary: str                 # 智能体对此步骤发现的一句话总结

这个 PastStep 结构对于智能体的自我批判循环至关重要。在每一步之后,我们将填充一个这样的字典并将其添加到我们的状态中。然后,智能体将能够审查这个不断增长的摘要列表,以了解它所知道的内容,并决定是否拥有足够的信息来完成其任务。

最后,我们将所有这些部分整合到主 RAGState 字典中。这是将流经我们整个图的核心对象,它持有原始查询、完整计划、过去步骤的历史,以及正在执行的当前步骤的所有中间数据。

# 将在我们 LangGraph 智能体所有节点之间传递的主状态字典class RAGState(TypedDict):    original_question: str     # 用户发起的、启动整个流程的初始复杂查询    plan: Plan                 # 由规划器智能体生成的多步骤计划    past_steps: List[PastStep] # 已完成研究步骤及其发现的累积历史    current_step_index: int    # 正在执行的计划中当前步骤的索引    retrieved_docs: List[Document] # 当前步骤中检索到的文档(广泛召回的结果)    reranked_docs: List[Document]  # 当前步骤中经过精确重排序后的文档    synthesized_context: str   # 从重排序后的文档中生成的简洁、蒸馏的上下文    final_answer: str          # 对用户原始问题的最终、综合性答案

这个 RAGState TypedDict 是我们智能体的完整心智。我们图中的每个节点都将接收这个字典作为输入,并返回一个更新后的版本作为输出。

例如,plan_node 将填充 plan 字段,retrieval_node 将填充 retrieved_docs 字段,依此类推。这种共享的、持久的状态使得我们简单的 RAG 链所缺乏的复杂、迭代的推理成为可能。

随着我们智能体记忆蓝图的定义完成,我们准备构建我们系统的第一个认知组件:将填充此状态的规划器智能体。

战略规划与查询构建

在定义了我们的 RAGState 之后,我们现在可以构建我们智能体的第一个,也可以说是最关键的认知组件:它的规划能力。这是我们的系统从一个简单的数据获取器跃升为真正的推理引擎的地方。我们的智能体不会天真地将用户的复杂查询视为单次搜索,而是会首先停下来思考,并构建一个详细的、分步的研究策略。

战略规划

本节分为三个关键的工程步骤:

  • 具备工具意识的规划器 (The Tool-Aware Planner): 我们将构建一个由 LLM 驱动的智能体,其唯一的工作就是将用户的查询分解为一个结构化的 Plan 对象,并为每一步决定使用哪种工具。
  • 查询重写器 (The Query Rewriter): 我们将创建一个专门的智能体,将规划器的简单子问题转化为高效、优化的搜索查询。
  • 元数据感知的文本分块 (Metadata-Aware Chunking): 我们将重新处理我们的源文档,以添加章节级别的元数据,这是解锁高精度、过滤式检索的关键一步。

使用具备工具意识的规划器分解问题

基本上,我们要构建我们操作的大脑。当这个大脑收到一个复杂问题时,它需要做的第一件事就是制定一个行动计划。

分解步骤

我们不能 просто 将整个问题扔给我们的数据库,然后期望得到最好的结果。我们需要教会智能体如何将问题分解成更小、更易于管理的部分。

为此,我们将创建一个专门的规划器智能体(Planner Agent)。我们需要给它一套非常清晰的指令,或者说是一个提示(prompt),告诉它它的工作究竟是什么。

from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom rich.pretty import pprint as rprint# 指示 LLM 如何作为规划器行事的系统提示planner_prompt = ChatPromptTemplate.from_messages([    ("system", """You are an expert research planner. Your task is to create a clear, multi-step plan to answer a complex user query by retrieving information from multiple sources.You have two tools available:1. `search_10k`: Use this to search for information within NVIDIA's 2023 10-K financial filing. This is best for historical facts, financial data, and stated company policies or risks from that specific time period.2. `search_web`: Use this to search the public internet for recent news, competitor information, or any topic that is not specific to NVIDIA's 2023 10-K.Decompose the user's query into a series of simple, sequential sub-questions. For each step, decide which tool is more appropriate.For `search_10k` steps, also identify the most likely section of the 10-K (e.g., 'Item 1A. Risk Factors', 'Item 7. Management's Discussion and Analysis...').It is critical to use the exact section titles found in a 10-K filing where possible."""),    ("human", "User Query: {question}") # 用户的原始、复杂查询])

我们基本上是赋予 LLM 一个新的角色:专家级研究规划师。我们明确告诉它它拥有两种工具(search_10ksearch_web),并指导它何时使用每一种。这就是“工具感知”的部分。

我们不只是要求它制定一个计划,而是要求它创建一个直接映射到我们所构建能力的计划。

现在我们可以初始化推理模型,并将其与我们的提示链接起来。这里一个非常重要的步骤是告诉 LLM,它的最终输出必须是我们 Pydantic Plan 类的格式。这使得输出结构化且可预测。

# 初始化我们强大的推理模型,如配置中所定义reasoning_llm = ChatOpenAI(model=config["reasoning_llm"], temperature=0)# 通过将提示管道连接到 LLM 并指示其使用我们的结构化 'Plan' 输出,来创建规划器智能体planner_agent = planner_prompt | reasoning_llm.with_structured_output(Plan)print("Tool-Aware Planner Agent created successfully.")# 让我们用我们的复杂查询来测试规划器智能体,看看它的输出print("\n--- Testing Planner Agent ---")test_plan = planner_agent.invoke({"question": complex_query_adv})# 使用 rich 的 pretty print 功能来清晰、可读地显示 Pydantic 对象rprint(test_plan)

我们将 planner_prompt 通过管道传递给我们强大的 reasoning_llm,然后使用 .with_structured_output(Plan) 方法。这告诉 LangChain 使用模型的功能调用能力,将其响应格式化为一个与我们的 Plan Pydantic 模式完美匹配的 JSON 对象。这比尝试解析纯文本响应要可靠得多。

让我们看看当我们用我们的挑战性查询测试它时的输出。

#### OUTPUT ####Tool-Aware Planner Agent created successfully.--- Testing Planner Agent ---Plan(│   steps=[│   │   Step(│   │   │   sub_question="What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?",│   │   │   justification="This step is necessary to extract the foundational information about competitive risks directly from the source document as requested by the user.",│   │   │   tool='search_10k',│   │   │   keywords=['competition', 'risk factors', 'semiconductor industry', 'competitors'],│   │   │   document_section='Item 1A. Risk Factors'│   │   ),│   │   Step(│   │   │   sub_question="What are the recent news and developments in AMD's AI chip strategy in 2024?",│   │   │   justification="This step requires finding up-to-date, external information that is not available in the 2023 10-K filing. A web search is necessary to get the latest details on AMD's strategy.",│   │   │   tool='search_web',│   │   │   keywords=['AMD', 'AI chip strategy', '2024', 'MI300X', 'Instinct accelerator'],│   │   │   document_section=None│   │   )│   ])

如果我们看输出,你会发现智能体不只是给了我们一个模糊的计划,它产生了一个结构化的 Plan 对象。它正确地识别出查询有两个部分。

    1. 对于第一部分,它知道答案在 10-K 文件中,并选择了 search_10k 工具,甚至正确地猜到了正确的文档章节。
    1. 对于第二部分,它知道“2024 年的新闻”不可能出现在 2023 年的文档中,并正确地选择了 search_web 工具。这是我们的流水线在思考过程中至少会给出有希望结果的第一个迹象。

通过查询重写智能体优化检索

好了,我们现在有了一个包含良好子问题的计划。

但像“风险有哪些?”这样的问题并不是一个好的搜索查询。它太笼统了。搜索引擎,无论是向量数据库还是网络搜索,对于具体、关键词丰富的查询效果最好。

查询重写智能体

为了解决这个问题,我们将构建另一个小型的、专门化的智能体:查询重写器 (Query Rewriter)。它唯一的工作就是获取当前步骤的子问题,并通过添加我们已经学到的相关关键词和上下文,使其更适合搜索。

首先,让我们为这个新智能体设计提示。

from langchain_core.output_parsers import StrOutputParser # 用于将 LLM 的输出解析为简单字符串# 我们查询重写器的提示,指示它扮演搜索专家的角色query_rewriter_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a search query optimization expert. Your task is to rewrite a given sub-question into a highly effective search query for a vector database or web search engine, using keywords and context from the research plan.The rewritten query should be specific, use terminology likely to be found in the target source (a financial 10-K or news articles), and be structured to retrieve the most relevant text snippets."""),    ("human", "Current sub-question: {sub_question}\n\nRelevant keywords from plan: {keywords}\n\nContext from past steps:\n{past_context}")])

我们基本上是在告诉这个智能体要像一个搜索查询优化专家一样行事。我们给它三部分信息来处理:简单的 sub_question、我们规划器已经识别出的 keywords,以及来自任何先前研究步骤的 past_context。这为它构建一个更好的查询提供了所有必要的原材料。

现在我们可以初始化这个智能体了。这是一个简单的链,因为我们只需要一个字符串作为输出。

# 通过将提示管道连接到我们的推理 LLM 和一个字符串输出解析器来创建智能体query_rewriter_agent = query_rewriter_prompt | reasoning_llm | StrOutputParser()print("Query Rewriter Agent created successfully.")# 让我们来测试重写器智能体。我们将假装已经完成了我们计划的前两个步骤。print("\n--- Testing Query Rewriter Agent ---")# 假设我们正处于一个需要前两步上下文的最终综合步骤。test_sub_q = "How does AMD's 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA's 10-K?"test_keywords = ['impact', 'threaten', 'competitive pressure', 'market share', 'technological change']# 我们创建一些模拟的“过去上下文”来模拟智能体在真实运行中此时会知道什么。test_past_context = "Step 1 Summary: NVIDIA's 10-K lists intense competition and rapid technological change as key risks. Step 2 Summary: AMD launched its MI300X AI accelerator in 2024 to directly compete with NVIDIA's H100."# 使用我们的测试数据调用智能体rewritten_q = query_rewriter_agent.invoke({    "sub_question": test_sub_q,    "keywords": test_keywords,    "past_context": test_past_context})print(f"Original sub-question: {test_sub_q}")print(f"Rewritten Search Query: {rewritten_q}")

为了正确测试这个,我们必须模拟一个真实场景。我们创建了一个 test_past_context 字符串,它代表了智能体已经从其计划的前两个步骤中生成的摘要。然后我们将这个,连同下一个子问题,一起喂给我们的 query_rewriter_agent

让我们看看结果。

#### OUTPUT ####Query Rewriter Agent created successfully.--- Testing Query Rewriter Agent ---Original sub-question: How does AMD 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA 10-K?Rewritten Search Query: analysis of how AMD 2024 AI chip strategy, including products like the MI300X, exacerbates NVIDIA's stated competitive risks such as rapid technological change and market share erosion in the data center and AI semiconductor industry

原始问题是给分析师的,而重写后的查询是给搜索引擎的。它被赋予了诸如“MI300X”、“市场份额侵蚀”和“数据中心”等特定术语,所有这些都是从关键词和过去的上下文中综合出来的。

像这样的查询更有可能检索到完全正确的文档,使我们的整个系统更加准确和高效。这个重写步骤将是我们主要智能体循环中的一个关键部分。

利用元数据感知的分块实现精确检索

所以,基本上,我们的规划器智能体给了我们一个很好的机会。它不只是说找到风险,它给出了一个提示:在“Item 1A. Risk Factors”章节中寻找风险

但目前,我们的检索器无法使用这个提示。我们的向量存储只是一个包含 378 个文本块的庞大、扁平的列表。它根本不知道什么是“章节”。

元数据感知的分块

我们需要解决这个问题。我们将从头开始重建我们的文档块。这一次,对于我们创建的每一个块,我们都会添加一个标签标记——它的元数据——告诉我们的系统它究竟来自 10-K 文件的哪个章节。这将使我们的智能体能够在之后执行高精度的、过滤后的搜索。

首先,我们需要一种方法来以编程方式找到每个章节在我们原始文本文件中的起始位置。如果我们查看文档,可以发现一个清晰的模式:每个主要章节都以单词“ITEM”开头,后跟一个数字,如“ITEM 1A”或“ITEM 7”。这非常适合使用正则表达式来处理。

# 这个正则表达式旨在查找 10-K 文本中像 'ITEM 1A.' 或 'ITEM 7.' 这样的章节标题。# 它寻找单词 'ITEM',后跟一个空格、一个数字、一个可选的字母、一个句点,然后捕获标题文本。# `re.IGNORECASE | re.DOTALL` 标志使搜索不区分大小写,并允许 '.' 匹配换行符。section_pattern = r"(ITEM\\s+\\d[A-Z]?\\.\\s*.*?)(?=\\nITEM\\s+\\d[A-Z]?\\.|$)"

我们基本上是创建了一个模式,它将作为我们的章节探测器。它的设计应该足够灵活,以捕捉不同的格式,同时又足够具体,不会抓取错误的文本。

现在我们可以使用这个模式将我们的文档切分成两个独立的列表:一个只包含章节标题,另一个包含每个章节内的内容。

# 我们将使用之前从 Document 对象中加载的原始文本raw_text = documents[0].page_content# 使用 re.findall 应用我们的模式,并将所有章节标题提取到一个列表中section_titles = re.findall(section_pattern, raw_text, re.IGNORECASE | re.DOTALL)# 一个快速的清理步骤,去除标题中任何多余的空白或换行符section_titles = [title.strip().replace('\\n', ' ') for title in section_titles]# 现在,使用 re.split 在每个章节标题出现的地方将文档分割开sections_content = re.split(section_pattern, raw_text, flags=re.IGNORECASE | re.DOTALL)# 分割结果是一个混合了标题和内容的列表,所以我们过滤它以只获取内容部分sections_content = [content.strip() for content in sections_content if content.strip() andnot content.strip().lower().startswith('item ')]print(f"Identified {len(section_titles)} document sections.")# 这是一个至关重要的健全性检查:如果标题的数量与内容块的数量不匹配,说明出了问题。assertlen(section_titles) == len(sections_content), "Mismatch between titles and content sections"

这是一种解析半结构化文档的非常有效的方法。我们使用了两次正则表达式模式:一次是为了获得一个干净的所有章节标题列表,另一次是为了将主文本分割成一个内容块列表。assert 语句让我们相信我们的解析逻辑是可靠的。

好了,现在我们有了各个部分:一个标题列表和一个相应的内容列表。我们现在可以遍历它们,创建我们最终的、富含元数据的文本块。

import uuid # 我们将用它为每个块赋予一个唯一的 ID,这是一个好习惯# 这个列表将存放我们新的、富含元数据的文档块doc_chunks_with_metadata = []# 使用 enumerate 遍历每个章节的内容及其标题for i, content inenumerate(sections_content):    # 获取当前内容块对应的标题    section_title = section_titles[i]    # 使用和之前一样的文本分割器,但这次,我们只在当前章节的内容上运行它    section_chunks = text_splitter.split_text(content)        # 现在,遍历从这一个章节创建的更小的块    for chunk in section_chunks:        # 为这个特定的块生成一个唯一的 ID        chunk_id = str(uuid.uuid4())        # 为这个块创建一个新的 LangChain Document 对象        doc_chunks_with_metadata.append(            Document(                page_content=chunk,                # 这是最重要的部分:我们附加元数据                metadata={                    "section": section_title,      # 这个块所属的章节                    "source_doc": doc_path_clean,  # 文档来源                    "id": chunk_id                 # 这个块的唯一 ID                }            )        )print(f"Created {len(doc_chunks_with_metadata)} chunks with section metadata.")print("\n--- Sample Chunk with Metadata ---")# 为了证明它奏效了,我们找一个我们知道应该在'Risk Factors'章节的块并打印出来sample_chunk = next(c for c in doc_chunks_with_metadata if"Risk Factors"in c.metadata.get("section", ""))print(sample_chunk)

这是我们升级的核心。我们逐一遍历每个章节。对于每个章节,我们创建我们的文本块。但在将它们添加到我们的最终列表之前,我们创建一个 metadata 字典并附加上 section_title。这有效地为每一个块都打上了其来源的标签。

让我们看看输出,并比较一下差异。

#### OUTPUT ####Processing document and adding metadata...Identified 22 document sections.Created 381 chunks with section metadata.--- Sample Chunk with Metadata ---Document(│   page_content='Our industry is intensely competitive. We operate in the semiconductor\\nindustry, which is intensely competitive and characterized by rapid\\ntechnological change and evolving industry standards. We compete with a number of\\ncompanies that have different business models and different combinations of\\nhardware, software, and systems expertise, many of which have substantially\\ngreater resources than we have. We expect competition to increase from existing\\ncompetitors, as well as new and emerging companies. Our competitors include\\nIntel, AMD, and Qualcomm; cloud service providers, or CSPs, such as Amazon Web\\nServices, or AWS, Google Cloud, and Microsoft Azure; and various companies\\ndeveloping or that may develop processors or systems for the AI, HPC, data\\ncenter, gaming, professional visualization, and automotive markets. Some of our\\ncustomers are also our competitors. Our business could be materially and\\nadversely affected if our competitors announce or introduce new products, services,\\nor technologies that have better performance or features, are less expensive, or\\nthat gain market acceptance.',│   metadata={│   │   'section': 'Item 1A. Risk Factors.',│   │   'source_doc': './data/nvda_10k_2023_clean.txt',│   │   'id': '...'│   })

看看那个 metadata 块。和之前相同的文本块现在附加上了一段上下文信息:'section': 'Item 1A. Risk Factors.'

现在,当我们的智能体需要寻找风险时,它可以告诉检索器,“嘿,不要搜索全部 381 个块。只搜索那些章节元数据是‘Item 1A. Risk Factors’的块”。

这个简单的改变将我们的检索器从一个迟钝的工具转变为一个外科手术刀,这也是构建真正生产级别 RAG 系统的关键原则。

创建多阶段检索漏斗

到目前为止,我们已经设计了一个智能的规划器,并用元数据丰富了我们的文档。现在,我们准备构建我们系统的核心:一个复杂的检索流水线。

一个简单的、一次性的语义搜索已经不够好了。对于一个生产级别的智能体,我们需要一个既自适应多阶段的检索过程。

我们将我们的检索过程设计成一个漏斗,其中每个阶段都会精炼前一阶段的结果:

多阶段漏斗

  • 检索监督器 (The Retrieval Supervisor): 我们将构建一个新的监督器智能体,它作为一个动态路由器,分析每个子问题并选择最佳的搜索策略(向量、关键词或混合搜索)。
  • 阶段 1(广泛召回): 我们将实现我们的监督器可以选择的不同检索策略,重点是撒下一张大网,捕捉所有可能相关的文档。
  • 阶段 2(高精度): 我们将使用一个交叉编码器(Cross-Encoder)模型来重排序初步结果,剔除噪音并将最相关的文档提升到顶部。
  • 阶段 3(综合): 最后,我们将创建一个蒸馏智能体 (Distiller Agent),将排名靠前的文档压缩成一个单一、简洁的段落,作为我们下游智能体的上下文。

使用监督器动态选择策略

基本上,并非所有的搜索查询都是一样的。像“‘计算与网络’部门的收入是多少?”这样的问题包含具体的、精确的术语。基于关键词的搜索对此非常完美。

但像……

“公司对市场竞争的情绪如何?”这样的问题是概念性的。基于语义的、向量的搜索会好得多。

监督智能体

我们不会硬编码一种策略,而是将构建一个小型、智能的智能体,即检索监督器,来为我们做这个决定。它唯一的工作就是查看搜索查询,并决定我们的哪种检索方法最合适。

首先,我们需要定义我们的监督器可以做出的可能决策。我们将使用一个 Pydantic BaseModel 来结构化其输出。

class RetrievalDecision(BaseModel):    # 选择的检索策略。必须是这三个选项之一。    strategy: Literal["vector_search", "keyword_search", "hybrid_search"]    # 智能体对其选择的理由。    justification: str

监督器必须选择这三种策略中的一种,并解释其推理过程。这使其决策过程透明且可靠。

现在,让我们创建将指导该智能体行为的提示。

retrieval_supervisor_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a retrieval strategy expert. Based on the user's query, you must decide the best retrieval strategy.You have three options:1. `vector_search`: Best for conceptual, semantic, or similarity-based queries.2. `keyword_search`: Best for queries with specific, exact terms, names, or codes (e.g., 'Item 1A', 'Hopper architecture').3. `hybrid_search`: A good default that combines both, but may be less precise than a targeted strategy."""),    ("human", "User Query: {sub_question}") # 重写后的搜索查询将在此处传递。])

我们在这里创建了一个非常直接的提示,告诉 LLM 它的角色是检索策略专家,并清楚地解释了其可用的每种策略在何时最为有效。

最后,我们可以组装我们的监督器智能体。

# 通过将我们的提示传递给推理 LLM 并使用我们的 Pydantic 类结构化其输出来创建智能体retrieval_supervisor_agent = retrieval_supervisor_prompt | reasoning_llm.with_structured_output(RetrievalDecision)print("Retrieval Supervisor Agent created.")# 让我们用两种不同类型的查询来测试它,看看它的行为如何print("\n--- Testing Retrieval Supervisor Agent ---")query1 = "revenue growth for the Compute & Networking segment in fiscal year 2023"decision1 = retrieval_supervisor_agent.invoke({"sub_question": query1})print(f"Query: '{query1}'")print(f"Decision: {decision1.strategy}, Justification: {decision1.justification}")query2 = "general sentiment about market competition and technological innovation"decision2 = retrieval_supervisor_agent.invoke({"sub_question": query2})print(f"\nQuery: '{query2}'")print(f"Decision: {decision2.strategy}, Justification: {decision2.justification}")

在这里,我们将所有部分连接起来。

我们的 .with_structured_output(RetrievalDecision) 再次承担了繁重的工作,确保我们从 LLM 那里得到一个干净、可预测的 RetrievalDecision 对象。让我们看看测试结果。

#### OUTPUT ####Retrieval Supervisor Agent created.# --- Testing Retrieval Supervisor Agent ---Query: 'revenue growth for the Compute & Networking segment in fiscal year 2023'Decision: keyword_search, Justification: The query contains specific keywords like 'revenue growth', 'Compute & Networking', and 'fiscal year 2023' which are ideal for a keyword-based search to find exact financial figures.Query: 'general sentiment about market competition and technological innovation'Decision: vector_search, Justification: This query is conceptual and seeks to understand sentiment and broader themes. Vector search is better suited to capture the semantic meaning of 'market competition' and 'technological innovation' rather than relying on exact keywords.

我们可以看到,它正确地识别出第一个查询充满了特定术语,并选择了 keyword_search

对于第二个概念性和抽象性的查询,它正确地选择了 vector_search。在我们检索漏斗开始时的这种动态决策,是对“一刀切”方法的一个很好的升级。

通过混合、关键词和语义搜索实现广泛召回

既然我们有了一个监督器来选择我们的策略,我们就需要构建检索策略本身。我们漏斗的这个第一阶段完全是关于召回(Recall)——我们的目标是撒下一张大网,捕捉每一个可能相关的文档,即使我们在此过程中会拾取一些噪音。

广泛召回

为此,我们将实现三个我们的监督器可以调用的不同搜索函数:

    1. 向量搜索 (Vector Search): 我们的标准语义搜索,但现在升级为使用元数据过滤器。
    1. 关键词搜索 (Keyword Search, BM25): 一种经典的、强大的算法,擅长查找包含特定、精确术语的文档。
    1. 混合搜索 (Hybrid Search): 一种两全其美的方法,它使用一种称为倒数排序融合(Reciprocal Rank Fusion, RRF)的技术结合了向量搜索和关键词搜索的结果。

首先,我们需要使用我们在上一节中创建的富含元数据的块来创建一个新的、高级的向量存储。

import numpy as np # Python 中用于数值运算的基础库from rank_bm25 import BM25Okapi # 用于实现 BM25 关键词搜索算法的库print("Creating advanced vector store with metadata...")# 我们创建一个新的 Chroma 向量存储,这次使用我们富含元数据的块advanced_vector_store = Chroma.from_documents(    documents=doc_chunks_with_metadata,    embedding=embedding_function)print(f"Advanced vector store created with {advanced_vector_store._collection.count()} embeddings.")

这是一个简单但关键的步骤。这个 advanced_vector_store 现在包含与我们基线相同的文本,但每个嵌入的块都用其章节标题进行了标记,这解锁了我们执行过滤搜索的能力。

接下来,我们需要为我们的关键词搜索做准备。BM25 算法通过分析文档中单词的频率来工作。要启用此功能,我们需要通过将每个文档的内容分割成一个单词列表(tokens)来预处理我们的语料库。

print("\nBuilding BM25 index for keyword search...")# 创建一个列表,其中每个元素都是来自一个文档的单词列表tokenized_corpus = [doc.page_content.split(" ") for doc in doc_chunks_with_metadata]# 创建一个包含所有唯一文档 ID 的列表doc_ids = [doc.metadata["id"] for doc in doc_chunks_with_metadata]# 创建一个从文档 ID 到完整 Document 对象的映射,以便于查找doc_map = {doc.metadata["id"]: doc for doc in doc_chunks_with_metadata}# 使用我们的分词语料库初始化 BM25Okapi 索引bm25 = BM25Okapi(tokenized_corpus)

我们基本上是为我们的 BM25 索引创建了必要的数据结构。tokenized_corpus 是算法将要搜索的对象,而 doc_map 将允许我们在搜索完成后快速检索完整的 Document 对象。

现在我们可以定义我们的三个检索函数。

# 策略 1: 纯向量搜索,带元数据过滤defvector_search_only(query: str, section_filter: str = None, k: int = 10):    # 这个字典定义了元数据过滤器。ChromaDB 只会搜索匹配此条件的文档。    filter_dict = {"section": section_filter} if section_filter and"Unknown"notin section_filter elseNone    # 执行相似性搜索,并可选地使用过滤器    return advanced_vector_store.similarity_search(query, k=k, filter=filter_dict)# 策略 2: 纯关键词搜索 (BM25)defbm25_search_only(query: str, k: int = 10):    # 对传入的查询进行分词    tokenized_query = query.split(" ")    # 获取查询相对于语料库中所有文档的 BM25 分数    bm25_scores = bm25.get_scores(tokenized_query)    # 获取得分最高的 k 个文档的索引    top_k_indices = np.argsort(bm25_scores)[::-1][:k]    # 使用我们的 doc_map 返回得分最高的文档的完整 Document 对象    return [doc_map[doc_ids[i]] for i in top_k_indices]# 策略 3: 混合搜索,采用倒数排序融合 (RRF)defhybrid_search(query: str, section_filter: str = None, k: int = 10):    # 1. 执行关键词搜索    bm25_docs = bm25_search_only(query, k=k)    # 2. 执行带元数据过滤的语义搜索    semantic_docs = vector_search_only(query, section_filter=section_filter, k=k)    # 3. 使用倒数排序融合 (RRF) 合并并重新排序结果    # 获取两种搜索方法找到的所有文档的唯一集合    all_docs = {doc.metadata["id"]: doc for doc in bm25_docs + semantic_docs}.values()    # 从每个搜索结果中创建仅包含文档 ID 的列表    ranked_lists = [[doc.metadata["id"] for doc in bm25_docs], [doc.metadata["id"] for doc in semantic_docs]]        # 初始化一个字典来存储每个文档的 RRF 分数    rrf_scores = {}    # 遍历每个排名列表(BM25 和语义搜索)    for doc_list in ranked_lists:        # 遍历列表中的每个文档 ID 及其排名 (i)        for i, doc_id inenumerate(doc_list):            if doc_id notin rrf_scores:                rrf_scores[doc_id] = 0            # RRF 公式:将 1 / (排名 + k) 加到分数上。我们使用 k=61 作为标准默认值。            rrf_scores[doc_id] += 1 / (i + 61)     # 根据它们的最终 RRF 分数按降序对文档 ID 进行排序    sorted_doc_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)    # 根据融合后的排名返回前 k 个 Document 对象    final_docs = [doc_map[doc_id] for doc_id in sorted_doc_ids[:k]]    return final_docsprint("\nAll retrieval strategy functions ready.")

我们现在已经实现了我们自适应检索系统的核心。

  • vector_search_only 函数是我们升级后的语义搜索。关键的增加是 filter=filter_dict 参数,它允许我们传递来自我们规划器 Step 中的 document_section,并强制搜索只考虑具有该元数据的块。
  • bm25_search_only 函数是我们的纯关键词检索器。对于查找语义搜索可能错过的特定术语,它非常快速和有效。
  • hybrid_search 函数并行运行两种搜索,然后使用 RRF 智能地合并结果。RRF 是一个简单但强大的算法,它根据文档在每个列表中的位置来排名,有效地给予在两种搜索结果中都排名靠前的文档更高的权重。

让我们做一个快速测试,看看我们的关键词搜索是如何工作的。我们将搜索我们的规划器识别出的确切章节标题。

# 测试关键词搜索,看它是否能精确找到特定章节print("\n--- Testing Keyword Search ---")test_query = "Item 1A. Risk Factors"test_results = bm25_search_only(test_query)print(f"Query: {test_query}")print(f"Found {len(test_results)} documents. Top result section: {test_results[0].metadata['section']}")
``````plaintext
#### OUTPUT ####Creating advanced vector store with metadata...Advanced vector store created with 381 embeddings.Building BM25 index for keyword search...All retrieval strategy functions ready.# --- Testing Keyword Search ---Query: Item 1A. Risk FactorsFound 10 documents. Top result section: Item 1A. Risk Factors.

输出正是我们想要的。BM25 搜索以关键词为中心,仅通过搜索标题就能完美、即时地检索到来自 Item 1A. Risk Factors 章节的文档。

我们的监督器现在可以在查询包含像章节标题这样的特定关键词时,选择这个精确的工具。

随着我们广泛召回阶段的构建完成,我们有了一个强大的机制来找到所有可能相关的文档。然而,这张大网也可能带来不相关的噪音。我们漏斗的下一个阶段将专注于用高精度来过滤这些噪音。

使用交叉编码器重排序器实现高精度

所以,我们第一阶段的检索在**召回(Recall)**方面做得很好。它提取了 10 份可能与我们子问题相关的文档。

但问题在于,它们只是可能相关。将这 10 个文本块全部直接喂给我们主要的推理 LLM 是低效且有风险的。

这会增加 token 成本,更重要的是,它可能会用嘈杂的、半相关的信息来混淆模型

高精度

我们现在需要的是一个精度阶段(Precision stage)。我们需要一种方法来检查那 10 份候选文档,并挑选出绝对最好的。这就是**重排序器(Reranker)**发挥作用的地方。

关键区别在于这些模型的工作方式。

    1. 我们最初的检索使用双编码器(bi-encoder)(即嵌入模型),它独立地为查询和文档创建向量。它速度快,非常适合在数百万个项目中进行搜索。
    1. 而**交叉编码器(cross-encoder)**则将查询和单个文档作为一个对(pair)一起处理,并进行更深入、更细致的比较。它速度较慢,但准确得多。

所以,基本上,我们想构建一个函数,它接收我们检索到的 10 份文档,并使用一个交叉编码器模型为每一份文档给出一个精确的相关性分数。然后,我们将只保留我们 config 中定义的前 3 份。

首先,让我们初始化我们的交叉编码器模型。我们将使用 sentence-transformers 库中一个小型但高效的模型,正如我们配置中所指定的。

from sentence_transformers import CrossEncoder # 使用交叉编码器模型的库print("Initializing CrossEncoder reranker...")# 使用我们中央配置字典中的名称初始化 CrossEncoder 模型。# 如果模型没有被缓存,库将自动从 Hugging Face Hub 下载。reranker = CrossEncoder(config["reranker_model"])

我们基本上是将预训练的重排序模型加载到内存中。这只需要做一次。我们选择的模型 ms-marco-MiniLM-L-6-v2 在这项任务中非常受欢迎,因为它在速度和准确性之间提供了很好的平衡。

现在我们可以创建将执行重排序的函数。

def rerank_documents_function(query: str, documents: List[Document]) -> List[Document]:    # 如果没有文档需要重排序,立即返回一个空列表。    ifnot documents:         return []            # 创建交叉编码器所需的 [查询, 文档内容] 对。    pairs = [(query, doc.page_content) for doc in documents]        # 使用重排序器为每个对预测一个相关性分数。这将返回一个分数列表。    scores = reranker.predict(pairs)        # 将原始文档与它们的新分数结合起来。    doc_scores = list(zip(documents, scores))        # 根据分数按降序对 (文档, 分数) 元组列表进行排序。    doc_scores.sort(key=lambda x: x[1], reverse=True)        # 从排序后的前 N 个结果中仅提取 Document 对象。    # 要保留的文档数量由我们配置中的 'top_n_rerank' 控制。    reranked_docs = [doc for doc, score in doc_scores[:config["top_n_rerank"]]]        return reranked_docs

这个函数 rerank_documents_function 是我们精度阶段的主要部分。它接收来自我们召回阶段的 query 和 10 份 documents 列表。最重要的步骤是 reranker.predict(pairs)

在这里,模型不是在创建嵌入,它是在对查询与每个文档内容进行全面比较,为每个文档生成一个相关性分数。

在得到分数后,我们简单地对文档进行排序,并切片列表以只保留前 3 个。这个函数的输出将是一个简短、干净且高度相关的文档列表——这是我们下游智能体的完美上下文。

这种漏斗式方法,从高召回率的第一阶段过渡到高精度的第二阶段,是生产级 RAG 系统的一个组成部分。它确保我们获得最佳证据,同时最小化噪音和成本。

利用上下文蒸馏进行综合

所以,我们的检索漏斗工作得非常出色。我们从一个广泛的搜索开始,得到了 10 个可能相关的文档。然后,我们的高精度重排序器将其过滤到前 3 个最相关的文本块。

我们现在处于一个好得多的位置,但在将这些信息交给我们的主要推理智能体之前,我们还可以做最后一个改进。目前,我们有三个独立的文本块。

综合

虽然它们都相关,但可能包含冗余信息或重叠的句子。将它们作为三个独立的块呈现,对于 LLM 来说处理起来仍然有点笨拙。

我们检索漏斗的最后阶段是上下文蒸馏(Contextual Distillation)。目标很简单:将我们排名前 3 的高度相关的文档块蒸馏成一个单一、干净、简洁的段落。这消除了最后的冗余,并为我们的下游智能体呈现了一份完美综合的证据。

这个蒸馏步骤就像一个最终的压缩层。它确保喂给我们更昂贵的推理智能体的上下文尽可能地密集和信息丰富,从而最大化信号并最小化噪音。

为此,我们将创建另一个小型的、专门的智能体,我们称之为蒸馏智能体(Distiller Agent)

首先,我们需要设计将指导其行为的提示。

# 我们蒸馏智能体的提示,指示它进行综合并保持简洁distiller_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a helpful assistant. Your task is to synthesize the following retrieved document snippets into a single, concise paragraph.The goal is to provide a clear and coherent context that directly answers the question: '{question}'.Focus on removing redundant information and organizing the content logically. Answer only with the synthesized context."""),    ("human", "Retrieved Documents:\n{context}") # 我们排名前 3 的重排序文档内容将在此处传递])

我们基本上是给这个智能体一个非常集中的任务。我们告诉它:“这里有一些文本片段。你唯一的工作就是将它们合并成一个连贯的段落,来回答这个具体的问题”。“仅用综合后的上下文作答”的指令很重要,它防止智能体添加任何对话性的废话或试图自己回答问题。它纯粹是一个文本处理工具。

现在,我们可以组装我们简单的 distiller_agent

# 通过将我们的提示管道连接到推理 LLM 和一个字符串输出解析器来创建智能体distiller_agent = distiller_prompt | reasoning_llm | StrOutputParser()print("Contextual Distiller Agent created.")

这是另一个直接的 LCEL 链。我们使用 distiller_prompt,将其通过管道传递给我们强大的 reasoning_llm 来执行综合,然后使用 StrOutputParser 来获得最终的、干净的文本段落。

随着这个 distiller_agent 的创建,我们的多阶段检索漏斗现在已经完成。在我们主要的智能体循环中,每个研究步骤的流程将是:

    1. 监督器 (Supervisor): 选择一种检索策略(vectorkeywordhybrid)。
    1. 召回阶段 (Recall Stage): 执行所选策略以获取前 10 份文档。
    1. 精度阶段 (Precision Stage): 使用 rerank_documents_function 获取前 3 份文档。
    1. 蒸馏阶段 (Distillation Stage): 使用 distiller_agent 将前 3 份文档压缩成一个单一、干净的段落。

这个多阶段过程确保了我们的智能体所使用的证据质量尽可能高。下一步是让我们的智能体有能力超越其内部知识,去搜索网络。

通过网络搜索增强知识

所以,我们的检索漏斗现在非常强大,但它有一个巨大的盲点。

它只能看到我们 2023 年 10-K 文件内部的内容。为了解决我们的挑战性查询,我们的智能体需要找到关于 AMD AI 芯片战略的最新消息(文件发布后,即 2024 年的新闻)。这些信息在我们的静态知识库中根本不存在。

要真正构建一个“深度思考”的智能体,它需要能够认识到自身知识的局限性,并到别处寻找答案。我们需要给它一扇通往外部世界的窗户。

通过网络增强

在这一步,我们将用一个新工具来增强我们智能体的能力:网络搜索 (Web Search)。这将我们的系统从一个特定于文档的问答机器人转变为一个真正的、多源的研究助理。

为此,我们将使用 Tavily 搜索 API。这是一个专门为 LLM 构建的搜索引擎,提供干净、无广告、相关的搜索结果,非常适合 RAG 流水线。它还与 LangChain 无缝集成。

所以,基本上,我们需要做的第一件事是初始化 Tavily 搜索工具本身。

from langchain_community.tools.tavily_search import TavilySearchResults# 初始化 Tavily 搜索工具。# k=3: 这个参数指示工具为给定的查询返回前 3 个最相关的搜索结果。web_search_tool = TavilySearchResults(k=3)

我们基本上是创建了一个 Tavily 搜索工具的实例,我们的智能体可以调用它。k=3 参数是一个很好的起点,它提供了一些高质量的来源,而不会用太多信息压垮智能体。

现在,一个原始的 API 响应并不完全是我们所需要的。我们的下游组件,即重排序器和蒸馏器,都是设计用来处理一种特定数据结构的:一个 LangChain Document 对象列表。为确保无缝集成,我们需要创建一个简单的包装函数。这个函数将接收一个查询,调用 Tavily 工具,然后将原始结果格式化为标准的 Document 结构。

def web_search_function(query: str) -> List[Document]:    # 使用提供的查询调用 Tavily 搜索工具。    results = web_search_tool.invoke({"query": query})        # 将结果格式化为 LangChain Document 对象的列表。    # 我们使用列表推导式以实现简洁易读的实现。    return [        Document(            # 搜索结果的主要内容放入 'page_content'。            page_content=res["content"],            # 我们将源 URL 存储在 'metadata' 字典中以便引用。            metadata={"source": res["url"]}        ) for res in results    ]

这个 web_search_function 扮演了一个至关重要的适配器角色。它调用 web_search_tool.invoke,该函数返回一个字典列表,每个字典包含像 "content""url" 这样的键。

    1. 列表推导式随后遍历这些结果,并将它们整齐地重新包装成我们流水线所期望的 Document 对象。
    1. page_content 获取主要文本,并且重要的是,我们将 url 存储在 metadata 中。
    1. 这确保了当我们的智能体生成最终答案时,它可以正确地引用其网络来源。

这使得我们的外部知识源看起来和感觉上都与我们的内部知识源完全一样,允许我们对两者使用相同的处理流水线。

函数准备就绪后,让我们快速测试一下,确保它按预期工作。我们将使用一个与我们主要挑战的第二部分相关的查询。

# 使用关于 AMD 2024 年战略的查询测试网络搜索功能print("\n--- Testing Web Search Tool ---")test_query_web = "AMD AI chip strategy 2024"test_results_web = web_search_function(test_query_web)print(f"Found {len(test_results_web)} results for query: '{test_query_web}'")# 打印第一个结果的片段,看看我们得到了什么if test_results_web:    print(f"Top result snippet: {test_results_web[0].page_content[:250]}...")
``````plaintext
#### OUTPUT ####Web search tool (Tavily) initialized.--- Testing Web Search Tool ---Found 3 results for query: 'AMD AI chip strategy 2024'Top result snippet: AMD has intensified its battle with Nvidia in the AI chip market with the release of the Instinct MI300X accelerator, a powerful GPU designed to challenge Nvidia's H100 in training and inference for large language models. Major cloud providers like Microsoft Azure and Oracle Cloud are adopting the MI300X, indicating strong market interest...

输出确认了我们的工具工作正常。它为我们的查询找到了 3 个相关的网页。顶部结果的片段正是我们的智能体所缺少的最新、外部信息。

它提到了 AMD 的“Instinct MI300X”及其与 NVIDIA“H100”的竞争——这正是解决我们问题后半部分所需的证据。

我们的智能体现在有了一扇通往外部世界的窗户,并且它的规划器可以智能地决定何时通过它向外看。最后一块拼图是赋予智能体反思其发现并决定何时完成研究的能力。

自我批判与控制流策略

到目前
为止,我们已经构建了一个强大的研究机器。我们的智能体可以创建计划、选择正确的工具,并执行一个复杂的检索漏斗。但还缺少一个关键部分:思考自身进展的能力。一个盲目地、一步一步遵循计划的智能体并非真正的智能。它需要一个自我批判的机制。

自我批判与策略制定

在这里,我们将构建我们智能体自主性的认知核心。在每个研究步骤之后,我们的智能体将暂停并反思。它会审视刚刚找到的新信息,将其与已知信息进行比较,然后做出战略决策:我的研究完成了吗,还是我需要继续?

这个自我批判循环将我们的系统从一个脚本化的工作流提升为一个自主的智能体。正是这个机制让它能够决定何时收集了足够的证据来自信地回答用户的问题。

我们将使用两个新的专门智能体来实现这一点:

    1. 反思智能体 (The Reflection Agent): 这个智能体将接收一个已完成步骤中蒸馏出的上下文,并创建一个简洁的、一句话的摘要。这个摘要随后被添加到我们智能体的“研究历史”中。
    1. 策略智能体 (The Policy Agent): 这是总策略师。在反思之后,它将检查整个研究历史与原始计划的关系,并做出一个关键决策:CONTINUE_PLANFINISH

更新和反思累积的研究历史

在我们的智能体完成一个研究步骤(例如,检索并蒸馏关于 NVIDIA 风险的信息)之后,我们不希望只是简单地继续。我们需要将这些新知识整合到智能体的记忆中。

反思性累积

我们将构建一个反思智能体(Reflection Agent),其唯一的工作就是执行这种整合。它将接收当前步骤中丰富的、蒸馏过的上下文,并将其总结成一个单一的、事实性的句子。然后,这个摘要会被添加到我们 RAGStatepast_steps 列表中。

首先,让我们为这个智能体创建提示。

# 我们反思智能体的提示,指示它要简洁和事实。reflection_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a research assistant. Based on the retrieved context for the current sub-question, write a concise, one-sentence summary of the key findings.This summary will be added to our research history. Be factual and to the point."""),    ("human", "Current sub-question: {sub_question}\n\nDistilled context:\n{context}")])

我们告诉这个智能体要像一个勤奋的研究助理。它的任务不是要有创造性,而是要做一个好的笔记记录者。它读取 context 并写下 summary。现在我们可以组装智能体本身。

# 通过将我们的提示管道连接到推理 LLM 和一个字符串输出解析器来创建智能体reflection_agent = reflection_prompt | reasoning_llm | StrOutputParser()print("Reflection Agent created.")

这个 reflection_agent 是我们认知循环的一部分。通过创建这些简洁的摘要,它建立了一个清晰、易读的研究历史。这段历史将成为我们下一个,也是最重要的智能体的输入:那个决定何时停止的智能体。

构建用于控制流的策略智能体

这是我们智能体自主性的大脑。在 reflection_agent 更新了研究历史之后,**策略智能体(Policy Agent)**开始发挥作用。它充当整个操作的监督者。

它的工作是审视智能体所知道的一切——原始问题、初始计划,以及已完成步骤的完整摘要历史——并做出一个高层次的战略决策。

策略智能体

我们将首先使用一个 Pydantic 模型来定义其决策的结构。

class Decision(BaseModel):    # 决策必须是这两个动作之一。    next_action: Literal["CONTINUE_PLAN", "FINISH"]    # 智能体必须为其决策提供理由。    justification: str

这个 Decision 类强制我们的策略智能体做出一个清晰的、二元的选择,并解释其推理过程。这使其行为透明且易于调试。

接下来,我们设计将指导其决策过程的提示。

# 我们策略智能体的提示,指示它扮演总策略师的角色policy_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a master strategist. Your role is to analyze the research progress and decide the next action.You have the original question, the initial plan, and a log of completed steps with their summaries.- If the collected information in the Research History is sufficient to comprehensively answer the Original Question, decide to FINISH.- Otherwise, if the plan is not yet complete, decide to CONTINUE_PLAN."""),    ("human", "Original Question: {question}\n\nInitial Plan:\n{plan}\n\nResearch History (Completed Steps):\n{history}")])

我们基本上是要求 LLM 进行一次元分析。它不是在回答问题本身;它是在推理研究过程的状态。它将它所拥有的(history)与它所需要的(planquestion)进行比较,并做出判断。

现在,我们可以组装 policy_agent

# 通过将我们的提示管道连接到推理 LLM 并使用我们的 Decision 类来结构化其输出来创建智能体policy_agent = policy_prompt | reasoning_llm.with_structured_output(Decision)print("Policy Agent created.")# 现在,让我们用我们研究过程的两种不同状态来测试策略智能体print("\n--- Testing Policy Agent (Incomplete State) ---")# 首先,一个只完成了步骤 1 的状态。plan_str = json.dumps([s.dict() for s in test_plan.steps])incomplete_history = "Step 1 Summary: NVIDIA's 10-K states that the semiconductor industry is intensely competitive and subject to rapid technological change."decision1 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": incomplete_history})print(f"Decision: {decision1.next_action}, Justification: {decision1.justification}")print("\n--- Testing Policy Agent (Complete State) ---")# 其次,一个步骤 1 和步骤 2 都已完成的状态。complete_history = incomplete_history + "\nStep 2 Summary: In 2024, AMD launched its MI300X accelerator to directly compete with NVIDIA in the AI chip market, gaining adoption from major cloud providers."decision2 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": complete_history})print(f"Decision: {decision2.next_action}, Justification: {decision2.justification}")

为了正确测试我们的 policy_agent,我们模拟了我们智能体生命周期中的两个不同时刻。在第一个测试中,我们提供给它一个只包含步骤 1 摘要的历史。在第二个测试中,我们提供给它步骤 1 和步骤 2 的摘要。

让我们在每种情况下检查它的决策。

#### OUTPUT ####Policy Agent created.--- Testing Policy Agent (Incomplete State) ---Decision: CONTINUE_PLAN, Justification: The research has only identified NVIDIA's competitive risks from the 10-K. It has not yet gathered the required external information about AMD's 2024 strategy, which is the next step in the plan.--- Testing Policy Agent (Complete State) ---Decision: FINISH, Justification: The research history now contains comprehensive summaries of both NVIDIA's stated competitive risks and AMD's recent AI chip strategy. All necessary information has been gathered to perform the final synthesis and answer the user's question.

让我们理解一下输出……

  • 在不完整的状态下, 智能体正确地认识到它缺少关于 AMD 战略的信息。它查看了它的计划,看到下一步是使用网络搜索,并正确地决定 CONTINUE_PLAN
  • 在完整的状态下, 在被给予了来自网络搜索的摘要后,它再次分析了它的历史。这一次,它认识到它已经拥有了谜题的所有部分——NVIDIA 的风险和 AMD 的战略。它正确地决定它的研究已经完成,是时候 FINISH 了。

有了这个 policy_agent,我们已经构建了我们自主系统的大脑。最后一步是将所有这些组件连接成一个完整、可执行的工作流,使用 LangGraph。

定义图节点

我们已经设计了所有这些酷炫的、专门化的智能体。现在是时候把它们变成我们工作流的实际构建块了。在 LangGraph 中,这些构建块被称为节点(nodes)。一个节点就是一个执行特定工作的 Python 函数。它以智能体的当前记忆(RAGState)作为输入,执行其任务,然后返回一个包含对该记忆的任何更新的字典。

我们将为我们的智能体需要采取的每个主要步骤创建一个节点。

图节点

首先,我们需要一个简单的辅助函数。由于我们的智能体经常需要查看研究历史,我们想要一种简洁的方式将 past_steps 列表格式化为可读的字符串。

# 一个辅助函数,用于为提示格式化研究历史def get_past_context_str(past_steps: List[PastStep]) -> str:    # 这将 PastStep 字典列表连接成一个单一的字符串。    # 每个步骤都清楚地标记出来,以便 LLM 理解上下文。    return "\\n\\n".join([f"Step {s['step_index']}: {s['sub_question']}\\nSummary: {s['summary']}" for s in past_steps])

我们基本上是创建了一个实用工具,它将在我们的几个节点内部使用,为我们的提示提供历史上下文。

现在是我们的第一个真正的节点:plan_node。这是我们智能体推理的起点。它唯一的工作就是调用我们的 planner_agent 并填充我们 RAGState 中的 plan 字段。

# 节点 1: 规划器def plan_node(state: RAGState) -> Dict:    console.print("--- 🧠: Generating Plan ---")    # 我们调用之前创建的 planner_agent,传入用户的原始问题。    plan = planner_agent.invoke({"question": state["original_question"]})    rprint(plan)    # 我们返回一个包含我们 RAGState 更新的字典。    # LangGraph 会自动将这个合并到主状态中。    return {"plan": plan, "current_step_index": 0, "past_steps": []}

这个节点启动了一切。它从状态中获取 original_question,得到 plan,然后将 current_step_index 初始化为 0(从第一步开始),并为这次新的运行清除 past_steps 历史。

接下来,我们需要实际去查找信息的节点。由于我们的规划器可以在两个工具之间选择,我们需要两个独立的检索节点。让我们从用于搜索我们内部 10-K 文档的 retrieval_node 开始。

# 节点 2a: 从 10-K 文档中检索defretrieval_node(state: RAGState) -> Dict:    # 首先,获取计划中当前步骤的详细信息。    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    console.print(f"--- 🔍: Retrieving from 10-K (Step {current_step_index + 1}: {current_step.sub_question}) ---")        # 使用我们的查询重写器来优化用于搜索的子问题。    past_context = get_past_context_str(state['past_steps'])    rewritten_query = query_rewriter_agent.invoke({        "sub_question": current_step.sub_question,        "keywords": current_step.keywords,        "past_context": past_context    })    console.print(f"  Rewritten Query: {rewritten_query}")        # 获取监督器关于哪个检索策略最好的决定。    retrieval_decision = retrieval_supervisor_agent.invoke({"sub_question": rewritten_query})    console.print(f"  Supervisor Decision: Use `{retrieval_decision.strategy}`. Justification: {retrieval_decision.justification}")    # 根据决定,执行正确的检索函数。    if retrieval_decision.strategy == 'vector_search':        retrieved_docs = vector_search_only(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])    elif retrieval_decision.strategy == 'keyword_search':        retrieved_docs = bm25_search_only(rewritten_query, k=config['top_k_retrieval'])    else: # hybrid_search        retrieved_docs = hybrid_search(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])        # 返回检索到的文档以添加到状态中。    return {"retrieved_docs": retrieved_docs}

这个节点做了很多智能的工作。它不仅仅是一个简单的检索器。它协调了一个微型流水线:重写查询,询问监督器最佳策略,然后执行该策略。

现在,我们需要我们另一个工具:网络搜索的相应节点。

# 节点 2b: 从网络检索defweb_search_node(state: RAGState) -> Dict:    # 获取当前步骤的详细信息。    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    console.print(f"--- 🌐: Searching Web (Step {current_step_index + 1}: {current_step.sub_question}) ---")        # 为网络搜索引擎重写子问题。    past_context = get_past_context_str(state['past_steps'])    rewritten_query = query_rewriter_agent.invoke({        "sub_question": current_step.sub_question,        "keywords": current_step.keywords,        "past_context": past_context    })    console.print(f"  Rewritten Query: {rewritten_query}")    # 调用我们的网络搜索函数。    retrieved_docs = web_search_function(rewritten_query)    # 返回结果。    return {"retrieved_docs": retrieved_docs}

这个 web_search_node 更简单,因为它不需要监督器,它只有一种搜索网络的方式。但它仍然使用我们强大的查询重写器来确保搜索尽可能有效。

在我们检索文档(从任一来源)之后,我们需要运行我们的精度和综合漏斗。我们将为每个阶段创建一个节点。首先是 rerank_node

# 节点 3: 重排序器def rerank_node(state: RAGState) -> Dict:    console.print("--- 🎯: Reranking Documents ---")    # 获取当前步骤的详细信息。    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    # 对我们刚刚检索到的文档调用我们的重排序函数。    reranked_docs = rerank_documents_function(current_step.sub_question, state["retrieved_docs"])    console.print(f"  Reranked to top {len(reranked_docs)} documents.")    # 用高精度文档更新状态。    return {"reranked_docs": reranked_docs}

这个节点接收 retrieved_docs(我们广泛召回的 10 份文档),并使用交叉编码器将它们过滤到前 3 份,将结果放在 reranked_docs 中。

接下来,compression_node 将接收那前 3 份文档并进行蒸馏。

# 节点 4: 压缩器 / 蒸馏器def compression_node(state: RAGState) -> Dict:    console.print("--- ✂️: Distilling Context ---")    # 获取当前步骤的详细信息。    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    # 将前 3 份文档格式化为单个字符串。    context = format_docs(state["reranked_docs"])    # 调用我们的蒸馏器智能体将它们综合成一个段落。    synthesized_context = distiller_agent.invoke({"question": current_step.sub_question, "context": context})    console.print(f"  Distilled Context Snippet: {synthesized_context[:200]}...")    # 用最终的、干净的上下文更新状态。    return {"synthesized_context": synthesized_context}

这个节点是我们检索漏斗的最后一步。它接收 reranked_docs 并产生一个单一、干净的 synthesized_context 段落。

现在我们有了证据,我们需要反思它并更新我们的研究历史。这是 reflection_node 的工作。

# 节点 5: 反思 / 更新步骤defreflection_node(state: RAGState) -> Dict:    console.print("--- 🤔: Reflecting on Findings ---")    # 获取当前步骤的详细信息。    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    # 调用我们的反思智能体来总结发现。    summary = reflection_agent.invoke({"sub_question": current_step.sub_question, "context": state['synthesized_context']})    console.print(f"  Summary: {summary}")        # 创建一个包含此步骤所有结果的新 PastStep 字典。    new_past_step = {        "step_index": current_step_index + 1,        "sub_question": current_step.sub_question,        "retrieved_docs": state['reranked_docs'], # 我们保存重排序后的文档以供最终引用        "summary": summary    }    # 将新步骤附加到我们的历史中,并增加步骤索引以移至下一步。    return {"past_steps": state["past_steps"] + [new_past_step], "current_step_index": current_step_index + 1}

这个节点是我们智能体的记账员。它调用 reflection_agent 来创建摘要,然后将当前研究周期的所有结果整齐地打包成一个 new_past_step 对象。然后它将此添加到 past_steps 列表中,并递增 current_step_index,为智能体的下一个循环做好准备。

最后,当研究完成时,我们需要最后一个节点来生成最终答案。

# 节点 6: 最终答案生成器deffinal_answer_node(state: RAGState) -> Dict:    console.print("--- ✅: Generating Final Answer with Citations ---")    # 首先,我们需要收集我们从所有过去步骤中收集到的所有证据。    final_context = ""    for i, step inenumerate(state['past_steps']):        final_context += f"\\n--- Findings from Research Step {i+1} ---\\n"        # 我们包含每个文档的来源元数据(章节或 URL)以启用引用。        for doc in step['retrieved_docs']:            source = doc.metadata.get('section') or doc.metadata.get('source')            final_context += f"Source: {source}\\nContent: {doc.page_content}\\n\\n"        # 我们创建一个专门用于生成最终、可引用答案的新提示。    final_answer_prompt = ChatPromptTemplate.from_messages([        ("system", """You are an expert financial analyst. Synthesize the research findings from internal documents and web searches into a comprehensive, multi-paragraph answer for the user's original question.Your answer must be grounded in the provided context. At the end of any sentence that relies on specific information, you MUST add a citation. For 10-K documents, use [Source: <section title>]. For web results, use [Source: <URL>]."""),        ("human", "Original Question: {question}\n\nResearch History and Context:\n{context}")    ])        # 我们为这个最终任务创建一个临时智能体并调用它。    final_answer_agent = final_answer_prompt | reasoning_llm | StrOutputParser()    final_answer = final_answer_agent.invoke({"question": state['original_question'], "context": final_context})    # 用最终答案更新状态。    return {"final_answer": final_answer}

这个 final_answer_node 是我们的大结局。它将 past_steps 历史中每一步的所有高质量、重排序的文档整合到一个巨大的上下文中。然后,它使用一个专门的提示来指示我们强大的 reasoning_llm 将这些信息综合成一个全面的、多段落的、包含引用的答案,从而成功地结束我们的研究过程。

随着所有节点的定义完成,我们现在拥有了我们智能体的所有构建块。下一步是定义连接它们并控制图流程的“线路”。

定义条件边

我们已经构建了所有的节点。我们有规划器、检索器、重排序器、蒸馏器和反思器。可以把它们想象成一个房间里的一群专家。现在我们需要定义对话的规则。谁在什么时候发言?我们如何决定下一步做什么?

这就是 LangGraph 中边(edges)的工作。简单的边很直接,“在节点 A 之后,总是去节点 B”。但真正的智能来自于条件边(conditional edges)

一个条件边是一个函数,它查看智能体的当前记忆(RAGState)并做出决策,根据情况将工作流路由到不同的路径。

我们的智能体需要两个关键的决策函数:

    1. 一个工具路由器 (route_by_tool): 在计划制定后,这个函数将查看计划的当前步骤,并决定是将工作流发送到 retrieve_10k 节点还是 retrieve_web 节点。
    1. 主控制循环 (should_continue_node): 这是最重要的一个。在每个研究步骤完成并反思之后,这个函数将调用我们的 policy_agent 来决定是继续计划的下一步,还是结束研究并生成最终答案。

首先,让我们构建我们简单的工具路由器。

# 条件边 1: 工具路由器def route_by_tool(state: RAGState) -> str:    # 获取我们当前所处步骤的索引。    current_step_index = state["current_step_index"]    # 从计划中获取当前步骤的完整详情。    current_step = state["plan"].steps[current_step_index]    # 返回为此步骤指定的工具名称。    # LangGraph 将使用这个字符串来决定下一个要去的节点。    return current_step.tool

这个函数非常简单,但至关重要。它就像一个总机。它从状态中读取 current_step_index,在 plan 中找到相应的 Step,并返回其 tool 字段的值(将是 "search_10k""search_web")。当我们连接我们的图时,我们会告诉它使用这个函数的输出来选择下一个节点。

现在我们需要创建一个控制我们智能体主要推理循环的函数。这就是我们的 policy_agent 发挥作用的地方。

# 条件边 2: 主控制循环defshould_continue_node(state: RAGState) -> str:    console.print("--- 🚦: Evaluating Policy ---")    # 获取我们即将开始的步骤的索引。    current_step_index = state["current_step_index"]        # 首先,检查我们的基本停止条件。    # 条件 1: 我们是否已完成计划中的所有步骤?    if current_step_index >= len(state["plan"].steps):        console.print("  -> Plan complete. Finishing.")        return"finish"        # 条件 2: 我们是否已超过我们设定的迭代次数安全限制?    if current_step_index >= config["max_reasoning_iterations"]:        console.print("  -> Max iterations reached. Finishing.")        return"finish"    # 一个特例:如果上一步的检索未能找到任何文档,    # 那么进行反思就没有意义了。最好直接进入下一步。    if state.get("reranked_docs") isnotNoneandnot state["reranked_docs"]:        console.print("  -> Retrieval failed for the last step. Continuing with next step in plan.")        return"continue"    # 如果没有满足任何基本条件,就该询问我们的策略智能体了。    # 我们将历史和计划格式化为字符串以用于提示。    history = get_past_context_str(state['past_steps'])    plan_str = json.dumps([s.dict() for s in state['plan'].steps])    # 调用策略智能体以获取其战略决策。    decision = policy_agent.invoke({"question": state["original_question"], "plan": plan_str, "history": history})    console.print(f"  -> Decision: {decision.next_action} | Justification: {decision.justification}")        # 根据智能体的决策,返回适当的信号。    if decision.next_action == "FINISH":        return"finish"    else: # CONTINUE_PLAN        return "continue"

这个 should_continue_node 函数是我们智能体控制流的认知核心。它在每个 reflection_node 之后运行。

    1. 它首先检查简单的、硬编码的停止标准。计划是否已用完所有步骤?我们是否达到了 max_reasoning_iterations 的安全限制?这些可以防止智能体无限运行。
    1. 如果这些检查通过,它就会调用我们强大的 policy_agent。它为策略智能体提供了所需的所有上下文:原始目标(question)、完整 plan,以及已完成工作的 history
    1. 最后,它接收 policy_agent 的结构化输出(CONTINUE_PLANFINISH),并返回简单的字符串 "continue""finish"。LangGraph 将使用这个字符串来决定是循环回来进行另一个研究周期,还是前进到 final_answer_node

随着我们的节点(专家)和条件边(对话规则)的定义完成,我们已经拥有了所需的一切。

是时候将所有这些部分组装成一个完整、能正常工作的 StateGraph 了。

组装深度思考 RAG 机器

我们已经准备好了所有独立的组件:

    1. 我们的节点 (工作者)
    1. 我们的条件边 (管理者)。

现在是时候将它们全部连接成一个单一、内聚的系统了。

我们将使用 LangGraph 的 StateGraph 来定义我们智能体的完整认知架构。在这里,我们将勾勒出我们智能体思考过程的蓝图,精确定义信息如何从一个步骤流向下个步骤。

我们需要做的第一件事是创建一个 StateGraph 的实例。我们将告诉它,它将传递的“状态”是我们的 RAGState 字典。

from langgraph.graph import StateGraph, END # 导入主要的图组件# 实例化图,告诉它使用我们的 RAGState TypedDict 作为其状态模式。graph = StateGraph(RAGState)

我们现在有了一个空的图。下一步是添加我们之前定义的所有节点。.add_node() 方法接受两个参数:节点的唯一字符串名称,以及该节点将执行的 Python 函数。

# 将我们所有的 Python 函数添加为图中的节点graph.add_node("plan", plan_node)                     # 创建初始计划的节点graph.add_node("retrieve_10k", retrieval_node)        # 内部文档检索的节点graph.add_node("retrieve_web", web_search_node)       # 外部网络搜索的节点graph.add_node("rerank", rerank_node)                 # 执行精确重排序的节点graph.add_node("compress", compression_node)          # 蒸馏上下文的节点graph.add_node("reflect", reflection_node)            # 总结发现并更新历史的节点graph.add_node("generate_final_answer", final_answer_node) # 综合最终答案的节点

现在我们所有的专家都已就位。最后也是最关键的一步是定义连接它们的“线路”。在这里,我们使用 .add_edge().add_conditional_edges() 方法来定义控制流。

# 我们图的入口点是 "plan" 节点。每次运行都从这里开始。graph.set_entry_point("plan")# 在 "plan" 节点之后,我们使用我们的第一个条件边来决定使用哪个工具。graph.add_conditional_edges(    "plan",           # 源节点    route_by_tool,    # 做出决策的函数    {                 # 一个将函数输出字符串映射到目标节点的字典        "search_10k": "retrieve_10k",        "search_web": "retrieve_web",    },)# 从 10-K 或网络检索之后,流程在一段时间内是线性的。graph.add_edge("retrieve_10k", "rerank") # 内部检索后,总是转到重排序。graph.add_edge("retrieve_web", "rerank") # 网络检索后,也总是转到重排序。graph.add_edge("rerank", "compress")      # 重排序后,总是转到压缩。graph.add_edge("compress", "reflect")     # 压缩后,总是转到反思。# 在 "reflect" 节点之后,我们遇到我们的主条件边,它控制着推理循环。graph.add_conditional_edges(    "reflect",        # 源节点    should_continue_node, # 调用我们策略智能体的函数    {                 # 一个将决策映射到下一步的字典        "continue": "plan", # 如果决策是 "continue",我们循环回到 "plan" 节点以路由下一步。        "finish": "generate_final_answer", # 如果决策是 "finish",我们继续生成最终答案。    },)# "generate_final_answer" 节点是结束前的最后一步。graph.add_edge("generate_final_answer", END) # 生成答案后,图结束。print("StateGraph constructed successfully.")

这是我们智能体大脑的蓝图。让我们追踪一下流程:

    1. 它总是从 plan 开始。
    1. 然后,route_by_tool 条件边作为一个开关,将流程导向 retrieve_10kretrieve_web
    1. 无论哪个检索器运行,输出总是通过 rerank -> compress -> reflect 流水线进行处理。
    1. 这就到了最重要的部分:should_continue_node 条件边。这是我们循环推理的核心。
  • • 如果策略智能体说 CONTINUE_PLAN,边会将工作流一直送回 plan 节点。我们回到 plan(而不是直接到下一个检索器),这样 route_by_tool 就可以正确地路由计划中的下一个步骤。
  • • 如果策略智能体说 FINISH,边会打破循环,并将工作流发送到 generate_final_answer 节点。
  • • 最后,在生成答案之后,图在 END 处终止。

我们已经成功地定义了我们深度思考智能体的完整、复杂和循环的架构。唯一剩下要做的就是将这个蓝图编译成一个可运行的应用程序,并将其可视化,看看我们构建了什么。

编译和可视化迭代工作流

在我们的图完全连接好之后,组装过程的最后一步是编译它。.compile() 方法将我们对节点和边的抽象定义转化为一个具体的、可执行的应用程序。

然后,我们可以使用一个内置的 LangGraph 实用工具来生成我们图的图表。可视化工作流对于理解和调试复杂的智能体系统非常有帮助。它将我们的代码转化为一个直观的流程图,清晰地展示了智能体可能的推理路径。

所以,基本上,我们正在将我们的蓝图变成一台真正的机器。

# .compile() 方法接收我们的图定义并创建一个可运行的对象。deep_thinking_rag_graph = graph.compile()print("Graph compiled successfully.")# 现在,让我们可视化我们构建的架构。try:    from IPython.display import Image, display    # 我们可以得到图结构的 PNG 图像。    png_image = deep_thinking_rag_graph.get_graph().draw_png()    # 在我们的 notebook 中显示图像。    display(Image(png_image))except Exception as e:    # 如果 pygraphviz 及其系统依赖项未安装,这可能会失败。    print(f"Graph visualization failed: {e}. Please ensure pygraphviz is installed.")

deep_thinking_rag_graph 对象现在是我们功能齐全的智能体。然后,可视化代码调用 .get_graph().draw_png() 来生成我们构建的状态机的视觉表示。

深度思考简化流水线流程

我们可以清楚地看到:

  • • 初始的分支逻辑,其中 route_by_toolretrieve_10kretrieve_web 之间选择。
  • • 每个研究步骤的线性处理流水线 (rerank -> compress -> reflect)。
  • • 关键的反馈循环,其中 should_continue 边将工作流送回 plan 节点以开始下一个研究周期。
  • • 一旦研究完成,通往 generate_final_answer 的最终“出口匝道”。

这是一个能够思考的系统的架构。现在,让我们来测试一下它。

运行深度思考流水线

我们已经设计了一个推理引擎。现在是时候看看它是否能成功地完成我们基线系统惨败的任务了。

我们将使用完全相同的多跳、多源挑战性查询来调用我们编译好的 deep_thinking_rag_graph。我们将使用 .stream() 方法来获取智能体执行的实时、分步追踪,观察它在解决问题时的“思考过程”。

本节的计划如下:

  • 调用图: 我们将运行我们的智能体,观察它执行计划,在工具之间切换,并建立其研究历史。
  • 分析最终输出: 我们将检查最终的、综合的答案,看看它是否成功地整合了来自 10-K 文件和网络的信息。
  • 比较结果: 我们将进行最终的并排比较,以明确地突出我们深度思考智能体的架构优势。

我们将设置我们的初始输入,这只是一个包含 original_question 的字典,然后调用 .stream() 方法。stream 方法对于调试和观察非常棒,因为它在每个节点完成工作后都会产生图的状态。

# 这将保存运行完成后图的最终状态。final_state = None# 我们图的初始输入,包含原始用户查询。graph_input = {"original_question": complex_query_adv}print("--- Invoking Deep Thinking RAG Graph ---")# 我们使用 .stream() 来实时观察智能体的过程。# "values" 模式意味着我们在每一步之后都会得到完整的 RAGState 对象。for chunk in deep_thinking_rag_graph.stream(graph_input, stream_mode="values"):    # 流中的最后一个块将是图的最终状态。    final_state = chunkprint("\n--- Graph Stream Finished ---")

这个循环是我们智能体焕发生机的地方。在每次迭代中,LangGraph 执行工作流中的下一个节点,更新 RAGState,并将新状态提供给我们。我们嵌入在节点内部的 rich 库的 console.print 语句将为我们提供智能体行动和决策的实时解说。

#### OUTPUT ####--- Invoking Deep Thinking RAG Graph ------ 🧠: Generating Plan ---plan:  steps:  - sub_question: What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?    tool: search_10k    ...  - sub_question: What are the recent news and developments in AMD's AI chip strategy in2024?    tool: search_web    ...--- 🔍: Retrieving from10-K (Step 1: ...) ---  Rewritten Query: key competitive risks for NVIDIA in the semiconductor industry...  Supervisor Decision: Use `hybrid_search`. ...--- 🎯: Reranking Documents ---  Reranked to top 3 documents.--- ✂️: Distilling Context ---  Distilled Context Snippet: NVIDIA operates in the intensely competitive semiconductor industry...--- 🤔: Reflecting on Findings ---  Summary: According to its 202310-K, NVIDIA operates in an intensely competitive semiconductor industry...--- 🚦: Evaluating Policy ---  -> Decision: CONTINUE_PLAN | Justification: The first step...has been completed. The next step...is still pending...--- 🌐: Searching Web (Step 2: ...) ---  Rewritten Query: AMD AI chip strategy news and developments 2024...--- 🎯: Reranking Documents ---  Reranked to top 3 documents.--- ✂️: Distilling Context ---  Distilled Context Snippet: AMD has ramped up its challenge to Nvidia in the AI accelerator market with its Instinct MI300 series...--- 🤔: Reflecting on Findings ---  Summary: In 2024, AMD is aggressively competing with NVIDIA in the AI chip market through its Instinct MI300X accelerator...--- 🚦: Evaluating Policy ---  -> Decision: FINISH | Justification: The research history now contains comprehensive summaries of both NVIDIA's stated risks and AMD's recent strategy...--- ✅: Generating Final Answer with Citations ------ Graph Stream Finished ---

你可以看到我们设计的执行过程。智能体:

    1. 规划: 它创建了正确的两步、多工具计划。
    1. 执行步骤 1: 它使用了 search_10k,通过完整的检索漏斗运行它,并对发现进行反思。
    1. 自我批判: 策略智能体看到计划尚未完成,并决定 CONTINUE_PLAN
    1. 执行步骤 2: 它正确地切换到 search_web 工具,通过相同的漏斗运行它,并再次反思。
    1. 再次自我批判: 这次,策略智能体看到所有必要的信息都已收集,并正确地决定 FINISH
    1. 综合: 工作流随后进入 generate_final_answer 节点。

智能体已成功地处理了复杂的查询。现在,让我们检查它产生的最终答案。

分析最终的高质量答案

智能体已经完成了它的研究。final_state 变量现在持有完整的 RAGState,包括 final_answer。让我们打印出来,看看它是否成功地将来自两个来源的信息综合成一个单一的、分析性的响应,并附带引用。

console.print("--- DEEP THINKING RAG FINAL ANSWER ---")console.print(Markdown(final_state['final_answer']))
``````plaintext
#### OUTPUT ####--- DEEP THINKING RAG FINAL ANSWER ---Based on an analysis of NVIDIA's 2023 10-K filing and recent news from 2024 regarding AMD's AI chip strategy, the following synthesis can be made:**NVIDIA's Stated Competitive Risks:**In its 2023 10-K filing, NVIDIA identifies its operating environment as the "intensely competitive" semiconductor industry, which is characterized by rapid technological change. A primary risk is that competitors, including AMD, could introduce new products with better performance or lower costs that gain significant market acceptance, which could materially and adversely affect its business [Source: Item 1A. Risk Factors.].**AMD's 2024 AI Chip Strategy:**In 2024, AMD has moved aggressively to challenge NVIDIA's dominance in the AI hardware market with its Instinct MI300 series of accelerators, particularly the MI300X. This product is designed to compete directly with NVIDIA's H100 GPU. AMD's strategy has gained significant traction, with major cloud providers such as Microsoft Azure and Oracle announcing plans to use the new chips [Source: https://www.reuters.com/technology/amd-forecasts-35-billion-ai-chip-revenue-2024-2024-01-30/].**Synthesis and Impact:**AMD's 2024 AI chip strategy directly exacerbates the competitive risks outlined in NVIDIA's 10-K. The successful launch and adoption of the MI300X is a materialization of the specific risk that a competitor could introduce a product with comparable performance. The adoption of AMD's chips by major cloud providers signifies a direct challenge to NVIDIA's market share in the lucrative data center segment, validating NVIDIA's stated concerns about rapid technological change [Source: Item 1A. Risk Factors. and https://www.cnbc.com/2023/12/06/amd-launches-new-mi300x-ai-chip-to-compete-with-nvidias-h100.html].

这是一个完全的成功。答案是一份深入的分析报告。

  • • 它正确地总结了 10-K 文件中的风险。
  • • 它正确地总结了来自网络搜索的关于 AMD 的新闻。
  • • 至关重要的是,在“综合与影响”部分,它执行了原始查询所要求的多跳推理,解释了后者如何加剧前者。
  • • 最后,它提供了正确的出处,引用指向了内部文档章节和外部网址。

并排比较

让我们将两个结果并排放在一起,以使差异一目了然。

特性 浅层 RAG(失败) 深度思考 RAG(成功)
规划 无。将查询作为单次搜索处理。 将查询分解为两个逻辑步骤:(1) 从 10-K 文件中识别风险,(2) 搜索 AMD 2024 年战略的网络新闻。
工具使用 仅限于内部文档。 智能地为第一步使用 search_10k 工具,为第二步使用 search_web 工具。
检索 单一的、通用的向量搜索。检索到关于“竞争”的一般性但无关紧要的文本块。 自适应的。使用混合搜索来精确定位 10-K 文件中的风险,然后使用网络搜索来获取最新的外部新闻。
控制流 线性的,一次性的。 循环的,迭代的。在第一步之后,它自我批判,认识到计划尚未完成,并继续进行第二步。
最终答案 承认失败。指出它没有关于 AMD 2024 年战略的信息。 综合了两个来源的信息,解释了 AMD 的 MI300X 如何直接加剧了 NVIDIA 在其 10-K 文件中陈述的竞争风险。提供了两个来源的引用。

这个比较提供了明确的结论。向循环的、工具感知的、自我批判的智能体架构的转变,在处理复杂的、真实世界的查询时,带来了显著且可衡量的性能提升。

评估框架与结果分析

我们已经看到了我们的高级智能体在一个非常困难的查询上取得了轶事般的成功。但在生产环境中,我们需要的不仅仅是一个单一的成功故事。我们需要客观、量化和自动化的验证。

评估框架

为了实现这一点,我们现在将使用 RAGAs (RAG 评估, RAG Assessment) 库构建一个严格的评估框架。我们将专注于 RAGAs 提供的四个关键指标:

  • 上下文精确率与召回率 (Context Precision & Recall): 这些指标衡量我们检索流水线的质量。精确率问:“在我们检索到的文档中,有多少是真正相关的?”(信号 vs. 噪音)。召回率问:“在所有存在的相关文档中,我们实际找到了多少?”(完整性)。
  • 答案忠实度 (Answer Faithfulness): 这衡量生成的答案是否基于所提供的上下文,作为我们对抗 LLM 幻觉的主要检查。
  • 答案正确性 (Answer Correctness): 这是质量的最终衡量标准。它将生成的答案与一个手动制作的“基准真相”答案进行比较,以评估其事实的准确性和完整性。

所以,基本上,要运行 RAGAs 评估,我们需要准备一个数据集。这个数据集将包含我们的挑战性查询、我们的基线和高级流水线生成的答案、它们各自使用的上下文,以及一个我们自己编写的作为理想响应的“基准真相”答案。

from datasets import Dataset # 来自 Hugging Face datasets 库,RAGAs 使用它from ragas import evaluatefrom ragas.metrics import (    context_precision,    context_recall,    faithfulness,    answer_correctness,)import pandas as pdprint("Preparing evaluation dataset...")# 这是我们手动制作的,对复杂查询的理想答案。ground_truth_answer_adv = "NVIDIA's 2023 10-K lists intense competition and rapid technological change as key risks. This risk is exacerbated by AMD's 2024 strategy, specifically the launch of the MI300X AI accelerator, which directly competes with NVIDIA's H100 and has been adopted by major cloud providers, threatening NVIDIA's market share in the data center segment."# 我们需要为基线模型重新运行检索器,以获取其用于评估的上下文。retrieved_docs_for_baseline_adv = baseline_retriever.invoke(complex_query_adv)baseline_contexts = [[doc.page_content for doc in retrieved_docs_for_baseline_adv]]# 对于高级智能体,我们将整合它在所有研究步骤中检索到的所有文档。advanced_contexts_flat = []for step in final_state['past_steps']:    advanced_contexts_flat.extend([doc.page_content for doc in step['retrieved_docs']])# 我们使用一个集合来移除任何重复的文档,以进行更清晰的评估。advanced_contexts = [list(set(advanced_contexts_flat))]# 现在,我们构建将转变为我们评估数据集的字典。eval_data = {    'question': [complex_query_adv, complex_query_adv], # 两个系统使用相同的问题    'answer': [baseline_result, final_state['final_answer']], # 每个系统给出的答案    'contexts': baseline_contexts + advanced_contexts, # 每个系统使用的上下文    'ground_truth': [ground_truth_answer_adv, ground_truth_answer_adv] # 理想答案}# 创建 Hugging Face Dataset 对象。eval_dataset = Dataset.from_dict(eval_data)# 定义我们想要计算的指标列表。metrics = [    context_precision,    context_recall,    faithfulness,    answer_correctness,]print("Running RAGAs evaluation...")# 运行评估。RAGAs 将调用一个 LLM 来为每个指标进行评分。result = evaluate(eval_dataset, metrics=metrics, is_async=False)print("Evaluation complete.")# 将结果格式化为一个干净的 pandas DataFrame,以便于比较。results_df = result.to_pandas()results_df.index = ['baseline_rag', 'deep_thinking_rag']print("\n--- RAGAs Evaluation Results ---")print(results_df[['context_precision', 'context_recall', 'faithfulness', 'answer_correctness']].T)

我们正在建立一个正式的实验。我们为我们单一的、困难的查询收集所有必要的工件:问题、两种不同的答案、两组不同的上下文,以及我们的理想基准真相。然后,我们将这个整齐打包的 eval_dataset 喂给 ragas.evaluate 函数。

在幕后,RAGAs 会进行一系列 LLM 调用,要求它扮演一个裁判的角色。例如,对于 faithfulness,它会问:“这个答案是否完全由这个上下文支持?”对于 answer_correctness,它会问……

这个答案与这个基准真相答案在事实上有多相似?

我们可以看看数值分数……

#### OUTPUT ####Preparing evaluation dataset...Running RAGAs evaluation...Evaluation complete.--- RAGAs Evaluation Results ---                     baseline_rag  deep_thinking_ragcontext_precision        0.500000           0.890000context_recall           0.333333           1.000000faithfulness             1.000000           1.000000answer_correctness       0.395112           0.991458

量化结果为深度思考架构的优越性提供了明确而客观的评判。

  • 上下文精确率 (0.50 vs 0.89): 基线模型的上下文只有一半是相关的,因为它只能检索到关于竞争的一般信息。高级智能体的多步骤、多工具检索几乎达到了完美的精确率分数。
  • 上下文召回率 (0.33 vs 1.00): 基线检索器完全错过了来自网络的关键信息,导致召回率非常低。高级智能体的规划和工具使用确保了所有必要的信息都被找到,实现了完美的召回率。
  • 忠实度 (1.00 vs 1.00): 两个系统都非常忠实。基线模型正确地陈述了它没有信息,而高级智能体正确地使用了它找到的信息。这对两者来说都是一个好迹象,但没有正确性的忠实度是无用的。
  • 答案正确性 (0.40 vs 0.99): 这是质量的最终衡量标准。基线模型的答案正确率不到 40%,因为它缺少了所需分析的整个后半部分。高级智能体的答案几乎是完美的。

总结我们的整个流水线

在本指南中,我们完成了一次完整的架构升级,从一个简单、脆弱的 RAG 流水线演进为一个复杂的自主推理智能体。

  • • 我们从构建一个原生 RAG 系统开始,并展示了它在一个复杂、多源查询上的可预见失败。
  • • 然后,我们系统地设计了一个深度思考智能体,赋予它规划、使用多种工具和调整其检索策略的能力。
  • • 我们构建了一个多阶段检索漏斗,它从广泛召回(使用混合搜索)到高精度(使用交叉编码器重排序器),最终到综合(使用蒸馏智能体)。
  • • 我们使用 LangGraph 来协调整个认知架构,创建了一个循环的、有状态的工作流,从而实现了真正的多步推理。
  • • 我们实现了一个自我批判循环,让智能体能够识别失败、修正自己的计划,并在找不到答案时优雅地退出。
  • • 最后,我们通过生产级的评估验证了我们的成功,使用 RAGAs 提供了客观、量化的证据,证明了高级智能体的优越性。

使用马尔可夫决策过程(MDP)学习策略

我们的智能体中的策略智能体(Policy Agent)目前依赖于像 GPT-4o 这样昂贵的、通用的 LLM 来做每一个 CONTINUEFINISH 的决策。虽然有效,但这在生产环境中可能会很慢且成本高昂。学术前沿提供了一条更优化的前进道路。

  • 将 RAG 视为决策过程: 我们可以将我们智能体的推理循环框架化为一个马尔可夫决策过程(Markov Decision Process, MDP)。在这个模型中,每个 RAGState 是一个“状态”,每个动作(CONTINUEREVISEFINISH)都会导向一个带有特定奖励(例如,找到正确答案)的新状态。
  • 从经验中学习: 我们在 LangSmith 中记录的成千上万个成功和失败的推理轨迹是宝贵的训练数据。每个轨迹都是智能体在这个 MDP 中导航的一个例子。
  • 训练一个策略模型: 利用这些数据,我们可以应用强化学习(Reinforcement Learning)来训练一个更小、更专业的策略模型
  • 目标:速度和效率: 目标是将像 GPT-4o 这样的复杂模型的推理能力蒸馏到一个紧凑的、微调过的模型中(例如,一个 7B 参数的模型)。这个学习到的策略可以更快、更便宜地做出 CONTINUE/FINISH 的决策,同时针对我们的特定领域进行了高度优化。这是像 DeepRAG 这样的前沿研究论文背后的核心思想,并代表了自主 RAG 系统优化的下一个层次。

​最后

我在一线科技企业深耕十二载,见证过太多因技术卡位而跃迁的案例。那些率先拥抱 AI 的同事,早已在效率与薪资上形成代际优势,我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在大模型的学习中的很多困惑。

我整理出这套 AI 大模型突围资料包:

  • ✅AI大模型学习路线图
  • ✅Agent行业报告
  • ✅100集大模型视频教程
  • ✅大模型书籍PDF
  • ✅DeepSeek教程
  • ✅AI产品经理入门资料

完整的大模型学习和面试资料已经上传带到CSDN的官方了,有需要的朋友可以扫描下方二维码免费领取【保证100%免费】👇👇
​​
在这里插入图片描述

为什么说现在普通人就业/升职加薪的首选是AI大模型?

人工智能技术的爆发式增长,正以不可逆转之势重塑就业市场版图。从DeepSeek等国产大模型引发的科技圈热议,到全国两会关于AI产业发展的政策聚焦,再到招聘会上排起的长队,AI的热度已从技术领域渗透到就业市场的每一个角落。

img
智联招聘的最新数据给出了最直观的印证:2025年2月,AI领域求职人数同比增幅突破200% ,远超其他行业平均水平;整个人工智能行业的求职增速达到33.4%,位居各行业榜首,其中人工智能工程师岗位的求职热度更是飙升69.6%。

AI产业的快速扩张,也让人才供需矛盾愈发突出。麦肯锡报告明确预测,到2030年中国AI专业人才需求将达600万人,人才缺口可能高达400万人,这一缺口不仅存在于核心技术领域,更蔓延至产业应用的各个环节。

在这里插入图片描述

​​
在这里插入图片描述

资料包有什么?

①从入门到精通的全套视频教程⑤⑥

包含提示词工程、RAG、Agent等技术点
在这里插入图片描述

② AI大模型学习路线图(还有视频解说)

全过程AI大模型学习路线

在这里插入图片描述

③学习电子书籍和技术文档

市面上的大模型书籍确实太多了,这些是我精选出来的

在这里插入图片描述

④各大厂大模型面试题目详解

在这里插入图片描述

⑤ 这些资料真的有用吗?

这份资料由我和鲁为民博士共同整理,鲁为民博士先后获得了北京清华大学学士和美国加州理工学院博士学位,在包括IEEE Transactions等学术期刊和诸多国际会议上发表了超过50篇学术论文、取得了多项美国和中国发明专利,同时还斩获了吴文俊人工智能科学技术奖。目前我正在和鲁博士共同进行人工智能的研究。

所有的视频教程由智泊AI老师录制,且资料与智泊AI共享,相互补充。这份学习大礼包应该算是现在最全面的大模型学习资料了。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。

在这里插入图片描述
在这里插入图片描述

智泊AI始终秉持着“让每个人平等享受到优质教育资源”的育人理念‌,通过动态追踪大模型开发、数据标注伦理等前沿技术趋势‌,构建起"前沿课程+智能实训+精准就业"的高效培养体系。

课堂上不光教理论,还带着学员做了十多个真实项目。学员要亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

​​​​在这里插入图片描述
在这里插入图片描述

如果说你是以下人群中的其中一类,都可以来智泊AI学习人工智能,找到高薪工作,一次小小的“投资”换来的是终身受益!

应届毕业生‌:无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌:非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能 ‌突破瓶颈:传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

👉获取方式:

😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓**

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐