在这里插入图片描述

  【个人主页:玄同765

大语言模型(LLM)开发工程师中国传媒大学·数字媒体技术(智能交互与游戏设计)

深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调

技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️

工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 

     

「让AI交互更智能,让技术落地更高效」

欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!

引言

在 LangChain v1.0 之前,Chain 模块存在类体系冗余、组合灵活性差、生产级特性缺失等问题,比如 LLMChain、SequentialChain 等类各自为政,难以实现复杂工作流的灵活组合。v1.0 版本通过引入 Runnable 接口,将所有可执行组件(Prompt、LLM、工具、自定义逻辑等)统一为具有相同交互方法的对象,Chain 则成为这些 Runnable 组件的组合体。基于这一设计,LangChain v1.0+ 的 Chain 可清晰划分为 5 大类,每类对应特定的业务场景,开发者可根据需求参考选型。

一、基础流转型 Chain:处理数据的核心单元

基础流转型 Chain 是构建复杂工作流的 “原子组件”,负责处理输入输出的基础传递、转换和增强,核心特点是轻量、专注单一功能。

1. RunnablePassthrough:透传与增强输入

适用场景:需要将原始输入直接传递到下游组件,或对输入进行简单增强(如添加固定参数、合并多字段)的场景。核心特性:

  • 透传输入数据,无需修改格式;
  • 支持 assign 方法扩展输入字段,实现数据增强。
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv

load_dotenv()

prompt = ChatPromptTemplate.from_messages([
    ("system", "你是{role},请基于用户问题给出专业回答。"),
    ("user", "{question}")
])
llm = ChatOpenAI(model="gpt-4o-mini")
output_parser = StrOutputParser()

# 1. 基础透传:直接传递输入到Prompt
chain = RunnablePassthrough() | prompt | llm | output_parser
print(chain.invoke({"role": "Python技术专家", "question": "如何实现异步IO?"}))

# 2. 数据增强:为输入添加额外字段
enhanced_chain = RunnablePassthrough.assign(
    role=lambda x: "资深架构师"  # 固定增强角色字段
) | prompt | llm | output_parser
print(enhanced_chain.invoke({"question": "如何设计高并发系统?"}))

2. RunnableLambda:嵌入自定义逻辑

适用场景:需要将自定义 Python 逻辑(如数据预处理、结果格式化、业务规则校验)插入到 Chain 中的场景。核心特性:

  • 支持同步 / 异步自定义函数;
  • 自动适配 Runnable 接口,可与其他组件无缝组合。
from langchain_core.runnables import RunnableLambda
import asyncio

# 同步自定义逻辑:格式化输出
def format_response(output):
    return f"【技术回复】\n{output}"

# 异步自定义逻辑:模拟异步数据校验
async def async_validate_input(input_data):
    question = input_data["question"]
    if len(question) < 5:
        raise ValueError("问题过短,请提供更多细节")
    return input_data

# 构建带自定义逻辑的Chain
chain = (
    RunnableLambda(async_validate_input)
    | prompt
    | llm
    | output_parser
    | RunnableLambda(format_response)
)

# 异步调用
async def main():
    response = await chain.ainvoke({"role": "Python专家", "question": "如何优化FastAPI性能?"})
    print(response)

asyncio.run(main())

二、组合编排型 Chain:构建复杂工作流的核心

组合编排型 Chain 是 LangChain v1.0+ 的核心能力,通过对基础 Runnable 组件的组合,实现顺序、并行、分支等复杂工作流,满足多步骤业务需求。

1. RunnableSequence:顺序工作流

适用场景:需要按固定顺序执行多个步骤的场景(如预处理→LLM 调用→后处理),是最常用的 Chain 类型。核心特性:

  • 用管道符 | 串联组件,语法简洁直观;
  • 前一个组件的输出自动作为后一个组件的输入,无需手动传递数据。
from langchain_core.runnables import RunnableSequence

# 显式定义顺序Chain(与|操作符等价)
preprocess = RunnableLambda(lambda x: {"role": "Python专家", "question": f"【紧急咨询】{x['question']}"})
generate = prompt | llm | output_parser
postprocess = RunnableLambda(lambda x: f"【最终回复】\n{x}")

# 方式1:用RunnableSequence显式定义
sequence_chain = RunnableSequence(preprocess, generate, postprocess)

# 方式2:用|操作符简化定义(推荐)
sequence_chain_simple = preprocess | generate | postprocess

print(sequence_chain.invoke({"question": "如何排查FastAPI 500错误?"}))

2. RunnableParallel:并行工作流

适用场景:需要同时执行多个独立任务的场景(如批量生成摘要、并行调用多个工具、多维度数据处理),可显著提升工作流效率。核心特性:

  • 并行执行多个 Runnable 组件;
  • 结果以字典形式返回,键为组件名称,值为组件输出。
from langchain_core.runnables import RunnableParallel

# 并行生成技术摘要和代码示例
parallel_chain = RunnableParallel(
    summary=(
        ChatPromptTemplate.from_messages([
            ("system", "生成一段100字以内的技术摘要。"),
            ("user", "{topic}")
        ]) | llm | output_parser
    ),
    code_example=(
        ChatPromptTemplate.from_messages([
            ("system", "生成一个简单的Python代码示例。"),
            ("user", "{topic}")
        ]) | llm | output_parser
    )
)

result = parallel_chain.invoke({"topic": "FastAPI异步接口"})
print("摘要:", result["summary"])
print("代码示例:", result["code_example"])

3. RunnableBranch:条件分支工作流

适用场景:需要根据输入的不同条件,动态选择不同处理逻辑的场景(如根据问题类型选择不同专家 Prompt、根据用户权限选择不同工具)。核心特性:

  • 支持多条件分支判断;
  • 按顺序匹配条件,匹配成功则执行对应分支,否则执行默认分支。
from langchain_core.runnables import RunnableBranch

# 构建条件分支Chain
branch_chain = RunnableBranch(
    # 条件1:问题包含"Python",使用Python专家分支
    (lambda x: "Python" in x["question"],
     ChatPromptTemplate.from_messages([
         ("system", "你是Python专家,仅输出Python相关技术方案。"),
         ("user", "{question}")
     ]) | llm | output_parser),
    # 条件2:问题包含"RAG",使用RAG架构分支
    (lambda x: "RAG" in x["question"],
     ChatPromptTemplate.from_messages([
         ("system", "你是RAG架构专家,详细解释实现步骤。"),
         ("user", "{question}")
     ]) | llm | output_parser),
    # 默认分支:通用技术顾问
    ChatPromptTemplate.from_messages([
         ("system", "你是通用技术顾问,解答各类技术问题。"),
         ("user", "{question}")
     ]) | llm | output_parser
)

# 测试不同分支(移除冗余的role参数)
print(branch_chain.invoke({"question": "Python如何实现协程?"}))
print(branch_chain.invoke({"question": "RAG系统的核心组件有哪些?"}))
print(branch_chain.invoke({"question": "如何设计分布式缓存?"}))

4. RunnableMap:字典键的映射式并行处理

适用场景:需要对字典输入的不同键绑定不同 Runnable 逻辑,并行执行并返回同结构字典结果的场景,是针对字典输入的并行处理工具。核心特性:

  • 针对字典输入的键值对进行映射处理;
  • 不同键的 Runnable 逻辑独立并行执行;
  • 输出与输入字典结构一致,键不变,值为对应 Runnable 的执行结果。
from langchain_core.runnables import RunnableMap

# 构建映射式Chain,对不同键执行不同逻辑
map_chain = RunnableMap({
    "processed_question": lambda x: f"【技术提问】{x['question']}",
    "expert_role": lambda x: "资深后端架构师" if "系统设计" in x["question"] else "Python技术专家",
    "raw_input": RunnablePassthrough()
})

# 调用测试
result = map_chain.invoke({
    "question": "如何设计高可用的微服务系统?"
})

print(f"处理后问题:{result['processed_question']}")
print(f"匹配专家角色:{result['expert_role']}")
print(f"原始输入:{result['raw_input']}")

# 结合其他Chain实现完整流程
full_chain = map_chain | (lambda x: prompt.invoke({"role": x["expert_role"], "question": x["processed_question"]}) | llm | output_parser)
print(full_chain.invoke({"question": "如何设计高可用的微服务系统?"}))

# 补充:列表批量处理的参考方式
def batch_qa(inputs: dict):
    """批量处理列表问题的函数"""
    single_chain = prompt | llm | output_parser
    return single_chain.batch([{"role": "Python专家", "question": q} for q in inputs["questions"]])

batch_chain = RunnableLambda(batch_qa)
batch_result = batch_chain.invoke({
    "questions": [
        "FastAPI的核心特性是什么?",
        "如何优化大模型推理速度?",
        "RAG的工作原理是什么?"
    ]
})

for idx, res in enumerate(batch_result):
    print(f"问题{idx+1}回复:{res}")

三、工具调用型 Chain:连接外部系统的桥梁

工具调用型 Chain 是 LangChain 实现大模型与外部系统交互的核心,支持大模型调用计算器、搜索引擎、数据库、API 等外部工具,扩展大模型的能力边界。

1. RunnableWithTools:基础工具调用

适用场景:需要大模型调用指定工具的场景,适合工具调用逻辑简单、无需自主决策的业务需求。核心特性:

  • 直接绑定工具列表,大模型根据 Prompt 生成工具调用指令;
  • 自动处理工具调用结果的解析和返回;
  • 建议搭配内置解析器使用,避免手动格式定义错误。
from langchain_core.runnables import RunnableWithTools
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

# 定义工具
@tool
def add(a: int, b: int) -> int:
    """计算两个整数的和"""
    return a + b

@tool
def search_internet(query: str) -> str:
    """搜索互联网获取最新信息"""
    return f"搜索结果:关于{query}的最新信息是..."

# 定义工具调用格式模型(内置解析器,避免手动格式错误)
class ToolCallSchema(BaseModel):
    name: str = Field(description="要调用的工具名称,可选add或search_internet")
    parameters: dict = Field(description="工具的入参,add需要a和b,search_internet需要query")

parser = JsonOutputParser(pydantic_object=ToolCallSchema)

# 构建工具调用Prompt(集成格式说明)
tool_prompt = PromptTemplate(
    template="请根据用户问题调用合适的工具\n用户问题:{question}\n格式要求:{format_instructions}",
    input_variables=["question"],
    partial_variables={"format_instructions": parser.get_format_instructions()}
)

# 构建工具调用Chain
tool_chain = tool_prompt | llm | parser | RunnableWithTools(tools=[add, search_internet])

print(tool_chain.invoke({"question": "计算123+456的结果"}))

2. AgentExecutor:智能代理 Chain

适用场景:需要大模型自主决策是否调用工具、调用哪个工具、如何处理工具结果的复杂场景(如智能客服、代码助手、企业级 RAG 系统)。核心特性:

  • 大模型自主规划工具调用流程;
  • 支持多轮工具调用,自动迭代处理结果;
  • 内置记忆能力,支持多轮对话中的工具调用。
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.tools import tool

# 定义工具
@tool
def search_internet(query: str) -> str:
    """搜索互联网获取最新信息"""
    return f"搜索结果:2024年Python最新稳定版本是3.12.4"

@tool
def calculate(a: int, b: int) -> int:
    """计算两个整数的运算结果"""
    return a + b

# 构建Agent Prompt
agent_prompt = ChatPromptTemplate.from_messages([
    ("system", "你是智能助手,可以调用工具解决问题。"),
    ("user", "{question}"),
    ("ai", "{agent_scratchpad}")
])

# 构建Agent
agent = create_openai_tools_agent(llm, [search_internet, calculate], agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=[search_internet, calculate], verbose=True)

# 执行智能代理
result = agent_executor.invoke({"question": "2024年Python最新版本是多少?再计算100+200的结果"})
print(result["output"])

四、会话记忆型 Chain:实现多轮对话的核心

会话记忆型 Chain 用于处理多轮对话场景,通过与 Memory 模块的集成,让大模型能够保留对话历史,实现上下文感知的交互。

RunnableWithMessageHistory:会话记忆 Chain

适用场景:需要实现多轮对话的场景(如聊天机器人、智能客服、个人助手),支持会话隔离,每个用户拥有独立的对话历史。核心特性:

  • 自动加载和保存对话历史;
  • 支持自定义会话存储(内存、Redis、数据库);
  • 与 LCEL 体系无缝集成,无需手动处理历史传递。
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.memory import ConversationBufferMemory

# 会话存储:用字典模拟内存存储(生产环境可替换为Redis/数据库)
session_store = {}

def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
    if session_id not in session_store:
        session_store[session_id] = InMemoryChatMessageHistory()
    return session_store[session_id]

# 构建基础对话Chain
base_chain = prompt | llm | output_parser

# 包装为带会话记忆的Chain
history_chain = RunnableWithMessageHistory(
    base_chain,
    get_session_history=get_session_history,
    input_messages_key="question",
    history_messages_key="history"
)

# 多轮对话测试
response1 = history_chain.invoke(
    {"role": "Python专家", "question": "你好,我叫张三,是一名Python开发者。"},
    config={"configurable": {"session_id": "user_zhangsan"}}
)
print(response1)

response2 = history_chain.invoke(
    {"role": "Python专家", "question": "我之前提到过我的名字和职业是什么?"},
    config={"configurable": {"session_id": "user_zhangsan"}}
)
print(response2)

五、自定义扩展型 Chain:适配业务专属需求

当内置 Chain 无法满足业务需求时,开发者可通过自定义扩展型 Chain 实现专属业务逻辑,LangChain v1.0+ 提供了两种自定义方式:@chain 装饰器和自定义 Runnable 类。

1. @chain 装饰器:快速自定义 Chain

适用场景:需要将业务逻辑封装为可复用 Chain 的场景,语法简洁,无需手动实现 Runnable 接口。核心特性:

  • 自动将普通函数转换为 Runnable 对象;
  • 支持同步 / 异步函数;
  • 自动处理输入输出的类型校验。
from langchain_core.runnables import chain

@chain
def custom_tech_chain(inputs: dict) -> str:
    """自定义技术咨询Chain:预处理→LLM调用→后处理"""
    # 预处理
    processed_input = {"role": "Python专家", "question": f"【业务咨询】{inputs['question']}"}
    # LLM调用
    response = base_chain.invoke(processed_input)
    # 后处理
    return f"【业务专属回复】\n{response}"

print(custom_tech_chain.invoke({"question": "如何实现FastAPI的JWT认证?"}))

2. 自定义 Runnable 类:精细控制 Chain 逻辑

适用场景:需要对 Chain 的执行逻辑进行精细控制的场景(如自定义错误处理、特殊数据流转、性能优化)。核心特性:

  • 完全自定义 invokestreambatch 等方法;
  • 可实现复杂的业务逻辑和性能优化;
  • 支持传递 config 参数,适配生产级配置需求。
from langchain_core.runnables import Runnable
from typing import Any, AsyncIterator, List

class CustomRunnable(Runnable):
    def invoke(self, input: dict, config: dict | None = None) -> str:
        """同步执行方法:整合知识库+LLM调用"""
        # 处理config参数,获取会话ID等配置
        session_id = config.get("configurable", {}).get("session_id", "default") if config else "default"
        
        # 自定义业务逻辑:调用内部知识库
        knowledge_result = f"【会话{session_id}知识库参考】FastAPI JWT认证可使用python-jose和passlib库实现。"
        
        # 结合输入生成最终请求
        final_input = {
            "role": "Python专家",
            "question": f"{knowledge_result}\n用户问题:{input['question']}"
        }
        return base_chain.invoke(final_input, config=config)

    async def ainvoke(self, input: dict, config: dict | None = None) -> str:
        """异步执行方法"""
        return self.invoke(input, config)

    def stream(self, input: dict, config: dict | None = None) -> AsyncIterator[str]:
        """流式输出方法:逐字返回结果"""
        final_input = {
            "role": "Python专家",
            "question": f"用户问题:{input['question']}"
        }
        return base_chain.stream(final_input, config=config)
    
    def batch(self, inputs: List[dict], config: dict | None = None) -> List[str]:
        """批量执行方法:提升批量处理效率"""
        final_inputs = [
            {"role": "Python专家", "question": f"用户问题:{item['question']}"}
            for item in inputs
        ]
        return base_chain.batch(final_inputs, config=config)

# 实例化并调用
custom_runnable = CustomRunnable()
# 同步调用
print(custom_runnable.invoke({"question": "如何实现FastAPI的JWT认证?"}))

# 批量调用
batch_inputs = [
    {"question": "如何实现FastAPI的JWT认证?"},
    {"question": "如何优化FastAPI的查询性能?"}
]
batch_outputs = custom_runnable.batch(batch_inputs)
for idx, out in enumerate(batch_outputs):
    print(f"批量任务{idx+1}:{out}")

六、Chain 选型指南:快速匹配业务需求

为了帮助开发者快速选型,下面整理了各类 Chain 的适用场景、核心特性及选型建议(基于 LangChain v1.0+ 官方规范整理):

Chain 类型 适用场景 核心特性 选型建议
RunnablePassthrough 透传输入、增强输入字段 轻量、无侵入、支持字段扩展 基础数据流转优先参考选择
RunnableLambda 插入自定义 Python 逻辑 灵活、支持同步 / 异步、适配 Runnable 接口 简单业务逻辑优先参考选择
RunnableSequence 顺序执行多步骤任务 语法简洁、数据自动传递 线性工作流优先参考选择
RunnableParallel 并行执行多个独立任务 提升效率、结果字典返回 批量处理、多维度分析优先参考选择
RunnableBranch 条件分支处理 动态选择逻辑、多条件匹配 多场景适配、权限控制优先参考选择
RunnableMap 字典键的映射式并行处理 针对字典键值对、并行执行、结构保持一致 字典输入的多逻辑并行处理优先参考选择
RunnableWithTools 基础工具调用 直接绑定工具、自动处理调用结果 简单工具调用场景优先参考选择
AgentExecutor 复杂工具调用、自主决策 智能规划、多轮工具调用、内置记忆 复杂业务场景、需要自主决策优先参考选择
RunnableWithMessageHistory 多轮对话、会话隔离 自动管理历史、支持自定义存储 聊天机器人、智能客服优先参考选择
@chain 装饰器 快速封装业务逻辑为 Chain 语法简洁、自动转换为 Runnable 快速开发、复用性高的业务逻辑优先参考选择
自定义 Runnable 类 精细控制 Chain 逻辑、特殊业务需求 完全自定义、支持复杂逻辑 复杂业务场景、性能优化需求优先参考选择

七、生产级落地注意事项

  1. 类型安全:所有 Chain 的输入输出均需保证类型一致,建议添加类型注解,避免运行时错误。
  2. 异步支持:高并发场景下优先使用异步方法(ainvokeastream),提升系统吞吐量。
  3. 监控调试:启用 LangSmith 追踪,实时监控 Chain 的执行流程、性能指标和错误日志。
  4. 性能优化:批量处理场景使用 batch 方法,流式输出场景使用 stream 方法,减少用户等待感知。
  5. 会话存储:生产环境下避免使用内存存储会话历史,应使用 Redis、数据库等持久化存储方案。
  6. 容错机制:使用 RunnableFallbacks 为 Chain 添加降级策略,应对工具调用失败、LLM 响应超时等异常情况。
    from langchain_core.runnables import RunnableFallbacks
    
    # 定义降级策略:LLM调用失败时返回默认提示
    fallback_chain = RunnableLambda(lambda x: "抱歉,当前服务繁忙,请稍后重试。")
    
    # 为核心chain添加降级
    robust_chain = base_chain.with_fallbacks([fallback_chain])
    

八、总结

LangChain v1.0+ 基于 Runnable 接口重构了 Chain 模块,实现了组件的统一编排和灵活组合,各类 Chain 覆盖了从基础数据流转到生产级复杂工作流的核心场景,可为开发者提供选型和落地参考。

开发者在选型时,应根据业务场景的复杂度、性能需求、扩展性要求等因素,参考本文的选型建议构建大模型应用。未来,LangChain 将持续优化 Chain 模块的性能和特性,推出更多适配复杂业务场景的组件,开发者可持续关注官方更新,提升大模型应用的构建效率和质量。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐