【AI大模型第11集】深入了解LangChain框架,解读大模型应用底层设计
LangChain是由Lang.AI开发的开源框架,专注于基于大语言模型(LLM)的应用开发。它采用三层架构设计:架构层提供核心功能,组件层连接外部系统,部署层支持企业级应用。框架包含六大核心组件:模型交互接口、检索系统、记忆模块、链式工作流、智能体和回调机制。LangChain通过模块化设计简化了AI应用开发流程,支持快速构建智能体、问答系统等复杂功能。开发者可通过Python环境安装相关包,利
文章目录
- 一、什么是LangChain?
- 二、LangChain 核心组件概览
- 三、LangChain 环境安装
- 四、LangChain 组件之 Model I/O
- 五、LangChain 组件之 Retrieval
- 六、LangChain 组件之 Chains
- 七、LangChain 组件之 Callbacks
- 八、LangChain 组件之 Agents
- 九、LangChain 组件之 Memory
- 十、与 LlamaIndex 框架对比
- 十一、浅谈 LangChain
LangChain是由Lang.AI开发的开源框架,专注于基于大语言模型(LLM)的应用开发,提供了 Python 和 Javascript 两种语言的版本支持。由 Harrison Chase 于 2022 年10月创建,核心目标是通过模块化组件、简化与LLM的交互、数据检索及功能模块整合,扩展模型能力边界。
一、什么是LangChain?
LangChain 是一个由多个包组成的开源框架,它提供了构建基于 LLM 的 AI 应用所需的各种模块(Modules)与工具,极大的简化了AI应用的构建过程,让开发者能快速组合各种模块来实现复杂功能,如搭建智能体、问答系统、对话机器人、知识库等。
LangChain 的整个生态体系采用三层架构设计:架构层(Architecture)、组件层(Components)和部署层(Deployment),并配套有强大的开发与调试平台 LangSmith。
图中OSS表示开源,Commercial表示商业产品。
1.1. LangChain 的生态体系
- 架构层 (Architecture):这是整个生态的基础技术底座,由两个核心开源项目构成:
- LangChain:基于
langchain-core(核心抽象层)构建,提供语言模型调用、链式逻辑、记忆与工具集成等基础能力。 - LangGraph:是LangChain 的一个扩展功能,引入了有状态的图式工作流,通过将步骤建模为图中的边和节点,使用 LLM 构建健壮的有状态多代理应用程序。
- LangChain:基于
- 组件层 (Components):组件层是 LangChain 连接外部生态的桥梁,通过 Integrations 模块实现对外部系统的连接,为上层工作流提供数据和工具的统一访问接口,使 LangChain 不再局限于“纯文本生成”,而是能无缝融入现有 IT 架构。
- 部署层 (Deployment):部署层 LangChain Platform 是基于 LangGraph 的企业级SaaS化服务平台,支持企业级部署、提供可视化界面配置复杂 AI 工作流(类似低代码平台)、内置安全策略、权限管理、审计日志等企业级功能。
- LangSmith:虽然不在主架构图中,但 LangSmith 是贯穿整个生态的独立开发平台,与上述三层协同工作。可用于构建、调试、测试、评估、监控和链路追踪大模型应用程序,提供了6大功能,涉及Debugging (调试)、Playground (沙盒)、Prompt Management (提示管理)、Annotation (注释)、Testing (测试)、Monitoring (监控)等。
1.2. LangChain 核心包
- langchain-core:此包包含不同组件的基础抽象以及将它们组合在一起的方式。核心组件(如聊天模型、向量存储、工具等)的接口在此处定义。此处未定义任何第三方集成。依赖项非常轻量。
- langchain:主 langchain 包包含构成应用程序认知架构的链和检索策略。这些不是第三方集成。此处的所有链、代理和检索策略并非特定于任何一个集成,而是适用于所有集成的通用策略。
- langchain-community:此包包含由 LangChain 社区维护的第三方集成。关键集成包已分离出来(参见上文)。这包含各种组件(聊天模型、向量存储、工具等)的集成。此包中的所有依赖项都是可选的,以尽可能保持包的轻量级。
- 集成包:流行的集成有它们自己的包(例如 langchain-openai、langchain-anthropic 等),以便它们可以正确地进行版本控制并保持适当的轻量级。由 LangChain 团队和集成开发者共同维护。
- **LangGraph **:LangGraph 是 langchain 的一个扩展,提供了用于创建常见类型代理的高级接口,以及用于组合自定义流程的低级 API,旨在通过将步骤建模为图中的边和节点,使用 LLM 构建健壮的有状态多代理应用程序。
- LangServe:一个用于将 LangChain 链部署为 REST API 的包。这使得部署生产就绪的 API 变得容易。LangServe 主要设计用于部署简单的 Runnable 并与 langchain-core 中众所周知的原语协同工作。
- LangSmith:一个开发者平台,可让您调试、测试、评估和监控 LLM 应用程序。
二、LangChain 核心组件概览
LangChain 提供了一个标准模块化、可扩展的的开发框架,开发者能通过灵活集成其六大核心组件,来构建功能复杂的 LLM 应用。
- Model I/O(模型交互标准化接口):提供统一的模型交互接口,封装提示模板调用、模型推理与输出解析,实现不同大语言模型输入的标准化与输出的结构化处理。
- Retrieval(检索,RAG核心):从大量的文档或数据源中查找相关信息的核心模块,它是构建RAG(检索增强生成)的基础。
- Memory(记忆):用于保存历史对话和上下文信息,以便在后续对话中使用,从而实现有状态的对话。
- Chains(链):用于链式工作流编排,将多个模块串联起来组成一个完整的流程,例如,一个 Chain 可能包括一个 Prompt 模板、一个大模型和一个输出解析器,它们协同工作,处理用户输入并返回结果。
- Agents(智能体):自主决策并调用工具,赋予大模型行动能力。
- Callbacks(回调):回调机制,允许连接到 LLM 应用程序的各个阶段,可以监控和分析LangChain的运行情况,比如日志记录、监控、流传输等,以优化性能。
三、LangChain 环境安装
因本文是基于 Python 的开发,故需要安装 Python 运行环境,及 LangChain 相关包。
以下为本文涉及使用的 LangChain 功能相关包安装:
pip install langchain langgraph langchain_openai langchain_community langchain-text-splitters pymupdf feedparser newspaper3k listparser
四、LangChain 组件之 Model I/O
Model I/O 是应用程序与大模型进行交互的组件,它与大模型的关系类似于JDBC与数据库的关系,本质上都是为了解耦应用逻辑与底层实现,提供统一标准化的交互接口,使应用程序无需关注大模型底层的实现,可与各种大模型进行交互。
Model I/O 组件封装主要包括三大块:输入提示(Format)、调用模型(Predict)、输出解析(Parse)。
1. 模型的调用
1.1. 语言模型类型
LangChain为两种类型的模型提供接口和集成,即 LLMs 和 ChatModels 。
1.1.1. LLMs 模型
LLMs 模型是将文本字符串作为输入并返回文本字符串的模型。它是 LangChain 的核心组件,LangChain 不提供自己的 LLM,而是提供了一个标准接口,用于与许多不同的 LLMs 进行交互。
from langchain_openai import ChatOpenAI
import os
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
response = llm.invoke("你是谁?")
print(response.content)
输出结果:
我是通义千问(Qwen),由阿里云研发的超大规模语言模型。我能够回答问题、创作文字,比如写故事、写公文、写邮件、写剧本、逻辑推理、编程等等,还能表达观点,玩游戏等。如果你有任何问题或需要帮助,尽管告诉我,我会尽力提供支持!
1.1.2. ChatModels 对话模型
对话模型(ChatModels )是语言模型的变体,ChatModels的输入和输出都是结构化的消息对象,能更好地理解和维护对话的上下文。Message(消息)是对话模型中通信的基本单位,用于表示与模型通讯的输入与输出,以及包含与对话相关的上下文信息和元数据,每一条消息都包含一个角色(系统、用户、AI等)和内容(用户的输入或模型的输出)以及其他元信息(id、名称、令牌使用情况等)。
LangChain提供了一个统一的Message(消息)格式,可以在不同模型之间使用,目前主要有以下四种消息类型:
- System Message(系统消息):为AI设定行为规范,用于定义大模型的角色、运行规则、环境信息等。如果模型不支持System Message则 LangChain 会将消息合并到 Human Message 中一起发送给大模型。这个消息相当于OpenAI接口中的
system role。 - Human Message(用户消息):代表用户的输入内容,用户向模型发出的提问或指令。这个消息相当于OpenAI接口中的
user role。 - AI Message(大模型消息):大模型的输出,这是大模型对 Human Message 和 System Message 的响应,不仅包含生成的文本内容(content属性),还可能包含以下结构化信息(外部工具、调用令牌使用情况等元数据)。这个消息相当于OpenAI接口中的
assistant role。 - Tool Message(工具调用消息):AI调用外部工具后返回的查询结果,提供连接外部能力,向模型传递外部工具或函数调用的执行结果,常用于Agent调用tool。
from langchain_openai import ChatOpenAI
import os
from langchain.messages import SystemMessage, HumanMessage, AIMessage
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
messages = [
SystemMessage(content="你是一个会哄人的聊天机器人。"),
HumanMessage(content="我是东方求败"),
AIMessage(content="你好,我在等你说话"),
HumanMessage(content="我是谁,你只要回答名字就可以。")
]
response = llm.invoke(messages)
print(response.content)
输出结果:
东方求败
1.2. 模型的调用方式
LangChain中的 invoke()、stream() 和 batch() 是模型调用的三大核心同步方法,围绕单次处理、流式交互、批量效率三大场景设计。同时也提供了类似的异步调用的方法 ainvoke()、astream()、abatch() ,在服务端高并发场景(如Web API)中,通过 asyncio 实现非阻塞调用,提升资源利用率。‘’
1.2.1. invoke()
阻塞式调用方法,适用于需完整输出的单次任务。例如问答、摘要生成、代码补全等场景,要求模型一次性返回全部结果。
from langchain_openai import ChatOpenAI
import os
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
# 输入参数:str 或 List[BaseMessage]
response = llm.invoke("请简单介绍秦始皇的一生。")
print(response.content)
输出结果:
秦始皇(公元前259年-公元前210年),姓嬴名政。。。
1.2.2. stream()
流式响应,逐token实时返回内容,提升交互体验。适用于聊天机器人、实时内容生成、长文本生成等需要即时反馈的场景。
from langchain_openai import ChatOpenAI
import os
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
# 流式调用
def stream_with_progress(prompt):
print("开始流式生成...\n", end="", flush=True)
full_response = ""
for chunk in llm.stream(prompt):
print(chunk.content, end="", flush=True)
full_response += chunk.content
print("\n流式生成完成...")
return full_response
# 输入参数:str 或 List[BaseMessage]
response = stream_with_progress("请简单介绍秦始皇的一生。")
输出结果:
秦始皇(公元前259年-公元前210年),姓嬴名政。。。
1.2.3. batch()
批量高效处理多个输入,通过并行或优化调度提升吞吐量。适用于文档批量摘要、情感分析、大规模数据标注等场景。
from langchain_openai import ChatOpenAI
import os
from langchain.messages import HumanMessage, AIMessage
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
# 创建三个消息
message1 = [
HumanMessage(content="中国的首都是哪里?"),
AIMessage(content="请用一句话精简的回答问题,要求答案精简准确。")
]
message2 = [
HumanMessage(content="广东省的省会城市是哪里?"),
AIMessage(content="请用一句话精简的回答问题,要求答案精简准确。")
]
message3 = [
HumanMessage(content="深圳市一共有几个行政区,分别是哪些?"),
AIMessage(content="请用一句话精简的回答问题,要求答案精简准确。")
]
messages = [message1, message2, message3]
def batch_example(prompts):
print(f"开始执行批量调用:{prompts}")
response = llm.batch(prompts)
print(f"结束执行批量调用:{prompts}")
return response
# 输入参数:List[str] 或 List[List[BaseMessage]]
responses = batch_example(messages)
for i, response in enumerate(responses):
print(f"问题{i+1}: {response.content}")
输出结果:
开始执行批量调用:[[HumanMessage(content='中国的首都是哪里?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])], [HumanMessage(content='广东省的省会城市是哪里?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])], [HumanMessage(content='深圳市一共有几个行政区,分别是哪些?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])]]
结束执行批量调用:[[HumanMessage(content='中国的首都是哪里?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])], [HumanMessage(content='广东省的省会城市是哪里?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])], [HumanMessage(content='深圳市一共有几个行政区,分别是哪些?', additional_kwargs={}, response_metadata={}), AIMessage(content='请用一句话精简的回答问题,要求答案精简准确。', additional_kwargs={}, response_metadata={}, tool_calls=[], invalid_tool_calls=[])]]
问题1: 中国的首都是北京。
问题2: 广东省的省会是广州。
问题3: 深圳市下辖9个行政区:福田、罗湖、南山、盐田、宝安、龙岗、龙华、坪山、光明。
1.2.4. ainvoke()、astream()、abatch()
LangChain还提供了与同步方法对应的异步调用方法:ainvoke()、astream()、abatch(),在服务端高并发场景(如Web API)中,通过asyncio实现非阻塞调用,提升资源利用率。
import asyncio
from langchain_openai import ChatOpenAI
import os
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
# 异步调用函数
async def ainvoke_example(prompts):
print(f"启动异步单次调用:{prompts}")
response = await llm.ainvoke(prompts)
print(f"结束异步单次调用:{prompts}")
return response
tasks=[
ainvoke_example(prompts="中国首都是哪里?请用一句话精简的回答问题,要求答案精简准确。"),
ainvoke_example(prompts="广东省的省会城市是哪里?请用一句话精简的回答问题,要求答案精简准确。"),
ainvoke_example(prompts="深圳市一共有几个行政区,分别是哪些?请用一句话精简的回答问题,要求答案精简准确。")
]
# 直接调用异步函数
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"问题{i+1}: {response.content}")
输出结果:
启动异步单次调用:中国首都是哪里?请用一句话精简的回答问题,要求答案精简准确。
启动异步单次调用:广东省的省会城市是哪里?请用一句话精简的回答问题,要求答案精简准确。
启动异步单次调用:深圳市一共有几个行政区,分别是哪些?请用一句话精简的回答问题,要求答案精简准确。
结束异步单次调用:中国首都是哪里?请用一句话精简的回答问题,要求答案精简准确。
结束异步单次调用:广东省的省会城市是哪里?请用一句话精简的回答问题,要求答案精简准确。
结束异步单次调用:深圳市一共有几个行政区,分别是哪些?请用一句话精简的回答问题,要求答案精简准确。
问题1: 中国首都是北京。
问题2: 广东省的省会是广州。
问题3: 深圳市共10个行政区,分别为福田、罗湖、南山、盐田、宝安、龙岗、龙华、坪山、光明、大鹏。
2. 模型的输入
2.1. 提示词(Prompt)
提示词是与大模型交互时输入的内容,用来指导大模型生成特定类型的回答或执行特定的任务。它可以是一个简单的问题、一段详细的任务说明,或包含角色、背景、示例的复杂文本。其核心目的是约束行为,减少错误,引导模型生成用户期望的响应。好的提示词可以解决60%以上的模型幻觉问题,优化提示词也是我们开发中解决大模型幻觉最多且最有效的方式之一。
比如我们要定义一个Java 单测生成助手的提示词可以如下:
"""
您是一名资深 Java 单元测试专家,具备以下专业背景:
- 精通 Java 8+ 语言特性
- 熟练掌握 JUnit 5、Mockito、AssertJ 等主流测试框架
- 熟悉 Spring / Spring Boot 项目的单元测试与切片测试
- 擅长 Mock 外部依赖(数据库、HTTP 调用、消息队列等)
- 熟悉 TDD 实践、覆盖率优化、CI 集成(JaCoCo 等)
- 具备企业级项目测试体系建设经验
请根据用户提供的 Java 代码或需求,输出以下内容:
1. 单元测试设计思路
- 被测试类的职责分析
- 关键业务路径识别
- 边界条件与异常分支拆解
- 是否需要 Mock 依赖(说明原因)
2. 完整可运行的测试代码
- 使用 JUnit 5
- 合理使用 Mockito 进行依赖隔离
- 必要时使用 @ExtendWith(MockitoExtension.class)
- 测试方法命名清晰(given_when_then 或 should_xxx)
- 覆盖正常流程 + 异常流程
- 使用 AssertJ 或标准断言进行清晰断言
3. 覆盖率优化建议
- 是否遗漏分支判断
- 是否存在未覆盖异常路径
- 如何提升分支覆盖率(而非仅行覆盖率)
4. 可测试性改进建议
- 是否存在强耦合(new 对象、静态方法等)
- 是否建议使用依赖注入
- 是否建议拆分方法或重构逻辑
- 是否存在隐藏副作用
5. 高级补充(如适用)
- SpringBootTest vs 单元测试的选择建议
- 并发测试建议
- 参数化测试建议
- 测试数据构造最佳实践
输出要求:
- 代码必须完整、可直接运行
- 不要省略 import
- 保持代码结构清晰
- 如有假设,请明确说明
用户输入:
{input}
"""
2.2. 提示词模板(Prompt Template)
固定的提示词限制了模型的灵活性和适用范围,所以 Prompt Template 是一个模板化的字符串,可以将变量(如用户提问等)插入到模板的占位符中,从而创建出不同的提示。
LangChain提供了很多提示词模版,用于与大模型交互,常见的提示词模版如下:
2.2.1. 基础文本提示词模板(PromptTemplate)
最基础、最核心的模板,用于生成字符串提示,生成的是单一、无角色区分的纯文本字符串,适用于"单轮文本生成"场景。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
prompt_template = PromptTemplate.from_template("你是一个文学专家,能根据历史文人的笔名或外号猜出人名。请你准确这个人的姓名:{name},你只要回答结果就可以。")
messages = prompt_template.format(name='诗仙')
response = llm.invoke(messages)
print(response.content)
输出结果:
李白
2.2.2. 聊天模型提示词模板(ChatPromptTemplate)
创建聊天消息列表,支持System/AI/Human等角色,处理多角色、多轮次对话场景的高级模板。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import ChatPromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个文学专家,你的名字叫{name}"),
("human", "我需要你的帮助。"),
("ai", "请说出你的问题吧,我可以回答你的任何问题。"),
("human", "帮我回答这个问题:{question}。只要回答结果可以。")
])
messages = prompt_template.format_messages(name='万事通', question='你是谁?诗仙又是谁?')
response = llm.invoke(messages)
print(response.content)
输出结果:
我是万事通,诗仙是李白。
2.2.3. 使用消息对象的模型提示词模板(XxxMessage)
在 ChatPromptTemplate 中使用消息对象 SystemMessage, HumanMessage, AIMessage 等进行封装处理。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import ChatPromptTemplate
from langchain.messages import SystemMessage, HumanMessage, AIMessage
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
input = '帮我回答这个问题:你是谁?诗仙又是谁?。只要回答结果可以。'
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage('你是一个文学专家,你的名字叫万事通'),
HumanMessage('我需要你的帮助。'),
AIMessage('请说出你的问题吧,我可以回答你的任何问题。'),
HumanMessage(content=input)
])
messages = prompt_template.format_messages()
response = llm.invoke(messages)
print(response.content)
输出结果:
我是万事通,诗仙是李白。
2.2.4. 特定角色的消息模型提示词模板(XxxMessagePromptTemplate)
在 ChatPromptTemplate 中使用特定角色的模型消息提示模板包括:SystemMessagePromptTemplate、HumanMessagePromptTemplate、AIMessagePromptTemplate、ChatMessagePromptTemplate等进行封装处理。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, AIMessagePromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
prompt_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template('你是一个文学专家,你的名字叫{name}'),
HumanMessagePromptTemplate.from_template('我需要你的帮助。'),
AIMessagePromptTemplate.from_template('请说出你的问题吧,我可以回答你的任何问题。'),
HumanMessagePromptTemplate.from_template('帮我回答这个问题:{question}。只要回答结果可以。')
])
messages = prompt_template.format_messages(name='万事通', question='你是谁?诗仙又是谁?')
response = llm.invoke(messages)
print(response.content)
输出结果:
我是万事通,诗仙是李白。
2.2.5. 多轮对话动态消息模型提示词模板(MessagePlaceholder)
在 ChatPromptTemplate 中使用 MessagePlaceholder 插入动态消息列表,将消息列表直接扩展为结构化的消息对象,而非字符串。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, AIMessagePromptTemplate
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.messages import SystemMessage, HumanMessage, AIMessage
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
history = ChatMessageHistory()
# 添加用户内容
history.add_user_message('我需要你的帮助。')
# 添加模型回答内容
history.add_ai_message('请说出你的问题吧,我可以回答你的任何问题。')
# 限制只保留最近的5条对话(注意:这里不能覆盖 history 变量)
recent_history = history.messages[-5:]
# 此处可以是思考的过程,查询知识库或外部借口的返回结果
queryData = [SystemMessage(content='查询知识库返回结果:诗仙是人们对李白的尊称。')]
prompt_template = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template('你是一个文学专家,你的名字叫{name}'),
MessagesPlaceholder(variable_name='history'),
MessagesPlaceholder(variable_name='query'), # 可以添加多个,拿到思考过程结果
HumanMessagePromptTemplate.from_template('帮我回答这个问题:{question}。只要回答结果可以。')
])
messages = prompt_template.invoke({
'name': '万事通',
'history': recent_history,
'query': queryData,
'question': '你是谁?诗仙又是谁?'
})
response = llm.invoke(messages)
print(response.content)
输出结果:
我是万事通,诗仙是李白。
2.2.6. 少量示例提示词模板(FewShotPromptTemplate)
FewShotPromptTemplate 是指通过提供少量的样例作为参考来引导大模型按照特定的格式和风格输出。
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, FewShotPromptTemplate
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, AIMessagePromptTemplate
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.messages import SystemMessage, HumanMessage, AIMessage
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
examples_data=[
{
'question': '诗神是谁?',
'answer': '诗神是苏轼,字子瞻,又字和仲,号东坡居士,又号铁冠道人。'
},
{
'question': '诗圣是谁?',
'answer': '诗圣是杜甫,字子美,号少陵野老。'
},
{
'question': '诗魔是谁?',
'answer': '诗魔是白居易,字乐天,号香山居士,又号醉吟先生。'
}
]
example_template = PromptTemplate(
input_variables=['question', 'answer'],
template='问题:{question}\n答案:{answer}'
)
prompt_template = FewShotPromptTemplate(
examples=examples_data,
example_prompt=example_template,
prefix='请根据示例回答问题:',
suffix='问题:{question}',
input_variables=['question']
)
messages = prompt_template.format(question='诗仙是谁?')
response = llm.invoke(messages)
print(response.content)
输出结果:
诗仙是李白,字太白,号青莲居士,又号谪仙人。
3. 模型的输出
大模型通常返回的内容都是字符串格式,但是我们实际开发中更擅长处理结构化数据,输出解析器(OutputParser)就是将大模型输出的结果转换成特定的结构化数据,以便应用程序能更方便的处理。LangChain提供了一些常见的输出解析器,这里我们挑几个比较常见的说一下:
3.1. StrOutputParser(字符串输出解析器)
StrOutputParser 最基础的输出解析器,将模型输出作为纯字符串返回。专为不需要结构化数据的简单文本输出场景设计,提供快速、无需配置的字符串解析能力,是处理基础文本生成任务的理想选择。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
parser = StrOutputParser()
prompt_template = PromptTemplate.from_template('请用一句话描述LangChain的核心价值:{description}')
messages = prompt_template.format(description='一个用于构建大语言模型应用的框架')
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:', response.content)
print('解析后输出:', result)
print('类型:', type(result))
输出结果:
模型原始输出: LangChain的核心价值在于提供模块化工具链,简化大语言模型应用的开发流程,使开发者能快速构建、集成和部署基于LLM的智能应用。
解析后输出: LangChain的核心价值在于提供模块化工具链,简化大语言模型应用的开发流程,使开发者能快速构建、集成和部署基于LLM的智能应用。
类型: <class 'str'>
3.2. PydanticOutputParser(Pydantic结构化解析器)
PydanticOutputParser 是基于Pydantic模型的结构化解析器,将模型输出转换为Python对象。专为需要强类型结构化数据的场景设计,提供类型安全、可验证的输出,是构建专业级AI应用的首选解析器。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts import PromptTemplate
from pydantic import BaseModel, Field
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
class Summary(BaseModel):
summary: str = Field(description="文本的简短摘要")
keywords: list[str] = Field(description="文本中的关键点")
sentiment: str = Field(description="文本的情感倾向(正面/负面/中性)")
parser = PydanticOutputParser(pydantic_object=Summary)
format_instructions = parser.get_format_instructions()
prompt_template = PromptTemplate(
template="请总结以下文本,按照指定格式输出:\n{format_instructions}\n文本:{text}",
input_variables=["text"],
partial_variables={"format_instructions": format_instructions}
)
text = "LangChain是一个强大的框架,用于构建基于大语言模型的应用。"
messages = prompt_template.format(text=text)
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:\n', response.content)
print('\n解析后输出:')
print(f'摘要: {result.summary}')
print(f'关键词: {result.keywords}')
print(f'情感倾向: {result.sentiment}')
print('\n类型:', type(result))
输出结果:
模型原始输出:
{
"summary": "LangChain是一个用于构建基于大语言模型应用的强大框架。",
"keywords": ["LangChain", "大语言模型", "应用框架"],
"sentiment": "正面"
}
解析后输出:
摘要: LangChain是一个用于构建基于大语言模型应用的强大框架。
关键词: ['LangChain', '大语言模型', '应用框架']
情感倾向: 正面
类型: <class '__main__.Summary'>
3.3. JsonOutputParser(JSON格式解析器)
JsonOutputParser专为将JSON格式字符串转换为Python字典设计,提供标准化、易用的JSON数据处理能力,是处理API响应和数据交换的标准选择。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
parser = JsonOutputParser()
format_instructions = parser.get_format_instructions()
prompt_template = PromptTemplate.from_template("请生成一个JSON对象,包含姓名、年龄和城市信息:{format_instructions}")
messages = prompt_template.format(format_instructions=format_instructions)
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:\n', response.content)
print('解析后输出:', result)
print('类型:', type(result))
输出结果:
模型原始输出:
{
"姓名": "张三",
"年龄": 28,
"城市": "北京"
}
解析后输出: {'姓名': '张三', '年龄': 28, '城市': '北京'}
类型: <class 'dict'>
3.4. XMLOutputParser(XML格式解析器)
XMLOutputParser专为将XML格式字符串转换为Python字典设计,提供与企业级系统无缝集成的XML数据处理能力,是处理传统企业系统数据的理想选择。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import XMLOutputParser
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
parser = XMLOutputParser()
format_instructions = parser.get_format_instructions()
prompt_template = PromptTemplate.from_template("请生成一个XML格式的用户对象,包含姓名、年龄和城市信息,要给随机默认值:{format_instructions}")
messages = prompt_template.format(format_instructions=format_instructions)
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:\n', response.content)
print('解析后输出:', result)
print('类型:', type(result))
输出结果:
模型原始输出:
<user>
<name>John Doe</name>
<age>28</age>
<city>New York</city>
</user>
解析后输出: {'user': [{'name': 'John Doe'}, {'age': '28'}, {'city': 'New York'}]}
类型: <class 'dict'>
3.5. CommaSeparatedListOutputParser(逗号分隔列表解析器)
CommaSeparatedListOutputParser专为处理逗号分隔列表数据而设计,提供简单高效的方式将模型输出转换为Python列表,是提取关键词、标签等常见场景的理想选择。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import CommaSeparatedListOutputParser
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
parser = CommaSeparatedListOutputParser()
format_instructions = parser.get_format_instructions()
prompt_template = PromptTemplate.from_template("列出3个与LangChain相关的关键词,使用小写逗号分隔:{text}")
text = "LangChain是一个强大的框架,用于构建基于大语言模型的应用。"
messages = prompt_template.format(text=text)
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:', response.content)
print('解析后输出:', result)
print('类型:', type(result))
输出结果:
模型原始输出: langchain,大语言模型,应用框架
解析后输出: ['langchain', '大语言模型', '应用框架']
类型: <class 'list'>
3.6. ListOutputParser(列表解析器)
ListOutputParser专为将模型输出解析为列表设计,提供直接返回列表的能力,便于处理多个项目,是提取关键词、列表数据等场景的理想选择。
from langchain_openai import ChatOpenAI
import os
from langchain_core.output_parsers import ListOutputParser
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
# 因为ListOutputParser是抽象类,需要自定义解析器
class CustomListOutputParser(ListOutputParser):
def parse(self, text: str) -> list:
text = text.strip()
if not text:
return []
items = [item.strip() for item in text.split('、') if item.strip()]
return items
# 初始化解析器和提示模板
parser = CustomListOutputParser()
prompt_template = PromptTemplate.from_template(
"列出3个与LangChain相关的关键词,使用中文顿号“、”分隔,例如:关键词1、关键词2、关键词3。文本:{text}"
)
text = "LangChain是一个强大的框架,用于构建基于大语言模型的应用。"
messages = prompt_template.format(text=text)
response = llm.invoke(messages)
result = parser.parse(response.content)
print('模型原始输出:', response.content)
print('解析后输出:', result)
print('类型:', type(result))
输出结果:
模型原始输出: 大语言模型、应用构建、框架
解析后输出: ['大语言模型', '应用构建', '框架']
类型: <class 'list'>
五、LangChain 组件之 Retrieval
Retrieval是用于从大量的文档或数据源中查找相关信息的核心模块,它是构建RAG(检索增强生成)的基础,能够让大模型访问外部知识源,解决模型本身的知识局限性和时效性问题。
LangChain 为 RAG 应用程序提供了所有集成组件,其中关键组件包括:文档加载器(Document Loaders)、文档转换器(Document Transformers)、文本嵌入模型(Text Embedding Models)、向量存储(Vector Stores)、检索器(Retrievers)。
1. 文档加载器(Document Loaders)
文档加载器是LangChain中用于从各种数据源(本地文件、网页、API、数据库等)加载原始文档并转换为标准化Document对象的工具,确保了无论数据来源如何,都能以一致的方式处理数据。
每个Document 对象都包含两部分:
- page_content:文档的文本内容(字符串)。
- metadata:元数据(字典,包含来源、文件名、时间戳等信息)。
1.1. 基础 API 接口
所有文档加载器都实现了 BaseLoader 接口,每个文档加载器都可以定义自己的参数,但它们共享一个通用的 API:
- load():一次性加载所有文档。
- lazy_load():延迟加载文档流,适用于大型数据集。
from langchain_community.document_loaders import PyMuPDFLoader
loader = PyMuPDFLoader("./test_chroma.pdf")
# 一次性加载全部内容
documents = loader.load()
print("开始一次性加载:")
print(documents)
print("\n开始懒加载:")
for document in loader.lazy_load():
print("懒加载分批次内容:")
print(document)
1.2. 文档加载器分类
LangChain 支持的全部文档加载器可查看官方文档:https://docs.langchain.com/oss/python/integrations/document_loaders
LangChain 支持的文档加载器按照数据源类型进行分类,主要分为以下5大类:
1.2.1. 本地文件加载器(Local File Loaders)
从本地文件系统加载各种格式的文档。
常见加载器及适用场景
| 加载器 | 适用格式 | 优势 | 适用场景 |
|---|---|---|---|
TextLoader |
.txt |
简单快速 | 纯文本文件 |
PyMuPDFLoader |
.pdf |
PDF解析 | 常规PDF文档 |
UnstructuredFileLoader |
多种格式 | 支持最广 | 复杂文档(PDF、Word、HTML等) |
DirectoryLoader |
任意格式 | 批量加载 | 目录中所有文件 |
CSVLoader |
.csv |
结构化数据 | CSV表格数据 |
JSONLoader |
.json |
结构化数据 | JSON格式数据 |
ExcelLoader |
.xlsx |
表格数据 | Excel电子表格 |
代码示例
from langchain_community.document_loaders import (
TextLoader,
PyMuPDFLoader,
UnstructuredFileLoader,
DirectoryLoader,
CSVLoader,
JSONLoader
)
# 1. 加载单个文本文件
text_loader = TextLoader("data/sample.txt")
text_docs = text_loader.load()
# 2. 加载PDF文件
pdf_loader = PyMuPDFLoader("data/sample.pdf")
pdf_docs = pdf_loader.load()
# 3. 加载多种格式的文件(使用Unstructured)
unstructured_loader = UnstructuredFileLoader("data/sample.docx")
unstructured_docs = unstructured_loader.load()
# 4. 批量加载目录中的所有文件
directory_loader = DirectoryLoader(
"data/",
glob="**/*",
show_progress=True,
loader_cls=UnstructuredFileLoader
)
all_docs = directory_loader.load()
# 5. 加载CSV文件
csv_loader = CSVLoader(file_path="data/data.csv")
csv_docs = csv_loader.load()
# 6. 加载JSON文件
json_loader = JSONLoader(
file_path="data/data.json",
jq_schema=".[]",
content_key="text"
)
json_docs = json_loader.load()
1.2.2. 网络内容加载器(Web Content Loaders)
从网络获取内容,如网页、博客等。
常见加载器及适用场景
| 加载器 | 适用场景 | 优势 | 限制 |
|---|---|---|---|
WebBaseLoader |
单个网页 | 简单高效 | 可能受反爬虫机制影响 |
SitemapLoader |
网站sitemap | 批量获取网站内容 | 依赖网站sitemap |
RSSFeedLoader |
RSS订阅源 | 获取新闻/博客更新 | 依赖RSS源存在 |
GmailLoader |
Gmail邮件 | 获取邮件内容 | 需要Gmail API权限 |
代码示例
from langchain_community.document_loaders import WebBaseLoader, SitemapLoader, RSSFeedLoader
# 1. 加载单个网页
web_loader = WebBaseLoader("https://www.example.com")
web_docs = web_loader.load()
# 2. 加载多个网页
urls = [
"https://www.example.com/page1",
"https://www.example.com/page2"
]
web_loader = WebBaseLoader(urls)
web_docs = web_loader.load()
# 3. 从sitemap加载网站内容
sitemap_loader = SitemapLoader("https://www.example.com/sitemap.xml")
sitemap_docs = sitemap_loader.load()
# 4. 加载RSS订阅源
rss_loader = RSSFeedLoader("https://www.example.com/rss")
rss_docs = rss_loader.load()
1.2.3. 数据库加载器(Database Loaders)
从数据库中加载结构化数据。
常见加载器及适用场景
| 加载器 | 数据库类型 | 优势 | 适用场景 |
|---|---|---|---|
SQLDatabaseLoader |
SQL数据库 | 通用SQL支持 | MySQL, PostgreSQL等 |
MongoDBLoader |
MongoDB | 无SQL依赖 | MongoDB文档数据库 |
PostgresLoader |
PostgreSQL | 优化PostgreSQL | PostgreSQL专用 |
代码示例
from langchain_community.document_loaders import SQLDatabaseLoader, MongodbLoader
# 1. 从SQL数据库加载
sql_loader = SQLDatabaseLoader(
"sqlite:///data.db",
query="SELECT * FROM documents"
)
sql_docs = sql_loader.load()
# 2. 从MongoDB加载(需要pymongo库)
mongo_loader = MongodbLoader(
connection_string="mongodb://localhost:27017/",
db_name="sample_restaurants",
collection_name="restaurants",
filter_criteria={"borough": "Bronx", "cuisine": "Bakery"},
field_names=["name", "address"],
)
mongo_docs = mongo_loader.load()
1.2.4. API内容加载器(API Content Loaders)
通过API获取内容,如GitHub、华为云等。
常见加载器及适用场景
| 加载器 | API服务 | 优势 | 适用场景 |
|---|---|---|---|
GithubFileLoader |
GitHub | 获取代码仓库 | 代码文档、README |
TelegramChatApiLoader |
Telegram | 获取Telegram数据 | Telegram信息 |
OBSFileLoader |
HuaWei Cloud | 获取HuaWei Cloud文件 | 云存储文档 |
代码示例
from langchain_community.document_loaders import GithubFileLoader, TelegramChatApiLoader, OBSFileLoader
from obs import ObsClient
# 1. 从GitHub加载代码仓库
github_loader = GithubFileLoader(
repo="langchain-ai/langchain", # the repo name
branch="master", # the branch name
access_token="your ACCESS_TOKEN",
github_api_url="https://api.github.com",
file_filter=lambda file_path: file_path.endswith(
".md"
), # load all markdowns files.
)
github_docs = github_loader.load()
# 2.通过 Telegram API获取
Telegram_loader = TelegramChatApiLoader(
chat_entity="<CHAT_URL>", # recommended to use Entity here
api_hash="<API HASH >",
api_id="<API_ID>",
username="", # needed only for caching the session.
)
Telegram_docs = Telegram_loader.load()
# 3. 从华为云获取文件
obs_client = ObsClient(
access_key_id="your-access-key",
secret_access_key="your-secret-key",
server="your-endpoint"
)
obs_loader = OBSFileLoader("your-bucket-name", "your-object-key", client=obs_client)
obs_docs = obs_loader.load()
1.2.5. 特殊格式加载器(Special Format Loaders)
处理特定格式的文件,如Markdown、HTML等。
常见加载器及适用场景
| 加载器 | 适用格式 | 优势 | 适用场景 |
|---|---|---|---|
UnstructuredMarkdownLoader |
.md |
Markdown解析 | Markdown文档 |
BSHTMLLoader |
.html |
HTML解析 | 网页内容 |
PDFPlumberLoader |
.pdf |
高级PDF解析 | 需要精确文本提取的PDF |
代码示例
from langchain_community.document_loaders import UnstructuredMarkdownLoader, BSHTMLLoader, PDFPlumberLoader
# 1. 加载Markdown文件
markdown_loader = UnstructuredMarkdownLoader(
file_path="./example_data/example.md",
mode="single",
strategy="fast",
)
markdown_docs = markdown_loader.load()
# 2. 加载HTML文件
html_loader = BSHTMLLoader(
file_path="./example_data/fake-content.html",
)
html_docs = html_loader.load()
# 3. 高级PDF解析(需要pdfplumber库)
pdf_plumber_loader = PDFPlumberLoader("data/sample.pdf")
pdf_plumber_docs = pdf_plumber_loader.load()
2. 文档转换器(Document Transformers)
文档转换器是LangChain中连接文档加载器与下游任务的"质量转换层",通过标准化处理文档,确保RAG应用的输入数据语义连贯、结构清晰、元数据丰富。它们是RAG工作流中不可或缺的预处理环节,直接影响检索准确率和生成质量。
文档转换器提供了一致的接口来操作文档,主要包括以下几类:
2.1. 文本拆分器(Text Splitters)
文本拆分器是RAG(Retrieval-Augmented Generation)工作流中最关键的基础组件,负责将长文档分割成适合嵌入模型处理的语义单元。在RAG应用中,错误的文本拆分会导致信息丢失、语义断裂和检索失败,直接影响整个系统的性能。
常用的文本拆分器及适用场景:
| 拆分器名称 | 核心特点 | 适用场景 |
|---|---|---|
| RecursiveCharacterTextSplitter | 递归分割,动态调整分隔符,避免词语断裂 | 代码处理、中文长文本、混合内容(如代码+诗词) |
| MarkdownTextSplitter | 基于Markdown结构(标题/列表/代码块)分割,保持语义完整性 | Markdown文档、技术文档、学术文献(含LaTeX) |
| SpacyTextSplitter | 基于spaCy的复杂句子分词,支持多语言实体识别 | RAG系统、对话系统、英文/多语言文本处理 |
| NLTKTextSplitter | 基于NLTK的句子分词,兼容LangChain/LlamaIndex | 英文文本分割、句子级Node构建 |
| TokenTextSplitter | 按token数量分割,适配大语言模型token限制 | 嵌入模型(如Sentence-BERT)、长文本控制 |
| SentenceTransformersTokenTextSplitter | 专为句子嵌入模型设计,确保token数符合模型窗口 | 语义一致文本块生成、跨块截断避免 |
| CharacterTextSplitter | 固定字符数或简单分隔符(如换行/句号)分割 | 简单文本(TXT/日志)、快速原型验证 |
| KonlpyTextSplitter | 韩语形态分析,基于Konlpy库 | 韩语文本处理 |
| JSFrameworkTextSplitter | 针对JavaScript框架的结构化分割 | JS代码/框架文档处理 |
| LatexTextSplitter | 保留LaTeX公式和章节结构 | LaTeX文档处理 |
代码示例:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
# 创建分割器(中文优化配置)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=50,
chunk_overlap=5,
length_function=len,
separators=[
"\n\n", # 优先按段落分割
"\n", # 其次按行分割
"。", # 按中文句号分割
"!", # 按感叹号分割
"?", # 按问号分割
" ", # 最后按空格分割
""
]
)
# 示例文档
document = Document(
page_content="LangChain是一个用于构建大语言模型应用的框架。"
"它的核心价值是提供一个统一的接口来集成各种大语言模型。"
"LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。"
"用户可以通过LangChain轻松构建复杂的LLM应用,如聊天机器人和内容生成器。",
metadata={"source": "langchain_guide.txt"}
)
# 分割文档
split_documents = text_splitter.split_documents([document])
# 打印结果
for i, doc in enumerate(split_documents):
print(f"分割块 {i+1} (长度: {len(doc.page_content)}):")
print(doc.page_content)
print("-" * 50)
输出结果:
分割块 1 (长度: 26):
LangChain是一个用于构建大语言模型应用的框架
--------------------------------------------------
分割块 2 (长度: 27):
。它的核心价值是提供一个统一的接口来集成各种大语言模型
--------------------------------------------------
分割块 3 (长度: 48):
。LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic
--------------------------------------------------
分割块 4 (长度: 42):
。用户可以通过LangChain轻松构建复杂的LLM应用,如聊天机器人和内容生成器。
--------------------------------------------------
2.2. 冗余过滤器(Redundancy Filters)
冗余过滤器是LangChain检索流程中提升检索结果质量的关键组件,用于过滤检索到的文档中的重复内容,确保返回的文档具有多样性和高信息价值。在RAG应用中,高质量的冗余过滤能将检索准确率提升20%-30%。
常用的冗余过滤器及适用场景:
| 冗余过滤器名称 | 核心特点 | 适用场景 |
|---|---|---|
| EmbeddingsRedundantFilter | 基于语义相似性,通过嵌入向量计算文档相似度,移除内容高度相似的文档 | 优化RAG或语义搜索的文档集,减少重复信息,提升检索效率 |
| LLMChainExtractor | 使用LLM分析文档内容,提取与查询相关的关键片段 | 深度语义理解的复杂查询,如技术文档、学术文献的精准提取 |
| EmbeddingsFilter | 基于嵌入相似度快速过滤文档,处理速度快 | 大规模文档的快速过滤,如网页内容、批量导入数据的初步筛选 |
| LLMChainFilter | 仅过滤无关文档,不修改文档内容,处理速度较快 | 需保留完整上下文的场景,如法律文书、合同条款的初步筛选 |
| LLMListwiseRerank | 使用零样本重排优化结果顺序,提升结果相关性 | 精细化排序,如问答系统、语义搜索的最终结果优化 |
| ContextualCompressionRetriever | 结合基础检索器和文档压缩器,先过滤冗余文档,再精细排序 | 提升检索结果质量,如企业知识库、FAQ系统的精准检索 |
| BM25Retriever | 基于关键词匹配,计算成本低,效果依赖字面匹配 | 传统搜索场景,如文档管理系统、内部知识库的快速检索 |
| TFIDFRetriever | 基于TF-IDF算法,适合关键词匹配,性能优于BM25在某些场景 | 文本搜索、关键词提取,如新闻聚合、内容推荐系统 |
代码示例:
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.document_transformers import EmbeddingsRedundantFilter
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI
import os
# 设置OpenAI API密钥(实际使用时请替换为您的API密钥)
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 1. 创建示例文档(模拟多源文档)
documents = [
Document(
page_content="LangChain是一个用于构建大语言模型应用的框架。"
"它的核心价值是提供一个统一的接口来集成各种大语言模型。",
metadata={"source": "source1"}
),
Document(
page_content="LangChain提供了一个框架,用于构建基于大语言模型的应用。"
"它允许开发者轻松集成多种大语言模型,如OpenAI、Hugging Face和Anthropic。",
metadata={"source": "source2"}
),
Document(
page_content="LangChain框架的核心是提供一个统一的接口,用于集成各种大语言模型。"
"通过LangChain,开发者可以快速构建LLM应用,如聊天机器人和内容生成器。",
metadata={"source": "source3"}
),
Document(
page_content="LangChain是一个开源框架,用于构建大语言模型应用。"
"它简化了集成不同大语言模型的过程,提供了丰富的组件和工具。",
metadata={"source": "source4"}
),
Document(
page_content="LangChain框架支持多种大语言模型,包括OpenAI、Hugging Face和Anthropic。"
"它的主要优势是提供了一个统一的接口,使开发者能够轻松构建LLM应用。",
metadata={"source": "source5"}
),
Document(
page_content="LangChain是一个用于构建大语言模型应用的框架,提供了一个统一的接口来集成各种模型。",
metadata={"source": "source6"}
)
]
# 2. 文本分割(使用优化的中文分割器)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
separators=[
"\n\n",
"\n",
"。",
"!",
"?",
" ",
""
]
)
split_documents = text_splitter.split_documents(documents)
# 3. 创建嵌入模型
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 4. 创建向量存储
vectorstore = FAISS.from_documents(split_documents, embeddings)
# 5. 创建冗余过滤器
redundant_filter = EmbeddingsRedundantFilter(
embeddings=embeddings,
similarity_threshold=0.85 # 相似度阈值,0.85表示高度相似
)
# 6. 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 10} # 检索10个文档
)
# 7. 集成冗余过滤器
# 注意:在LangChain中,冗余过滤器需要作为检索器的前置处理
filtered_retriever = redundant_filter | retriever
# 8. 执行检索并展示结果
query = "LangChain的核心价值是什么?"
print("="*50)
print(f"检索问题: {query}")
print("="*50)
# 原始检索结果(未过滤)
raw_docs = filtered_retriever.invoke(query)
print(f"\n原始检索文档数量: {len(raw_docs)}")
# 打印原始检索结果
print("\n原始检索结果:")
for i, doc in enumerate(raw_docs):
print(f"文档 {i+1} (来源: {doc.metadata.get('source', '未知')}):")
print(f"内容: {doc.page_content[:200]}...")
print("-" * 50)
# 应用冗余过滤后的结果
filtered_docs = [doc for doc in raw_docs if doc.metadata.get("is_redundant", False) == False]
print(f"\n过滤后文档数量: {len(filtered_docs)}")
# 打印过滤后的结果
print("\n过滤后结果:")
for i, doc in enumerate(filtered_docs):
print(f"文档 {i+1} (来源: {doc.metadata.get('source', '未知')}):")
print(f"内容: {doc.page_content[:200]}...")
print("-" * 50)
# 9. 创建RAG链(可选,展示如何在完整RAG流程中使用)
prompt = ChatPromptTemplate.from_template(
"根据以下上下文回答问题:\n\n{context}\n\n问题: {question}"
)
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1)
rag_chain = (
{"context": filtered_retriever, "question": RunnablePassthrough()}
| prompt
| model
)
# 执行RAG链
response = rag_chain.invoke(query)
print("\nRAG生成的答案:")
print(response.content)
2.3. 元数据提取器(Metadata Extractors)
元数据提取器是RAG(Retrieval-Augmented Generation)工作流中关键的增强组件,它们从文档中提取结构化信息,为检索和生成提供更丰富的上下文。在RAG应用中,高质量的元数据能将检索准确率提升20%-30%,并显著改善生成质量。
常见元数据提取器:
| 元数据提取器 | 核心特点 | 适用场景 |
|---|---|---|
| OpenAIMetadataTagger | 基于大语言模型生成高质量语义标签 | 通用文档,需要语义级元数据的RAG应用 |
| DoctranPropertyExtractor | 基于正则表达式提取预定义结构化属性 | 结构化文档(含标题/作者/日期等信息) |
| MarkdownHeaderTextSplitter | 自动提取Markdown标题层次结构作为元数据 | Markdown格式文档(技术文档、笔记、博客) |
| HtmlMetadataExtractor | 提取HTML标准元标签(title/description/keywords) | 网页内容、HTML格式文档 |
| PDFMetadataExtractor | 提取PDF文档标准属性(Title/Author/Subject) | PDF格式文档(技术报告、论文、文档) |
代码示例:
from langchain_community.document_loaders import TextLoader
from langchain_community.document_transformers import OpenAIMetadataTagger
from langchain_core.documents import Document
import os
# 设置OpenAI API密钥(实际使用时请替换为您的API密钥)
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 创建示例文档
with open("sample_doc.txt", "w") as f:
f.write("LangChain是一个用于构建大语言模型应用的框架。"
"它的核心价值是提供一个统一的接口来集成各种大语言模型。"
"LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。")
# 加载文档
loader = TextLoader("sample_doc.txt")
documents = loader.load()
# 创建OpenAI元数据标签器(最常用配置)
tagger = OpenAIMetadataTagger(
model="gpt-3.5-turbo",
temperature=0.1,
max_tokens=50,
prompt_template="提取以下文本的关键主题标签,用逗号分隔:{text}"
)
# 应用元数据提取
tagged_documents = tagger.transform_documents(documents)
# 打印结果
for doc in tagged_documents:
print(f"文档内容摘要: {doc.page_content[:100]}...")
print(f"关键标签: {doc.metadata.get('tags', [])}")
print("-" * 50)
# 清理临时文件
import os
os.remove("sample_doc.txt")
2.4. 多语言转换器(Multi-lingual Transformers)
多语言转换器是RAG(Retrieval-Augmented Generation)系统中处理多语言文档的关键组件,它们使RAG应用能够处理和检索不同语言的文档,显著扩展了应用的适用范围。在多语言RAG应用中,正确的语言处理能将检索准确率至少提升25%。
常见多语言转换器及适用场景:
| 多语言转换器 | 核心功能 | 适用场景 |
|---|---|---|
LanguageDetector |
自动检测文档内容的语言 | 多语言文档处理、语言上下文识别 |
Translator |
将文档内容翻译成目标语言 | 多语言文档统一处理、跨语言检索 |
MultiLingualEmbeddings |
使用支持多语言的嵌入模型生成语义表示 | 多语言文档检索、跨语言语义匹配 |
MultiLingualRetriever |
集成语言检测和翻译的多语言检索器 | 全球化RAG应用、多语言查询处理 |
代码示例:
from langchain_community.document_transformers import LanguageDetector, Translator
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import MultiLingualRetriever
from langchain_core.documents import Document
# 创建多语言文档
documents = [
Document(page_content="Hello, this is a test document.", metadata={"language": "en"}),
Document(page_content="Bonjour, ceci est un document de test.", metadata={"language": "fr"}),
Document(page_content="Hola, este es un documento de prueba.", metadata={"language": "es"})
]
# 创建语言检测器
language_detector = LanguageDetector(
confidence_threshold=0.8,
language_column="language"
)
# 创建翻译器(翻译成英语)
translator = Translator(
target_language="en",
source_language=None,
translation_model="google"
)
# 创建多语言嵌入模型
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# 创建向量存储
vectorstore = FAISS.from_documents(documents, embeddings)
# 创建多语言检索器
retriever = MultiLingualRetriever(
retriever=vectorstore.as_retriever(),
language_detector=language_detector,
translator=translator
)
# 检索示例(用法语查询)
query = "document de test" # 法语查询
results = retriever.invoke(query)
# 打印结果
print(f"查询: {query}")
print("检索结果:")
for i, doc in enumerate(results):
print(f"结果 {i+1} (语言: {doc.metadata.get('language', 'unknown')}):")
print(f"内容: {doc.page_content}")
print("-" * 50)
2.5. 对话转换器(Conversational Transformers)
将非结构化对话转换为问答格式,提升对话式RAG的检索效率。在对话式RAG应用中,对话处理组件是"对话连贯性保障",它们确保系统能够理解对话上下文,提供连贯、相关、高质量的响应。
常用对话转换器及适用场景:
| 组件名称 | 核心功能 | 适用场景 |
|---|---|---|
ConversationalRetrievalChain |
结合检索器和对话历史,提供上下文相关的对话式RAG | 聊天机器人、对话式问答系统、需要维护对话上下文的RAG应用 |
ConversationBufferMemory |
存储对话历史作为消息列表,保留最近N轮对话 | 需要简单对话历史记录的RAG应用,对话历史长度有限的场景 |
ConversationBufferWindowMemory |
保留最近N条消息,精确控制对话历史长度 | 需要更精细控制对话历史长度的RAG应用,对话历史长度需要精确控制的场景 |
ConversationSummaryBufferMemory |
存储对话摘要而非完整对话历史,通过LLM生成高质量摘要 | 长对话场景,需要减少上下文长度的RAG应用,对话历史非常长的场景 |
代码示例:
from langchain_community.chat_models import ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_core.documents import Document
# 创建示例文档
documents = [
Document(page_content="LangChain是一个用于构建大语言模型应用的框架。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain的核心价值是提供一个统一的接口来集成各种大语言模型。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。", metadata={"source": "langchain_guide"})
]
# 创建嵌入和向量存储
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = FAISS.from_documents(documents, embeddings)
retriever = vectorstore.as_retriever()
# 创建对话记忆(保留最近2轮对话)
memory = ConversationBufferWindowMemory(
k=2,
memory_key="chat_history",
return_messages=True
)
# 创建对话检索链
qa = ConversationalRetrievalChain.from_llm(
llm=ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1),
retriever=retriever,
memory=memory,
return_source_documents=True
)
# 执行对话
print("对话系统已启动,输入'你好'开始对话")
print("-" * 50)
# 第一轮对话
response = qa.invoke("LangChain的核心价值是什么?")
print(f"系统: {response['answer']}")
print(f"来源: {response['source_documents']}")
print("-" * 50)
# 第二轮对话(系统会记住之前的对话)
response = qa.invoke("它支持哪些模型?")
print(f"系统: {response['answer']}")
print(f"来源: {response['source_documents']}")
print("-" * 50)
# 第三轮对话(系统继续记住对话历史)
response = qa.invoke("请总结一下LangChain的主要特点")
print(f"系统: {response['answer']}")
print(f"来源: {response['source_documents']}")
print("-" * 50)
3. 文本嵌入模型(Text Embedding Models)
文本嵌入模型是将原始文本(例如句子、段落或推文)转换为固定长度的数字向量,通过比较向量的相似度,捕捉其语义含义。这些向量使机器能够基于含义而非确切的词语来比较和搜索文本。
LangChain 通过Embeddings接口为文本嵌入模型(例如 OpenAI、Cohere、Hugging Face)提供标准接口。主要有两种方法:
- embed_documents(texts: List[str]):将入参转为向量后保存。
- embed_query(text: str):将入参转为向量并查询,通过相似性判断查询结果数据。
常见文本嵌入模型及适用场景:
| 模型名称 | 提供商 | 特点 | 适用场景 |
|---|---|---|---|
text-embedding-ada-002 |
OpenAI | 低成本、高性能,适合大多数场景,支持1536维向量 | 通用RAG应用,预算有限的项目 |
text-embedding-3-small |
OpenAI | 更小的模型,成本更低,适合轻量级应用,支持1536维向量 | 轻量级RAG,移动应用,预算敏感型项目 |
text-embedding-3-large |
OpenAI | 最高性能,适合高精度需求,支持3072维向量 | 企业级RAG,高精度要求的场景 |
all-MiniLM-L6-v2 |
Hugging Face | 轻量级,支持100+语言,速度较快,128维向量 | 通用多语言嵌入,小型项目,资源受限环境 |
all-mpnet-base-v2 |
Hugging Face | 高性能,适合需要高精度的场景,768维向量 | 高精度检索,专业RAG应用,学术研究 |
all-distilroberta-v1 |
Hugging Face | 基于DistilBERT,比BERT更轻量,768维向量 | 速度敏感型应用,资源有限的环境 |
BGE-M3 |
BAAI | 支持多语言,支持密集和稀疏嵌入,多模态,1024维向量 | 复杂RAG应用,需要多模态支持,企业级应用 |
Universal Sentence Encoder |
支持多种语言,预训练在大量文本上,512维向量 | 多语言应用,Google生态系统 | |
Cohere Embeddings |
Cohere | 高质量,支持多种语言,专为RAG优化 | 企业级RAG应用,Cohere生态系统 |
Qwen-Embedding |
Alibaba | 通义千问系列,支持中文和多语言,1024维向量 | 中文RAG应用,阿里云生态系统 |
代码示例:
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
import os
# 设置OpenAI API密钥(实际使用时请替换为您的API密钥)
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 创建OpenAIEmbeddings实例,指定使用text-embedding-3-small模型
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=1536 # 可选:指定维度,text-embedding-3-small默认是1536
)
# 示例文本
sample_text = "LangChain是一个用于构建大语言模型应用的框架。"
# 生成嵌入(使用LangChain方式)
embedding = embeddings.embed_query(sample_text)
# 打印嵌入信息
print(f"文本: '{sample_text}'")
print(f"嵌入维度: {len(embedding)}")
print(f"嵌入前5个值: {embedding[:5]}")
print(f"嵌入后5个值: {embedding[-5:]}")
print(f"嵌入类型: {type(embedding)}")
print(f"嵌入数据类型: {type(embedding[0])}")
# 验证:使用相同的文本获取嵌入,确保结果一致
embedding2 = embeddings.embed_query(sample_text)
print(f"\n两次嵌入是否一致: {embedding == embedding2}")
print(f"嵌入向量类型: {type(embedding2)}")
# 与LangChain文档嵌入示例
documents = [
Document(page_content="LangChain是一个用于构建大语言模型应用的框架。"),
Document(page_content="LangChain的核心价值是提供一个统一的接口来集成各种大语言模型。"),
Document(page_content="LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。")
]
# 生成文档嵌入
document_embeddings = embeddings.embed_documents([doc.page_content for doc in documents])
# 打印文档嵌入示例
print("\n文档嵌入示例:")
for i, doc_embedding in enumerate(document_embeddings):
print(f"文档 {i+1} 嵌入维度: {len(doc_embedding)}")
print(f"文档 {i+1} 嵌入前5个值: {doc_embedding[:5]}")
4. 向量存储(Vector Stores)
向量存储是RAG(Retrieval-Augmented Generation)应用的"大脑",它决定了系统能多快、多准地找到相关信息。在LangChain中,向量存储是VectorStore类的实现,提供了统一的接口来与各种向量数据库交互,基础操作如下:
- add_documents:将文档保存到向量库。
- delete:按ID删除已存储的文档。
- similarity_search:查询语义相似的文档。
代码示例:
# 完整操作流程
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_core.documents import Document
# 创建嵌入模型
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 创建文档
documents = [
Document(page_content="LangChain是一个用于构建大语言模型应用的框架。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain的核心价值是提供一个统一的接口来集成各种大语言模型。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。", metadata={"source": "langchain_guide"})
]
# 创建Chroma向量存储
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
persist_directory="./chroma_db"
)
# 按ID删除文档
ids = vectorstore._collection.get()['ids']
vectorstore.delete([ids[1]])
# 过滤查询
results = vectorstore.similarity_search(
query="LangChain的框架",
k=2,
filter={"source": "langchain_guide"}
)
# 按向量搜索
vector_results = vectorstore.similarity_search(
query="LangChain的文档",
k=2
)
# 更新文档
vectorstore.delete([ids[0]])
updated_doc = Document(
page_content="LangChain是一个用于构建大语言模型应用的框架,文档非常详细。",
metadata={"source": "langchain_guide"}
)
vectorstore.add_documents([updated_doc])
# 保存并重新加载
vectorstore.persist()
reloaded_vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=embeddings
)
# 验证
print("\n验证结果:")
print(f"向量存储文档数量: {len(reloaded_vectorstore._collection.get()['ids'])}")
print(f"检索结果数量: {len(vectorstore.similarity_search('框架'))}")
# 清理
vectorstore.delete_collection()
5. 检索器(Retrievers)
检索器是RAG(Retrieval-Augmented Generation)应用的"眼睛",它决定了系统能多快、多准地找到相关信息。在LangChain中,检索器是Retriever类的实现,提供了统一的接口来从各种数据源检索相关信息,所有向量存储都可以转换为检索器,也可以通过继承 BaseRetriever 实现的自定义检索器。
常见检索器类型:
| 检索器类型 | 核心特点 | 适用场景 | 典型用例 |
|---|---|---|---|
| VectorStoreRetriever | 基于向量存储的相似度检索,与FAISS、Chroma等向量数据库无缝集成 | 通用RAG应用,需要语义相似度搜索 | 文档问答系统、知识库查询 |
| MultiRetriever | 组合多个检索器,通过加权或排序合并结果 | 需要结合多种检索策略的复杂应用 | 多源数据检索、混合检索策略 |
| MultiLingualRetriever | 自动处理多语言查询,集成语言检测和翻译 | 全球化RAG应用、多语言支持 | 国际化聊天机器人、多语言知识库 |
| ConversationalRetrievalChain | 结合检索器和对话历史,维护对话上下文 | 对话式RAG应用、聊天机器人 | 会话式问答系统、客服聊天机器人 |
| ContextualCompressionRetriever | 使用压缩模型过滤冗余结果,提高检索质量 | 需要高质量检索结果的场景 | 专业问答系统、高质量文档检索 |
| BM25Retriever | 基于关键词的检索,适合精确匹配 | 需要精确关键词匹配的场景 | 专业文档搜索、技术手册查询 |
| HybridRetriever | 结合关键词和向量检索,提供混合搜索结果 | 需要结合语义和关键词搜索的场景 | 企业知识库、综合搜索系统 |
| SelfQueryRetriever | 基于查询的元数据过滤,自动提取过滤条件 | 需要根据元数据过滤的高级检索 | 结构化文档检索、基于属性的搜索 |
代码示例:
# 安装依赖(如果尚未安装)
# pip install langchain langchain-community openai chromadb
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_core.documents import Document
# 创建示例文档
documents = [
Document(page_content="LangChain是一个用于构建大语言模型应用的框架。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain的核心价值是提供一个统一的接口来集成各种大语言模型。", metadata={"source": "langchain_guide"}),
Document(page_content="LangChain支持多种模型,包括OpenAI、Hugging Face和Anthropic。", metadata={"source": "langchain_guide"})
]
# 创建嵌入模型
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# 创建Chroma向量存储(本地模式)
vectorstore = Chroma.from_documents(documents, embeddings)
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 2}
)
# 执行检索
results = retriever.invoke("LangChain的核心价值是什么?")
# 打印结果
print("检索结果:")
for i, doc in enumerate(results):
print(f"结果 {i+1}:")
print(f"内容: {doc.page_content}")
print(f"来源: {doc.metadata['source']}")
print("-" * 50)
六、LangChain 组件之 Chains
Chain是LangChain框架中用于编排语言模型工作流的核心组件,它将提示模板、语言模型、输出解析器、检索器等组件以"链式"方式连接,形成可执行的工作流。Chain是LangChain实现复杂LLM应用的基础,使开发者能够轻松构建从简单文本生成到复杂RAG系统的各种应用。

需要注意的是,Chain已从传统的"顺序执行"方式演进为LCEL(LangChain Expression Language),这是目前官方推荐的现代链构造方式,使链的构建更加简洁、高效和可维护。
1. 传统 chain 调用
在LCEL出来之前,LangChain 提供了很多类型的传统 chain 进行链式调用。
| 名称 | 定义/用途 | 优势 |
|---|---|---|
| LLMChain | 将提示模板与LLM直接结合的最基础Chain | 代码简单,适合快速原型开发,无需额外配置 |
| SimpleSequentialChain | 顺序执行多个Chain,每个Chain的输出作为下一个Chain的输入 | 实现线性工作流简单直接,适合快速多步骤处理 |
| SequentialChain | 精确控制输入输出变量的顺序Chain | 支持指定输入/输出变量,适合复杂工作流 |
| RetrievalQA | 专为RAG设计的Chain,集成检索器和LLM | 专为文档问答系统设计,简化RAG实现 |
| ConversationalRetrievalChain | 结合Memory和RetrievalQA的Chain,维护对话上下文 | 支持多轮对话,提供连贯的对话体验 |
| Custom Chain | 自定义Chain,允许完全控制工作流逻辑 | 满足高度定制化需求,可集成任何业务逻辑 |
代码示例:
# 安装依赖: pip install langchain langchain-community openai
from langchain_community.llms import OpenAI
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain, SequentialChain
# 设置API密钥(实际使用时替换)
import os
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
# 创建描述Chain
describe_prompt = PromptTemplate(
template="请描述{topic}:",
input_variables=["topic"]
)
describe_chain = LLMChain(
llm=OpenAI(temperature=0),
prompt=describe_prompt,
output_key="description"
)
# 创建摘要Chain
summarize_prompt = PromptTemplate(
template="请用100字总结:{description}",
input_variables=["description"]
)
summarize_chain = LLMChain(
llm=OpenAI(temperature=0),
prompt=summarize_prompt,
output_key="summary"
)
# 创建顺序Chain
chain = SequentialChain(
chains=[describe_chain, summarize_chain],
input_variables=["topic"],
output_variables=["summary"],
verbose=True
)
# 执行Chain
result = chain.invoke({"topic": "LangChain"})
print(result["summary"])
2. LCEL(LangChain Expression Language)
LCEL(LangChain Expression Language)是LangChain框架中用于高效构建和编排语言模型工作流的声明式工具,它通过简洁的语法和标准化接口,将提示词模板、语言模型、输出解析器、检索器等组件以"管道"形式串联,支持流式处理、并行执行、错误重试等高级功能。
2.1. LCEL 核心特性
2.1.1. 极简语法与流式支持
LCEL使用|运算符(类似Unix管道)无缝连接组件,例如:
prompt | model | parser
这种语法不仅使代码简洁,还原生支持流式输出——语言模型生成的令牌(token)可直接流式传输到解析器,实现与模型输出同步的增量响应。例如,用户输入主题后,链可立即开始流式返回结果,显著提升交互体验。
2.1.2. 异步与并行执行
LCEL链同时支持:
- 同步调用:如Jupyter调试
- 异步调用:如LangServe服务器
这使得可以在原型和生产中使用相同的代码,获得出色的性能,并能在同一服务器上处理多个并发请求。
2.1.3. 优化的并行执行
当LCEL链具有可并行执行的步骤(如从多个检索器获取文档),LCEL会自动优化并行执行,以尽可能减少延迟。
2.1.4. 重试和回退
为LCEL链的任何部分配置重试和回退,使链在规模上更加可靠。LCEL正在努力为重试/回退添加流式支持,使可靠性提升无需延迟成本。
2.1.5. 访问中间结果
对于复杂链,可以在最终输出产生前访问中间步骤的结果,用于:
- 让终端用户知道正在发生什么
- 便于调试链
LCEL支持流式传输中间结果,可在LangServe服务器中使用。
2.1.6. 输入和输出模式
LCEL链自动提供从结构推断出的Pydantic和JSONSchema模式,可用于:
- 验证输入
- 验证输出
- 是LangServe不可或缺的组成部分
2.1.7. 与 LangSmith 无缝集成
随着链条变得越来越复杂,日志跟踪也变得越来越重要,通过LCEL,所有步骤都自动记录到 LangSmith,以实现最大化的监控记录和调试。
2.1.8. 与 LangServe 无缝集成
任何使用LCEL创建的链都可以快速简单的使用 LangServe 集成部署。
2.2 LCEL的实现原理
LCEL之所以能如此流畅地连接不同组件,得益于LangChain设计的核心抽象接口:Runnable。
class Runnable(Protocol):
def invoke(self, input: Any) -> Any: ... # 同步调用
def batch(self, inputs: List[Any]) -> List[Any]: ... # 同步批量处理
def stream(self, input: Any) -> Iterator[Any]: ... # 流式输出
# 还有异步方法:ainvoke, abatch, astream
为什么需要Runnable协议?
在早期LangChain版本中,组件的调用方式不统一,导致代码冗长且难以维护。而Runnable协议强制要求所有LCEL组件实现标准方法,使它们"共享一套统一的调用方式"。
在LangChain中,几乎所有核心组件(提示词模板、语言模型、输出解析器、检索器、代理等)都实现了Runnable接口,这意味着它们都是"LCEL兼容"的。
2.3 LCEL的应用
LCEL通过声明式语法和自动化能力,显著降低AI应用开发复杂度,提升代码可维护性和团队协作效率,是构建复杂语言模型应用的核心工具。
2.3.1. LCEL 基础调用与流式输出
from langchain_openai import ChatOpenAI
import os
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, base_url=BASE_URL, api_key=API_KEY)
prompt = PromptTemplate.from_template("你给我讲讲{topic}的故事")
output_parser = StrOutputParser()
chains = (
{"topic": RunnablePassthrough()}
| prompt
| llm
| output_parser
)
# 一次性返回
response = chains.invoke("守株待兔")
print(response)
# 流式返回
for response in chains.stream("守株待兔"):
print(response, end="", flush=True)
2.3.2. 使用 LCEL 实现 RAG
# 安装依赖
# pip install dashscope langchain langchain-community chromadb langchain_text_splitters
import os
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
import chromadb
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
EMBEDDING_MODEL=os.getenv("DASHSCOPE_EMBEDDING_MODEL")
# 创建阿里云Embedding模型
embeddings = DashScopeEmbeddings(
model=EMBEDDING_MODEL,
dashscope_api_key=API_KEY,
)
# 创建示例文档
documents = [
"孙悟空是花果山水帘洞美猴王,齐天大圣,孙行者,斗战胜佛。",
"唐僧是金蝉子,唐三藏,玄奘,旃檀功德佛。",
"孙悟空是一只猴子,传说他从石头蹦出来的,他是齐天大圣。",
"猪八戒是天蓬元帅,猪悟能,猪八戒,净坛使者。",
"沙悟净是卷帘大将,沙和尚,沙悟净,金身罗汉。",
"白龙马是西海玉龙三太子,小白龙,八部天龙广力菩萨。",
"齐天大圣是孙悟空,他当年大闹天宫,自己取名齐天大圣。",
]
# 创建分割器(中文优化配置)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=50,
chunk_overlap=5,
length_function=len,
separators=[
"\n\n", # 优先按段落分割
"\n", # 其次按行分割
"。", # 按中文句号分割
"!", # 按感叹号分割
"?", # 按问号分割
" ", # 最后按空格分割
""
]
)
# 分割文档
split_documents = text_splitter.create_documents(documents)
print("分割后的文档:")
for document in split_documents:
print(document.page_content)
# 创建本地客户端
chroma_client=chromadb.Client()
# 创建ChromaDB向量存储(本地存储)
vectorstore = Chroma.from_documents(
documents=split_documents,
embedding=embeddings,
client=chroma_client,
collection_name="test_chroma"
)
# 获取向量存储内容
print("向量存储内容:")
print(vectorstore.get())
# 创建检索器
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 2} # 返回最相关的2个结果
)
# 执行检索
query = "齐天大圣是谁?"
results = retriever.invoke(query)
# 打印检索结果
print("检索结果:")
for i, result in enumerate(results):
print(f"结果 {i+1}: {result.page_content}")
输出结果:
分割后的文档:
孙悟空是花果山水帘洞美猴王,齐天大圣,孙行者,斗战胜佛。
唐僧是金蝉子,唐三藏,玄奘,旃檀功德佛。
孙悟空是一只猴子,传说他从石头蹦出来的,他是齐天大圣。
猪八戒是天蓬元帅,猪悟能,猪八戒,净坛使者。
沙悟净是卷帘大将,沙和尚,沙悟净,金身罗汉。
白龙马是西海玉龙三太子,小白龙,八部天龙广力菩萨。
齐天大圣是孙悟空,他当年大闹天宫,自己取名齐天大圣。
向量存储内容:
{'ids': ['fd2f43e0-7310-4890-9e2a-0494e3f54a56', '20ba7139-d205-4025-9380-b76af7ce7989', 'fbaf6518-8353-4315-84c1-eeb4137b86c2', '1fc889af-bd04-4155-a663-b27a22dd0098', '44b95764-8cef-474b-af54-1ca5b97c1146', '6d8e3db3-633b-4f42-8088-81588b7b245f', 'ddb39c63-2db0-49c7-9501-17a0292f752b'], 'embeddings': None, 'documents': ['孙悟空是花果山水帘洞美猴王,齐天大圣,孙行者,斗战胜佛。', '唐僧是金蝉子,唐三藏,玄奘,旃檀功德佛。', '孙悟空是一只猴子,传说他从石头蹦出来的,他是齐天大圣。', '猪八戒是天蓬元帅,猪悟能,猪八戒,净坛使者。', '沙悟净是卷帘大将,沙和尚,沙悟净,金身罗汉。', '白龙马是西海玉龙三太子,小白龙,八部天龙广力菩萨。', '齐天大圣是孙悟空,他当年大闹天宫,自己取名齐天大圣。'], 'uris': None, 'included': ['metadatas', 'documents'], 'data': None, 'metadatas': [None, None, None, None, None, None, None]}
检索结果:
结果 1: 齐天大圣是孙悟空,他当年大闹天宫,自己取名齐天大圣。
结果 2: 孙悟空是花果山水帘洞美猴王,齐天大圣,孙行者,斗战胜佛。
2.3.3. 使用 LCEL 实现动态切换模型
import os
from langchain_openai import ChatOpenAI
from langchain_core.runnables.configurable import ConfigurableField
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import PromptTemplate
## 这里我读的是配好的系统环境变量,你们可以替换成自己的模型相关信息
BASE_URL=os.getenv("DASHSCOPE_BASE_URL")
API_KEY=os.getenv("DASHSCOPE_API_KEY")
LLM_MODEL=os.getenv("DASHSCOPE_LLM_MODEL")
EMBEDDING_MODEL=os.getenv("DASHSCOPE_EMBEDDING_MODEL")
# 模型1
qianwen_llm = ChatOpenAI(model=LLM_MODEL, api_key=API_KEY, base_url=BASE_URL)
# 模型2
deepseek_llm = ChatOpenAI(model="deepseek-v1", api_key=API_KEY, base_url=BASE_URL)
model = deepseek_llm.configurable_alternatives(
ConfigurableField(id="llm"),
default_key="deepseek",
qianwen=qianwen_llm,
deepseek=deepseek_llm,
)
prompt = PromptTemplate.from_template("""
You are a helpful assistant.
{question}
""")
chains = (
{"question": RunnablePassthrough()}
| prompt
| model
|StrOutputParser()
)
response = chains.with_config(configurable={"llm":"qianwen"}).invoke("你是哪个公司的产品?")
print(response)
输出结果:
我是阿里巴巴集团旗下的通义实验室自主研发的超大规模语言模型...
通过 LCEL 还可以实现以下功能:
- 存储与管理对话历史
- 多模态调用
- 配置运行时变量
- 异常故障处理
- 并行调用
- 逻辑分支
- 动态创建 chain
七、LangChain 组件之 Callbacks
Callbacks是LangChain框架中用于监控、日志记录和调试的核心组件,它使开发者能够在链(Chain)的执行过程中插入自定义逻辑,捕获执行过程中的关键事件。Callbacks是LangChain实现"可观测性"的关键组件,帮助开发者了解链的执行流程、性能和问题。
LangChain Callback实现提供了一个公共基类:BaseCallbackHandler。所有Callback都实现了该接口,该接口定义了所有回调事件:
| 回调事件 | 触发时机 | 说明 |
|---|---|---|
| on_llm_start | LLM开始执行时 | 捕获LLM调用的输入和参数 |
| on_llm_end | LLM执行结束时 | 捕获LLM的输出和响应时间 |
| on_chain_start | Chain开始执行时 | 捕获Chain的输入和参数 |
| on_chain_end | Chain执行结束时 | 捕获Chain的输出 |
| on_chain_error | Chain执行出错时 | 捕获异常信息 |
| on_tool_start | 工具开始执行时 | 捕获工具调用的输入 |
| on_tool_end | 工具执行结束时 | 捕获工具的输出 |
| on_tool_error | 工具执行出错时 | 捕获工具执行的异常 |
| on_retriever_start | 检索器开始执行时 | 捕获检索查询 |
| on_retriever_end | 检索器执行结束时 | 捕获检索结果 |
| on_retriever_error | 检索器执行出错时 | 捕获检索异常 |
| on_chat_model_start | Chat模型开始执行时 | 适用于Chat模型的特定事件 |
| on_chat_model_end | Chat模型执行结束时 | 适用于Chat模型的特定事件 |
标准CallbackHandler
from langchain.callbacks import StdOutCallbackHandler
# 添加标准输出回调
callbacks = [StdOutCallbackHandler()]
# 使用LCEL链
chain = prompt | llm | parser
# 执行链并添加回调
result = chain.invoke({"topic": "LangChain"}, config={"callbacks": callbacks})
自定义CallbackHandler
from langchain.callbacks.base import BaseCallbackHandler
class CustomCallbackHandler(BaseCallbackHandler):
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:
print(f"LLM开始执行: {prompts[0]}")
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
print(f"LLM执行完成: {response.generations[0][0].text}")
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
print(f"Chain开始执行: {inputs}")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
print(f"Chain执行完成: {outputs}")
# 创建自定义回调
custom_callback = CustomCallbackHandler()
# 使用LCEL链
chain = prompt | llm | parser
# 执行链并添加自定义回调
result = chain.invoke({"topic": "LangChain"}, config={"callbacks": [custom_callback]})
多个CallbackHandler组合使用
from langchain.callbacks import StdOutCallbackHandler, FileCallbackHandler
# 创建多个回调
callbacks = [
StdOutCallbackHandler(),
FileCallbackHandler(file_path="chain_log.txt")
]
# 执行链
result = chain.invoke({"topic": "LangChain"}, config={"callbacks": callbacks})
错误重试
from langchain.callbacks import BaseCallbackHandler
from langchain_core.exceptions import OutputParserException
class ErrorHandlingCallbackHandler(BaseCallbackHandler):
def on_chain_error(self, error: Exception, **kwargs: Any) -> None:
if isinstance(error, OutputParserException):
print("解析错误,尝试重新生成")
# 重新执行链
# chain.invoke(...)
else:
print(f"未知错误: {error}")
# 使用错误处理
error_callback = ErrorHandlingCallbackHandler()
chain = prompt | llm | parser
result = chain.invoke({"topic": "LangChain"}, config={"callbacks": [error_callback]})
八、LangChain 组件之 Agents
1. Agents的定义与核心作用
Agents是LangChain框架中用于创建能够自主决策和执行任务的AI代理的核心组件,它结合语言模型与工具,使AI系统能够推理任务、决定使用哪些工具、并迭代工作以达到解决方案。Agents是实现复杂LLM应用的关键,使AI系统能够超越简单的文本生成,执行实际操作。
2025年也被称为Agent元年,标志着人工智能正式从“思考与对话” 转向 “自主决策与行动”。
一个Agent应该具备以下核心能力:
- 大模型(LLM):作为大脑,提供推理、知识理解和逻辑判断能力。
- 记忆(Memory):用于保持Agent的上下文信息,具备短期记忆(上下文)和长期记忆(向量存储),支持快速知识检索。使交互具有连续性。
- 工具(Tools):扩展Agent的能力边界,通过调用外部工具(搜索、数据库等)Agent可以获取实时信息,弥补自身的功能局限性。
- 规划(Planning):将具体的任务分解,并具备反思和调整能力,当效果不符合预期时,能够重新规划方案并执行,确保目标最终实现。
- 行动(Action):实际执行决策的能力。比如:检索、推理、编程
- 协作(cooperation):单一的Agent功能通常是有限的,而复杂问题往往需要不同的专业知识的Agent协同解决,不同Agent之间通过A2A协议交互。
2. Agents的核心工作原理
2.1. ReAct模式
Agents遵循ReAct (“Reasoning + Acting”) 模式,交替进行:
- Reasoning(推理):分析当前状态,决定下一步行动。
- Acting(行动):调用工具执行具体操作。
2.2. Agent执行流程
Agent运行直到满足停止条件(模型输出最终结果或达到迭代限制):
- 接收输入
- 进行推理
- 调用工具
- 获取工具结果
- 重复步骤2-4直到达到目标
- 生成最终答案
3. Agents的核心组件
Agents已通过create_agent函数提供生产就绪的实现,使用LangGraph构建基于图的Agent运行时,支持流式处理、错误处理和高级功能。
3.1. Model(模型)
3.1.1. Static Model(静态模型)
- 定义:配置一次,保持不变。
- 使用方式:
from langchain.agents import create_agent agent = create_agent("openai:gpt-5", tools=tools) - 优势:简单直接,最常用
3.1.2. Dynamic Model(动态模型)
- 定义:在运行时基于当前状态和上下文选择模型。
- 使用方式:
from langchain_openai import ChatOpenAI from langchain.agents import create_agent from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse basic_model = ChatOpenAI(model="gpt-4.1-mini") advanced_model = ChatOpenAI(model="gpt-4.1") @wrap_model_call def dynamic_model_selection(request: ModelRequest, handler) -> ModelResponse: message_count = len(request.state["messages"]) if message_count > 10: model = advanced_model else: model = basic_model return handler(request.override(model=model)) agent = create_agent( model=basic_model, tools=tools, middleware=[dynamic_model_selection] ) - 优势:支持成本优化和复杂路由逻辑。
3.2. Tools(工具)
3.2.1. Static Tools(静态工具)
- 定义:在创建Agent时定义,保持不变。
- 使用方式:
from langchain.tools import tool @tool def search(query: str) -> str: """搜索信息""" return f"结果: {query}" @tool def get_weather(location: str) -> str: """获取天气信息""" return f"{location}天气: 晴朗, 72°F" agent = create_agent(model, tools=[search, get_weather]) - 优势:简单直接,最常用
3.2.2. Dynamic Tools(动态工具)
- 定义:在运行时修改可用工具集。
- 使用方式:
# 过滤预注册工具 @wrap_model_call def state_based_tools(request: ModelRequest, handler) -> ModelResponse: state = request.state is_authenticated = state.get("authenticated", False) message_count = len(state["messages"]) if not is_authenticated: tools = [t for t in request.tools if t.name.startswith("public_")] request = request.override(tools=tools) elif message_count < 5: tools = [t for t in request.tools if t.name != "advanced_search"] request = request.override(tools=tools) return handler(request) agent = create_agent( model="gpt-4.1", tools=[public_search, private_search, advanced_search], middleware=[state_based_tools] ) - 优势:根据权限、状态动态调整可用工具
3.2.3. Tool Error Handling(工具错误处理)
- 定义:自定义工具错误处理逻辑。
- 使用方式:
from langchain.agents import create_agent from langchain.agents.middleware import wrap_tool_call from langchain.messages import ToolMessage @wrap_tool_call def handle_tool_errors(request, handler): try: return handler(request) except Exception as e: return ToolMessage( content=f"工具错误: 请检查输入并重试。({str(e)})", tool_call_id=request.tool_call["id"] ) agent = create_agent( model="gpt-4.1", tools=[search, get_weather], middleware=[handle_tool_errors] )
3.3. System Prompt(系统提示)
- 定义:定义Agent如何处理任务。
- 使用方式:
agent = create_agent( model, tools, system_prompt="你是一个帮助用户解决技术问题的助手。" ) - 高级用法:
from langchain.messages import SystemMessage from langchain.agents import create_agent literary_agent = create_agent( model="anthropic:claude-sonnet-4-5", system_prompt=SystemMessage( content=[ {"type": "text", "text": "你是一个分析文学作品的AI助手。"}, {"type": "text", "text": "<整个《傲慢与偏见》的内容>", "cache_control": {"type": "ephemeral"}} ] ) ) - 优势:支持Anthropic的提示缓存功能。
3.4. Name(名称)
- 定义:Agent的标识符。
- 使用方式:
agent = create_agent( model, tools, name="research_assistant" ) - 重要提示:推荐使用
snake_case(如research_assistant),避免空格和特殊字符
4. Agents的高级特性
4.1. Structured Output(结构化输出)
4.1.1. ToolStrategy
- 定义:使用人工工具调用生成结构化输出。
- 使用方式:
from pydantic import BaseModel from langchain.agents import create_agent from langchain.agents.structured_output import ToolStrategy class ContactInfo(BaseModel): name: str email: str phone: str agent = create_agent( model="gpt-4.1-mini", tools=[search_tool], response_format=ToolStrategy(ContactInfo) )
4.1.2. ProviderStrategy
- 定义:使用模型提供商的原生结构化输出功能。
- 使用方式:
from langchain.agents.structured_output import ProviderStrategy agent = create_agent( model="gpt-4.1", response_format=ProviderStrategy(ContactInfo) )
4.2. Memory(记忆)
- 定义:维护对话历史,使Agent能够记住之前交互。
- 使用方式:
from langchain.memory import ConversationBufferWindowMemory memory = ConversationBufferWindowMemory( k=2, memory_key="chat_history", return_messages=True ) agent = create_agent( model, tools, memory=memory ) - 优势:支持多轮对话,提供连贯的用户体验。
4.3. Streaming(流式处理)
- 定义:支持流式输出,逐步返回结果。
- 使用方式:
for chunk in agent.stream( {"messages": [{"role": "user", "content": "搜索AI新闻并总结"}]}, stream_mode="values" ): latest_message = chunk["messages"][-1] if latest_message.content: print(f"Agent: {latest_message.content}") elif latest_message.tool_calls: print(f"Calling tools: {[tc['name'] for tc in latest_message.tool_calls]}")
5. Agents的创建与调用
5.1. 创建Agent
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
# 定义工具
@tool
def get_weather(location: str) -> str:
"""获取天气信息"""
return f"Weather in {location}: Sunny, 72°F"
# 创建Agent
agent = create_agent(
model=ChatOpenAI(model="gpt-4.1"),
tools=[get_weather],
system_prompt="你是一个天气助手,只回答天气相关问题。"
)
5.2. 调用Agent
# 同步调用
result = agent.invoke(
{"messages": [{"role": "user", "content": "今天北京的天气怎么样?"}]}
)
# 流式调用
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "今天北京的天气怎么样?"}]},
stream_mode="values"
):
latest_message = chunk["messages"][-1]
if latest_message.content:
print(f"Agent: {latest_message.content}")
6. Agents的应用
6.1. RAG with Agent(带Agent的RAG)
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
from langchain.agents import create_agent
from langchain.tools import Tool
# 创建向量存储
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(["示例文档"], embeddings)
# 创建检索工具
def search_documents(query: str) -> str:
results = vectorstore.similarity_search(query)
return "\n".join([doc.page_content for doc in results])
search_tool = Tool(
name="search_documents",
func=search_documents,
description="搜索文档库以获取相关信息"
)
# 创建Agent
agent = create_agent(
model=ChatOpenAI(model="gpt-4.1"),
tools=[search_tool],
system_prompt="你是一个文档检索助手,使用提供的工具搜索文档并回答问题。"
)
# 执行查询
result = agent.invoke(
{"messages": [{"role": "user", "content": "LangChain的核心价值是什么?"}]}
)
print(result["output"])
6.2. 多工具协作Agent
from langchain.tools import tool
@tool
def search(query: str) -> str:
"""搜索信息"""
return f"结果: {query}"
@tool
def get_weather(location: str) -> str:
"""获取天气信息"""
return f"{location}天气: 晴朗, 72°F"
@tool
def get_stock_price(symbol: str) -> str:
"""获取股票价格"""
return f"{symbol}股票价格: $100.00"
# 创建Agent
agent = create_agent(
model=ChatOpenAI(model="gpt-4.1"),
tools=[search, get_weather, get_stock_price],
system_prompt="你是一个多功能助手,可以搜索、获取天气和股票信息。"
)
# 执行查询
result = agent.invoke(
{"messages": [{"role": "user", "content": "查询北京天气和AAPL股票价格"}]}
)
print(result["output"])
6.3. 优先使用LCEL方式
# 优先使用LCEL方式
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
# 定义工具
@tool
def get_weather(location: str) -> str:
return f"Weather in {location}: Sunny, 72°F"
# 创建Agent
agent = create_agent(
model=ChatOpenAI(model="gpt-4.1"),
tools=[get_weather],
system_prompt="你是一个天气助手。"
)
6.4. 使用动态工具选择
from langchain.agents.middleware import wrap_model_call, ModelRequest
@wrap_model_call
def dynamic_tool_selection(request: ModelRequest, handler) -> ModelResponse:
state = request.state
if "authenticated" in state and state["authenticated"]:
# 已认证用户可用高级工具
tools = [t for t in request.tools if t.name != "basic_search"]
else:
# 未认证用户只能使用基础工具
tools = [t for t in request.tools if t.name == "basic_search"]
return handler(request.override(tools=tools))
agent = create_agent(
model=ChatOpenAI(model="gpt-4.1"),
tools=[basic_search, advanced_search],
middleware=[dynamic_tool_selection]
)
6.5. 配置结构化输出
from pydantic import BaseModel
from langchain.agents.structured_output import ToolStrategy
class WeatherInfo(BaseModel):
location: str
temperature: str
condition: str
agent = create_agent(
model="gpt-4.1",
tools=[get_weather],
response_format=ToolStrategy(WeatherInfo)
)
6.6. 使用流式处理
# 流式调用Agent
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "搜索AI新闻并总结"}]},
stream_mode="values"
):
latest_message = chunk["messages"][-1]
if latest_message.content:
print(f"Agent: {latest_message.content}")
elif latest_message.tool_calls:
print(f"Calling tools: {[tc['name'] for tc in latest_message.tool_calls]}")
7. Agents与传统方式的对比
| 特性 | 传统Agents | LangChain Agents |
|---|---|---|
| 创建方式 | 需要手动实现复杂逻辑 | create_agent提供生产就绪实现 |
| 工具管理 | 需要手动处理工具调用 | 自动处理工具调用和错误 |
| 结构化输出 | 需要手动实现 | response_format参数支持 |
| 流式处理 | 不支持 | 原生支持 |
| 动态工具 | 需要手动实现 | 通过middleware支持 |
| 系统提示 | 需要手动处理 | 通过system_prompt参数支持 |
| 内存管理 | 需要手动实现 | 通过memory参数支持 |
九、LangChain 组件之 Memory
Memory是LangChain框架中用于存储和管理交互历史的关键组件,它使AI应用能够记住先前的交互,提供连贯、自然的对话体验。对于AI代理(Agents)而言,记忆至关重要,因为它让系统能够记住先前交互、从反馈中学习,并适应用户偏好。
在LangChain中,Memory主要分为短期记忆(Short-term memory)和长期记忆(Long-term memory):
- 短期记忆:管理单个会话或线程内的对话历史。
- 长期记忆:跨会话存储用户偏好和重要信息。
1. 短期记忆(Short-term memory)
1.1. 短期记忆的定义与工作原理
短期记忆让应用记住单个线程或会话中的先前交互。对话历史是短期记忆最常见的形式。
长对话对LLM的挑战:
- 完整历史可能超出LLM的上下文窗口,导致上下文丢失或错误。
- 即使模型支持完整上下文长度,LLM在长上下文中表现仍然不佳。
- LLM可能会被过时或不相关的上下文分散注意力。
- 长上下文导致响应时间变慢和成本增加。
1.2. 短期记忆的实现方式
要添加短期记忆(线程级持久化)到代理,需要在创建代理时指定checkpointer:
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
"gpt-5",
tools=[get_user_info],
checkpointer=InMemorySaver(),
)
1.3. 短期记忆的实现
在生产环境中,应使用数据库支持的checkpointer:
# pip install langgraph-checkpoint-postgres
from langchain.agents import create_agent
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://postgres:postgres@localhost:5442/postgres?sslmode=disable"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
agent = create_agent(
"gpt-5",
tools=[get_user_info],
checkpointer=checkpointer,
)
1.4. 自定义Agent记忆
默认情况下,Agent使用AgentState管理短期记忆,特别是通过messages键的对话历史。
可以扩展AgentState以添加额外字段:
from langchain.agents import create_agent, AgentState
from langgraph.checkpoint.memory import InMemorySaver
class CustomAgentState(AgentState):
user_id: str
preferences: dict
agent = create_agent(
"gpt-5",
tools=[get_user_info],
state_schema=CustomAgentState,
checkpointer=InMemorySaver(),
)
1.5. 常见模式
当启用短期记忆时,长对话可能会超出LLM的上下文窗口。以下是常见解决方案:
| 模式 | 描述 | 适用场景 |
|---|---|---|
| Trim messages | 移除第一个或最后一个N条消息(在调用LLM之前) | 适用于需要保持上下文长度的场景 |
| Delete messages | 从LangGraph状态永久删除消息 | 适用于需要清理特定消息或清除整个消息历史的场景 |
| Summarize messages | 总结早期消息并用摘要替换 | 适用于需要保留重要信息但避免上下文过长的场景 |
| Custom strategies | 自定义策略(如消息过滤等) | 适用于需要特定消息管理逻辑的场景 |
1.5.1. Trim messages代码示例
from langchain.messages import RemoveMessage
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""Keep only the last few messages to fit context window."""
messages = state["messages"]
if len(messages) <= 3:
return None
first_msg = messages[0]
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
return {
"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES), *new_messages]
}
agent = create_agent(
your_model_here,
tools=your_tools_here,
middleware=[trim_messages],
checkpointer=InMemorySaver(),
)
1.5.2. Summarize messages代码示例
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
checkpointer = InMemorySaver()
agent = create_agent(
model="gpt-4.1",
tools=[],
middleware=[
SummarizationMiddleware(
model="gpt-4.1-mini",
trigger=("tokens", 4000),
keep=("messages", 20)
)
],
checkpointer=checkpointer,
)
1.6. 访问和修改短期记忆
1.6.1 通过工具访问
在工具中使用runtime参数(类型为ToolRuntime)访问短期记忆:
from langchain.agents import create_agent, AgentState
from langchain.tools import tool, ToolRuntime
class CustomState(AgentState):
user_id: str
@tool
def get_user_info(runtime: ToolRuntime) -> str:
"""Look up user info."""
user_id = runtime.state["user_id"]
return "User is John Smith" if user_id == "user_123" else "Unknown user"
agent = create_agent(
model="gpt-5-nano",
tools=[get_user_info],
state_schema=CustomState,
)
result = agent.invoke(
{"messages": "look up user information", "user_id": "user_123"}
)
print(result["messages"][-1].content)
1.6.2 通过中间件访问
在中间件中访问短期记忆以创建动态提示:
from langchain.agents import create_agent
from typing import TypedDict
from langchain.agents.middleware import dynamic_prompt, ModelRequest
class CustomContext(TypedDict):
user_name: str
def get_weather(city: str) -> str:
"""Get the weather in a city."""
return f"The weather in {city} is always sunny!"
@dynamic_prompt
def dynamic_system_prompt(request: ModelRequest) -> str:
user_name = request.runtime.context["user_name"]
system_prompt = f"You are a helpful assistant. Address the user as {user_name}."
return system_prompt
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
middleware=[dynamic_system_prompt],
context_schema=CustomContext,
)
result = agent.invoke(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
context=CustomContext(user_name="John Smith"),
)
2. 长期记忆(Long-term memory)
2.1. 定义与工作原理
LangChain代理使用 LangGraph persistence 来实现长期记忆。这是一种更高级的主题,需要了解LangGraph才能使用。长期记忆将记忆作为JSON文档存储在 store 中。
2.2. 存储结构
LangGraph将长期记忆存储为JSON文档,组织在自定义的namespace(类似文件夹)和特定的key(类似文件名)下:
from langgraph.store.memory import InMemoryStore
def embed(texts: list[str]) -> list[list[float]]:
# Replace with an actual embedding function or LangChain embeddings object
return [[1.0, 2.0] * len(texts)]
# InMemoryStore saves data to an in-memory dictionary. Use a DB-backed store in production
store = InMemoryStore(index={"embed": embed, "dims": 2})
user_id = "my-user"
application_context = "chitchat"
namespace = (user_id, application_context)
store.put(
namespace,
"a-memory",
{
"rules": [
"User likes short, direct language",
"User only speaks English & python",
],
"my-key": "my-value",
},
)
# get the "memory" by ID
item = store.get(namespace, "a-memory")
# search for "memories" within this namespace, filtering on content equivalence, sorted by vector similarity
items = store.search(
namespace,
filter={"my-key": "my-value"},
query="language preferences"
)
2.3. 读取长期记忆
在工具中读取长期记忆:
from dataclasses import dataclass
from langchain_core.runnables import RunnableConfig
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime
from langgraph.store.memory import InMemoryStore
@dataclass
class Context:
user_id: str
# InMemoryStore saves data to an in-memory dictionary. Use a DB-backed store in production.
store = InMemoryStore()
# Write sample data to the store using the put method
store.put(
("users",), # Namespace to group related data together (users namespace for user data)
"user_123", # Key within the namespace (user ID as key)
{
"name": "John Smith",
"language": "English",
} # Data to store for the given user
)
@tool
def get_user_info(runtime: ToolRuntime[Context]) -> str:
"""Look up user info."""
# Access the store - same as that provided to `create_agent`
store = runtime.store
user_id = runtime.context.user_id # Retrieve data from store - returns StoreValue object with value and metadata
user_info = store.get(("users",), user_id)
return str(user_info.value) if user_info else "Unknown user"
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_user_info],
store=store,
context_schema=Context
)
agent.invoke(
{"messages": [{"role": "user", "content": "look up user information"}]},
context=Context(user_id="user_123")
)
2.4. 写入长期记忆
从工具写入长期记忆:
from dataclasses import dataclass
from typing_extensions import TypedDict
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime
from langgraph.store.memory import InMemoryStore
# InMemoryStore saves data to an in-memory dictionary. Use a DB-backed store in production.
store = InMemoryStore()
@dataclass
class Context:
user_id: str
# TypedDict defines the structure of user information for the LLM
class UserInfo(TypedDict):
name: str
# Tool that allows agent to update user information (useful for chat applications)
@tool
def save_user_info(user_info: UserInfo, runtime: ToolRuntime[Context]) -> str:
"""Save user info."""
# Access the store - same as that provided to `create_agent`
store = runtime.store
user_id = runtime.context.user_id
# Store data in the store (namespace, key, data)
store.put(("users",), user_id, user_info)
return "Successfully saved user info."
agent = create_agent(
model="claude-sonnet-4-6",
tools=[save_user_info],
store=store,
context_schema=Context
)
agent.invoke(
{"messages": [{"role": "user", "content": "My name is John Smith"}]},
context=Context(user_id="user_123")
)
# You can access the store directly to get the value
store.get(("users",), "user_123").value
3. Memory的实战应用
3.1. 短期记忆
- 优先使用窗口式记忆:
ConversationBufferWindowMemory是大多数场景下的最佳选择 - 控制对话历史长度:通过
k参数限制对话历史长度,避免内存溢出 - 为不同用户创建独立记忆:在多用户场景中,确保对话历史隔离
from langchain_community.memory import ConversationBufferWindowMemory
memory = ConversationBufferWindowMemory(
k=5, # 保留最近5轮对话
memory_key="chat_history",
return_messages=True
)
3.2. 长期记忆
- 使用命名空间组织数据:使用
(user_id, context)作为命名空间,便于组织和检索 - 使用适当的存储后端:在生产环境中使用数据库支持的存储(如PostgreSQL)
- 结合短期和长期记忆:在RAG应用中,短期记忆用于当前会话,长期记忆用于用户偏好
# 结合短期和长期记忆
from langchain_community.memory import ConversationBufferWindowMemory
from langgraph.store.memory import InMemoryStore
# 创建短期记忆
short_term_memory = ConversationBufferWindowMemory(
k=3,
memory_key="chat_history",
return_messages=True
)
# 创建长期记忆
long_term_store = InMemoryStore()
# 读取长期记忆
user_info = long_term_store.get(("users",), "user_123")
# 使用两者
agent = create_agent(
model="gpt-4.1",
tools=[get_user_info, search],
memory=short_term_memory,
store=long_term_store,
state_schema=CustomState,
)
十、与 LlamaIndex 框架对比
LlamaIndex(原名GPT Index)是专为构建基于知识库的LLM应用而设计的开源框架,专注于文档处理、知识库构建和检索增强生成(RAG)应用。它提供了一套完整的工具链,使开发者能够高效地将结构化和非结构化数据转化为LLM可访问的知识库。
LlamaIndex 框架侧重于数据交互的封装:
- 提供了大量数据加载、切割、索引、检索、排序等工具。
- Prompt、LLM 等底层封装比较少。
- 配套实现 RAG 相关工具。
- 集成 Agent 相关工具,无亮点。
LangChain 框架侧重与 LLM 交互的封装:
- 提供了大量Prompt、LLM、Message、Structured output等工具。
- 在数据处理和 RAG 集成方面相对较少。
- 主打 LCEL 链式调用,流程封装。
- 提供了 Agent、LangGraph、LangServe、LangSmith等生态集成。
两者可以说是功能突出的方向有较大区别,形成错位的竞争。当然网上也有很多人吐槽LlamaIndex “不够成熟”、“不好用”等,只能说仁者见仁,智者见智。框架的没有好坏,只有适不适合,但是在使用框架之前尽量考虑好是否需要,毕竟如果后期发现框架功能无法适应需求的时候,项目重构的话就会增加可怕的成本。
十一、浅谈 LangChain
LangChain是一个强大的框架,但并非适用于所有场景。随着 AI 模型的高速迭代升级,网上有很多人高呼“Langchain已死”,对 LangChain 框架提出了一系列的质疑:
- 学习运维成本高:LangChain提供了大量抽象组件,学习曲线陡峭,初学者难以快速上手,组件的复杂性也导致了运维成本升高。
- 模型内部封装:随着GPT-4、Claude 3等大模型原生支持函数调用、工具链集成,LangChain核心优势被持续削弱。
- 版本迭代太快:为了适应模型的更新,框架也在不断的更新,API 也在不断的变化,旧版本却无法在新版本中使用。
- 增加性能开销:框架大量的抽象层增加了额外的运行时开销。
- 功能集成有限:内置工具、集成组件有限,面对复杂需求需要自定义工具或组件。
LangChain的局限性不是框架的缺陷,而是其设计目标与实际应用场景之间的权衡。对于一个AI入门学者,LangChain框架告诉我们的不是组件有多全面,工具有多少,而是我们应该从学习框架的过程中,从整个框架体系组件的设计,我们可以认识到一个AI应用开发所覆盖的所有知识点的底层逻辑,基于这些底层知识,我们能更好的理解并掌握如何开发出高性能的AI应用,这才是我们需要去思考学习的地方。
注:文中有一些组件的很多案例在最新版本中已经无法使用,是因为有了更好的解决方案,但还是可以学习一下框架最初的设计思想。
由于 LangChain 框架迭代速度过快,想了解最新框架情况的也可以直达官方平台学习。
LangChain 官方文档平台:https://docs.langchain.com/oss/python/langchain/overview
更多推荐


所有评论(0)