LangChain精通- 核心模块分析
本文深入剖析了LangChain框架的核心模块设计原理与实现细节。重点分析了Runnable系统的统一接口设计,通过抽象Runnable接口实现各类组件(提示模板、语言模型、解析器等)的无缝协作。详细展示了RunnableSequence顺序执行流程和RunnableParallel并发执行机制的具体实现,包括异常处理、中间结果追踪等关键技术点。同时介绍了语言模型抽象层BaseLanguageMo
LangChain精通-核心模块分析
深入分析 LangChain 的核心模块,包括 Runnable 系统、语言模型抽象、工具系统、检索系统、记忆系统、代理系统和链系统的设计原理与实现细节
LangChain 核心模块分析
概述
本文档深入分析 LangChain 的核心模块,包括其设计原理、实现细节和使用场景。
1. Runnable 系统
1.1 设计理念
问题: 在 LLM 应用中,需要组合多种组件(提示词、模型、解析器等),如何让它们无缝协作?
解决方案: 定义统一的 Runnable 接口,所有组件都实现这个接口。
1.2 核心接口
文件: libs/core/langchain_core/runnables/base.py
class Runnable(Generic[Input, Output], ABC):
"""所有组件的基础接口"""
@abstractmethod
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""同步执行:将输入转换为输出"""
pass
def batch(self, inputs: list[Input]) -> list[Output]:
"""批量处理:并行执行多个输入"""
pass
def stream(self, input: Input) -> Iterator[Output]:
"""流式输出:逐步产生结果"""
pass
async def ainvoke(self, input: Input) -> Output:
"""异步执行"""
pass
def __or__(self, other: Runnable) -> RunnableSequence:
"""管道操作符:创建顺序链"""
pass
1.3 类型层次
1.4 LCEL 执行流程
1.5 实现细节
RunnableSequence 的 invoke 实现:
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""
顺序执行所有步骤
关键点:
1. 将每个步骤的输出作为下一步的输入
2. 处理异常并提供上下文
3. 支持中间结果的追踪
"""
value: Any = input
for i, step in enumerate(self.steps):
try:
# 核心逻辑:传递值到下一步
value = step.invoke(value, config)
except Exception as e:
# 提供有用的错误信息
raise RuntimeError(
f"Error in step {i} ({step.get_name()}): {str(e)}"
) from e
return value
RunnableParallel 的并发执行:
async def ainvoke(
self,
input: Input,
config: Optional[RunnableConfig] = None,
) -> dict[str, Any]:
"""
并行执行所有步骤
关键点:
1. 使用 asyncio.gather 并行执行
2. 所有步骤接收相同的输入
3. 结果合并到字典中
"""
# 为每个步骤创建异步任务
tasks = [
step.ainvoke(input, config)
for step in self.steps.values()
]
# 并行等待所有任务完成
results = await asyncio.gather(*tasks)
# 将结果映射到键
return dict(zip(self.steps.keys(), results))
1.6 最佳实践
# ✓ 推荐:使用类型注解
from typing import Dict
chain: Runnable[Dict[str, str], str] = prompt | model | parser
# ✓ 推荐:使用 RunnablePassthrough 传递数据
from langchain_core.runnables import RunnablePassthrough
chain = {
"context": retriever | format_docs,
"question": RunnablePassthrough(), # 保留原始问题
} | prompt | model
# ✓ 推荐:使用 RunnableBranch 处理条件
from langchain_core.runnables import RunnableBranch
chain = RunnableBranch(
(lambda x: x["category"] == "tech", tech_chain),
(lambda x: x["category"] == "news", news_chain),
default_chain
)
# ✗ 避免:在链中执行耗时操作
# 应该使用异步或流式处理
2. 语言模型抽象
2.1 设计理念
问题: 不同的 LLM 提供商(OpenAI、Anthropic、Google 等)有不同的 API,如何统一使用?
解决方案: 定义 BaseLanguageModel 抽象接口,各提供商实现此接口。
2.2 核心接口
文件: libs/core/langchain_core/language_models/base.py
class BaseLanguageModel(RunnableSerializable[LanguageModelInput, LanguageModelOutputVar], ABC):
"""所有语言模型的抽象基类"""
# 可配置的字段
cache: Union[BaseCache, bool, None]
verbose: bool
callbacks: Callbacks
tags: list[str]
# 核心方法
@abstractmethod
def generate_prompt(
self,
prompts: list[PromptValue],
stop: list[str] | None = None,
**kwargs: Any,
) -> LLMResult:
"""生成文本的核心方法"""
pass
# Runnable 实现
def invoke(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
) -> LanguageModelOutputVar:
"""将输入转换为 PromptValue 并生成"""
pass
2.3 消息类型系统
文件: libs/core/langchain_core/messages/
2.4 工具调用机制
绑定工具到模型:
# 文件: libs/core/langchain_core/language_models/chat_models.py
def bind_tools(
self,
tools: Sequence[BaseTool],
**kwargs: Any,
) -> "RunnableBinding":
"""
绑定工具到模型
步骤:
1. 将工具转换为模型特定的格式
2. 创建 RunnableBinding 包装器
3. 在调用时自动添加工具信息
"""
# 格式化工具
formatted_tools = [self._format_tool(tool) for tool in tools]
# 创建绑定
return RunnableBinding(
bound=self,
kwargs={**kwargs, "tools": formatted_tools}
)
2.5 流式生成
def stream(
self,
input: LanguageModelInput,
config: Optional[RunnableConfig] = None,
) -> Iterator[BaseMessage]:
"""
流式生成文本
关键点:
1. 与提供商建立 SSE 连接
2. 逐步接收和产生数据块
3. 累积完整的消息
"""
accumulated_content = ""
for chunk in self._stream(input, config):
accumulated_content += chunk.content
yield AIMessage(content=chunk.content)
3. 工具系统
3.1 设计理念
问题: LLM 只能生成文本,如何让它执行实际操作(搜索、计算等)?
解决方案: 定义工具接口,LLM 可以选择调用工具,工具执行结果返回给 LLM。
3.2 工具结构
文件: libs/core/langchain_core/tools/base.py
class BaseTool(RunnableSerializable[Union[str, dict], Any], ABC):
"""工具的抽象基类"""
# 元数据
name: str
description: str
args_schema: Type[BaseModel]
# 执行方法
@abstractmethod
def _run(self, *args, **kwargs) -> Any:
"""同步执行工具"""
pass
async def _arun(self, *args, **kwargs) -> Any:
"""异步执行工具(可选)"""
return await asyncio.get_event_loop().run_in_executor(
None, functools.partial(self._run, *args, **kwargs)
)
3.3 工具调用流程
3.4 自定义工具示例
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
# 定义输入模式
class SearchInput(BaseModel):
query: str = Field(description="搜索查询")
limit: int = Field(default=10, description="结果数量")
# 定义工具函数
def search_func(query: str, limit: int = 10) -> str:
"""执行搜索"""
# 实际的搜索逻辑
return f"搜索 '{query}',返回 {limit} 个结果"
# 创建工具
search_tool = StructuredTool(
name="search",
description="搜索互联网",
func=search_func,
args_schema=SearchInput,
)
4. 检索系统
4.1 设计理念
问题: LLM 有知识截止日期,如何让它访问最新或私有数据?
解决方案: RAG (检索增强生成) - 先检索相关文档,再基于文档生成答案。
4.2 检索器接口
文件: libs/core/langchain_core/retrievers.py
class BaseRetriever(Runnable[str, list[Document]], ABC):
"""检索器的抽象基类"""
@abstractmethod
def _get_relevant_documents(
self,
query: str,
run_manager: CallbackManagerForChainRun,
) -> list[Document]:
"""获取相关文档"""
pass
def invoke(self, input: str) -> list[Document]:
"""Runnable 接口实现"""
return self._get_relevant_documents(input, None)
4.3 向量存储检索器
# 文件: libs/langchain/langchain/vectorstores/base.py
class VectorStoreRetriever(BaseRetriever):
"""基于向量存储的检索器"""
vectorstore: VectorStore
search_type: str = "similarity"
search_kwargs: dict[str, Any] = Field(default_factory=dict)
def _get_relevant_documents(
self,
query: str,
run_manager: CallbackManagerForChainRun,
) -> list[Document]:
"""执行向量相似度搜索"""
if self.search_type == "similarity":
return self.vectorstore.similarity_search(
query,
k=self.search_kwargs.get("k", 4)
)
elif self.search_type == "mmr":
return self.vectorstore.max_marginal_relevance_search(
query,
k=self.search_kwargs.get("k", 4)
)
4.4 RAG 链实现
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# RAG 提示词模板
template = """根据以下上下文回答问题:
上下文:
{context}
问题:{question}
回答:"""
prompt = ChatPromptTemplate.from_template(template)
# 格式化文档
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# 构建 RAG 链
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough()
}
| prompt
| llm
| StrOutputParser()
)
# 使用
result = rag_chain.invoke("什么是 LangChain?")
5. 记忆系统
5.1 设计理念
问题: LLM 是无状态的,如何在多轮对话中保持上下文?
解决方案: 记忆组件 - 自动管理对话历史并在每次调用时注入提示词。
5.2 记忆接口
文件: libs/langchain/langchain/memory/base.py
class BaseMemory(Serializable, ABC):
"""记忆的抽象基类"""
@property
@abstractmethod
def memory_variables(self) -> list[str]:
"""返回此记忆管理的变量名"""
pass
@abstractmethod
def load_memory_variables(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""加载记忆变量"""
pass
@abstractmethod
def save_context(self, inputs: dict[str, Any], outputs: dict[str, Any]) -> None:
"""保存当前对话上下文"""
pass
5.3 记忆类型
5.4 在链中使用记忆
from langchain.memory import ConversationBufferMemory
# 创建记忆
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 创建带记忆的提示词
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个友好的助手。"),
("placeholder", "{chat_history}"), # 记忆占位符
("human", "{input}")
])
# 创建链
chain = prompt | llm
# 使用 RunnableWithMessageHistory 添加记忆
from langchain_core.runnables.history import RunnableWithMessageHistory
chain_with_history = RunnableWithMessageHistory(
chain,
lambda session_id: memory, # 获取记忆的函数
input_messages_key="input",
history_messages_key="chat_history"
)
6. 代理系统
6.1 设计理念
问题: 如何让 LLM 自主决策和执行复杂任务?
解决方案: 代理 - LLM 作为"大脑",决定调用哪些工具以及何时停止。
6.2 代理类型
| 类型 | 描述 | 使用场景 |
|---|---|---|
| ReActAgent | 推理-行动循环 | 需要多步推理 |
| OpenAIToolsAgent | OpenAI 工具调用 | 使用 OpenAI API |
| StructuredChatAgent | 结构化输入输出 | 复杂工具参数 |
| SelfAskWithSearch | 自问自答 | 需要验证事实 |
6.3 代理执行流程
# 文件: libs/langchain/langchain/agents/agent.py
class AgentExecutor:
"""代理执行器"""
agent: Agent
tools: list[BaseTool]
max_iterations: int = 15
def invoke(self, inputs: dict[str, Any]) -> AgentFinish:
"""执行代理循环"""
iterations = 0
intermediate_steps = []
while iterations < self.max_iterations:
# 1. 让代理决定下一步动作
action = self.agent.plan(
inputs,
intermediate_steps
)
# 2. 检查是否完成
if isinstance(action, AgentFinish):
return action
# 3. 执行工具
tool = self._get_tool(action.tool)
observation = tool.invoke(action.tool_input)
# 4. 记录观察结果
intermediate_steps.append((action, observation))
iterations += 1
raise RuntimeError("代理超过最大迭代次数")
7. 链系统
7.1 链的类型
7.2 使用 LCEL 构建链
# 传统方式(已废弃)
from langchain.chains import LLMChain
chain = LLMChain(
llm=llm,
prompt=prompt,
output_parser=parser
)
result = chain.run({"topic": "AI"})
# LCEL 方式(推荐)
chain = prompt | llm | parser
result = chain.invoke({"topic": "AI"})
7.3 复杂链示例
from langchain_core.runnables import RunnableBranch, RunnableParallel
# 条件分支
branch = RunnableBranch(
(lambda x: x["language"] == "chinese", chinese_chain),
(lambda x: x["language"] == "english", english_chain),
default_chain
)
# 并行处理
parallel = RunnableParallel(
translation=translation_chain,
summary=summary_chain,
sentiment=sentiment_chain
)
# 组合在一起
complex_chain = {
"input": RunnablePassthrough(),
"language": detect_language_chain
} | branch | parallel | final_formatter
8. 性能优化
8.1 批量处理
# 批量调用可以提高吞吐量
results = chain.batch([
{"input": "query 1"},
{"input": "query 2"},
{"input": "query 3"}
], config={"max_concurrency": 5})
8.2 流式处理
# 流式处理可以减少延迟
async for chunk in chain.astream({"input": "长文本"}):
print(chunk, end="", flush=True)
8.3 缓存
from langchain_core.cache import InMemoryCache
# 启用缓存
llm = ChatOpenAI(cache=InMemoryCache())
# 第一次调用会执行实际 API 调用
result1 = llm.invoke("Hello")
# 第二次会返回缓存结果
result2 = llm.invoke("Hello")
9. 调试和监控
9.1 使用回调
from langchain_core.callbacks import StdOutCallbackHandler
# 在链执行时打印详细信息
chain.invoke(
{"input": "test"},
config={"callbacks": [StdOutCallbackHandler()]}
)
9.2 使用 LangSmith
import os
# 配置 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
# 所有执行都会被追踪
result = chain.invoke({"input": "test"})
10. 总结
LangChain 的核心模块通过统一的 Runnable 接口实现了高度的可组合性和灵活性:
- Runnable 系统: 统一的执行接口和组合能力
- 语言模型抽象: 提供商无关的模型接口
- 工具系统: 让 LLM 能执行实际操作
- 检索系统: RAG 模式的核心
- 记忆系统: 多轮对话的状态管理
- 代理系统: 自主决策和执行
- 链系统: 工作流编排
理解这些模块的设计和实现,将帮助你更有效地使用和扩展 LangChain。
更多推荐



所有评论(0)