LangChain入门-Runnable 协议完全解析

标签: LangChain, Runnable, LCEL, 协议设计, Python, 异步编程

深入讲解 LangChain 核心 Runnable 协议,包括 LCEL 管道操作符、并行执行、路由分支、流式输出和批量处理

Runnable 协议


用途

LangChain 的核心抽象协议,为所有组件提供统一的执行接口。这是 LangChain 表达式语言(LCEL)的基础。

位置

libs/core/langchain_core/runnables/

知识图谱上下文

组合

使用

Runnable

语言模型

工具

检索器

嵌入模型

Agent

核心类/模块

Runnable 类

  • Runnable: 基础抽象类,定义核心接口
  • RunnableLambda: 将函数转换为 Runnable
  • RunnableParallel: 并行执行多个 Runnable
  • RunnableSequence: 顺序执行(LCEL 的 | 操作符)
  • RunnablePassthrough: 传递输入不变
  • RunnableConfig: 运行时配置

核心接口

方法 用途 参数 返回值
invoke() 同步执行 input, config output
ainvoke() 异步执行 input, config output
batch() 批量执行 inputs, config List[output]
abatch() 异步批量 inputs, config List[output]
stream() 流式输出 input, config Iterator
astream() 异步流式 input, config AsyncIterator
stream_events() 事件流 input, config Iterator

依赖关系

内部依赖

  • langchain_core.messages: 消息类型
  • langchain_core.callbacks: 回调系统
  • langchain_core.output_parsers: 输出解析

外部依赖

  • typing: 类型提示
  • asyncio: 异步支持
  • operator: 管道操作符

数据流

invoke

处理

return

stream

yield

yield

输入

Runnable

转换

输出

输入

Runnable

数据块

数据块

LCEL 管道操作符

使用 | 组合组件

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 管道组合
chain = (
    ChatPromptTemplate.from_template("Tell me a joke about {topic}")
    | ChatOpenAI(model="gpt-4")
    | StrOutputParser()
)

实际执行流程

格式化

生成消息

提取文本

提示模板

LLM

解析器

最终输出

并行执行

RunnableParallel

from langchain_core.runnables import RunnableParallel

# 并行执行多个链
chain = RunnableParallel(
    joke=chain1,
    poem=chain2,
    fact=chain3
)

并行流程

分支1

分支2

分支3

结果

结果

结果

输入

RunnableParallel

链1

链2

链3

合并结果

最终输出

路由和条件分支

RunnableBranch

from langchain_core.runnables import RunnableBranch

# 条件路由
branch = RunnableBranch(
    (lambda x: x["language"] == "spanish", spanish_chain),
    (lambda x: x["language"] == "english", english_chain),
    default_chain
)

路由流程

条件1

条件2

默认

输入

条件判断

链1

链2

默认链

输出

流式输出

stream() 方法

# 流式文本生成
for chunk in chain.stream({"topic": "cats"}):
    print(chunk, end="", flush=True)

astream() 异步流

async for chunk in chain.astream({"topic": "cats"}):
    print(chunk, end="", flush=True)

批量处理

batch() 方法

# 批量执行
results = chain.batch([
    {"topic": "cats"},
    {"topic": "dogs"},
    {"topic": "birds"}
])

并行批量

# 使用 max_concurrency 控制并发
results = chain.batch(inputs, config={"max_concurrency": 5})

配置和元数据

RunnableConfig

from langchain_core.runnables import RunnableConfig

config = RunnableConfig(
    tags=["my-tag"],
    metadata={"user_id": "123"},
    callbacks=[my_handler],
    run_name="my-run"
)

配置传递

传递

传递

传递

使用

使用

使用

RunnableConfig

invoke/ainvoke

batch/abatch

stream/astream

标签

元数据

回调

自定义 Runnable

使用 RunnableLambda

from langchain_core.runnables import RunnableLambda

# 将函数转为 Runnable
def custom_function(input: dict) -> str:
    return f"Processed: {input['text']}"

custom_runnable = RunnableLambda(custom_function)

继承 Runnable 类

from langchain_core.runnables import Runnable

class CustomRunnable(Runnable):
    def invoke(self, input, config=None):
        # 自定义处理逻辑
        return self._process(input)

重要设计模式

1. 装饰器模式

# 使用 bind() 绑定参数
llm = ChatOpenAI(model="gpt-4")
llm_with_stop = llm.bind(stop=["\n"])

2. 组合模式

组合

组合

形成

Runnable A

Runnable B

Runnable C

复杂链

3. 适配器模式

# 将函数适配为 Runnable
@runnable
def my_function(x: str) -> str:
    return x.upper()

# 现在可以使用管道
chain = my_function | another_runnable

错误处理

重试机制

from langchain_core.runnables import RunnableLambda
from tenacity import retry

@retry(stop=stop_after_attempt(3))
def flaky_function(x):
    # 可能失败的函数
    return process(x)

runnable_with_retry = RunnableLambda(flaky_function)

回退机制

from langchain_core.runnables import RunnableWithFallbacks

# 主流程失败时使用备选
chain_with_fallback = primary_chain.with_fallbacks([fallback_chain])

性能优化

1. 流式处理

  • 大数据集处理
  • 实时响应
  • 减少内存使用

2. 批量处理

  • 并行执行多个请求
  • 减少 API 调用开销
  • 提高吞吐量

3. 异步执行

  • 非阻塞 I/O
  • 提高并发能力
  • 更好的资源利用

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐