LangChain v1.0+ Chain 全类型指南:从基础流转到生产级工作流的选型手册
【个人主页:玄同765】
大语言模型(LLM)开发工程师|中国传媒大学·数字媒体技术(智能交互与游戏设计)
深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调
技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️
工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案
「让AI交互更智能,让技术落地更高效」
欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!
引言
在 LangChain v1.0 之前,Chain 模块存在类体系冗余、组合灵活性差、生产级特性缺失等问题,比如 LLMChain、SequentialChain 等类各自为政,难以实现复杂工作流的灵活组合。v1.0 版本通过引入 Runnable 接口,将所有可执行组件(Prompt、LLM、工具、自定义逻辑等)统一为具有相同交互方法的对象,Chain 则成为这些 Runnable 组件的组合体。基于这一设计,LangChain v1.0+ 的 Chain 可清晰划分为 5 大类,每类对应特定的业务场景,开发者可根据需求参考选型。
一、基础流转型 Chain:处理数据的核心单元
基础流转型 Chain 是构建复杂工作流的 “原子组件”,负责处理输入输出的基础传递、转换和增强,核心特点是轻量、专注单一功能。
1. RunnablePassthrough:透传与增强输入
适用场景:需要将原始输入直接传递到下游组件,或对输入进行简单增强(如添加固定参数、合并多字段)的场景。核心特性:
- 透传输入数据,无需修改格式;
- 支持 assign 方法扩展输入字段,实现数据增强。
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
load_dotenv()
prompt = ChatPromptTemplate.from_messages([
("system", "你是{role},请基于用户问题给出专业回答。"),
("user", "{question}")
])
llm = ChatOpenAI(model="gpt-4o-mini")
output_parser = StrOutputParser()
# 1. 基础透传:直接传递输入到Prompt
chain = RunnablePassthrough() | prompt | llm | output_parser
print(chain.invoke({"role": "Python技术专家", "question": "如何实现异步IO?"}))
# 2. 数据增强:为输入添加额外字段
enhanced_chain = RunnablePassthrough.assign(
role=lambda x: "资深架构师" # 固定增强角色字段
) | prompt | llm | output_parser
print(enhanced_chain.invoke({"question": "如何设计高并发系统?"}))
2. RunnableLambda:嵌入自定义逻辑
适用场景:需要将自定义 Python 逻辑(如数据预处理、结果格式化、业务规则校验)插入到 Chain 中的场景。核心特性:
- 支持同步 / 异步自定义函数;
- 自动适配 Runnable 接口,可与其他组件无缝组合。
from langchain_core.runnables import RunnableLambda
import asyncio
# 同步自定义逻辑:格式化输出
def format_response(output):
return f"【技术回复】\n{output}"
# 异步自定义逻辑:模拟异步数据校验
async def async_validate_input(input_data):
question = input_data["question"]
if len(question) < 5:
raise ValueError("问题过短,请提供更多细节")
return input_data
# 构建带自定义逻辑的Chain
chain = (
RunnableLambda(async_validate_input)
| prompt
| llm
| output_parser
| RunnableLambda(format_response)
)
# 异步调用
async def main():
response = await chain.ainvoke({"role": "Python专家", "question": "如何优化FastAPI性能?"})
print(response)
asyncio.run(main())
二、组合编排型 Chain:构建复杂工作流的核心
组合编排型 Chain 是 LangChain v1.0+ 的核心能力,通过对基础 Runnable 组件的组合,实现顺序、并行、分支等复杂工作流,满足多步骤业务需求。
1. RunnableSequence:顺序工作流
适用场景:需要按固定顺序执行多个步骤的场景(如预处理→LLM 调用→后处理),是最常用的 Chain 类型。核心特性:
- 用管道符
|串联组件,语法简洁直观; - 前一个组件的输出自动作为后一个组件的输入,无需手动传递数据。
from langchain_core.runnables import RunnableSequence
# 显式定义顺序Chain(与|操作符等价)
preprocess = RunnableLambda(lambda x: {"role": "Python专家", "question": f"【紧急咨询】{x['question']}"})
generate = prompt | llm | output_parser
postprocess = RunnableLambda(lambda x: f"【最终回复】\n{x}")
# 方式1:用RunnableSequence显式定义
sequence_chain = RunnableSequence(preprocess, generate, postprocess)
# 方式2:用|操作符简化定义(推荐)
sequence_chain_simple = preprocess | generate | postprocess
print(sequence_chain.invoke({"question": "如何排查FastAPI 500错误?"}))
2. RunnableParallel:并行工作流
适用场景:需要同时执行多个独立任务的场景(如批量生成摘要、并行调用多个工具、多维度数据处理),可显著提升工作流效率。核心特性:
- 并行执行多个 Runnable 组件;
- 结果以字典形式返回,键为组件名称,值为组件输出。
from langchain_core.runnables import RunnableParallel
# 并行生成技术摘要和代码示例
parallel_chain = RunnableParallel(
summary=(
ChatPromptTemplate.from_messages([
("system", "生成一段100字以内的技术摘要。"),
("user", "{topic}")
]) | llm | output_parser
),
code_example=(
ChatPromptTemplate.from_messages([
("system", "生成一个简单的Python代码示例。"),
("user", "{topic}")
]) | llm | output_parser
)
)
result = parallel_chain.invoke({"topic": "FastAPI异步接口"})
print("摘要:", result["summary"])
print("代码示例:", result["code_example"])
3. RunnableBranch:条件分支工作流
适用场景:需要根据输入的不同条件,动态选择不同处理逻辑的场景(如根据问题类型选择不同专家 Prompt、根据用户权限选择不同工具)。核心特性:
- 支持多条件分支判断;
- 按顺序匹配条件,匹配成功则执行对应分支,否则执行默认分支。
from langchain_core.runnables import RunnableBranch
# 构建条件分支Chain
branch_chain = RunnableBranch(
# 条件1:问题包含"Python",使用Python专家分支
(lambda x: "Python" in x["question"],
ChatPromptTemplate.from_messages([
("system", "你是Python专家,仅输出Python相关技术方案。"),
("user", "{question}")
]) | llm | output_parser),
# 条件2:问题包含"RAG",使用RAG架构分支
(lambda x: "RAG" in x["question"],
ChatPromptTemplate.from_messages([
("system", "你是RAG架构专家,详细解释实现步骤。"),
("user", "{question}")
]) | llm | output_parser),
# 默认分支:通用技术顾问
ChatPromptTemplate.from_messages([
("system", "你是通用技术顾问,解答各类技术问题。"),
("user", "{question}")
]) | llm | output_parser
)
# 测试不同分支(移除冗余的role参数)
print(branch_chain.invoke({"question": "Python如何实现协程?"}))
print(branch_chain.invoke({"question": "RAG系统的核心组件有哪些?"}))
print(branch_chain.invoke({"question": "如何设计分布式缓存?"}))
4. RunnableMap:字典键的映射式并行处理
适用场景:需要对字典输入的不同键绑定不同 Runnable 逻辑,并行执行并返回同结构字典结果的场景,是针对字典输入的并行处理工具。核心特性:
- 针对字典输入的键值对进行映射处理;
- 不同键的 Runnable 逻辑独立并行执行;
- 输出与输入字典结构一致,键不变,值为对应 Runnable 的执行结果。
from langchain_core.runnables import RunnableMap
# 构建映射式Chain,对不同键执行不同逻辑
map_chain = RunnableMap({
"processed_question": lambda x: f"【技术提问】{x['question']}",
"expert_role": lambda x: "资深后端架构师" if "系统设计" in x["question"] else "Python技术专家",
"raw_input": RunnablePassthrough()
})
# 调用测试
result = map_chain.invoke({
"question": "如何设计高可用的微服务系统?"
})
print(f"处理后问题:{result['processed_question']}")
print(f"匹配专家角色:{result['expert_role']}")
print(f"原始输入:{result['raw_input']}")
# 结合其他Chain实现完整流程
full_chain = map_chain | (lambda x: prompt.invoke({"role": x["expert_role"], "question": x["processed_question"]}) | llm | output_parser)
print(full_chain.invoke({"question": "如何设计高可用的微服务系统?"}))
# 补充:列表批量处理的参考方式
def batch_qa(inputs: dict):
"""批量处理列表问题的函数"""
single_chain = prompt | llm | output_parser
return single_chain.batch([{"role": "Python专家", "question": q} for q in inputs["questions"]])
batch_chain = RunnableLambda(batch_qa)
batch_result = batch_chain.invoke({
"questions": [
"FastAPI的核心特性是什么?",
"如何优化大模型推理速度?",
"RAG的工作原理是什么?"
]
})
for idx, res in enumerate(batch_result):
print(f"问题{idx+1}回复:{res}")
三、工具调用型 Chain:连接外部系统的桥梁
工具调用型 Chain 是 LangChain 实现大模型与外部系统交互的核心,支持大模型调用计算器、搜索引擎、数据库、API 等外部工具,扩展大模型的能力边界。
1. RunnableWithTools:基础工具调用
适用场景:需要大模型调用指定工具的场景,适合工具调用逻辑简单、无需自主决策的业务需求。核心特性:
- 直接绑定工具列表,大模型根据 Prompt 生成工具调用指令;
- 自动处理工具调用结果的解析和返回;
- 建议搭配内置解析器使用,避免手动格式定义错误。
from langchain_core.runnables import RunnableWithTools
from langchain_core.tools import tool
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field
# 定义工具
@tool
def add(a: int, b: int) -> int:
"""计算两个整数的和"""
return a + b
@tool
def search_internet(query: str) -> str:
"""搜索互联网获取最新信息"""
return f"搜索结果:关于{query}的最新信息是..."
# 定义工具调用格式模型(内置解析器,避免手动格式错误)
class ToolCallSchema(BaseModel):
name: str = Field(description="要调用的工具名称,可选add或search_internet")
parameters: dict = Field(description="工具的入参,add需要a和b,search_internet需要query")
parser = JsonOutputParser(pydantic_object=ToolCallSchema)
# 构建工具调用Prompt(集成格式说明)
tool_prompt = PromptTemplate(
template="请根据用户问题调用合适的工具\n用户问题:{question}\n格式要求:{format_instructions}",
input_variables=["question"],
partial_variables={"format_instructions": parser.get_format_instructions()}
)
# 构建工具调用Chain
tool_chain = tool_prompt | llm | parser | RunnableWithTools(tools=[add, search_internet])
print(tool_chain.invoke({"question": "计算123+456的结果"}))
2. AgentExecutor:智能代理 Chain
适用场景:需要大模型自主决策是否调用工具、调用哪个工具、如何处理工具结果的复杂场景(如智能客服、代码助手、企业级 RAG 系统)。核心特性:
- 大模型自主规划工具调用流程;
- 支持多轮工具调用,自动迭代处理结果;
- 内置记忆能力,支持多轮对话中的工具调用。
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.tools import tool
# 定义工具
@tool
def search_internet(query: str) -> str:
"""搜索互联网获取最新信息"""
return f"搜索结果:2024年Python最新稳定版本是3.12.4"
@tool
def calculate(a: int, b: int) -> int:
"""计算两个整数的运算结果"""
return a + b
# 构建Agent Prompt
agent_prompt = ChatPromptTemplate.from_messages([
("system", "你是智能助手,可以调用工具解决问题。"),
("user", "{question}"),
("ai", "{agent_scratchpad}")
])
# 构建Agent
agent = create_openai_tools_agent(llm, [search_internet, calculate], agent_prompt)
agent_executor = AgentExecutor(agent=agent, tools=[search_internet, calculate], verbose=True)
# 执行智能代理
result = agent_executor.invoke({"question": "2024年Python最新版本是多少?再计算100+200的结果"})
print(result["output"])
四、会话记忆型 Chain:实现多轮对话的核心
会话记忆型 Chain 用于处理多轮对话场景,通过与 Memory 模块的集成,让大模型能够保留对话历史,实现上下文感知的交互。
RunnableWithMessageHistory:会话记忆 Chain
适用场景:需要实现多轮对话的场景(如聊天机器人、智能客服、个人助手),支持会话隔离,每个用户拥有独立的对话历史。核心特性:
- 自动加载和保存对话历史;
- 支持自定义会话存储(内存、Redis、数据库);
- 与 LCEL 体系无缝集成,无需手动处理历史传递。
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.memory import ConversationBufferMemory
# 会话存储:用字典模拟内存存储(生产环境可替换为Redis/数据库)
session_store = {}
def get_session_history(session_id: str) -> InMemoryChatMessageHistory:
if session_id not in session_store:
session_store[session_id] = InMemoryChatMessageHistory()
return session_store[session_id]
# 构建基础对话Chain
base_chain = prompt | llm | output_parser
# 包装为带会话记忆的Chain
history_chain = RunnableWithMessageHistory(
base_chain,
get_session_history=get_session_history,
input_messages_key="question",
history_messages_key="history"
)
# 多轮对话测试
response1 = history_chain.invoke(
{"role": "Python专家", "question": "你好,我叫张三,是一名Python开发者。"},
config={"configurable": {"session_id": "user_zhangsan"}}
)
print(response1)
response2 = history_chain.invoke(
{"role": "Python专家", "question": "我之前提到过我的名字和职业是什么?"},
config={"configurable": {"session_id": "user_zhangsan"}}
)
print(response2)
五、自定义扩展型 Chain:适配业务专属需求
当内置 Chain 无法满足业务需求时,开发者可通过自定义扩展型 Chain 实现专属业务逻辑,LangChain v1.0+ 提供了两种自定义方式:@chain 装饰器和自定义 Runnable 类。
1. @chain 装饰器:快速自定义 Chain
适用场景:需要将业务逻辑封装为可复用 Chain 的场景,语法简洁,无需手动实现 Runnable 接口。核心特性:
- 自动将普通函数转换为 Runnable 对象;
- 支持同步 / 异步函数;
- 自动处理输入输出的类型校验。
from langchain_core.runnables import chain
@chain
def custom_tech_chain(inputs: dict) -> str:
"""自定义技术咨询Chain:预处理→LLM调用→后处理"""
# 预处理
processed_input = {"role": "Python专家", "question": f"【业务咨询】{inputs['question']}"}
# LLM调用
response = base_chain.invoke(processed_input)
# 后处理
return f"【业务专属回复】\n{response}"
print(custom_tech_chain.invoke({"question": "如何实现FastAPI的JWT认证?"}))
2. 自定义 Runnable 类:精细控制 Chain 逻辑
适用场景:需要对 Chain 的执行逻辑进行精细控制的场景(如自定义错误处理、特殊数据流转、性能优化)。核心特性:
- 完全自定义
invoke、stream、batch等方法; - 可实现复杂的业务逻辑和性能优化;
- 支持传递
config参数,适配生产级配置需求。
from langchain_core.runnables import Runnable
from typing import Any, AsyncIterator, List
class CustomRunnable(Runnable):
def invoke(self, input: dict, config: dict | None = None) -> str:
"""同步执行方法:整合知识库+LLM调用"""
# 处理config参数,获取会话ID等配置
session_id = config.get("configurable", {}).get("session_id", "default") if config else "default"
# 自定义业务逻辑:调用内部知识库
knowledge_result = f"【会话{session_id}知识库参考】FastAPI JWT认证可使用python-jose和passlib库实现。"
# 结合输入生成最终请求
final_input = {
"role": "Python专家",
"question": f"{knowledge_result}\n用户问题:{input['question']}"
}
return base_chain.invoke(final_input, config=config)
async def ainvoke(self, input: dict, config: dict | None = None) -> str:
"""异步执行方法"""
return self.invoke(input, config)
def stream(self, input: dict, config: dict | None = None) -> AsyncIterator[str]:
"""流式输出方法:逐字返回结果"""
final_input = {
"role": "Python专家",
"question": f"用户问题:{input['question']}"
}
return base_chain.stream(final_input, config=config)
def batch(self, inputs: List[dict], config: dict | None = None) -> List[str]:
"""批量执行方法:提升批量处理效率"""
final_inputs = [
{"role": "Python专家", "question": f"用户问题:{item['question']}"}
for item in inputs
]
return base_chain.batch(final_inputs, config=config)
# 实例化并调用
custom_runnable = CustomRunnable()
# 同步调用
print(custom_runnable.invoke({"question": "如何实现FastAPI的JWT认证?"}))
# 批量调用
batch_inputs = [
{"question": "如何实现FastAPI的JWT认证?"},
{"question": "如何优化FastAPI的查询性能?"}
]
batch_outputs = custom_runnable.batch(batch_inputs)
for idx, out in enumerate(batch_outputs):
print(f"批量任务{idx+1}:{out}")
六、Chain 选型指南:快速匹配业务需求
为了帮助开发者快速选型,下面整理了各类 Chain 的适用场景、核心特性及选型建议(基于 LangChain v1.0+ 官方规范整理):
| Chain 类型 | 适用场景 | 核心特性 | 选型建议 |
|---|---|---|---|
| RunnablePassthrough | 透传输入、增强输入字段 | 轻量、无侵入、支持字段扩展 | 基础数据流转优先参考选择 |
| RunnableLambda | 插入自定义 Python 逻辑 | 灵活、支持同步 / 异步、适配 Runnable 接口 | 简单业务逻辑优先参考选择 |
| RunnableSequence | 顺序执行多步骤任务 | 语法简洁、数据自动传递 | 线性工作流优先参考选择 |
| RunnableParallel | 并行执行多个独立任务 | 提升效率、结果字典返回 | 批量处理、多维度分析优先参考选择 |
| RunnableBranch | 条件分支处理 | 动态选择逻辑、多条件匹配 | 多场景适配、权限控制优先参考选择 |
| RunnableMap | 字典键的映射式并行处理 | 针对字典键值对、并行执行、结构保持一致 | 字典输入的多逻辑并行处理优先参考选择 |
| RunnableWithTools | 基础工具调用 | 直接绑定工具、自动处理调用结果 | 简单工具调用场景优先参考选择 |
| AgentExecutor | 复杂工具调用、自主决策 | 智能规划、多轮工具调用、内置记忆 | 复杂业务场景、需要自主决策优先参考选择 |
| RunnableWithMessageHistory | 多轮对话、会话隔离 | 自动管理历史、支持自定义存储 | 聊天机器人、智能客服优先参考选择 |
| @chain 装饰器 | 快速封装业务逻辑为 Chain | 语法简洁、自动转换为 Runnable | 快速开发、复用性高的业务逻辑优先参考选择 |
| 自定义 Runnable 类 | 精细控制 Chain 逻辑、特殊业务需求 | 完全自定义、支持复杂逻辑 | 复杂业务场景、性能优化需求优先参考选择 |
七、生产级落地注意事项
- 类型安全:所有 Chain 的输入输出均需保证类型一致,建议添加类型注解,避免运行时错误。
- 异步支持:高并发场景下优先使用异步方法(
ainvoke、astream),提升系统吞吐量。 - 监控调试:启用 LangSmith 追踪,实时监控 Chain 的执行流程、性能指标和错误日志。
- 性能优化:批量处理场景使用
batch方法,流式输出场景使用stream方法,减少用户等待感知。 - 会话存储:生产环境下避免使用内存存储会话历史,应使用 Redis、数据库等持久化存储方案。
- 容错机制:使用
RunnableFallbacks为 Chain 添加降级策略,应对工具调用失败、LLM 响应超时等异常情况。from langchain_core.runnables import RunnableFallbacks # 定义降级策略:LLM调用失败时返回默认提示 fallback_chain = RunnableLambda(lambda x: "抱歉,当前服务繁忙,请稍后重试。") # 为核心chain添加降级 robust_chain = base_chain.with_fallbacks([fallback_chain])
八、总结
LangChain v1.0+ 基于 Runnable 接口重构了 Chain 模块,实现了组件的统一编排和灵活组合,各类 Chain 覆盖了从基础数据流转到生产级复杂工作流的核心场景,可为开发者提供选型和落地参考。
开发者在选型时,应根据业务场景的复杂度、性能需求、扩展性要求等因素,参考本文的选型建议构建大模型应用。未来,LangChain 将持续优化 Chain 模块的性能和特性,推出更多适配复杂业务场景的组件,开发者可持续关注官方更新,提升大模型应用的构建效率和质量。
更多推荐



所有评论(0)