LangChain精通-核心模块分析

深入分析 LangChain 的核心模块,包括 Runnable 系统、语言模型抽象、工具系统、检索系统、记忆系统、代理系统和链系统的设计原理与实现细节

LangChain 核心模块分析


概述

本文档深入分析 LangChain 的核心模块,包括其设计原理、实现细节和使用场景。

1. Runnable 系统

1.1 设计理念

问题: 在 LLM 应用中,需要组合多种组件(提示词、模型、解析器等),如何让它们无缝协作?

解决方案: 定义统一的 Runnable 接口,所有组件都实现这个接口。

组合

组合

组合

Runnable 接口

提示模板

语言模型

输出解析器

工具

检索器

LCEL 链

1.2 核心接口

文件: libs/core/langchain_core/runnables/base.py

class Runnable(Generic[Input, Output], ABC):
    """所有组件的基础接口"""

    @abstractmethod
    def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
        """同步执行:将输入转换为输出"""
        pass

    def batch(self, inputs: list[Input]) -> list[Output]:
        """批量处理:并行执行多个输入"""
        pass

    def stream(self, input: Input) -> Iterator[Output]:
        """流式输出:逐步产生结果"""
        pass

    async def ainvoke(self, input: Input) -> Output:
        """异步执行"""
        pass

    def __or__(self, other: Runnable) -> RunnableSequence:
        """管道操作符:创建顺序链"""
        pass

1.3 类型层次

«abstract»

Runnable

+invoke()

+batch()

+stream()

+ainvoke()

RunnableLambda

+func Callable

+invoke()

RunnableSequence

+steps Sequence

+invoke()

RunnableParallel

+steps Mapping

+invoke()

RunnablePassthrough

+invoke()

1.4 LCEL 执行流程

Parser Model Prompt RunnableSequence 用户代码 Parser Model Prompt RunnableSequence 用户代码 chain.invoke({"topic": "AI"}) prompt.invoke({"topic": "AI"}) "Tell me about AI" model.invoke("Tell me about AI") AIMessage("AI is...") parser.invoke(AIMessage) "AI is..." "AI is..."

1.5 实现细节

RunnableSequence 的 invoke 实现:

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
    """
    顺序执行所有步骤

    关键点:
    1. 将每个步骤的输出作为下一步的输入
    2. 处理异常并提供上下文
    3. 支持中间结果的追踪
    """
    value: Any = input

    for i, step in enumerate(self.steps):
        try:
            # 核心逻辑:传递值到下一步
            value = step.invoke(value, config)
        except Exception as e:
            # 提供有用的错误信息
            raise RuntimeError(
                f"Error in step {i} ({step.get_name()}): {str(e)}"
            ) from e

    return value

RunnableParallel 的并发执行:

async def ainvoke(
    self,
    input: Input,
    config: Optional[RunnableConfig] = None,
) -> dict[str, Any]:
    """
    并行执行所有步骤

    关键点:
    1. 使用 asyncio.gather 并行执行
    2. 所有步骤接收相同的输入
    3. 结果合并到字典中
    """
    # 为每个步骤创建异步任务
    tasks = [
        step.ainvoke(input, config)
        for step in self.steps.values()
    ]

    # 并行等待所有任务完成
    results = await asyncio.gather(*tasks)

    # 将结果映射到键
    return dict(zip(self.steps.keys(), results))

1.6 最佳实践

# ✓ 推荐:使用类型注解
from typing import Dict

chain: Runnable[Dict[str, str], str] = prompt | model | parser

# ✓ 推荐:使用 RunnablePassthrough 传递数据
from langchain_core.runnables import RunnablePassthrough

chain = {
    "context": retriever | format_docs,
    "question": RunnablePassthrough(),  # 保留原始问题
} | prompt | model

# ✓ 推荐:使用 RunnableBranch 处理条件
from langchain_core.runnables import RunnableBranch

chain = RunnableBranch(
    (lambda x: x["category"] == "tech", tech_chain),
    (lambda x: x["category"] == "news", news_chain),
    default_chain
)

# ✗ 避免:在链中执行耗时操作
# 应该使用异步或流式处理

2. 语言模型抽象

2.1 设计理念

问题: 不同的 LLM 提供商(OpenAI、Anthropic、Google 等)有不同的 API,如何统一使用?

解决方案: 定义 BaseLanguageModel 抽象接口,各提供商实现此接口。

使用

BaseLanguageModel

ChatOpenAI

ChatAnthropic

ChatGoogle

HuggingFace

应用代码

2.2 核心接口

文件: libs/core/langchain_core/language_models/base.py

class BaseLanguageModel(RunnableSerializable[LanguageModelInput, LanguageModelOutputVar], ABC):
    """所有语言模型的抽象基类"""

    # 可配置的字段
    cache: Union[BaseCache, bool, None]
    verbose: bool
    callbacks: Callbacks
    tags: list[str]

    # 核心方法
    @abstractmethod
    def generate_prompt(
        self,
        prompts: list[PromptValue],
        stop: list[str] | None = None,
        **kwargs: Any,
    ) -> LLMResult:
        """生成文本的核心方法"""
        pass

    # Runnable 实现
    def invoke(
        self,
        input: LanguageModelInput,
        config: Optional[RunnableConfig] = None,
    ) -> LanguageModelOutputVar:
        """将输入转换为 PromptValue 并生成"""
        pass

2.3 消息类型系统

文件: libs/core/langchain_core/messages/

«abstract»

BaseMessage

+content Any

+type str

+to_dict()

HumanMessage

+content str

+example bool

AIMessage

+content str

+tool_calls list

+tool_call_id str

SystemMessage

+content str

ToolMessage

+content Any

+tool_call_id str

2.4 工具调用机制

绑定工具到模型:

# 文件: libs/core/langchain_core/language_models/chat_models.py

def bind_tools(
    self,
    tools: Sequence[BaseTool],
    **kwargs: Any,
) -> "RunnableBinding":
    """
    绑定工具到模型

    步骤:
    1. 将工具转换为模型特定的格式
    2. 创建 RunnableBinding 包装器
    3. 在调用时自动添加工具信息
    """
    # 格式化工具
    formatted_tools = [self._format_tool(tool) for tool in tools]

    # 创建绑定
    return RunnableBinding(
        bound=self,
        kwargs={**kwargs, "tools": formatted_tools}
    )

2.5 流式生成

def stream(
    self,
    input: LanguageModelInput,
    config: Optional[RunnableConfig] = None,
) -> Iterator[BaseMessage]:
    """
    流式生成文本

    关键点:
    1. 与提供商建立 SSE 连接
    2. 逐步接收和产生数据块
    3. 累积完整的消息
    """
    accumulated_content = ""

    for chunk in self._stream(input, config):
        accumulated_content += chunk.content
        yield AIMessage(content=chunk.content)

3. 工具系统

3.1 设计理念

问题: LLM 只能生成文本,如何让它执行实际操作(搜索、计算等)?

解决方案: 定义工具接口,LLM 可以选择调用工具,工具执行结果返回给 LLM。

外部系统 工具 LLM 用户 外部系统 工具 LLM 用户 "巴黎天气怎么样?" 分析需要天气信息 调用 get_weather("巴黎") 请求天气数据 {"temp": 20, "condition": "晴天"} 返回结果 根据结果生成回答 "巴黎今天20度,晴天"

3.2 工具结构

文件: libs/core/langchain_core/tools/base.py

class BaseTool(RunnableSerializable[Union[str, dict], Any], ABC):
    """工具的抽象基类"""

    # 元数据
    name: str
    description: str
    args_schema: Type[BaseModel]

    # 执行方法
    @abstractmethod
    def _run(self, *args, **kwargs) -> Any:
        """同步执行工具"""
        pass

    async def _arun(self, *args, **kwargs) -> Any:
        """异步执行工具(可选)"""
        return await asyncio.get_event_loop().run_in_executor(
            None, functools.partial(self._run, *args, **kwargs)
        )

3.3 工具调用流程

LLM 输出

args_schema

_run/_arun

ToolMessage

接收输入

解析参数

验证参数

执行工具

格式化结果

3.4 自定义工具示例

from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

# 定义输入模式
class SearchInput(BaseModel):
    query: str = Field(description="搜索查询")
    limit: int = Field(default=10, description="结果数量")

# 定义工具函数
def search_func(query: str, limit: int = 10) -> str:
    """执行搜索"""
    # 实际的搜索逻辑
    return f"搜索 '{query}',返回 {limit} 个结果"

# 创建工具
search_tool = StructuredTool(
    name="search",
    description="搜索互联网",
    func=search_func,
    args_schema=SearchInput,
)

4. 检索系统

4.1 设计理念

问题: LLM 有知识截止日期,如何让它访问最新或私有数据?

解决方案: RAG (检索增强生成) - 先检索相关文档,再基于文档生成答案。

相似度搜索

用户查询

检索器

向量存储

相关文档

提示词模板

LLM

最终答案

4.2 检索器接口

文件: libs/core/langchain_core/retrievers.py

class BaseRetriever(Runnable[str, list[Document]], ABC):
    """检索器的抽象基类"""

    @abstractmethod
    def _get_relevant_documents(
        self,
        query: str,
        run_manager: CallbackManagerForChainRun,
    ) -> list[Document]:
        """获取相关文档"""
        pass

    def invoke(self, input: str) -> list[Document]:
        """Runnable 接口实现"""
        return self._get_relevant_documents(input, None)

4.3 向量存储检索器

# 文件: libs/langchain/langchain/vectorstores/base.py

class VectorStoreRetriever(BaseRetriever):
    """基于向量存储的检索器"""

    vectorstore: VectorStore
    search_type: str = "similarity"
    search_kwargs: dict[str, Any] = Field(default_factory=dict)

    def _get_relevant_documents(
        self,
        query: str,
        run_manager: CallbackManagerForChainRun,
    ) -> list[Document]:
        """执行向量相似度搜索"""
        if self.search_type == "similarity":
            return self.vectorstore.similarity_search(
                query,
                k=self.search_kwargs.get("k", 4)
            )
        elif self.search_type == "mmr":
            return self.vectorstore.max_marginal_relevance_search(
                query,
                k=self.search_kwargs.get("k", 4)
            )

4.4 RAG 链实现

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# RAG 提示词模板
template = """根据以下上下文回答问题:

上下文:
{context}

问题:{question}

回答:"""

prompt = ChatPromptTemplate.from_template(template)

# 格式化文档
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# 构建 RAG 链
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | prompt
    | llm
    | StrOutputParser()
)

# 使用
result = rag_chain.invoke("什么是 LangChain?")

5. 记忆系统

5.1 设计理念

问题: LLM 是无状态的,如何在多轮对话中保持上下文?

解决方案: 记忆组件 - 自动管理对话历史并在每次调用时注入提示词。

5.2 记忆接口

文件: libs/langchain/langchain/memory/base.py

class BaseMemory(Serializable, ABC):
    """记忆的抽象基类"""

    @property
    @abstractmethod
    def memory_variables(self) -> list[str]:
        """返回此记忆管理的变量名"""
        pass

    @abstractmethod
    def load_memory_variables(self, inputs: dict[str, Any]) -> dict[str, Any]:
        """加载记忆变量"""
        pass

    @abstractmethod
    def save_context(self, inputs: dict[str, Any], outputs: dict[str, Any]) -> None:
        """保存当前对话上下文"""
        pass

5.3 记忆类型

记忆类型

缓冲记忆

ConversationBufferMemory

保存所有消息

窗口记忆

BufferWindowMemory

只保存最近 N 条

摘要记忆

SummaryMemory

保存对话摘要

向量记忆

VectorStoreMemory

基于向量检索

实体记忆

EntityMemory

跟踪实体信息

5.4 在链中使用记忆

from langchain.memory import ConversationBufferMemory

# 创建记忆
memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

# 创建带记忆的提示词
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一个友好的助手。"),
    ("placeholder", "{chat_history}"),  # 记忆占位符
    ("human", "{input}")
])

# 创建链
chain = prompt | llm

# 使用 RunnableWithMessageHistory 添加记忆
from langchain_core.runnables.history import RunnableWithMessageHistory

chain_with_history = RunnableWithMessageHistory(
    chain,
    lambda session_id: memory,  # 获取记忆的函数
    input_messages_key="input",
    history_messages_key="chat_history"
)

6. 代理系统

6.1 设计理念

问题: 如何让 LLM 自主决策和执行复杂任务?

解决方案: 代理 - LLM 作为"大脑",决定调用哪些工具以及何时停止。

LLM 分析

选择工具

调用工具

获取输出

继续推理

满足条件

接收任务

思考

决定动作

执行工具

观察结果

完成任务

6.2 代理类型

类型 描述 使用场景
ReActAgent 推理-行动循环 需要多步推理
OpenAIToolsAgent OpenAI 工具调用 使用 OpenAI API
StructuredChatAgent 结构化输入输出 复杂工具参数
SelfAskWithSearch 自问自答 需要验证事实

6.3 代理执行流程

# 文件: libs/langchain/langchain/agents/agent.py

class AgentExecutor:
    """代理执行器"""

    agent: Agent
    tools: list[BaseTool]
    max_iterations: int = 15

    def invoke(self, inputs: dict[str, Any]) -> AgentFinish:
        """执行代理循环"""
        iterations = 0
        intermediate_steps = []

        while iterations < self.max_iterations:
            # 1. 让代理决定下一步动作
            action = self.agent.plan(
                inputs,
                intermediate_steps
            )

            # 2. 检查是否完成
            if isinstance(action, AgentFinish):
                return action

            # 3. 执行工具
            tool = self._get_tool(action.tool)
            observation = tool.invoke(action.tool_input)

            # 4. 记录观察结果
            intermediate_steps.append((action, observation))

            iterations += 1

        raise RuntimeError("代理超过最大迭代次数")

7. 链系统

7.1 链的类型

链类型

基础链

LLMChain

最简单的 LLM 调用

顺序链

SequentialChain

SimpleSequentialChain

按顺序执行多个链

路由链

RouterChain

MultiPromptChain

根据输入路由到不同链

高级链

RetrievalQA

ConversationalRetrievalChain

TransformChain

7.2 使用 LCEL 构建链

# 传统方式(已废弃)
from langchain.chains import LLMChain

chain = LLMChain(
    llm=llm,
    prompt=prompt,
    output_parser=parser
)
result = chain.run({"topic": "AI"})

# LCEL 方式(推荐)
chain = prompt | llm | parser
result = chain.invoke({"topic": "AI"})

7.3 复杂链示例

from langchain_core.runnables import RunnableBranch, RunnableParallel

# 条件分支
branch = RunnableBranch(
    (lambda x: x["language"] == "chinese", chinese_chain),
    (lambda x: x["language"] == "english", english_chain),
    default_chain
)

# 并行处理
parallel = RunnableParallel(
    translation=translation_chain,
    summary=summary_chain,
    sentiment=sentiment_chain
)

# 组合在一起
complex_chain = {
    "input": RunnablePassthrough(),
    "language": detect_language_chain
} | branch | parallel | final_formatter

8. 性能优化

8.1 批量处理

# 批量调用可以提高吞吐量
results = chain.batch([
    {"input": "query 1"},
    {"input": "query 2"},
    {"input": "query 3"}
], config={"max_concurrency": 5})

8.2 流式处理

# 流式处理可以减少延迟
async for chunk in chain.astream({"input": "长文本"}):
    print(chunk, end="", flush=True)

8.3 缓存

from langchain_core.cache import InMemoryCache

# 启用缓存
llm = ChatOpenAI(cache=InMemoryCache())

# 第一次调用会执行实际 API 调用
result1 = llm.invoke("Hello")

# 第二次会返回缓存结果
result2 = llm.invoke("Hello")

9. 调试和监控

9.1 使用回调

from langchain_core.callbacks import StdOutCallbackHandler

# 在链执行时打印详细信息
chain.invoke(
    {"input": "test"},
    config={"callbacks": [StdOutCallbackHandler()]}
)

9.2 使用 LangSmith

import os

# 配置 LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"

# 所有执行都会被追踪
result = chain.invoke({"input": "test"})

10. 总结

LangChain 的核心模块通过统一的 Runnable 接口实现了高度的可组合性和灵活性:

  1. Runnable 系统: 统一的执行接口和组合能力
  2. 语言模型抽象: 提供商无关的模型接口
  3. 工具系统: 让 LLM 能执行实际操作
  4. 检索系统: RAG 模式的核心
  5. 记忆系统: 多轮对话的状态管理
  6. 代理系统: 自主决策和执行
  7. 链系统: 工作流编排

理解这些模块的设计和实现,将帮助你更有效地使用和扩展 LangChain。


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐