【LangChain 源码解析一:Runnable】
本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的 Runnable 体系与 LCEL。
- 第 1 部分:5 分钟上手 LCEL(本文)
- 第 2 部分:流式与并发——数据如何流动
- 第 3 部分:组合模式全解
- 第 4 部分:源码架构与设计哲学
LangChain Runnables 深度解析(一):5 分钟上手 LCEL
一个能跑的例子
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
chain = RunnableLambda(add_one) | RunnableLambda(mul_two)
# 三种调用方式
print(chain.invoke(1)) # 4 — (1+1)*2
print(chain.batch([1, 2, 3])) # [4, 6, 8]
for chunk in chain.stream(1):
print(chunk) # 4 — RunnableLambda 不分块,一次性输出
invoke 处理单个输入,batch 并行处理多个输入,stream 流式输出。三种方式都自带异步变体(ainvoke/abatch/astream)。
三种调用方式对比
| 方法 | 输入 | 输出 | 同步 | 异步 |
|---|---|---|---|---|
invoke |
单个值 | 单个值 | invoke() |
ainvoke() |
batch |
list[Input] |
list[Output] |
batch() |
abatch() |
stream |
单个值 | Iterator[Output] |
stream() |
astream() |
三种方式的默认实现关系:
stream 默认实现 → 调用 invoke,yield 一次
batch 默认实现 → 用 ThreadPoolExecutor 并行调用 invoke
ainvoke 默认实现 → run_in_executor(self.invoke)
这意味着:只要你实现了 invoke,其他 5 个方法自动可用。但如果需要真正的流式输出或高效批处理,子类应该覆盖对应方法。
看源码验证一下。Runnable.stream 的默认实现(base.py:1130):
# base.py:1130-1149
def stream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Output]:
yield self.invoke(input, config, **kwargs) # ← 就是调用 invoke 然后 yield
ainvoke 的默认实现(base.py:844):
# base.py:844-865
async def ainvoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any,
) -> Output:
return await run_in_executor(config, self.invoke, input, config, **kwargs)
# ← 把 sync invoke 丢到线程池执行
batch 的默认实现(base.py:867):
# base.py:867-915
def batch(self, inputs: list[Input], config=None, *, return_exceptions=False, **kwargs):
if not inputs:
return []
configs = get_config_list(config, len(inputs))
def invoke(input_: Input, config: RunnableConfig) -> Output | Exception:
if return_exceptions:
try:
return self.invoke(input_, config, **kwargs)
except Exception as e:
return e
else:
return self.invoke(input_, config, **kwargs)
# 只有 1 个输入时跳过线程池
if len(inputs) == 1:
return [invoke(inputs[0], configs[0])]
# 多个输入用线程池并行
with get_executor_for_config(configs[0]) as executor:
return list(executor.map(invoke, inputs, configs))
管道操作符 | 的本质
| 是 LCEL 最核心的语法糖。当你写 a | b 时,实际调用的是 a.__or__(b)。
源码(base.py:618):
# base.py:618-637
def __or__(
self,
other: Runnable[Any, Other]
| Callable[[Any], Other]
| Mapping[str, ...],
) -> RunnableSerializable[Input, Other]:
return RunnableSequence(self, coerce_to_runnable(other))
两件事:
- 把
other通过coerce_to_runnable强制转换为Runnable - 创建
RunnableSequence(self, other)
coerce_to_runnable(base.py:6172)是自动类型转换的关键:
# base.py:6172-6196
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
if isinstance(thing, Runnable):
return thing # 已经是 Runnable,直接返回
if is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing) # 生成器 → RunnableGenerator
if callable(thing):
return RunnableLambda(cast(..., thing)) # 普通函数 → RunnableLambda
if isinstance(thing, dict):
return cast(..., RunnableParallel(thing)) # dict → RunnableParallel
raise TypeError(f"Expected a Runnable, callable or dict. Got: {type(thing)}")
所以你可以直接用普通函数、dict 来组合链,不需要手动包装:
# 这三种写法等价
chain = RunnableLambda(add_one) | RunnableLambda(mul_two)
chain = RunnableLambda(add_one) | mul_two # 函数自动转 RunnableLambda
chain = RunnableLambda(add_one) | {"doubled": mul_two} # dict 自动转 RunnableParallel
RunnableSequence 的结构
RunnableSequence 内部将步骤拆分为 first、middle(列表)、last 三部分(base.py:2900):
# base.py:2897-2905
# The steps are broken into first, middle and last, solely for type checking
class RunnableSequence(RunnableSerializable[Input, Output]):
first: Runnable[Input, Any]
middle: list[Runnable[Any, Any]] = Field(default_factory=list)
last: Runnable[Any, Output]
构造时会自动展平嵌套的 RunnableSequence(base.py:2927):
# base.py:2927-2946
def __init__(self, *steps, name=None, first=None, middle=None, last=None):
steps_flat: list[Runnable] = []
for step in steps:
if isinstance(step, RunnableSequence):
steps_flat.extend(step.steps) # ← 展平!
else:
steps_flat.append(coerce_to_runnable(step))
super().__init__(first=steps_flat[0], middle=list(steps_flat[1:-1]), last=steps_flat[-1])
这意味着 (a | b) | (c | d) 和 a | b | c | d 产生的结构完全一样——都是一个 4 步的扁平序列。
一个真实的 LLM 链
不需要 API key 也能理解结构:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
# 模拟一个 ChatModel(实际用 ChatOpenAI / ChatAnthropic 替换)
def fake_model(messages):
from langchain_core.messages import AIMessage
return AIMessage(content=f"Echo: {messages[-1].content}")
prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
model = RunnableLambda(fake_model)
parser = StrOutputParser()
chain = prompt | model | parser
result = chain.invoke({"text": "你好世界"})
print(result) # "Echo: 翻译成英文:你好世界"
print(type(chain)) # <class 'langchain_core.runnables.base.RunnableSequence'>
print(len(chain.steps)) # 3
数据流:
{"text": "你好世界"}
│
▼
┌──────────────────┐
│ ChatPromptTemplate │ → ChatPromptValue (包含 messages)
└──────────────────┘
│
▼
┌──────────────────┐
│ fake_model (LLM) │ → AIMessage
└──────────────────┘
│
▼
┌──────────────────┐
│ StrOutputParser │ → str
└──────────────────┘
│
▼
"Echo: 翻译成英文:你好世界"
RunnableConfig 简介
每个 invoke/stream/batch 都接受一个可选的 config 参数。RunnableConfig 是一个 TypedDict(config.py:51):
# config.py:51-123
class RunnableConfig(TypedDict, total=False):
tags: list[str] # 标签,用于过滤追踪
metadata: dict[str, Any] # 元数据,传给回调
callbacks: Callbacks # 回调处理器
run_name: str # 追踪 run 的名称
max_concurrency: int | None # 最大并行数
recursion_limit: int # 递归深度限制(默认 25)
configurable: dict[str, Any] # 运行时动态配置
run_id: uuid.UUID | None # 唯一标识
使用示例:
chain.invoke(
{"text": "hello"},
config={
"tags": ["production", "v2"],
"metadata": {"user_id": "u123"},
"max_concurrency": 5,
"run_name": "翻译链",
}
)
类型推断
每个 Runnable 都能自动推断输入/输出的 JSON Schema:
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
r = RunnableLambda(add_one)
print(r.input_schema.model_json_schema())
# {'properties': {'__root__': {'title': '__Root__', 'type': 'integer'}}, ...}
print(r.output_schema.model_json_schema())
# {'properties': {'__root__': {'title': '__Root__', 'type': 'integer'}}, ...}
对于 RunnableSequence,输入 schema 取第一步的,输出 schema 取最后一步的。对于 RunnableParallel,输出 schema 是所有分支输出的并集。
小结
这篇覆盖了 LCEL 的核心用法:
invoke/stream/batch三种调用方式及其默认实现关系|操作符 =__or__→RunnableSequence+coerce_to_runnable自动转换RunnableSequence的扁平化结构RunnableConfig的 8 个配置字段- 类型推断(
input_schema/output_schema)
但还有一些关键问题没回答:stream 在 RunnableSequence 中到底是怎么串联的?batch 的线程池怎么工作?RunnableParallel 怎么实现真正的并行? 这些是下一篇的主题。
LangChain Runnables 深度解析(二):流式与并发——数据如何流动
一个能跑的例子
import time
from langchain_core.runnables import RunnableLambda, RunnableParallel
def slow_upper(x: str) -> str:
time.sleep(1)
return x.upper()
def slow_reverse(x: str) -> str:
time.sleep(1)
return x[::-1]
# 串行执行需要 2 秒
start = time.time()
r1 = slow_upper("hello")
r2 = slow_reverse("hello")
print(f"串行: {time.time() - start:.1f}s") # ~2.0s
# RunnableParallel 并行执行只需 ~1 秒
parallel = RunnableParallel(upper=slow_upper, reverse=slow_reverse)
start = time.time()
result = parallel.invoke("hello")
print(f"并行: {time.time() - start:.1f}s") # ~1.0s
print(result) # {'upper': 'HELLO', 'reverse': 'olleh'}
RunnableParallel 通过线程池实现了真正的并行执行。接下来看它是怎么做到的。
RunnableSequence.invoke 的执行路径
先回顾最简单的情况——序列执行。RunnableSequence.invoke(base.py:3127):
# base.py:3127-3160
def invoke(self, input: Input, config: RunnableConfig | None = None, **kwargs):
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 1. 启动根 run
run_manager = callback_manager.on_chain_start(
None, input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
input_ = input
try:
for i, step in enumerate(self.steps):
# 2. 每一步创建子 run
config = patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i + 1}")
)
with set_config_context(config) as context:
# 3. 输出传给下一步作为输入
if i == 0:
input_ = context.run(step.invoke, input_, config, **kwargs)
else:
input_ = context.run(step.invoke, input_, config)
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input_)
return cast("Output", input_)
执行流程图:
invoke(input, config)
│
├─ ensure_config(config) ← 填充默认值,合并 ContextVar
├─ on_chain_start(...) ← 启动根回调
│
├─ step[0].invoke(input_, config) ← 子 run: "seq:step:1"
│ └─ output_0
│
├─ step[1].invoke(output_0, config) ← 子 run: "seq:step:2"
│ └─ output_1
│
├─ step[2].invoke(output_1, config) ← 子 run: "seq:step:3"
│ └─ output_2
│
└─ on_chain_end(output_2) ← 结束根回调
关键点:每一步的输出直接作为下一步的输入,kwargs 只传给第一步。
stream 的真相:迭代器串联
RunnableSequence.stream 不是简单地调用每步的 invoke——它把整个管道变成一条迭代器链。
stream 方法(base.py:3524)委托给 transform:
# base.py:3524-3530
def stream(self, input, config=None, **kwargs):
yield from self.transform(iter([input]), config, **kwargs)
transform 委托给 _transform_stream_with_config,核心逻辑在 _transform(base.py:3461):
# base.py:3461-3482
def _transform(self, inputs: Iterator[Input], run_manager, config, **kwargs):
steps = [self.first, *self.middle, self.last]
# 把每一步的 transform 串联起来——惰性求值链!
final_pipeline = cast("Iterator[Output]", inputs)
for idx, step in enumerate(steps):
config = patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{idx + 1}")
)
if idx == 0:
final_pipeline = step.transform(final_pipeline, config, **kwargs)
else:
final_pipeline = step.transform(final_pipeline, config)
yield from final_pipeline # ← 消费最终迭代器时,整条链才开始执行
迭代器串联示意:
iter([input])
│
▼
step[0].transform(...) → Iterator[A] ← 惰性,不立即执行
│
▼
step[1].transform(...) → Iterator[B] ← 惰性
│
▼
step[2].transform(...) → Iterator[C] ← 惰性
│
▼
yield from final_pipeline ← 此时才拉动整条链!
这就是"懒求值链"——所有 transform 调用只是建立了迭代器管道,真正的数据流动要等到最终消费时才发生。如果中间某个 step 不支持 transform(如 RunnableLambda),它会缓冲所有输入,执行完毕后再一次性输出。
_transform_stream_with_config:流式回调骨架
每个 Runnable 的 transform 方法最终都通过 _transform_stream_with_config(base.py:2261)来执行,它是流式处理的统一骨架:
# base.py:2261-2357(简化版)
def _transform_stream_with_config(self, inputs, transformer, config, **kwargs):
# 1. tee:分叉输入流——一份给 transformer,一份给追踪
input_for_tracing, input_for_transform = tee(inputs, 2)
# 提前拉一个 chunk 确保上游启动
final_input = next(input_for_tracing, None)
final_output = None
# 2. 启动回调
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(...)
try:
child_config = patch_config(config, callbacks=run_manager.get_child())
with set_config_context(child_config) as context:
# 3. 调用实际的 transformer
iterator = context.run(transformer, input_for_transform, **kwargs)
# 4. 如果有 StreamingCallbackHandler,tap 输出流
if stream_handler := ...:
iterator = stream_handler.tap_output_iter(run_manager.run_id, iterator)
# 5. 逐 chunk yield,同时累积 final_output
while True:
chunk = context.run(next, iterator)
yield chunk
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk # ← 用 + 拼接
except TypeError:
final_output = chunk
# 6. 累积 final_input(给追踪用)
for ichunk in input_for_tracing:
final_input = final_input + ichunk # 同样用 + 拼接
except BaseException as e:
run_manager.on_chain_error(e, inputs=final_input)
raise
else:
run_manager.on_chain_end(final_output, inputs=final_input)
这段代码做了 3 件事:
- tee 分叉:输入流分成两份——一份给实际处理,一份给追踪记录
- 逐 chunk yield:每个 chunk 一边 yield 给下游,一边用
+累积为final_output - 回调通知:处理完毕后把累积的
final_input和final_output传给on_chain_end
final_output = final_output + chunk 这行很关键——LangChain 的流式 chunk 都实现了 __add__(如 AIMessageChunk、AddableDict),通过 + 拼接成完整结果。
batch 并行机制
sync batch 使用 ContextThreadPoolExecutor(config.py:527),async abatch 使用 asyncio.gather。
sync batch
上面第一篇已经看过 Runnable.batch 的默认实现。关键在于 ContextThreadPoolExecutor:
# config.py:527-576
class ContextThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor that copies the context to the child thread."""
def submit(self, func, *args, **kwargs):
# 关键:copy_context() 把当前线程的 ContextVar 复制到子线程
return super().submit(
partial(copy_context().run, func, *args, **kwargs)
)
def map(self, fn, *iterables, **kwargs):
contexts = [copy_context() for _ in range(len(iterables[0]))]
def _wrapped_fn(*args):
return contexts.pop().run(fn, *args)
return super().map(_wrapped_fn, *iterables, **kwargs)
它通过 copy_context() 确保 ContextVar(包括 var_child_runnable_config)在子线程中也能访问到——否则子线程里 ensure_config 拿不到父 config。
get_executor_for_config(config.py:579)根据 max_concurrency 创建线程池:
# config.py:579-595
@contextmanager
def get_executor_for_config(config):
config = config or {}
with ContextThreadPoolExecutor(
max_workers=config.get("max_concurrency") # ← 控制并行度
) as executor:
yield executor
async batch
Runnable.abatch(base.py:1002)使用 gather_with_concurrency(utils.py:63):
# utils.py:63-78
async def gather_with_concurrency(n: int | None, *coros) -> list:
if n is None:
return await asyncio.gather(*coros) # 无限制
semaphore = asyncio.Semaphore(n)
return await asyncio.gather(*(gated_coro(semaphore, c) for c in coros))
async def gated_coro(semaphore, coro):
async with semaphore: # ← 信号量限制并发
return await coro
max_concurrency 的作用:
- sync:
ThreadPoolExecutor(max_workers=max_concurrency)— 限制线程数 - async:
asyncio.Semaphore(max_concurrency)— 限制并发协程数
RunnableParallel.invoke 的实现
RunnableParallel(base.py:3830)用线程池并行执行所有分支:
# base.py:3830-3887(简化版)
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config)
callback_manager = CallbackManager.configure(...)
run_manager = callback_manager.on_chain_start(...)
def _invoke_step(step, input_, config, key):
child_config = patch_config(
config, callbacks=run_manager.get_child(f"map:key:{key}")
)
with set_config_context(child_config) as context:
return context.run(step.invoke, input_, child_config)
try:
steps = dict(self.steps__)
with get_executor_for_config(config) as executor:
# 每个分支提交到线程池
futures = [
executor.submit(_invoke_step, step, input, config, key)
for key, step in steps.items()
]
# 收集结果
output = {
key: future.result()
for key, future in zip(steps, futures)
}
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
异步版本用 asyncio.gather(base.py:3923):
# base.py:3919-3935
results = await asyncio.gather(
*(
_ainvoke_step(step, input, config, key)
for key, step in steps.items()
)
)
output = dict(zip(steps, results))
Config 传播机制
Config 在 Runnable 链中的传播通过 ContextVar 实现(config.py:146):
# config.py:146-148
var_child_runnable_config: ContextVar[RunnableConfig | None] = ContextVar(
"child_runnable_config", default=None
)
传播流程:
chain.invoke(input, config={"tags": ["prod"]})
│
├─ ensure_config(config)
│ └─ 读取 var_child_runnable_config,合并用户传入的 config
│
├─ _call_with_config(...)
│ ├─ child_config = patch_config(config, callbacks=run_manager.get_child())
│ └─ set_config_context(child_config)
│ └─ var_child_runnable_config.set(child_config) ← 写入 ContextVar
│
└─ 子 Runnable.invoke(input, config)
└─ ensure_config(config)
└─ 读取 var_child_runnable_config ← 拿到父级 config
ensure_config(config.py:216)的合并逻辑:
# config.py:216-266
def ensure_config(config: RunnableConfig | None = None) -> RunnableConfig:
# 1. 创建空 config(默认值)
empty = RunnableConfig(tags=[], metadata={}, callbacks=None,
recursion_limit=25, configurable={})
# 2. 先合并 ContextVar 中的父级 config
if var_config := var_child_runnable_config.get():
empty.update({k: v.copy() if k in COPIABLE_KEYS else v
for k, v in var_config.items() if v is not None})
# 3. 再合并显式传入的 config(优先级更高)
if config is not None:
empty.update({k: v.copy() if k in COPIABLE_KEYS else v
for k, v in config.items() if v is not None and k in CONFIG_KEYS})
return empty
merge_configs(config.py:357)对不同字段有不同的合并策略:
| 字段 | 合并策略 |
|---|---|
tags |
去重并排序合并 |
metadata |
dict 浅合并(后者覆盖前者) |
configurable |
dict 浅合并 |
callbacks |
合并 handler 列表或 CallbackManager |
recursion_limit |
非默认值覆盖 |
其他(run_name, run_id) |
后者覆盖前者 |
_call_with_config:回调骨架
_call_with_config(base.py:2027)是所有 Runnable 子类实现 invoke 时使用的统一骨架:
# base.py:2027-2074
def _call_with_config(self, func, input_, config, run_type=None, **kwargs):
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
# 1. on_chain_start → 创建 run_manager
run_manager = callback_manager.on_chain_start(
serialized, input_,
run_type=run_type,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
try:
# 2. 创建子 config(给内部调用的 Runnable 使用)
child_config = patch_config(config, callbacks=run_manager.get_child())
with set_config_context(child_config) as context:
# 3. 在正确的 context 中执行用户函数
output = context.run(
call_func_with_variable_args,
func, input_, config, run_manager, **kwargs
)
except BaseException as e:
run_manager.on_chain_error(e) # 4a. 失败回调
raise
else:
run_manager.on_chain_end(output) # 4b. 成功回调
return output
call_func_with_variable_args(config.py:423)会检查函数签名,按需传入 config 和 run_manager:
# config.py:423-452
def call_func_with_variable_args(func, input, config, run_manager=None, **kwargs):
if accepts_config(func):
kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
if run_manager is not None and accepts_run_manager(func):
kwargs["run_manager"] = run_manager
return func(input, **kwargs)
这就是为什么你的 RunnableLambda 函数可以选择性地接受 config 参数:
# 这两种函数签名都能用
def simple(x: int) -> int:
return x + 1
def with_config(x: int, config: RunnableConfig) -> int:
print(config["tags"])
return x + 1
小结
这篇深入了数据流动的三条路径:
- invoke:
RunnableSequence逐步传递输出→输入,通过_call_with_config统一管理回调 - stream:迭代器懒求值链(
step.transform串联),通过_transform_stream_with_config实现 tee 分叉 + chunk 累积 - batch:sync 用
ContextThreadPoolExecutor,async 用asyncio.gather+Semaphore - Config 传播:
ContextVar+ensure_config自动继承父级 config
但 Runnable 的组合能力远不止序列和并行。条件分支怎么实现?透传和增强是什么模式?降级容错和自动重试怎么集成? 下一篇全面解析 8 种组合模式。
LangChain Runnables 深度解析(三):组合模式全解
一个能跑的例子
from langchain_core.runnables import RunnableBranch, RunnableLambda
branch = RunnableBranch(
(lambda x: isinstance(x, str), lambda x: x.upper()),
(lambda x: isinstance(x, int), lambda x: x + 1),
lambda x: "default", # 默认分支
)
print(branch.invoke("hello")) # "HELLO"
print(branch.invoke(42)) # 43
print(branch.invoke(None)) # "default"
RunnableBranch 像 if/elif/else——逐个检查条件,匹配就执行对应分支。接下来完整介绍 LangChain 的 8 种组合模式。
1. RunnableBranch(条件路由)
RunnableBranch(branch.py:42)接受 (condition, runnable) 元组列表和一个默认分支:
# branch.py:69-72
class RunnableBranch(RunnableSerializable[Input, Output]):
branches: Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]
default: Runnable[Input, Output]
构造时自动将条件和分支 coerce_to_runnable(branch.py:113-132):
# branch.py:113-132
for branch in branches[:-1]:
condition, runnable = branch
condition = coerce_to_runnable(condition) # lambda → RunnableLambda
runnable = coerce_to_runnable(runnable) # lambda → RunnableLambda
branches_.append((condition, runnable))
invoke 实现(branch.py:189):
# branch.py:189-245(简化版)
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(...)
try:
for idx, branch in enumerate(self.branches):
condition, runnable = branch
# 执行条件检查(子 run: "condition:1")
expression_value = condition.invoke(
input,
config=patch_config(config, callbacks=run_manager.get_child(
tag=f"condition:{idx + 1}"
)),
)
if expression_value:
# 条件为真,执行对应分支(子 run: "branch:1")
output = runnable.invoke(
input,
config=patch_config(config, callbacks=run_manager.get_child(
tag=f"branch:{idx + 1}"
)),
**kwargs,
)
break
else:
# 所有条件都不满足,执行默认分支
output = self.default.invoke(
input,
config=patch_config(config, callbacks=run_manager.get_child(
tag="branch:default"
)),
**kwargs,
)
except BaseException as e:
run_manager.on_chain_error(e)
raise
run_manager.on_chain_end(output)
return output
执行流程:
invoke(input)
│
├─ condition_1.invoke(input) → False
├─ condition_2.invoke(input) → True ← 命中!
│ └─ branch_2.invoke(input) → output
│
└─ on_chain_end(output)
2. RunnablePassthrough / RunnableAssign / RunnablePick
这三个类(passthrough.py)是 RAG 模式的基石。
RunnablePassthrough(透传)
RunnablePassthrough(passthrough.py:74)把输入原样传递到输出,可选地执行一个副作用函数:
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
# 基本用法:原样透传
chain = RunnablePassthrough()
print(chain.invoke(42)) # 42
# 配合 RunnableParallel 实现"透传 + 增强"
chain = RunnableParallel(
original=RunnablePassthrough(),
doubled=lambda x: x * 2,
)
print(chain.invoke(5)) # {'original': 5, 'doubled': 10}
核心实现非常简单——invoke 调用 identity 函数(passthrough.py:226):
# passthrough.py:50-59
def identity(x: Other) -> Other:
return x
# passthrough.py:226-233
def invoke(self, input, config=None, **kwargs):
if self.func is not None:
call_func_with_variable_args(self.func, input, ensure_config(config), **kwargs)
return self._call_with_config(identity, input, config)
RunnablePassthrough.assign(增强)
assign 是 RAG 场景最常用的模式——在 dict 输入上"追加"新字段:
from langchain_core.runnables import RunnablePassthrough
chain = RunnablePassthrough.assign(
length=lambda x: len(x["text"]),
upper=lambda x: x["text"].upper(),
)
print(chain.invoke({"text": "hello"}))
# {'text': 'hello', 'length': 5, 'upper': 'HELLO'}
assign 返回 RunnableAssign(passthrough.py:352),它内部用 RunnableParallel 处理新字段,然后 {**原始输入, **新字段} 合并:
# passthrough.py:207-223
@classmethod
def assign(cls, **kwargs) -> RunnableAssign:
return RunnableAssign(RunnableParallel(kwargs))
# passthrough.py:480-498
def _invoke(self, value, run_manager, config, **kwargs):
if not isinstance(value, dict):
raise ValueError("The input to RunnablePassthrough.assign() must be a dict.")
return {
**value, # ← 保留原始字段
**self.mapper.invoke(value, ...), # ← 添加新字段
}
RunnablePick(提取)
RunnablePick(passthrough.py:671)从 dict 中提取指定的 key:
from langchain_core.runnables.passthrough import RunnablePick
data = {"name": "Alice", "age": 30, "city": "Beijing"}
pick_one = RunnablePick(keys="name")
print(pick_one.invoke(data)) # "Alice"(单 key 返回值本身)
pick_multi = RunnablePick(keys=["name", "age"])
print(pick_multi.invoke(data)) # {'name': 'Alice', 'age': 30}(多 key 返回 dict)
RAG 经典模式
三者组合的经典 RAG 模式:
from langchain_core.runnables import RunnablePassthrough, RunnableParallel
# 伪代码:展示 RAG 管道结构
chain = (
RunnableParallel(
context=retriever, # 检索上下文
question=RunnablePassthrough(), # 透传用户问题
)
| prompt # 拼接 prompt
| model # 调用 LLM
| parser # 解析输出
)
3. RunnableWithFallbacks(降级容错)
RunnableWithFallbacks(fallbacks.py:36)在主 Runnable 失败时自动切换到备选方案:
from langchain_core.runnables import RunnableLambda
def unstable(x):
raise ValueError("primary failed!")
def backup(x):
return f"backup: {x}"
chain = RunnableLambda(unstable).with_fallbacks(
[RunnableLambda(backup)]
)
print(chain.invoke("hello")) # "backup: hello"
核心字段(fallbacks.py:88-104):
# fallbacks.py:88-104
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
runnable: Runnable[Input, Output] # 主 Runnable
fallbacks: Sequence[Runnable[Input, Output]] # 备选方案列表
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,) # 捕获哪些异常
exception_key: str | None = None # 传递异常给 fallback
invoke 的核心逻辑(fallbacks.py:166):
# fallbacks.py:166-213(简化版)
def invoke(self, input, config=None, **kwargs):
first_error = None
last_error = None
for runnable in self.runnables: # ← 遍历 [主, fallback1, fallback2, ...]
try:
if self.exception_key and last_error is not None:
input[self.exception_key] = last_error # 把错误传给下一个
output = runnable.invoke(input, config, **kwargs)
except self.exceptions_to_handle as e:
if first_error is None:
first_error = e
last_error = e
else:
return output # ← 成功就返回
raise first_error # ← 全部失败,抛出第一个错误
@property
def runnables(self):
yield self.runnable
yield from self.fallbacks
注意 __getattr__ 的巧妙设计(fallbacks.py:593)——当你在 RunnableWithFallbacks 上调用 bind_tools 等方法时,它会自动在主 Runnable 和所有 fallback 上都调用:
# fallbacks.py:625-646 —— bind_tools 等方法自动传播到所有 fallback
model = gpt_4o.with_fallbacks([claude_sonnet])
model.bind_tools([...]) # 等价于:
# gpt_4o.bind_tools([...]).with_fallbacks([claude_sonnet.bind_tools([...])])
4. RunnableRetry(自动重试)
RunnableRetry(retry.py:48)基于 tenacity 库实现自动重试:
from langchain_core.runnables import RunnableLambda
call_count = 0
def flaky(x):
global call_count
call_count += 1
if call_count < 3:
raise ConnectionError("network error")
return f"success: {x}"
chain = RunnableLambda(flaky).with_retry(
retry_if_exception_type=(ConnectionError,),
stop_after_attempt=3,
wait_exponential_jitter=True,
)
print(chain.invoke("hello")) # "success: hello"(第 3 次成功)
RunnableRetry 继承自 RunnableBindingBase(装饰器模式),核心配置:
# retry.py:48-132
class RunnableRetry(RunnableBindingBase[Input, Output]):
retry_exception_types: tuple[type[BaseException], ...] = (Exception,)
wait_exponential_jitter: bool = True
exponential_jitter_params: ExponentialJitterParams | None = None
max_attempt_number: int = 3
_invoke 实现(retry.py:179)——直接使用 tenacity 的 Retrying:
# retry.py:179-195
def _invoke(self, input_, run_manager, config, **kwargs):
for attempt in self._sync_retrying(reraise=True):
with attempt:
result = super().invoke(
input_,
self._patch_config(config, run_manager, attempt.retry_state),
**kwargs,
)
if attempt.retry_state.outcome and not attempt.retry_state.outcome.failed:
attempt.retry_state.set_result(result)
return result
每次重试都会创建新的子 run(tag 为 retry:attempt:N),方便追踪。
注意:stream 和 transform 不支持重试(retry.py:378),因为重试一个流不太直观。
5. RunnableBinding(装饰器模式)
RunnableBinding(base.py:5928)是所有 with_* 方法的基础——它包裹一个 Runnable,附加额外的 kwargs、config 或行为:
from langchain_core.runnables import RunnableLambda
def greet(x, greeting="Hello"):
return f"{greeting}, {x}!"
chain = RunnableLambda(greet).bind(greeting="Hi")
print(chain.invoke("Alice")) # "Hi, Alice!"
RunnableBindingBase 的核心字段(base.py:5536):
# base.py:5536-5554
class RunnableBindingBase(RunnableSerializable[Input, Output]):
bound: Runnable[Input, Output] # 被包裹的 Runnable
kwargs: Mapping[str, Any] # 绑定的 kwargs
config: RunnableConfig # 绑定的 config
config_factories: list[Callable] # config 工厂函数
custom_input_type: Any | None = None # 覆盖输入类型
custom_output_type: Any | None = None # 覆盖输出类型
所有 with_* 方法都返回新的 RunnableBinding 实例:
| 方法 | 作用 | 返回类型 |
|---|---|---|
bind(**kwargs) |
绑定 kwargs | RunnableBinding |
with_config(config) |
绑定 config | RunnableBinding |
with_retry(...) |
添加重试 | RunnableRetry(子类) |
with_fallbacks(...) |
添加降级 | RunnableWithFallbacks |
with_listeners(...) |
添加生命周期监听 | RunnableBinding |
with_types(...) |
覆盖类型标注 | RunnableBinding |
RunnableBinding.bind 的实现(base.py:5981)——合并 kwargs:
# base.py:5981-5999
def bind(self, **kwargs):
return self.__class__(
bound=self.bound,
config=self.config,
kwargs={**self.kwargs, **kwargs}, # ← 新旧 kwargs 合并
)
6. Configurable(运行时动态配置)
configurable_fields 和 configurable_alternatives 允许在运行时动态修改 Runnable 的参数或切换实现。
configurable_fields(修改参数)
from langchain_core.runnables import RunnableLambda, ConfigurableField
def greet(x, *, greeting="Hello"):
return f"{greeting}, {x}!"
# 注意:实际使用通常在 ChatModel 上,这里用 RunnableLambda 仅做概念演示
# 真实用法:model = ChatOpenAI(temperature=0).configurable_fields(
# temperature=ConfigurableField(id="temp", name="Temperature")
# )
# model.with_config(configurable={"temp": 0.9}).invoke(...)
configurable_fields(base.py:2610)返回 RunnableConfigurableFields(configurable.py:318):
# base.py:2652-2666
def configurable_fields(self, **kwargs):
from langchain_core.runnables.configurable import RunnableConfigurableFields
return RunnableConfigurableFields(default=self, fields=kwargs)
RunnableConfigurableFields._prepare(configurable.py:420)在调用时根据 config["configurable"] 动态重建 Runnable:
# configurable.py:420-459(简化版)
def _prepare(self, config=None):
config = ensure_config(config)
configurable_fields = {
specs_by_id[k][0]: v
for k, v in config.get("configurable", {}).items()
if k in specs_by_id
}
if configurable_fields:
# 用新参数重建 Runnable!
return (self.default.__class__(**{**init_params, **configurable_fields}), config)
return (self.default, config)
configurable_alternatives(切换实现)
# 概念示例(需要实际模型包)
# prompt = PromptTemplate.from_template("Tell me about {topic}").configurable_alternatives(
# ConfigurableField(id="prompt"),
# default_key="joke",
# poem=PromptTemplate.from_template("Write a poem about {topic}"),
# )
# chain = prompt | model
# chain.with_config(configurable={"prompt": "poem"}).invoke({"topic": "cats"})
RunnableConfigurableAlternatives(configurable.py:475)根据 config["configurable"][which.id] 选择要使用的 Runnable。
7. RunnableEach(map 操作)
Runnable.map() 返回 RunnableEach——把单个 Runnable 映射到列表的每个元素上:
from langchain_core.runnables import RunnableLambda
add_one = RunnableLambda(lambda x: x + 1)
mapped = add_one.map()
print(mapped.invoke([1, 2, 3])) # [2, 3, 4]
本质上就是对输入列表调用 bound.batch()。
8 种组合模式对比
| 模式 | 类名 | 用途 | 创建方式 |
|---|---|---|---|
| 序列 | RunnableSequence |
串联执行 | a | b |
| 并行 | RunnableParallel |
同一输入并行处理 | {"k1": a, "k2": b} |
| 条件分支 | RunnableBranch |
if/elif/else 路由 | RunnableBranch((cond, run), default) |
| 透传增强 | RunnablePassthrough / RunnableAssign |
保留原始输入+添加字段 | RunnablePassthrough.assign(...) |
| 提取 | RunnablePick |
从 dict 中取 key | RunnablePick("key") |
| 降级容错 | RunnableWithFallbacks |
主失败→备选 | a.with_fallbacks([b]) |
| 自动重试 | RunnableRetry |
异常自动重试 | a.with_retry(...) |
| 装饰绑定 | RunnableBinding |
绑定 kwargs/config | a.bind(key=val) |
小结
这篇介绍了 LangChain 的 8 种组合模式:
- RunnableBranch — 条件路由,逐个检查条件
- RunnablePassthrough/Assign/Pick — 透传、增强、提取,RAG 的基石
- RunnableWithFallbacks — 降级容错,自动切换备选
- RunnableRetry — 基于 tenacity 的自动重试
- RunnableBinding — 装饰器模式,所有
with_*的基础 - Configurable — 运行时动态修改参数或切换实现
这些模式都遵循同一个接口——Runnable[Input, Output]——所以可以任意组合嵌套。那么这个接口是怎么设计出来的?16 个文件 13712 行代码是怎么组织的?继承体系是什么样的?最后一篇来看源码架构与设计哲学。
LangChain Runnables 深度解析(四):源码架构与设计哲学
一个能跑的例子
from langchain_core.runnables import Runnable, RunnableSerializable, RunnableLambda
from langchain_core.load.serializable import Serializable
# Runnable 是一切的基类
assert issubclass(RunnableLambda, Runnable)
# RunnableSerializable 同时继承 Serializable 和 Runnable
assert issubclass(RunnableSerializable, Runnable)
assert issubclass(RunnableSerializable, Serializable)
# 继承链
print(RunnableLambda.__mro__)
# (RunnableLambda, Runnable, ABC, Generic, BaseModel, ...)
# 注意:RunnableLambda 直接继承 Runnable,不继承 RunnableSerializable
Runnable 是抽象基类,RunnableSerializable 是加了序列化能力的 Runnable。但不是所有 Runnable 都需要序列化——比如 RunnableLambda 就直接继承 Runnable。
完整目录结构
runnables/ # 13712 行,16 文件
├── __init__.py 136 # 懒加载入口
├── base.py 6257 # 核心:Runnable/Sequence/Parallel/Lambda/Binding/Generator
├── config.py 632 # RunnableConfig + 线程池 + ContextVar
├── utils.py 759 # 工具函数 + ConfigurableField + AddableDict
├── passthrough.py 841 # RunnablePassthrough/Assign/Pick
├── graph.py 739 # DAG 图表示(可视化用)
├── configurable.py 716 # DynamicRunnable + ConfigurableFields/Alternatives
├── fallbacks.py 664 # RunnableWithFallbacks
├── history.py 622 # RunnableWithMessageHistory
├── graph_mermaid.py 498 # Mermaid 图渲染
├── branch.py 461 # RunnableBranch
├── retry.py 379 # RunnableRetry
├── graph_ascii.py 366 # ASCII 图渲染
├── router.py 239 # RouterRunnable(已少用)
├── graph_png.py 215 # PNG 图渲染
└── schema.py 188 # StreamEvent 类型定义
base.py 一个文件占了全部代码的 45%——这是有意为之,把最核心的 7 个类放在一起避免循环依赖。
继承体系
BaseModel (Pydantic)
│
Serializable (load/serializable.py:88)
│ │
┌──────────────┘ │
│ │
│ Runnable[Input, Output] (ABC, Generic)
│ │ (base.py:124)
│ │
│ ┌──────────────┼──────────────────┐
│ │ │ │
│ │ RunnableSerializable RunnableLambda
│ │ (base.py:2586) (base.py:4395)
│ │ ← Serializable + Runnable
│ │ │
│ │ ┌─────────┼────────────────────────┐
│ │ │ │ │
│ │ RunnableSequence RunnableParallel RunnableBindingBase
│ │ (base.py:2813) (base.py:3561) (base.py:5526)
│ │ │
│ │ ┌────────┼────────┐
│ │ │ │ │
│ │ RunnableBinding RunnableRetry ...
│ │ (base.py:5928) (retry.py:48)
│ │
│ RunnableGenerator (base.py:4092)
│
├── RunnableBranch (branch.py:42)
├── RunnablePassthrough (passthrough.py:74)
├── RunnableAssign (passthrough.py:352)
├── RunnablePick (passthrough.py:671)
├── RunnableWithFallbacks (fallbacks.py:36)
└── DynamicRunnable (configurable.py:49)
├── RunnableConfigurableFields (configurable.py:318)
└── RunnableConfigurableAlternatives (configurable.py:475)
关键设计决策:
RunnableLambda不继承RunnableSerializable——因为 lambda 函数没法序列化RunnableBindingBase是RunnableSerializable的子类——因为RunnableRetry/RunnableBinding需要序列化配置RunnableWithFallbacks继承RunnableSerializable——因为主 Runnable 和 fallback 列表都需要序列化
外部组件也是 Runnable:
# 以下都是 Runnable 的子类
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.tools.base import BaseTool
from langchain_core.prompts import BasePromptTemplate
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.retrievers import BaseRetriever
# BaseChatModel → ... → RunnableSerializable → Serializable + Runnable
# BaseTool → RunnableSerializable → Serializable + Runnable
这就是为什么 prompt | model | parser 能直接用 | 组合——它们都是 Runnable。
base.py 内部组织(6257 行)
base.py 这个巨型文件按区域划分:
| 行号范围 | 内容 | 行数 |
|---|---|---|
| 1-117 | imports + TYPE_CHECKING | ~117 |
| 124-2585 | Runnable (抽象基类) |
~2461 |
| 2586-2812 | RunnableSerializable |
~226 |
| 2813-3560 | RunnableSequence |
~747 |
| 3561-4090 | RunnableParallel |
~529 |
| 4092-4393 | RunnableGenerator |
~301 |
| 4395-5524 | RunnableLambda |
~1129 |
| 5526-5927 | RunnableBindingBase |
~401 |
| 5928-6171 | RunnableBinding |
~243 |
| 6172-6258 | coerce_to_runnable + @chain |
~86 |
Runnable 类本身 2461 行,包含了:
- 核心方法:
invoke/ainvoke/batch/abatch/stream/astream - 组合方法:
__or__/__ror__/pipe - 配置方法:
bind/with_config/with_retry/with_fallbacks/with_listeners - 回调骨架:
_call_with_config/_acall_with_config/_transform_stream_with_config - schema 方法:
input_schema/output_schema/config_schema - 事件流:
astream_events/astream_log
类型推断机制
LangChain 通过两种方式推断 Runnable 的输入/输出类型:
1. 泛型参数(Generic metadata)
Runnable 是 Generic[Input, Output]。当子类指定具体类型时,如 RunnableSequence[str, int],通过 __pydantic_generic_metadata__ 获取:
# base.py:200-210(InputType 属性)
@property
def InputType(self) -> type[Input]:
for cls in type(self).__mro__:
if hasattr(cls, "__pydantic_generic_metadata__"):
metadata = cls.__pydantic_generic_metadata__
if "args" in metadata and len(metadata["args"]) >= _RUNNABLE_GENERIC_NUM_ARGS:
return metadata["args"][0] # ← 第一个泛型参数 = Input
raise TypeError(...)
2. 函数签名检查(RunnableLambda)
RunnableLambda 通过 inspect 和 AST 分析推断类型:
def add_one(x: int) -> int:
return x + 1
r = RunnableLambda(add_one)
# InputType 来自参数 x 的类型标注 → int
# OutputType 来自返回值类型标注 → int
更巧妙的是,它还能通过 AST 分析 lambda 函数对输入 dict 的 key 访问来推断输入 schema:
# utils.py:365-381
def get_function_first_arg_dict_keys(func) -> list[str] | None:
code = inspect.getsource(func)
tree = ast.parse(textwrap.dedent(code))
visitor = IsFunctionArgDict()
visitor.visit(tree)
return sorted(visitor.keys) if visitor.keys else None
r = RunnableLambda(lambda x: x["name"] + x["age"])
print(r.input_schema.model_json_schema())
# 推断出输入需要 "name" 和 "age" 两个 key
回调集成架构
每个 Runnable 的执行都嵌入了回调系统,形成层级结构:
chain.invoke(input, config={"callbacks": [handler]})
│
├─ CallbackManager.configure(config["callbacks"])
│ └─ 创建根 CallbackManager
│
├─ on_chain_start(input) ← 根 run
│ └─ run_manager (CallbackManagerForChainRun)
│
├─ step[0].invoke(...)
│ ├─ run_manager.get_child("seq:step:1") ← 子 run
│ └─ on_chain_start / on_chain_end
│
├─ step[1].invoke(...)
│ ├─ run_manager.get_child("seq:step:2") ← 子 run
│ └─ on_chain_start / on_chain_end
│
└─ on_chain_end(output) ← 根 run 结束
子 run 的 tag 命名规则:
RunnableSequence:seq:step:1,seq:step:2, …RunnableParallel:map:key:xxxRunnableBranch:condition:1,branch:1,branch:defaultRunnableRetry:retry:attempt:2,retry:attempt:3, …
懒加载入口
runnables/__init__.py(136 行)使用 __getattr__ 实现懒加载:
# __init__.py:95-132
_dynamic_imports = {
"Runnable": "base",
"RunnableSequence": "base",
"RunnableParallel": "base",
"RunnableLambda": "base",
"RunnableBranch": "branch",
"RunnableConfig": "config",
# ...
}
def __getattr__(attr_name: str) -> object:
module_name = _dynamic_imports.get(attr_name)
result = import_attr(attr_name, module_name, __spec__.parent)
globals()[attr_name] = result # ← 缓存,下次直接从 globals 取
return result
好处:from langchain_core.runnables import RunnableLambda 只会加载 base.py,不会加载 branch.py、configurable.py 等不需要的模块。
Async "降级"实现
当子类没有覆盖 ainvoke 时,默认实现把 sync 版本扔到线程池(base.py:865):
# base.py:844-865
async def ainvoke(self, input, config=None, **kwargs):
return await run_in_executor(config, self.invoke, input, config, **kwargs)
run_in_executor(config.py:598)使用 asyncio.get_running_loop().run_in_executor:
# config.py:598-632
async def run_in_executor(executor_or_config, func, *args, **kwargs):
def wrapper():
try:
return func(*args, **kwargs)
except StopIteration as exc:
# StopIteration 不能设置到 asyncio.Future 上
raise RuntimeError from exc
if executor_or_config is None or isinstance(executor_or_config, dict):
return await asyncio.get_running_loop().run_in_executor(
None,
partial(copy_context().run, wrapper), # ← 复制 ContextVar
)
return await asyncio.get_running_loop().run_in_executor(executor_or_config, wrapper)
这意味着:每个 Runnable 自动获得 async 支持——即使只实现了 sync 版本。但这不是"真正的 async"——它只是在线程池中运行 sync 代码。如果需要真正的 async(例如 async HTTP 调用),子类应该覆盖 ainvoke。
5 个架构设计原则
1. 统一接口(Protocol Conformance)
所有组件都实现同一个 Runnable[Input, Output] 接口:
class Runnable(ABC, Generic[Input, Output]):
@abstractmethod
def invoke(self, input: Input, config=None) -> Output: ...
def stream(self, input: Input, config=None) -> Iterator[Output]: ...
def batch(self, inputs: list[Input], config=None) -> list[Output]: ...
# + async 变体
这使得 ChatModel、Tool、Prompt、Parser、Retriever 可以自由组合。
2. 装饰器模式(Decorator Pattern)
通过 RunnableBinding 实现 with_* 方法,不修改原 Runnable,而是包装一层:
RunnableBinding(bound=原始Runnable, kwargs={...}, config={...})
└── RunnableRetry(bound=原始Runnable, max_attempt=3)
└── RunnableWithFallbacks(runnable=原始, fallbacks=[备选])
3. 回调骨架模式(Template Method)
_call_with_config 和 _transform_stream_with_config 提供统一的回调管理骨架,子类只需提供核心逻辑函数。
4. 惰性求值(Lazy Evaluation)
RunnableSequence.stream 通过迭代器链实现惰性求值,避免中间数据全量缓存。同样,模块导入通过 __getattr__ 实现惰性加载。
5. 渐进式 Async(Gradual Async)
默认通过 run_in_executor 把 sync 变 async,子类可以按需覆盖为真正的 async 实现。不强迫用户写 async 代码,但保留了 async 优化的空间。
依赖方向图
┌────────────┐
│ serializable│
│ (load/) │
└─────┬──────┘
│
┌─────▼──────┐
│ utils │ ConfigurableField, AddableDict,
│ │ accepts_config, gather_with_concurrency
└─────┬──────┘
│
┌─────▼──────┐
│ config │ RunnableConfig, ensure_config,
│ │ ContextThreadPoolExecutor, merge_configs
└─────┬──────┘
│
┌─────▼──────┐
│ base │ Runnable, RunnableSerializable,
│ (6257行) │ Sequence, Parallel, Lambda,
│ │ Binding, Generator
└─────┬──────┘
│
┌───────┬───────┼───────┬──────────┐
│ │ │ │ │
┌────▼──┐ ┌──▼───┐ ┌▼────┐ ┌▼───────┐ ┌▼──────────┐
│branch │ │retry │ │fall-│ │pass- │ │configurable│
│ │ │ │ │backs│ │through │ │ │
└───────┘ └──────┘ └─────┘ └────────┘ └────────────┘
依赖方向严格单向:utils ← config ← base ← 各模块。没有循环依赖(需要时用局部 import)。
小结
4 篇文章走完了 LangChain Runnable 体系的全貌:
| 篇 | 主题 | 核心收获 |
|---|---|---|
| 第 1 篇 | 5 分钟上手 LCEL | invoke/stream/batch + | 操作符 + coerce_to_runnable |
| 第 2 篇 | 流式与并发 | 迭代器懒求值链 + _transform_stream_with_config + ContextThreadPoolExecutor |
| 第 3 篇 | 8 种组合模式 | Branch/Passthrough/Assign/Pick/Fallbacks/Retry/Binding/Configurable |
| 第 4 篇 | 源码架构 | 继承体系 + 回调骨架 + 懒加载 + async 降级 + 5 大设计原则 |
Runnable 是 LangChain 中最核心的抽象——13712 行代码,16 个文件,但核心思想只有一个:统一接口 + 自由组合。理解了 Runnable,就理解了 LangChain 的骨架。
更多推荐



所有评论(0)