很多 RAG 系统失败,并不是因为 LLM 不够聪明,而是因为它们的架构太简单。它们试图用线性的一次性方式,处理一个本质上循环、多步骤的问题。

许多复杂查询需要推理、反思,以及何时行动的聪明决策,这与我们面对问题时如何检索信息非常相似。这正是 RAG 流水线中引入“agent 驱动行为”的用武之地。下面看看一个典型的深度思考 RAG 流水线长什么样……

Deep Thinking RAG Pipeline (Created by Fareed Khan)

  1. Plan:首先,agent 将复杂用户查询拆解成结构化的多步骤研究计划,并决定每一步使用何种工具(内部文档搜索或 web 搜索)。
  2. Retrieve:对每一步,执行自适应的多阶段检索漏斗,由一个 supervisor 动态选择最佳搜索策略(vector、keyword 或 hybrid)。
  3. Refine:使用高精度 Cross-Encoder 对初始结果进行重排,并由 distiller agent 将最佳证据压缩为简洁的上下文。
  4. Reflect:每一步后,agent 总结当前发现并更新研究历史,逐步构建对问题的累积理解。
  5. Critique:随后,一个 policy agent 检查这段历史,策略性决策是继续下一步、遇到死胡同时修订计划,还是结束。
  6. Synthesize:研究完成后,最终的 agent 将来自所有来源的证据综合为单一、全面且可引用的答案。

在这篇文章中,我们将实现完整的“深度思考 RAG 流水线”,并与基础 RAG 流水线做对比,展示它是如何解决复杂的 multi-hop 查询的。

所有代码与理论都在我的 GitHub 仓库:

GitHub - FareedKhan-dev/deep-thinking-rag: A Deep Thinking RAG Pipeline to Solve Complex Queries

📘 目录

  • 📘 目录
  • 环境配置
  • 知识库来源
  • 理解多源、多跳查询
  • 构建一个会失败的浅层 RAG 流水线
  • 定义中央智能体系统的 RAG 状态
  • 战略规划与查询制定
  • 使用工具感知规划器分解问题
  • 使用查询重写智能体优化检索
  • 通过元数据感知分块提升精度
  • 创建多阶段检索漏斗
  • 使用监督器动态选择策略
  • 利用混合、关键词与语义搜索进行广泛召回
  • 使用交叉编码器重排器实现高精度
  • 通过上下文蒸馏进行综合
  • 使用网络搜索增强知识
  • 自我评估与控制流策略
  • 更新并反映累积研究历史
  • B构建用于控制流的策略智能体
  • 定义图节点
  • 定义条件边
  • 连接深度思考 RAG 机器
  • 编译与可视化迭代工作流
  • 运行深度思考流水线
  • 分析最终高质量答案
  • 并排对比
  • 评估框架与结果分析
  • 总结整个流水线
  • 使用马尔可夫决策过程(MDP)学习策略

环境配置

在开始编写 Deep RAG 流水线前,我们需要打好基础,因为一个生产级 AI 系统不仅仅是最终算法,还包括在搭建时做出的深思熟虑的选择。

我们将要实现的每个步骤,都会直接影响最终系统的有效性和可靠性。

当开始开发流水线并不断试错时,最好把配置定义为一个简单的字典。等流程复杂起来,就可以直接回到这个字典,调整配置并观察对整体性能的影响。

# Central Configuration Dictionary to manage all system parametersconfig = {    "data_dir": "./data",                           # Directory to store raw and cleaned data    "vector_store_dir": "./vector_store",           # Directory to persist our vector store    "llm_provider": "openai",                       # The LLM provider we are using    "reasoning_llm": "gpt-4o",                      # The powerful model for planning and synthesis    "fast_llm": "gpt-4o-mini",                      # A faster, cheaper model for simpler tasks like the baseline RAG    "embedding_model": "text-embedding-3-small",    # The model for creating document embeddings    "reranker_model": "cross-encoder/ms-marco-MiniLM-L-6-v2", # The model for precision reranking    "max_reasoning_iterations": 7,                  # A safeguard to prevent the agent from getting into an infinite loop    "top_k_retrieval": 10,                          # Number of documents for initial broad recall    "top_n_rerank": 3,                              # Number of documents to keep after precision reranking}

这些键大都很好理解,但有三个值得强调:

  • llm_provider:我们使用的 LLM 提供方,这里用的是 OpenAI。之所以选择 OpenAI,是因为在 LangChain 中我们可以很容易地切换模型和提供方;你也可以选择适合自己的,比如 Ollama。
  • reasoning_llm:在整个系统里它必须是最强的,因为要承担规划与综合。
  • fast_llm:用在更简单的任务上(比如 baseline RAG),应更快更省。

接下来导入流水线会用到的库,并把 API keys 设为环境变量,避免在代码中暴露。

import osimport reimport jsonfrom getpass import getpassfrom pprint import pprintimport uuidfrom typing importList, Dict, TypedDict, Literal, Optionaldef_set_env(var: str):    ifnot os.environ.get(var):        os.environ[var] = getpass(f"Enter your {var}: ")_set_env("OPENAI_API_KEY")_set_env("LANGSMITH_API_KEY")_set_env("TAVILY_API_KEY")os.environ["LANGSMITH_TRACING"] = "true"os.environ["LANGSMITH_PROJECT"] = "Advanced-Deep-Thinking-RAG"

我们同时启用了 LangSmith 的 tracing。在一个 agentic 系统里,工作流复杂且循环,tracing 并非可有可无,而是很重要。它帮助你可视化内部过程,更容易调试 agent 的思考路径。

知识库来源

一个生产级 RAG 系统需要既复杂又有挑战性的知识库,才能真正体现其有效性。我们将使用 NVIDIA 的 2023 年 10-K 报告(链接),这是一份超过百页的文件,详述公司的业务运营、财务表现和风险因素披露。

Sourcing the Knowledge Base (Created by Fareed Khan)

首先实现一个自定义函数,直接从 SEC EDGAR 数据库下载 10-K 报告,解析原始 HTML,并转换成干净、结构化的文本,供我们的 RAG 流水线摄取。

import requestsfrom bs4 import BeautifulSoupfrom langchain.docstore.document import Documentdefdownload_and_parse_10k(url, doc_path_raw, doc_path_clean):    if os.path.exists(doc_path_clean):        print(f"Cleaned 10-K file already exists at: {doc_path_clean}")        return    print(f"Downloading 10-K filing from {url}...")    headers = {'User-Agent': 'Mozilla/5.0'}    response = requests.get(url, headers=headers)    response.raise_for_status()    withopen(doc_path_raw, 'w', encoding='utf-8') as f:        f.write(response.text)    print(f"Raw document saved to {doc_path_raw}")    soup = BeautifulSoup(response.content, 'html.parser')    text = ''    for p in soup.find_all(['p', 'div', 'span']):        text += p.get_text(strip=True) + '\n\n'    clean_text = re.sub(r'\n{3,}', '\n\n', text).strip()    clean_text = re.sub(r'\s{2,}', ' ', clean_text).strip()    withopen(doc_path_clean, 'w', encoding='utf-8') as f:        f.write(clean_text)    print(f"Cleaned text content extracted and saved to {doc_path_clean}")

这段代码很直观,使用 beautifulsoup4 解析 HTML 并提取文本,可方便地在 HTML 结构中导航,获取有效信息,忽略脚本或样式等无关元素。

现在执行看看效果。

print("Downloading and parsing NVIDIA's 2023 10-K filing...")download_and_parse_10k(url_10k, doc_path_raw, doc_path_clean)with open(doc_path_clean, 'r', encoding='utf-8') as f:    print("\n--- Sample content from cleaned 10-K ---")    print(f.read(1000) + "...")
``````plaintext
#### OUTPUT ####Downloading and parsing NVIDIA 2023 10-K filing...Successfully downloaded 10-K filing from https://www.sec.gov/Archives/edgar/data/1045810/000104581023000017/nvda-20230129.htmRaw document saved to ./data/nvda_10k_2023_raw.htmlCleaned text content extracted and saved to ./data/nvda_10k_2023_clean.txt# --- Sample content from cleaned 10-K ---Item 1. Business.  OVERVIEW  NVIDIA is the pioneer of accelerated computing. We are a full-stack computing company with a platform strategy that brings together hardware, systems, software, algorithms, libraries, and services to create unique value for the markets we serve. Our work in accelerated computing and AI is reshaping the worlds largest industries and profoundly impacting society.  Founded in 1993, we started as a PC graphics chip company, inventing the graphics processing unit, or GPU. The GPU was essential for the growth of the PC gaming market and has since been repurposed to revolutionize computer graphics, high performance computing, or HPC, and AI.  The programmability of our GPUs made them ...

我们调用函数,把内容存到 txt 文件,作为后续 RAG 的上下文。运行上述代码后会自动下载,并可预览样例。

理解多源、多跳查询

为了测试我们实现的流水线,并与基础 RAG 对比,我们需要一个非常复杂的查询,覆盖我们所用文档的不同方面。

Our Complex Query:"Based on NVIDIA's 2023 10-K filing, identify their key risks related tocompetition. Then, find recent news (post-filing, from 2024) about AMD'sAI chip strategy and explain how this new strategy directly addresses orexacerbates one of NVIDIA's stated risks."

为什么这个查询会难倒标准 RAG 流水线?

  1. Multi-Hop 推理:它不能一步完成。系统必须先识别风险,再找 AMD 的新闻,最后把二者综合起来。
  2. 多源知识:所需信息在完全不同的地方。风险在内部静态文档(10-K)中,而 AMD 的新闻是外部的,需要实时 web 访问。
  3. 综合与分析:不是简单列出事实,而是要解释“后者如何加剧前者”,需要真正的综合推理。

下一节我们实现一个基础 RAG 流水线,看看它是如何失败的。

构建一个会失败的浅层 RAG 流水线

现在环境和挑战性知识库都准备好了,下一步是构建一个标准的“vanilla” RAG 流水线。这很重要……

先从最简单可行的方案开始,然后把复杂查询在其上运行,观察到底是如何以及为何失败。

我们将做如下事情:

Shallow RAG Pipeline (Created by Fareed Khan)

  • 加载并切分文档:读取清洗后的 10-K,并按固定大小切片——这是常见但语义上“天真”的方法。
  • 创建 vector store:对这些切片做 embedding,并用 ChromaDB 建索引,支持基础语义搜索。
  • 组装 RAG Chain:使用 LangChain Expression Language(LCEL)把 retriever、prompt 模板和 LLM 串起来,形成线性流程。
  • 演示关键失败点:用我们的多跳多源查询去执行,分析其不充分的回答。

先加载清洗后的文档并切分。我们用 LangChain 的 RecursiveCharacterTextSplitter

from langchain_community.document_loaders import TextLoaderfrom langchain.text_splitter import RecursiveCharacterTextSplitterprint("Loading and chunking the document...")loader = TextLoader(doc_path_clean, encoding='utf-8')documents = loader.load()text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150)doc_chunks = text_splitter.split_documents(documents)print(f"Document loaded and split into {len(doc_chunks)} chunks.")
``````plaintext
#### OUTPUT ####Loading and chunking the document...Document loaded and split into 378 chunks.

有了 378 个 chunk,下一步是让它们可检索——创建向量并存入数据库。我们用 ChromaDB 作为 vector store,并用 OpenAI 的 text-embedding-3-small 作为 embedding 模型(配置中已定义)。

from langchain_community.vectorstores import Chromafrom langchain_openai import OpenAIEmbeddingsprint("Creating baseline vector store...")embedding_function = OpenAIEmbeddings(model=config['embedding_model'])baseline_vector_store = Chroma.from_documents(    documents=doc_chunks,    embedding=embedding_function)baseline_retriever = baseline_vector_store.as_retriever(search_kwargs={"k": 3})print(f"Vector store created with {baseline_vector_store._collection.count()} embeddings.")
``````plaintext
#### OUTPUT ####Creating baseline vector store...Vector store created with 378 embeddings.

Chroma.from_documents 会组织以上过程,把向量存入可检索的索引。最后用 LCEL 把它们装配成单一可运行的 RAG chain。数据流:用户问题 -> retriever -> prompt -> LLM。

from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom langchain_core.runnable import RunnablePassthroughfrom langchain_core.output_parsers import StrOutputParsertemplate = """You are an AI financial analyst. Answer the question based only on the following context:{context}Question: {question}"""prompt = ChatPromptTemplate.from_template(template)llm = ChatOpenAI(model=config["fast_llm"], temperature=0)defformat_docs(docs):    return"\n\n---\n\n".join(doc.page_content for doc in docs)baseline_rag_chain = (    {"context": baseline_retriever | format_docs, "question": RunnablePassthrough()}    | prompt    | llm    | StrOutputParser())

注意第一步是个字典。context 由子链生成:输入问题 -> baseline_retriever -> format_docs;而 question 则原样透传(RunnablePassthrough)。

运行看看哪里会失败。

from rich.console import Consolefrom rich.markdown import Markdownconsole = Console()complex_query_adv = "Based on NVIDIA's 2023 10-K filing, identify their key risks related to competition. Then, find recent news (post-filing, from 2024) about AMD's AI chip strategy and explain how this new strategy directly addresses or exacerbates one of NVIDIA's stated risks."print("Executing complex query on the baseline RAG chain...")baseline_result = baseline_rag_chain.invoke(complex_query_adv)console.print("\n--- BASELINE RAG FAILED OUTPUT ---")console.print(Markdown(baseline_result))
``````plaintext
#### OUTPUT ####Executing complex query on the baseline RAG chain...--- BASELINE RAG FAILED OUTPUT ---Based on the provided context, NVIDIA operates in an intensely competitive semiconductorindustry and faces competition from companies like AMD. The context mentionsthat the industry is characterized by rapid technological change. However, the provided documents do not contain any specific information about AMD's recent AI chip strategy from 2024 or how it might impact NVIDIA's stated risks.

可以看到三个明显问题:

  • 语境不相关:retriever 抓来一些“泛泛的 NVIDIA/competition/AMD”段落,却没有 2024 年 AMD 策略的具体细节。
  • 信息缺失:2023 年的数据不可能覆盖 2024 年事件,系统没有意识到自己“缺关键信息”。
  • 无规划与工具使用:把复杂问题当成简单问答,不能拆分步骤,也不会用 web 搜索来补齐。

系统失败不是因为 LLM 笨,而是因为架构过于简单。它用线性的一次性流程,试图解决一个循环的多步骤问题。

理解了基础 RAG 的问题后,接下来开始实现深度思考的方法论,看看如何解决复杂查询。

定义中央智能体系统的 RAG 状态

要构建推理 agent,首先需要管理它的“状态”。简单 RAG chain 的每一步都是无状态的,但……

智能的 agent 需要“记忆”。它需要记住最初的问题、它制定的计划、以及迄今为止收集到的证据。

RAG State (Created by Fareed Khan)

RAGState 将作为中央记忆,在我们的 LangGraph 工作流中在各节点之间传递。首先定义数据结构,从最基本的构件开始:研究计划中的单一步骤。

我们希望定义 agent 计划的原子单元。每个 Step 不仅要包含一个待回答的子问题,还要包含其背后的理由,尤其是指定要用的工具。这迫使 agent 的规划过程明确且结构化。

from langchain_core.documents import Documentfrom langchain_core.pydantic_v1 import BaseModel, Fieldclass Step(BaseModel):    sub_question: str = Field(description="A specific, answerable question for this step.")    justification: str = Field(description="A brief explanation of why this step is necessary to answer the main query.")    tool: Literal["search_10k", "search_web"] = Field(description="The tool to use for this step.")    keywords: List[str] = Field(description="A list of critical keywords for searching relevant document sections.")    document_section: Optional[str] = Field(description="A likely document section title (e.g., 'Item 1A. Risk Factors') to search within. Only for 'search_10k' tool.")

Step 类(基于 Pydantic BaseModel)为 Planner Agent 输出提供严格契约。tool: Literal[...] 强制 LLM 明确在内部知识(search_10k)与外部知识(search_web)之间做出选择。

这种结构化输出比解析自然语言计划可靠得多。

定义了单个 Step 后,需要一个容器保存整个步骤序列。创建 Plan 类,它是 Step 对象的列表,代表 agent 端到端的研究策略。

class Plan(BaseModel):    steps: List[Step] = Field(description="A detailed, multi-step plan to answer the user's query.")

Plan 类为整个研究过程提供结构。调用 Planner Agent 时,我们会要求返回符合该 schema 的 JSON 对象。这样在任何检索行动前,agent 的策略都是清晰、按序的,且机器可读。

执行计划时,agent 需要记住自己学到了什么。定义 PastStep 字典,存储每个已完成步骤的结果,构成 agent 的“研究历史”或“实验手记”。

class PastStep(TypedDict):    step_index: int    sub_question: str    retrieved_docs: List[Document]    summary: str

该结构对 agent 的自我批判(self-critique)循环至关重要。每一步后,我们填充这个字典并加入 state。agent 就能通过回顾这份逐步增长的摘要列表,理解自己已知/未知,决定是否已具备完成任务所需的信息。

最后,把这些拼装为主 RAGState 字典。它在整个图中流动,包含原始问题、完整计划、过往步骤历史,以及当前正在执行步骤的中间数据。

class RAGState(TypedDict):    original_question: str    plan: Plan    past_steps: List[PastStep]    current_step_index: int    retrieved_docs: List[Document]    reranked_docs: List[Document]    synthesized_context: str    final_answer: str

RAGState 就是 agent 的“心智”。图中的每个节点接受此字典为输入,并返回更新后的版本。

例如,plan_node 会填充 plan 字段,retrieval_node 会填充 retrieved_docs,以此类推。这个共享、持久的状态使复杂的迭代推理成为可能,这是简单 RAG 链所缺失的。

准备好 agent 的记忆蓝图后,开始构建第一个认知组件:Planner Agent。

战略规划与查询制定

Strategic Planning (Created by Fareed Khan)

本节分为三步工程:

  • Tool-Aware Planner:构建 LLM 驱动的 agent,把用户查询分解为结构化 Plan,并为每步选择工具。
  • Query Rewriter:创建专门 agent,把 planner 的简单子问题改写为高效检索的查询。
  • Metadata-Aware Chunking:对源文档重新处理,增加 section 级 metadata,这是实现高精度过滤检索的关键。

使用工具感知规划器分解问题

Decomposing Step (Created by Fareed Khan)

不能把完整问题扔给数据库,指望运气。要教会 agent 将问题拆成更小、更易处理的部分。

为此,我们创建专门的 Planner Agent,并给出非常清晰的指令(prompt),告诉它该如何工作。

from langchain_core.prompts import ChatPromptTemplatefrom langchain_openai import ChatOpenAIfrom rich.pretty import pprint as rprintplanner_prompt = ChatPromptTemplate.from_messages([    ("system", """You are an expert research planner. Your task is to create a clear, multi-step plan to answer a complex user query by retrieving information from multiple sources.You have two tools available:1. `search_10k`: Use this to search for information within NVIDIA's 2023 10-K financial filing. This is best for historical facts, financial data, and stated company policies or risks from that specific time period.2. `search_web`: Use this to search the public internet for recent news, competitor information, or any topic that is not specific to NVIDIA's 2023 10-K.Decompose the user's query into a series of simple, sequential sub-questions. For each step, decide which tool is more appropriate.For `search_10k` steps, also identify the most likely section of the 10-K (e.g., 'Item 1A. Risk Factors', 'Item 7. Management's Discussion and Analysis...').It is critical to use the exact section titles found in a 10-K filing where possible."""),    ("human", "User Query: {question}")])

这里给 LLM 一个新 persona:expert research planner。明确告知它有两个工具(search_10ksearch_web),以及各自适用场景——这就是“工具感知(tool-aware)”的部分。

我们要求它输出一个能直接映射到系统能力的计划,而不是模糊表述。

接下来初始化 reasoning 模型,并与 prompt 串接。关键是告诉 LLM 最终输出必须符合 Pydantic 的 Plan 类格式,确保结构化、可预测。

reasoning_llm = ChatOpenAI(model=config["reasoning_llm"], temperature=0)planner_agent = planner_prompt | reasoning_llm.with_structured_output(Plan)print("Tool-Aware Planner Agent created successfully.")print("\n--- Testing Planner Agent ---")test_plan = planner_agent.invoke({"question": complex_query_adv})rprint(test_plan)

我们把 planner_prompt 通过 reasoning_llm,再用 .with_structured_output(Plan),让 LangChain 用函数调用能力,返回完全匹配 Plan schema 的 JSON 对象,比解析纯文本可靠得多。

测试输出如下:

#### OUTPUT ####Tool-Aware Planner Agent created successfully.--- Testing Planner Agent ---Plan(│   steps=[│   │   Step(│   │   │   sub_question="What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?",│   │   │   justification="This step is necessary to extract the foundational information about competitive risks directly from the source document as requested by the user.",│   │   │   tool='search_10k',│   │   │   keywords=['competition', 'risk factors', 'semiconductor industry', 'competitors'],│   │   │   document_section='Item 1A. Risk Factors'│   │   ),│   │   Step(│   │   │   sub_question="What are the recent news and developments in AMD's AI chip strategy in 2024?",│   │   │   justification="This step requires finding up-to-date, external information that is not available in the 2023 10-K filing. A web search is necessary to get the latest details on AMD's strategy.",│   │   │   tool='search_web',│   │   │   keywords=['AMD', 'AI chip strategy', '2024', 'MI300X', 'Instinct accelerator'],│   │   │   document_section=None│   │   )│   ])

可以看到,agent 不仅给了一个清晰的 Plan,还正确识别出问题包含两部分:

  1. 第一部分答案在 10-K 中,选了 search_10k,且正确猜测了可能的 section。
  2. 第二部分是“2024 年新闻”,10-K 中不可能有,正确选了 search_web。这说明我们的流程在“思考层面”已有希望。

使用查询重写智能体优化检索

目前我们有一个包含好子问题的计划。

但“有哪些风险?”这样的问法并不是好的检索查询,太笼统。无论是向量数据库还是 web 搜索,引擎更偏好具体、关键词丰富的查询。

Query Rewriting Agent (Creted by Fareed Khan)

为此我们构建一个小而专的 agent:Query Rewriter。它的唯一工作是,把当前步骤的子问题,结合已知上下文,改写为更适合检索的 query。

先设计 prompt:

from langchain_core.output_parsers import StrOutputParserquery_rewriter_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a search query optimization expert. Your task is to rewrite a given sub-question into a highly effective search query for a vector database or web search engine, using keywords and context from the research plan.The rewritten query should be specific, use terminology likely to be found in the target source (a financial 10-K or news articles), and be structured to retrieve the most relevant text snippets."""),    ("human", "Current sub-question: {sub_question}\n\nRelevant keywords from plan: {keywords}\n\nContext from past steps:\n{past_context}")])

我们让这个 agent 扮演“search query optimization expert”。它接收三类信息:sub_questionkeywordspast_context,以此构造更强的查询。

初始化 agent:

query_rewriter_agent = query_rewriter_prompt | reasoning_llm | StrOutputParser()print("Query Rewriter Agent created successfully.")print("\n--- Testing Query Rewriter Agent ---")test_sub_q = "How does AMD's 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA's 10-K?"test_keywords = ['impact', 'threaten', 'competitive pressure', 'market share', 'technological change']test_past_context = "Step 1 Summary: NVIDIA's 10-K lists intense competition and rapid technological change as key risks. Step 2 Summary: AMD launched its MI300X AI accelerator in 2024 to directly compete with NVIDIA's H100."rewritten_q = query_rewriter_agent.invoke({    "sub_question": test_sub_q,    "keywords": test_keywords,    "past_context": test_past_context})print(f"Original sub-question: {test_sub_q}")print(f"Rewritten Search Query: {rewritten_q}")

结果如下:

#### OUTPUT ####Query Rewriter Agent created successfully.--- Testing Query Rewriter Agent ---Original sub-question: How does AMD 2024 AI chip strategy potentially exacerbate the competitive risks identified in NVIDIA 10-K?Rewritten Search Query: analysis of how AMD 2024 AI chip strategy, including products like the MI300X, exacerbates NVIDIA's stated competitive risks such as rapid technological change and market share erosion in the data center and AI semiconductor industry

原问题面向分析师;改写后的查询面向搜索引擎,包含更具体术语,如“MI300X”、“market share erosion”、“data center”等,这些都从关键词和过往上下文中归纳出来。这样的查询更可能检回准确文档,提升系统准确与效率。

通过元数据感知分块提升精度

Planner Agent 让我们有了“额外线索”:它不仅说“找风险”,还提示“看 Item 1A. Risk Factors 这一节”。

但当前 retriever 用不上这个提示。vector store 只是 378 个 chunk 的“扁平列表”,不知道什么是“section”。

Meta aware chunking (Created by Fareed Khan)

我们需要重建 chunks。这次,每个 chunk 都要加上“它来自 10-K 的哪一节”的 metadata 标签。这样 agent 就能执行更精确的“过滤检索”。

首先,需要在原始文本中程序化定位每个 section 的起始。观察文档格式,每个主 section 以 “ITEM”+编号开头(如“ITEM 1A”、“ITEM 7”),非常适合用正则。

section_pattern = r"(ITEM\\s+\\d[A-Z]?\\.\\s*.*?)(?=\\nITEM\\s+\\d[A-Z]?\\.|$)"

这条 pattern 用于检测 section 标题,既要足够灵活以适配多种格式,又要足够具体避免误抓。

应用该 pattern,把文档切分为两个列表:section 标题列表、对应内容列表。

raw_text = documents[0].page_contentsection_titles = re.findall(section_pattern, raw_text, re.IGNORECASE | re.DOTALL)section_titles = [title.strip().replace('\\n', ' ') for title in section_titles]sections_content = re.split(section_pattern, raw_text, flags=re.IGNORECASE | re.DOTALL)sections_content = [content.strip() for content in sections_content if content.strip() and not content.strip().lower().startswith('item ')]print(f"Identified {len(section_titles)} document sections.")assert len(section_titles) == len(sections_content), "Mismatch between titles and content sections"

这是一种高效解析半结构化文档的方法。用一次 findall 获得所有 section 标题,再用一次 split 按标题切分全文。assert 是健全性检查,确保解析正确。

接着,将标题与内容逐一对应,生成最终带 metadata 的 chunks。

import uuiddoc_chunks_with_metadata = []for i, content inenumerate(sections_content):    section_title = section_titles[i]    section_chunks = text_splitter.split_text(content)        for chunk in section_chunks:        chunk_id = str(uuid.uuid4())        doc_chunks_with_metadata.append(            Document(                page_content=chunk,                metadata={                    "section": section_title,                    "source_doc": doc_path_clean,                    "id": chunk_id                }            )        )print(f"Created {len(doc_chunks_with_metadata)} chunks with section metadata.")print("\n--- Sample Chunk with Metadata ---")sample_chunk = next(c for c in doc_chunks_with_metadata if"Risk Factors"in c.metadata.get("section", ""))print(sample_chunk)

核心在于:为每个 chunk 附加 metadata,将 section_title 作为标签。输出如下:

#### OUTPUT ####Processing document and adding metadata...Identified 22 document sections.Created 381 chunks with section metadata.--- Sample Chunk with Metadata ---Document(│   page_content='Our industry is intensely competitive. We operate in the semiconductor\\nindustry, which is intensely competitive and characterized by rapid\\ntechnological change and evolving industry standards. ...│   metadata={│   │   'section': 'Item 1A. Risk Factors.',│   │   'source_doc': './data/nvda_10k_2023_clean.txt',│   │   'id': '...'│   })

看到 metadata 中的 'section': 'Item 1A. Risk Factors.' 了吗?现在,当 agent 需要找“风险”时,可以对 retriever 说:“只在 section=‘Item 1A. Risk Factors’ 的 chunks 中检索”。一个简单的改造,就让检索从“钝器”变成“手术刀”。这是构建生产级 RAG 的关键原则。

创建多阶段检索漏斗

到目前为止,我们已经做了智能规划,并为文档添加了 metadata。现在构建系统的核心:复杂的检索流水线。

简单的一次性语义搜索已经不够。生产级 agent 需要自适应、分阶段的检索流程。

Multi Stage Funnel (Created by Fareed Khan)

  • Retrieval Supervisor:构建 supervisor agent 作为动态路由器,分析每个子问题并选择最佳检索策略(vector、keyword 或 hybrid)。
  • 第一阶段(广覆盖 Recall):实现 supervisor 可选的不同检索策略,尽可能广泛地捕获潜在相关文档。
  • 第二阶段(高精度 Precision):使用 Cross-Encoder 模型对初始结果进行重排,去噪并将最相关文档置顶。
  • 第三阶段(综合 Synthesis):创建 Distiller Agent 将 top 文档压缩为单一、简洁的上下文。

使用监督器动态选择策略

并非所有查询都相同。比如“Compute & Networking 分部 2023 财年的 revenue 增长是多少?”包含非常具体的术语,keyword 搜索更合适;而“公司对市场竞争的整体态度如何?”则是概念性问题,semantic 搜索更优。

Supervisor Agent (Created by Fareed Khan)

我们不硬编码策略,而是构建一个小而智能的 agent——Retrieval Supervisor。它的职责就是分析查询,决定用哪种检索方式。

先定义其输出的结构:

class RetrievalDecision(BaseModel):    strategy: Literal["vector_search", "keyword_search", "hybrid_search"]    justification: str

然后是 prompt:

retrieval_supervisor_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a retrieval strategy expert. Based on the user's query, you must decide the best retrieval strategy.You have three options:1. `vector_search`: Best for conceptual, semantic, or similarity-based queries.2. `keyword_search`: Best for queries with specific, exact terms, names, or codes (e.g., 'Item 1A', 'Hopper architecture').3. `hybrid_search`: A good default that combines both, but may be less precise than a targeted strategy."""),    ("human", "User Query: {sub_question}")])

装配该 agent 并测试:

retrieval_supervisor_agent = retrieval_supervisor_prompt | reasoning_llm.with_structured_output(RetrievalDecision)print("Retrieval Supervisor Agent created.")print("\n--- Testing Retrieval Supervisor Agent ---")query1 = "revenue growth for the Compute & Networking segment in fiscal year 2023"decision1 = retrieval_supervisor_agent.invoke({"sub_question": query1})print(f"Query: '{query1}'")print(f"Decision: {decision1.strategy}, Justification: {decision1.justification}")query2 = "general sentiment about market competition and technological innovation"decision2 = retrieval_supervisor_agent.invoke({"sub_question": query2})print(f"\nQuery: '{query2}'")print(f"Decision: {decision2.strategy}, Justification: {decision2.justification}")
``````plaintext
#### OUTPUT ####Retrieval Supervisor Agent created.# --- Testing Retrieval Supervisor Agent ---Query: 'revenue growth for the Compute & Networking segment in fiscal year 2023'Decision: keyword_search, Justification: The query contains specific keywords like 'revenue growth', 'Compute & Networking', and 'fiscal year 2023' which are ideal for a keyword-based search to find exact financial figures.Query: 'general sentiment about market competition and technological innovation'Decision: vector_search, Justification: This query is conceptual and seeks to understand sentiment and broader themes. Vector search is better suited to capture the semantic meaning of 'market competition' and 'technological innovation' rather than relying on exact keywords.

它能正确地为具体术语选 keyword_search,为概念性问题选 vector_search。动态决策比一刀切强得多。

利用混合、关键词与语义搜索进行广泛召回

有了 supervisor 选择策略,我们需要实现这些策略。第一阶段的目标是 Recall(广覆盖):尽可能捕获所有潜在相关文档,即使带入一些噪声也没关系。

Broad Recall (Created by Fareed Khan)

我们实现三种搜索函数:

  1. Vector Search:标准语义搜索,升级为支持 metadata filter。
  2. Keyword Search(BM25):传统且强大的算法,擅长匹配具体术语。
  3. Hybrid Search:结合二者,用 RRF(Reciprocal Rank Fusion)融合。

先用带 metadata 的 chunks 创建一个高级 vector store。

import numpy as npfrom rank_bm25 import BM25Okapiprint("Creating advanced vector store with metadata...")advanced_vector_store = Chroma.from_documents(    documents=doc_chunks_with_metadata,    embedding=embedding_function)print(f"Advanced vector store created with {advanced_vector_store._collection.count()} embeddings.")

接着构建 BM25 的索引:

print("\nBuilding BM25 index for keyword search...")tokenized_corpus = [doc.page_content.split(" ") for doc in doc_chunks_with_metadata]doc_ids = [doc.metadata["id"] for doc in doc_chunks_with_metadata]doc_map = {doc.metadata["id"]: doc for doc in doc_chunks_with_metadata}bm25 = BM25Okapi(tokenized_corpus)

定义三个检索函数:

def vector_search_only(query: str, section_filter: str = None, k: int = 10):    filter_dict = {"section": section_filter} if section_filter and"Unknown"notin section_filter elseNone    return advanced_vector_store.similarity_search(query, k=k, filter=filter_dict)defbm25_search_only(query: str, k: int = 10):    tokenized_query = query.split(" ")    bm25_scores = bm25.get_scores(tokenized_query)    top_k_indices = np.argsort(bm25_scores)[::-1][:k]    return [doc_map[doc_ids[i]] for i in top_k_indices]defhybrid_search(query: str, section_filter: str = None, k: int = 10):    bm25_docs = bm25_search_only(query, k=k)    semantic_docs = vector_search_only(query, section_filter=section_filter, k=k)    all_docs = {doc.metadata["id"]: doc for doc in bm25_docs + semantic_docs}.values()    ranked_lists = [[doc.metadata["id"] for doc in bm25_docs], [doc.metadata["id"] for doc in semantic_docs]]        rrf_scores = {}    for doc_list in ranked_lists:        for i, doc_id inenumerate(doc_list):            if doc_id notin rrf_scores:                rrf_scores[doc_id] = 0            rrf_scores[doc_id] += 1 / (i + 61)    sorted_doc_ids = sorted(rrf_scores.keys(), key=lambda x: rrf_scores[x], reverse=True)    final_docs = [doc_map[doc_id] for doc_id in sorted_doc_ids[:k]]    return final_docsprint("\nAll retrieval strategy functions ready.")

快速测试 keyword 搜索是否能精确命中目标 section:

print("\n--- Testing Keyword Search ---")test_query = "Item 1A. Risk Factors"test_results = bm25_search_only(test_query)print(f"Query: {test_query}")print(f"Found {len(test_results)} documents. Top result section: {test_results[0].metadata['section']}")
``````plaintext
#### OUTPUT ####Creating advanced vector store with metadata...Advanced vector store created with 381 embeddings.Building BM25 index for keyword search...All retrieval strategy functions ready.# --- Testing Keyword Search ---Query: Item 1A. Risk FactorsFound 10 documents. Top result section: Item 1A. Risk Factors.

如预期,BM25 能精确、迅速地检回 “Item 1A. Risk Factors” 相关文档。当查询包含具体关键词(如 section 标题)时,supervisor 就可以选择这一精准工具。

接下来进入高精度阶段,进行重排。

使用交叉编码器重排器实现高精度

第一阶段的 Recall 能拿到 10 个“潜在相关”的文档。

但“潜在相关”还不够,直接把这 10 个 chunk 全喂给主推理 LLM 会既低效又有风险——成本高,还可能被噪声干扰。

High Precision (Created by Fareed Khan)

我们需要 Precision 阶段,用 Reranker 来从这 10 个候选中挑出最相关的少数。区别在于模型工作方式:

  1. 初始检索用的是 bi-encoder(embedding 模型),分别对 query 与文档编码,速度快、适合海量搜索。
  2. Cross-Encoder 则将“query + 单个文档”作为一对,一起输入,做更深入的比较。它更慢,但更准。

我们要写一个函数,把 10 个文档用 Cross-Encoder 打分重排,只保留 config 里的 top 3。

初始化 Cross-Encoder 模型:

from sentence_transformers import CrossEncoderprint("Initializing CrossEncoder reranker...")reranker = CrossEncoder(config["reranker_model"])

定义重排函数:

def rerank_documents_function(query: str, documents: List[Document]) -> List[Document]:    if not documents:         return []            pairs = [(query, doc.page_content) for doc in documents]    scores = reranker.predict(pairs)    doc_scores = list(zip(documents, scores))    doc_scores.sort(key=lambda x: x[1], reverse=True)    reranked_docs = [doc for doc, score in doc_scores[:config["top_n_rerank"]]]    return reranked_docs

该函数用 cross-encoder 对每个(query, doc)对进行打分,排序后截取前 3,输出短小而高相关的文档列表,作为后续 agent 的完美上下文。这样的漏斗式处理(先高召回,再高精度)是生产级 RAG 的关键。

通过上下文蒸馏进行综合

现在我们有 10 -> 3 的高相关文档,但仍然是三个独立块。为进一步精炼,再加入最后一道“压缩”——Contextual Distillation:将前 3 个文档“蒸馏”为一个简洁、干净的段落,去除冗余,构建一段信息密度极高的上下文。

Synthesization (Created by Fareed Khan)

这一步是针对文本处理,不负责回答问题。我们创建 Distiller Agent:

distiller_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a helpful assistant. Your task is to synthesize the following retrieved document snippets into a single, concise paragraph.The goal is to provide a clear and coherent context that directly answers the question: '{question}'.Focus on removing redundant information and organizing the content logically. Answer only with the synthesized context."""),    ("human", "Retrieved Documents:\n{context}")])distiller_agent = distiller_prompt | reasoning_llm | StrOutputParser()print("Contextual Distiller Agent created.")

在主循环中,每一步的流程将是:

  1. Supervisor:选择检索策略(vector/keyword/hybrid)。
  2. Recall:执行选择的策略,取 top 10 文档。
  3. Precision:用 rerank_documents_function 取 top 3。
  4. Distillation:用 distiller_agent 压缩为单段精华。

这样我们的证据质量达到最佳。下一步,给 agent “看向外部世界”的能力:web 检索。

使用网络搜索增强知识

目前的检索漏斗很强,但有一个致命盲点:

它只能看到 2023 年 10-K 中的内容。而我们的挑战需要“2024 年 AMD 的 AI 芯片策略”的最新新闻——这些在静态知识库中根本不存在。

真正的“深度思考” agent,必须意识到自身知识的边界,并能到别处找答案。我们需要给它一扇“窗”。

Augemtation using Web (Created by Fareed Khan)

这一步我们为系统增加一个新工具:Web Search,使其从“文档特定问答机器人”变成真正的多源研究助手。

我们使用 Tavily Search API——专为 LLM 构建的搜索引擎,返回干净、无广告、相关的结果,非常适合 RAG;同时与 LangChain 集成顺畅。

初始化 Tavily 搜索工具:

from langchain_community.tools.tavily_search import TavilySearchResultsweb_search_tool = TavilySearchResults(k=3)

原始 API 响应需要包装为标准的 Document 列表,以便与我们的 reranker、distiller 无缝衔接。写一个小包装函数:

def web_search_function(query: str) -> List[Document]:    results = web_search_tool.invoke({"query": query})    return [        Document(            page_content=res["content"],            metadata={"source": res["url"]}        ) for res in results    ]

测试:

print("\n--- Testing Web Search Tool ---")test_query_web = "AMD AI chip strategy 2024"test_results_web = web_search_function(test_query_web)print(f"Found {len(test_results_web)} results for query: '{test_query_web}'")if test_results_web:    print(f"Top result snippet: {test_results_web[0].page_content[:250]}...")
``````plaintext
#### OUTPUT ####Web search tool (Tavily) initialized.--- Testing Web Search Tool ---Found 3 results for query: 'AMD AI chip strategy 2024'Top result snippet: AMD has intensified its battle with Nvidia in the AI chip market with the release of the Instinct MI300X accelerator, a powerful GPU designed to challenge Nvidia's H100 in training and inference for large language models. Major cloud providers like Microsoft Azure and Oracle Cloud are adopting the MI300X, indicating strong market interest...

结果如愿,找到了 3 篇相关网页。摘要提到了 AMD “Instinct MI300X” 与 NVIDIA “H100”的对抗——正是解决第二部分问题所需的证据。现在 agent 拥有通往外部世界的窗口,planner 可以智能地决定何时使用它。下一步是让 agent 能够“反思”并决定何时结束研究。

自我评估与控制流策略

到目前为止,agent 能制定计划、选择工具,并执行复杂的检索漏斗。但还缺少一个关键能力:对自身进展进行“思考”。盲目照搬计划逐步执行的 agent 并非真正智能,需要一个自我批判机制。

Self Critique and Policy Making (Created by Fareed Khan)

每次研究步骤后,agent 都会停下来反思,比较新信息与既有知识,然后做出策略性决策:研究是否已完成,还是要继续?

这个自我批判循环让系统从脚本化工作流,跃升为自治 agent。它能判断自己是否已经收集到足够的证据,来有信心地回答用户问题。

我们将实现两个专门 agent:

  1. Reflection Agent:读取当前步骤的精炼上下文,写一条简洁的一句话摘要,加入“研究历史”。
  2. Policy Agent:作为总指挥,在反思之后,审视整个历史与最初计划,做出关键决策:CONTINUE_PLANFINISH

更新并反映累积研究历史

每完成一步(例如:检索并蒸馏出 NVIDIA 的风险),不要直接进入下一步。需要把新知识整合到 agent 的记忆中。

Reflective Cumulative (Created by Fareed Khan)

构建 Reflection Agent:任务是读入当前步骤的精炼上下文,写一条事实性的一句话摘要,并把它添加到 RAGStatepast_steps 中。

reflection_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a research assistant. Based on the retrieved context for the current sub-question, write a concise, one-sentence summary of the key findings.This summary will be added to our research history. Be factual and to the point."""),    ("human", "Current sub-question: {sub_question}\n\nDistilled context:\n{context}")])reflection_agent = reflection_prompt | reasoning_llm | StrOutputParser()print("Reflection Agent created.")

它是认知循环的重要组成:通过这些简洁摘要,构建干净易读的“研究历史”,为下一个、也是最重要的 agent——策略决策者,提供输入。

B构建用于控制流的策略智能体

这是 agent 自主性的“大脑”。在 reflection_agent 更新历史后,Policy Agent 上场,作为总调度,查看:原始问题、初始计划、已完成步骤摘要的全量历史,做出高阶策略决策。

Policy Agent (Created by Fareed Khan)

定义决策输出结构:

class Decision(BaseModel):    next_action: Literal["CONTINUE_PLAN", "FINISH"]    justification: str

设计 prompt:

policy_prompt = ChatPromptTemplate.from_messages([    ("system", """You are a master strategist. Your role is to analyze the research progress and decide the next action.You have the original question, the initial plan, and a log of completed steps with their summaries.- If the collected information in the Research History is sufficient to comprehensively answer the Original Question, decide to FINISH.- Otherwise, if the plan is not yet complete, decide to CONTINUE_PLAN."""),    ("human", "Original Question: {question}\n\nInitial Plan:\n{plan}\n\nResearch History (Completed Steps):\n{history}")])policy_agent = policy_prompt | reasoning_llm.with_structured_output(Decision)print("Policy Agent created.")

测试两个状态:

print("\n--- Testing Policy Agent (Incomplete State) ---")plan_str = json.dumps([s.dict() for s in test_plan.steps])incomplete_history = "Step 1 Summary: NVIDIA's 10-K states that the semiconductor industry is intensely competitive and subject to rapid technological change."decision1 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": incomplete_history})print(f"Decision: {decision1.next_action}, Justification: {decision1.justification}")print("\n--- Testing Policy Agent (Complete State) ---")complete_history = incomplete_history + "\nStep 2 Summary: In 2024, AMD launched its MI300X accelerator to directly compete with NVIDIA in the AI chip market, gaining adoption from major cloud providers."decision2 = policy_agent.invoke({"question": complex_query_adv, "plan": plan_str, "history": complete_history})print(f"Decision: {decision2.next_action}, Justification: {decision2.justification}")
``````plaintext
#### OUTPUT ####Policy Agent created.--- Testing Policy Agent (Incomplete State) ---Decision: CONTINUE_PLAN, Justification: The research has only identified NVIDIA's competitive risks from the 10-K. It has not yet gathered the required external information about AMD's 2024 strategy, which is the next step in the plan.--- Testing Policy Agent (Complete State) ---Decision: FINISH, Justification: The research history now contains comprehensive summaries of both NVIDIA's stated competitive risks and AMD's recent AI chip strategy. All necessary information has been gathered to perform the final synthesis and answer the user's question.

未完成状态时,正确选择 CONTINUE_PLAN;完成状态时,正确选择 FINISH。有了 policy_agent,我们具备自主系统的头脑。接下来用 LangGraph 把所有组件串起来。

定义图节点

我们已经设计好这些专门的 agent。现在要把它们变成工作流的“积木”。LangGraph 中的“节点(node)”就是干这事的:每个节点是一个 Python 函数,完成一项具体工作,接收 RAGState,更新并返回字典。

Graph Nodes (Created by Fareed Khan)

先写一个工具函数,把研究历史 past_steps 格式化成易读字符串,方便传给 prompt:

def get_past_context_str(past_steps: List[PastStep]) -> str:    return "\\n\\n".join([f"Step {s['step_index']}: {s['sub_question']}\\nSummary: {s['summary']}" for s in past_steps])

第一个节点:plan_node,调用 planner_agent 填充 plan 字段。

def plan_node(state: RAGState) -> Dict:    console.print("--- 🧠: Generating Plan ---")    plan = planner_agent.invoke({"question": state["original_question"]})    rprint(plan)    return {"plan": plan, "current_step_index": 0, "past_steps": []}

接着是两个检索节点:内部文档与 web。

def retrieval_node(state: RAGState) -> Dict:    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    console.print(f"--- 🔍 : Retrieving from 10-K (Step {current_step_index + 1}: {current_step.sub_question}) ---")        past_context = get_past_context_str(state['past_steps'])    rewritten_query = query_rewriter_agent.invoke({        "sub_question": current_step.sub_question,        "keywords": current_step.keywords,        "past_context": past_context    })    console.print(f"  Rewritten Query: {rewritten_query}")        retrieval_decision = retrieval_supervisor_agent.invoke({"sub_question": rewritten_query})    console.print(f"  Supervisor Decision: Use `{retrieval_decision.strategy}`. Justification: {retrieval_decision.justification}")    if retrieval_decision.strategy == 'vector_search':        retrieved_docs = vector_search_only(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])    elif retrieval_decision.strategy == 'keyword_search':        retrieved_docs = bm25_search_only(rewritten_query, k=config['top_k_retrieval'])    else:        retrieved_docs = hybrid_search(rewritten_query, section_filter=current_step.document_section, k=config['top_k_retrieval'])        return {"retrieved_docs": retrieved_docs}
``````plaintext
def web_search_node(state: RAGState) -> Dict:    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    console.print(f"--- 🌐 : Searching Web (Step {current_step_index + 1}: {current_step.sub_question}) ---")        past_context = get_past_context_str(state['past_steps'])    rewritten_query = query_rewriter_agent.invoke({        "sub_question": current_step.sub_question,        "keywords": current_step.keywords,        "past_context": past_context    })    console.print(f"  Rewritten Query: {rewritten_query}")    retrieved_docs = web_search_function(rewritten_query)    return {"retrieved_docs": retrieved_docs}

然后是 Precision 与 Distillation 节点:

def rerank_node(state: RAGState) -> Dict:    console.print("--- 🎯 : Reranking Documents ---")    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    reranked_docs = rerank_documents_function(current_step.sub_question, state["retrieved_docs"])    console.print(f"  Reranked to top {len(reranked_docs)} documents.")    return {"reranked_docs": reranked_docs}
``````plaintext
def compression_node(state: RAGState) -> Dict:    console.print("--- ✂️: Distilling Context ---")    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    context = format_docs(state["reranked_docs"])    synthesized_context = distiller_agent.invoke({"question": current_step.sub_question, "context": context})    console.print(f"  Distilled Context Snippet: {synthesized_context[:200]}...")    return {"synthesized_context": synthesized_context}

反思并更新历史:

def reflection_node(state: RAGState) -> Dict:    console.print("--- : Reflecting on Findings ---")    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    summary = reflection_agent.invoke({"sub_question": current_step.sub_question, "context": state['synthesized_context']})    console.print(f"  Summary: {summary}")        new_past_step = {        "step_index": current_step_index + 1,        "sub_question": current_step.sub_question,        "retrieved_docs": state['reranked_docs'],        "summary": summary    }    return {"past_steps": state["past_steps"] + [new_past_step], "current_step_index": current_step_index + 1}

最终答案生成:

def final_answer_node(state: RAGState) -> Dict:    console.print("--- ✅: Generating Final Answer with Citations ---")    final_context = ""    for i, step inenumerate(state['past_steps']):        final_context += f"\\n--- Findings from Research Step {i+1} ---\\n"        for doc in step['retrieved_docs']:            source = doc.metadata.get('section') or doc.metadata.get('source')            final_context += f"Source: {source}\\nContent: {doc.page_content}\\n\\n"        final_answer_prompt = ChatPromptTemplate.from_messages([        ("system", """You are an expert financial analyst. Synthesize the research findings from internal documents and web searches into a comprehensive, multi-paragraph answer for the user's original question.Your answer must be grounded in the provided context. At the end of any sentence that relies on specific information, you MUST add a citation. For 10-K documents, use [Source: ]. For web results, use [Source: ]."""),        ("human", "Original Question: {question}\n\nResearch History and Context:\n{context}")    ])        final_answer_agent = final_answer_prompt | reasoning_llm | StrOutputParser()    final_answer = final_answer_agent.invoke({"question": state['original_question'], "context": final_context})    return {"final_answer": final_answer}

节点齐备后,接下来定义“边”(edges),确定它们之间的连接关系与控制流。

定义条件边

我们需要两类关键的条件边:

  1. 工具路由器(route_by_tool):在 plan 之后,查看当前步骤应使用的工具,路由到 retrieve_10kretrieve_web
  2. 主控制循环(should_continue_node):每次反思后,调用 policy_agent 决定是继续下一步还是结束并生成答案。

工具路由器:

def route_by_tool(state: RAGState) -> str:    current_step_index = state["current_step_index"]    current_step = state["plan"].steps[current_step_index]    return current_step.tool

主控制循环:

def should_continue_node(state: RAGState) -> str:    console.print("--- 🚦 : Evaluating Policy ---")    current_step_index = state["current_step_index"]        if current_step_index >= len(state["plan"].steps):        console.print("  -> Plan complete. Finishing.")        return"finish"        if current_step_index >= config["max_reasoning_iterations"]:        console.print("  -> Max iterations reached. Finishing.")        return"finish"    if state.get("reranked_docs") isnotNoneandnot state["reranked_docs"]:        console.print("  -> Retrieval failed for the last step. Continuing with next step in plan.")        return"continue"    history = get_past_context_str(state['past_steps'])    plan_str = json.dumps([s.dict() for s in state['plan'].steps])    decision = policy_agent.invoke({"question": state["original_question"], "plan": plan_str, "history": history})    console.print(f"  -> Decision: {decision.next_action} | Justification: {decision.justification}")        if decision.next_action == "FINISH":        return"finish"    else:        return"continue"

有了节点(专家)与条件边(对话规则),我们就可以构建完整的 StateGraph

连接深度思考 RAG 机器

现在用 LangGraph 的 StateGraph 来定义完整的认知架构,也就是 agent 的思维流程蓝图。

from langgraph.graph import StateGraph, ENDgraph = StateGraph(RAGState)

添加节点:

graph.add_node("plan", plan_node)graph.add_node("retrieve_10k", retrieval_node)graph.add_node("retrieve_web", web_search_node)graph.add_node("rerank", rerank_node)graph.add_node("compress", compression_node)graph.add_node("reflect", reflection_node)graph.add_node("generate_final_answer", final_answer_node)

连接边与条件边:

graph.set_entry_point("plan")graph.add_conditional_edges(    "plan",    route_by_tool,    {        "search_10k": "retrieve_10k",        "search_web": "retrieve_web",    },)graph.add_edge("retrieve_10k", "rerank")graph.add_edge("retrieve_web", "rerank")graph.add_edge("rerank", "compress")graph.add_edge("compress", "reflect")graph.add_conditional_edges(    "reflect",    should_continue_node,    {        "continue": "plan",        "finish": "generate_final_answer",    },)graph.add_edge("generate_final_answer", END)print("StateGraph constructed successfully.")

流程回顾:

  1. plan 开始;
  2. route_by_tool 决定走 retrieve_10k 还是 retrieve_web
  3. 然后始终按 rerank -> compress -> reflect
  4. reflect 后,通过 should_continue_node 决定:
  • CONTINUE_PLAN,回到 plan,路由下一步;
  • FINISH,进入 generate_final_answer
  1. 生成最终答案后结束。

至此,我们完成了深度思考 Agent 的复杂、循环架构。下一步是编译与可视化。

编译与可视化迭代工作流

编译(.compile())会把抽象的节点与边定义,转化为可执行应用。我们还可以用内置工具生成图示,有助于理解与调试。

deep_thinking_rag_graph = graph.compile()print("Graph compiled successfully.")try:    from IPython.display import Image, display    png_image = deep_thinking_rag_graph.get_graph().draw_png()    display(Image(png_image))except Exception as e:    print(f"Graph visualization failed: {e}. Please ensure pygraphviz is installed.")
```![](http://cdn.zhipoai.cn/d380419c.jpg)

Deep Thinking Simpler Pipeline Flow (Created by Fareed Khan)

你会看到:

* `route_by_tool` 选择内部或外部检索的分支;
* 每个研究步骤的线性处理(`rerank -> compress -> reflect`);
* 关键的反馈循环:`should_continue` 把流程送回 `plan`,开始下一轮;
* 研究完成后进入 `generate_final_answer` 的“出口”。

这就是一个“会思考”的系统。接下来实际运行。

运行深度思考流水线
---------

我们要用相同的多跳多源查询来测试这个系统,看看它能否成功。

这里我们调用 `.stream()` 观察每个节点更新后的 state,实时追踪 agent 的“思考过程”。

```plaintext
final_state = Nonegraph_input = {"original_question": complex_query_adv}print("--- Invoking Deep Thinking RAG Graph ---")for chunk in deep_thinking_rag_graph.stream(graph_input, stream_mode="values"):    final_state = chunkprint("\n--- Graph Stream Finished ---")
``````plaintext
#### OUTPUT ####--- Invoking Deep Thinking RAG Graph ------ 🧠: Generating Plan ---plan:  steps:  - sub_question: What are the key risks related to competition as stated in NVIDIA's 2023 10-K filing?    tool: search_10k    ...  - sub_question: What are the recent news and developments in AMD's AI chip strategy in 2024?    tool: search_web    ...--- 🔍 : Retrieving from 10-K (Step 1: ...) ---  Rewritten Query: key competitive risks for NVIDIA in the semiconductor industry...  Supervisor Decision: Use `hybrid_search`. ...--- 🎯  : Reranking Documents ---  Reranked to top 3 documents.--- ✂️: Distilling Context ---  Distilled Context Snippet: NVIDIA operates in the intensely competitive semiconductor industry...--- 🤔: Reflecting on Findings ---  Summary: According to its 2023 10-K, NVIDIA operates in an intensely competitive semiconductor industry...--- 🚦  : Evaluating Policy ---  -> Decision: CONTINUE_PLAN | Justification: The first step...has been completed. The next step...is still pending...--- 🌐 : Searching Web (Step 2: ...) ---  Rewritten Query: AMD AI chip strategy news and developments 2024...--- 🎯  : Reranking Documents ---  Reranked to top 3 documents.--- ✂️: Distilling Context ---  Distilled Context Snippet: AMD has ramped up its challenge to Nvidia in the AI accelerator market with its Instinct MI300 series...--- 🤔: Reflecting on Findings ---  Summary: In 2024, AMD is aggressively competing with NVIDIA in the AI chip market through its Instinct MI300X accelerator...--- 🚦  : Evaluating Policy ---  -> Decision: FINISH | Justification: The research history now contains comprehensive summaries of both NVIDIA's stated risks and AMD's recent strategy...--- ✅: Generating Final Answer with Citations ------ Graph Stream Finished ---

可以看到系统完整执行了我们设计的流程:规划 -> 步骤 1 -> 自我评估继续 -> 步骤 2 -> 自我评估结束 -> 最终综合。

分析最终高质量答案

打印最终答案:

console.print("--- DEEP THINKING RAG FINAL ANSWER ---")console.print(Markdown(final_state['final_answer']))
``````plaintext
#### OUTPUT ####--- DEEP THINKING RAG FINAL ANSWER ---Based on an analysis of NVIDIA's 2023 10-K filing and recent news from 2024 regarding AMD's AI chip strategy, the following synthesis can be made:**NVIDIA's Stated Competitive Risks:**In its 2023 10-K filing, NVIDIA identifies its operating environment as the "intensely competitive" semiconductor industry, which is characterized by rapid technological change. A primary risk is that competitors, including AMD, could introduce new products with better performance or lower costs that gain significant market acceptance, which could materially and adversely affect its business [Source: Item 1A. Risk Factors.].**AMD's 2024 AI Chip Strategy:**In 2024, AMD has moved aggressively to challenge NVIDIA's dominance in the AI hardware market with its Instinct MI300 series of accelerators, particularly the MI300X. This product is designed to compete directly with NVIDIA's H100 GPU. AMD's strategy has gained significant traction, with major cloud providers such as Microsoft Azure and Oracle announcing plans to use the new chips [Source: https://www.reuters.com/technology/amd-forecasts-35-billion-ai-chip-revenue-2024-2024-01-30/].**Synthesis and Impact:**AMD's 2024 AI chip strategy directly exacerbates the competitive risks outlined in NVIDIA's 10-K. The successful launch and adoption of the MI300X is a materialization of the specific risk that a competitor could introduce a product with comparable performance. The adoption of AMD's chips by major cloud providers signifies a direct challenge to NVIDIA's market share in the lucrative data center segment, validating NVIDIA's stated concerns about rapid technological change [Source: Item 1A. Risk Factors. and https://www.cnbc.com/2023/12/06/amd-launches-new-mi300x-ai-chip-to-compete-with-nvidias-h100.html].

这是一次“完全成功”的综合性回答:

  • 正确总结了 10-K 的风险;
  • 正确总结了 2024 年 AMD 动向;
  • 关键在“综合与影响”部分:完成了多跳推理,解释“后者如何加剧前者”;
  • 并提供了来源溯源(内部 section 与外部 URL)。

并排对比

让我们把两种结果放在一起对比。

这个对比清晰地说明:采用循环、工具感知、自我批判的 agent 架构,在复杂真实查询上实现了显著且可量化的性能提升。

评估框架与结果分析

虽然我们在一个难题上取得了成功,但在生产环境中需要客观、量化、自动化的验证。

Evaluation Framework (Created by Fareed Khan)

我们使用 RAGAs(RAG Assessment)库,聚焦四个关键指标:

  • Context Precision & Recall:衡量检索质量。Precision 问:检回的文档有多少真相关?Recall 问:所有相关文档中,我们找到了多少?
  • Answer Faithfulness:答案是否扎根于提供的上下文,是防止 LLM 幻觉的主要检查。
  • Answer Correctness:最终质量度量,与人工撰写的“ground truth”答案对比,评估事实准确性与完整性。

准备评估数据集(包含问题、两套系统的答案与上下文、以及 ground truth)并评测:

from datasets import Datasetfrom ragas import evaluatefrom ragas.metrics import (    context_precision,    context_recall,    faithfulness,    answer_correctness,)import pandas as pdprint("Preparing evaluation dataset...")ground_truth_answer_adv = "NVIDIA's 2023 10-K lists intense competition and rapid technological change as key risks. This risk is exacerbated by AMD's 2024 strategy, specifically the launch of the MI300X AI accelerator, which directly competes with NVIDIA's H100 and has been adopted by major cloud providers, threatening NVIDIA's market share in the data center segment."retrieved_docs_for_baseline_adv = baseline_retriever.invoke(complex_query_adv)baseline_contexts = [[doc.page_content for doc in retrieved_docs_for_baseline_adv]]advanced_contexts_flat = []for step in final_state['past_steps']:    advanced_contexts_flat.extend([doc.page_content for doc in step['retrieved_docs']])advanced_contexts = [list(set(advanced_contexts_flat))]eval_data = {    'question': [complex_query_adv, complex_query_adv],    'answer': [baseline_result, final_state['final_answer']],    'contexts': baseline_contexts + advanced_contexts,    'ground_truth': [ground_truth_answer_adv, ground_truth_answer_adv]}eval_dataset = Dataset.from_dict(eval_data)metrics = [    context_precision,    context_recall,    faithfulness,    answer_correctness,]print("Running RAGAs evaluation...")result = evaluate(eval_dataset, metrics=metrics, is_async=False)print("Evaluation complete.")results_df = result.to_pandas()results_df.index = ['baseline_rag', 'deep_thinking_rag']print("\n--- RAGAs Evaluation Results ---")print(results_df[['context_precision', 'context_recall', 'faithfulness', 'answer_correctness']].T)

输出示例:

#### OUTPUT ####Preparing evaluation dataset...Running RAGAs evaluation...Evaluation complete.--- RAGAs Evaluation Results ---                     baseline_rag  deep_thinking_ragcontext_precision        0.500000           0.890000context_recall           0.333333           1.000000faithfulness             1.000000           1.000000answer_correctness       0.395112           0.991458

量化结果为 Deep Thinking 架构给出明确客观的优势:

  • Context Precision(0.50 vs 0.89):baseline 只有一半相关,因为只能检回关于“竞争”的泛化信息;advanced agent 通过多步骤、多工具检索,显著提升精度。
  • Context Recall(0.33 vs 1.00):baseline 完全错过了 web 信息,召回低;advanced 通过规划与工具使用,找齐全部必要信息,达到满分。
  • Faithfulness(1.00 vs 1.00):两者都很忠实。baseline 正确指出自己没有信息;advanced 正确使用了找到的信息。忠实但不正确的答案也没意义。
  • Answer Correctness(0.40 vs 0.99):最终质量指标。baseline 因缺失第二部分分析而低于 40%;advanced 接近完美。

总结整个流水线

本文中,我们从一个简单、脆弱的 RAG 流水线,构建到一个复杂的自治推理 agent:

  • 先搭建 vanilla RAG,并演示它在复杂多源查询上的必然失败;
  • 系统化地打造 Deep Thinking Agent,赋予其规划、多工具使用、与自适应检索策略的能力;
  • 构建多阶段检索漏斗:先广召回(hybrid search),再高精度(cross-encoder reranker),最后综合(distiller agent);
  • 使用 LangGraph 编排整个认知架构,创建循环、有状态的工作流,实现真正的多步推理;
  • 加入自我批判循环,让 agent 能识别失败、修订计划、并在无法得到答案时优雅退出;
  • 最后用 RAGAs 做生产级评估,客观量化证明 advanced agent 的优越性能。

使用马尔可夫决策过程(MDP)学习策略

目前,我们的 Policy Agent(决定 CONTINUEFINISH)依赖于像 GPT-4o 这样的通用 LLM,每次都要调用。尽管有效,但在生产环境可能较慢且昂贵。学术前沿提出了更优的路径。

  • 将 RAG 建模为 Decision Process:把 agent 的推理循环建模为 Markov Decision Process(MDP)。在这个模型中,每个 RAGState 是“状态”,每个 action(CONTINUEREVISEFINISH)会把系统带入新状态,并获得一定“奖励”(比如找到正确答案)。
  • 从经验中学习:我们在 LangSmith 中记录的成千上万次成功/失败的推理轨迹,都是宝贵的训练数据。每条轨迹都是 agent 在这个 MDP 中的一个例子。
  • 训练 Policy Model:利用这些数据,可以用 Reinforcement Learning 训练一个更小、更专门的 policy 模型。
  • 目标:速度与效率。把像 GPT-4o 这样复杂模型的决策能力,蒸馏到一个更小(例如 7B)的模型里,使 CONTINUE/FINISH 的决策更快、更省,同时高度针对我们的领域。这是诸多研究(如 DeepRAG)的核心思想,也是自治 RAG 系统优化的下一阶段。

如何学习大模型 AI ?

我国在AI大模型领域面临人才短缺,数量与质量均落后于发达国家。2023年,人才缺口已超百万,凸显培养不足。随着Al技术飞速发展,预计到2025年,这一缺口将急剧扩大至400万,严重制约我国Al产业的创新步伐。加强人才培养,优化教育体系,国际合作并进,是破解困局、推动AI发展的关键。

但是具体到个人,只能说是:

“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。

这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。

在这里插入图片描述

2025最新大模型学习路线

明确的学习路线至关重要。它能指引新人起点、规划学习顺序、明确核心知识点。大模型领域涉及的知识点非常广泛,没有明确的学习路线可能会导致新人感到迷茫,不知道应该专注于哪些内容。

对于从来没有接触过AI大模型的同学,我帮大家准备了从零基础到精通学习成长路线图以及学习规划。可以说是最科学最系统的学习路线。

在这里插入图片描述

针对以上大模型的学习路线我们也整理了对应的学习视频教程,和配套的学习资料。

大模型经典PDF书籍

新手必备的大模型学习PDF书单来了!全是硬核知识,帮你少走弯路!

在这里插入图片描述

配套大模型项目实战

所有视频教程所涉及的实战项目和项目源码等
在这里插入图片描述

博主介绍+AI项目案例集锦

MoPaaS专注于Al技术能力建设与应用场景开发,与智学优课联合孵化,培养适合未来发展需求的技术性人才和应用型领袖。

在这里插入图片描述

在这里插入图片描述

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

为什么要学习大模型?

2025人工智能大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 , 大模型作为其中的重要组成部分 , 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力, 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 ,为各行各业带来了革命性的改变和机遇 。

在这里插入图片描述

适合人群

  • 在校学生:包括专科、本科、硕士和博士研究生。学生应具备扎实的编程基础和一定的数学基础,有志于深入AGI大模型行业,希望开展相关的研究和开发工作。
  • IT行业从业人员:包括在职或失业者,涵盖开发、测试、运维、产品经理等职务。拥有一定的IT从业经验,至少1年以上的编程工作经验,对大模型技术感兴趣或有业务需求,希望通过课程提升自身在IT领域的竞争力。
  • IT管理及技术研究领域人员:包括技术经理、技术负责人、CTO、架构师、研究员等角色。这些人员需要跟随技术发展趋势,主导技术创新,推动大模型技术在企业业务中的应用与改造。
  • 传统AI从业人员:包括算法工程师、机器视觉工程师、深度学习工程师等。这些AI技术人才原先从事机器视觉、自然语言处理、推荐系统等领域工作,现需要快速补充大模型技术能力,获得大模型训练微调的实操技能,以适应新的技术发展趋势。
    在这里插入图片描述

课程精彩瞬间

大模型核心原理与Prompt:掌握大语言模型的核心知识,了解行业应用与趋势;熟练Python编程,提升提示工程技能,为Al应用开发打下坚实基础。

在这里插入图片描述

RAG应用开发工程:掌握RAG应用开发全流程,理解前沿技术,提升商业化分析与优化能力,通过实战项目加深理解与应用。 在这里插入图片描述

Agent应用架构进阶实践:掌握大模型Agent技术的核心原理与实践应用,能够独立完成Agent系统的设计与开发,提升多智能体协同与复杂任务处理的能力,为AI产品的创新与优化提供有力支持。
在这里插入图片描述

模型微调与私有化大模型:掌握大模型微调与私有化部署技能,提升模型优化与部署能力,为大模型项目落地打下坚实基础。 在这里插入图片描述

顶尖师资,深耕AI大模型前沿技术

实战专家亲授,让你少走弯路
在这里插入图片描述

一对一学习规划,职业生涯指导

  • 真实商业项目实训
  • 大厂绿色直通车

人才库优秀学员参与真实商业项目实训

以商业交付标准作为学习标准,具备真实大模型项目实践操作经验可写入简历,支持项目背调

在这里插入图片描述
大厂绿色直通车,冲击行业高薪岗位
在这里插入图片描述

文中涉及到的完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐