LangChain入门-Runnable 协议完全解析
摘要: LangChain的Runnable协议是核心抽象层,为所有组件提供统一执行接口,支持同步/异步调用、批量处理和流式输出。通过LCEL管道操作符(|)实现组件组合,RunnableParallel支持并行执行,RunnableBranch实现条件路由。协议包含invoke/ainvoke同步异步方法、batch/abatch批量处理、stream/astream流式输出等核心接口,采用装饰
·
LangChain入门-Runnable 协议完全解析
标签: LangChain, Runnable, LCEL, 协议设计, Python, 异步编程
深入讲解 LangChain 核心 Runnable 协议,包括 LCEL 管道操作符、并行执行、路由分支、流式输出和批量处理
Runnable 协议
用途
LangChain 的核心抽象协议,为所有组件提供统一的执行接口。这是 LangChain 表达式语言(LCEL)的基础。
位置
libs/core/langchain_core/runnables/
知识图谱上下文
核心类/模块
Runnable 类
Runnable: 基础抽象类,定义核心接口RunnableLambda: 将函数转换为 RunnableRunnableParallel: 并行执行多个 RunnableRunnableSequence: 顺序执行(LCEL 的|操作符)RunnablePassthrough: 传递输入不变RunnableConfig: 运行时配置
核心接口
| 方法 | 用途 | 参数 | 返回值 |
|---|---|---|---|
invoke() |
同步执行 | input, config | output |
ainvoke() |
异步执行 | input, config | output |
batch() |
批量执行 | inputs, config | List[output] |
abatch() |
异步批量 | inputs, config | List[output] |
stream() |
流式输出 | input, config | Iterator |
astream() |
异步流式 | input, config | AsyncIterator |
stream_events() |
事件流 | input, config | Iterator |
依赖关系
内部依赖
langchain_core.messages: 消息类型langchain_core.callbacks: 回调系统langchain_core.output_parsers: 输出解析
外部依赖
typing: 类型提示asyncio: 异步支持operator: 管道操作符
数据流
LCEL 管道操作符
使用 | 组合组件
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 管道组合
chain = (
ChatPromptTemplate.from_template("Tell me a joke about {topic}")
| ChatOpenAI(model="gpt-4")
| StrOutputParser()
)
实际执行流程
并行执行
RunnableParallel
from langchain_core.runnables import RunnableParallel
# 并行执行多个链
chain = RunnableParallel(
joke=chain1,
poem=chain2,
fact=chain3
)
并行流程
路由和条件分支
RunnableBranch
from langchain_core.runnables import RunnableBranch
# 条件路由
branch = RunnableBranch(
(lambda x: x["language"] == "spanish", spanish_chain),
(lambda x: x["language"] == "english", english_chain),
default_chain
)
路由流程
流式输出
stream() 方法
# 流式文本生成
for chunk in chain.stream({"topic": "cats"}):
print(chunk, end="", flush=True)
astream() 异步流
async for chunk in chain.astream({"topic": "cats"}):
print(chunk, end="", flush=True)
批量处理
batch() 方法
# 批量执行
results = chain.batch([
{"topic": "cats"},
{"topic": "dogs"},
{"topic": "birds"}
])
并行批量
# 使用 max_concurrency 控制并发
results = chain.batch(inputs, config={"max_concurrency": 5})
配置和元数据
RunnableConfig
from langchain_core.runnables import RunnableConfig
config = RunnableConfig(
tags=["my-tag"],
metadata={"user_id": "123"},
callbacks=[my_handler],
run_name="my-run"
)
配置传递
自定义 Runnable
使用 RunnableLambda
from langchain_core.runnables import RunnableLambda
# 将函数转为 Runnable
def custom_function(input: dict) -> str:
return f"Processed: {input['text']}"
custom_runnable = RunnableLambda(custom_function)
继承 Runnable 类
from langchain_core.runnables import Runnable
class CustomRunnable(Runnable):
def invoke(self, input, config=None):
# 自定义处理逻辑
return self._process(input)
重要设计模式
1. 装饰器模式
# 使用 bind() 绑定参数
llm = ChatOpenAI(model="gpt-4")
llm_with_stop = llm.bind(stop=["\n"])
2. 组合模式
3. 适配器模式
# 将函数适配为 Runnable
@runnable
def my_function(x: str) -> str:
return x.upper()
# 现在可以使用管道
chain = my_function | another_runnable
错误处理
重试机制
from langchain_core.runnables import RunnableLambda
from tenacity import retry
@retry(stop=stop_after_attempt(3))
def flaky_function(x):
# 可能失败的函数
return process(x)
runnable_with_retry = RunnableLambda(flaky_function)
回退机制
from langchain_core.runnables import RunnableWithFallbacks
# 主流程失败时使用备选
chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])
性能优化
1. 流式处理
- 大数据集处理
- 实时响应
- 减少内存使用
2. 批量处理
- 并行执行多个请求
- 减少 API 调用开销
- 提高吞吐量
3. 异步执行
- 非阻塞 I/O
- 提高并发能力
- 更好的资源利用
更多推荐


所有评论(0)