本系列共 4 部分,从日常使用到源码架构,完整拆解 LangChain 的 Runnable 体系与 LCEL。

  • 第 1 部分:5 分钟上手 LCEL(本文)
  • 第 2 部分:流式与并发——数据如何流动
  • 第 3 部分:组合模式全解
  • 第 4 部分:源码架构与设计哲学

LangChain Runnables 深度解析(一):5 分钟上手 LCEL

一个能跑的例子

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

def mul_two(x: int) -> int:
    return x * 2

chain = RunnableLambda(add_one) | RunnableLambda(mul_two)

# 三种调用方式
print(chain.invoke(1))        # 4    — (1+1)*2
print(chain.batch([1, 2, 3])) # [4, 6, 8]

for chunk in chain.stream(1):
    print(chunk)               # 4   — RunnableLambda 不分块,一次性输出

invoke 处理单个输入,batch 并行处理多个输入,stream 流式输出。三种方式都自带异步变体(ainvoke/abatch/astream)。


三种调用方式对比

方法 输入 输出 同步 异步
invoke 单个值 单个值 invoke() ainvoke()
batch list[Input] list[Output] batch() abatch()
stream 单个值 Iterator[Output] stream() astream()

三种方式的默认实现关系:

stream 默认实现 → 调用 invoke,yield 一次
batch  默认实现 → 用 ThreadPoolExecutor 并行调用 invoke
ainvoke 默认实现 → run_in_executor(self.invoke)

这意味着:只要你实现了 invoke,其他 5 个方法自动可用。但如果需要真正的流式输出高效批处理,子类应该覆盖对应方法。

看源码验证一下。Runnable.stream 的默认实现(base.py:1130):

# base.py:1130-1149
def stream(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any | None,
) -> Iterator[Output]:
    yield self.invoke(input, config, **kwargs)  # ← 就是调用 invoke 然后 yield

ainvoke 的默认实现(base.py:844):

# base.py:844-865
async def ainvoke(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any,
) -> Output:
    return await run_in_executor(config, self.invoke, input, config, **kwargs)
    # ← 把 sync invoke 丢到线程池执行

batch 的默认实现(base.py:867):

# base.py:867-915
def batch(self, inputs: list[Input], config=None, *, return_exceptions=False, **kwargs):
    if not inputs:
        return []
    configs = get_config_list(config, len(inputs))

    def invoke(input_: Input, config: RunnableConfig) -> Output | Exception:
        if return_exceptions:
            try:
                return self.invoke(input_, config, **kwargs)
            except Exception as e:
                return e
        else:
            return self.invoke(input_, config, **kwargs)

    # 只有 1 个输入时跳过线程池
    if len(inputs) == 1:
        return [invoke(inputs[0], configs[0])]
    # 多个输入用线程池并行
    with get_executor_for_config(configs[0]) as executor:
        return list(executor.map(invoke, inputs, configs))

管道操作符 | 的本质

| 是 LCEL 最核心的语法糖。当你写 a | b 时,实际调用的是 a.__or__(b)

源码(base.py:618):

# base.py:618-637
def __or__(
    self,
    other: Runnable[Any, Other]
    | Callable[[Any], Other]
    | Mapping[str, ...],
) -> RunnableSerializable[Input, Other]:
    return RunnableSequence(self, coerce_to_runnable(other))

两件事:

  1. other 通过 coerce_to_runnable 强制转换为 Runnable
  2. 创建 RunnableSequence(self, other)

coerce_to_runnablebase.py:6172)是自动类型转换的关键:

# base.py:6172-6196
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
    if isinstance(thing, Runnable):
        return thing                                    # 已经是 Runnable,直接返回
    if is_async_generator(thing) or inspect.isgeneratorfunction(thing):
        return RunnableGenerator(thing)                 # 生成器 → RunnableGenerator
    if callable(thing):
        return RunnableLambda(cast(..., thing))          # 普通函数 → RunnableLambda
    if isinstance(thing, dict):
        return cast(..., RunnableParallel(thing))        # dict → RunnableParallel
    raise TypeError(f"Expected a Runnable, callable or dict. Got: {type(thing)}")

所以你可以直接用普通函数、dict 来组合链,不需要手动包装:

# 这三种写法等价
chain = RunnableLambda(add_one) | RunnableLambda(mul_two)
chain = RunnableLambda(add_one) | mul_two                  # 函数自动转 RunnableLambda
chain = RunnableLambda(add_one) | {"doubled": mul_two}     # dict 自动转 RunnableParallel

RunnableSequence 的结构

RunnableSequence 内部将步骤拆分为 firstmiddle(列表)、last 三部分(base.py:2900):

# base.py:2897-2905
# The steps are broken into first, middle and last, solely for type checking
class RunnableSequence(RunnableSerializable[Input, Output]):
    first: Runnable[Input, Any]
    middle: list[Runnable[Any, Any]] = Field(default_factory=list)
    last: Runnable[Any, Output]

构造时会自动展平嵌套的 RunnableSequencebase.py:2927):

# base.py:2927-2946
def __init__(self, *steps, name=None, first=None, middle=None, last=None):
    steps_flat: list[Runnable] = []
    for step in steps:
        if isinstance(step, RunnableSequence):
            steps_flat.extend(step.steps)     # ← 展平!
        else:
            steps_flat.append(coerce_to_runnable(step))
    super().__init__(first=steps_flat[0], middle=list(steps_flat[1:-1]), last=steps_flat[-1])

这意味着 (a | b) | (c | d)a | b | c | d 产生的结构完全一样——都是一个 4 步的扁平序列。


一个真实的 LLM 链

不需要 API key 也能理解结构:

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda

# 模拟一个 ChatModel(实际用 ChatOpenAI / ChatAnthropic 替换)
def fake_model(messages):
    from langchain_core.messages import AIMessage
    return AIMessage(content=f"Echo: {messages[-1].content}")

prompt = ChatPromptTemplate.from_template("翻译成英文:{text}")
model = RunnableLambda(fake_model)
parser = StrOutputParser()

chain = prompt | model | parser

result = chain.invoke({"text": "你好世界"})
print(result)  # "Echo: 翻译成英文:你好世界"
print(type(chain))  # <class 'langchain_core.runnables.base.RunnableSequence'>
print(len(chain.steps))  # 3

数据流:

{"text": "你好世界"}
        │
        ▼
┌──────────────────┐
│  ChatPromptTemplate  │  → ChatPromptValue (包含 messages)
└──────────────────┘
        │
        ▼
┌──────────────────┐
│  fake_model (LLM)    │  → AIMessage
└──────────────────┘
        │
        ▼
┌──────────────────┐
│  StrOutputParser     │  → str
└──────────────────┘
        │
        ▼
  "Echo: 翻译成英文:你好世界"

RunnableConfig 简介

每个 invoke/stream/batch 都接受一个可选的 config 参数。RunnableConfig 是一个 TypedDictconfig.py:51):

# config.py:51-123
class RunnableConfig(TypedDict, total=False):
    tags: list[str]              # 标签,用于过滤追踪
    metadata: dict[str, Any]     # 元数据,传给回调
    callbacks: Callbacks          # 回调处理器
    run_name: str                # 追踪 run 的名称
    max_concurrency: int | None  # 最大并行数
    recursion_limit: int         # 递归深度限制(默认 25)
    configurable: dict[str, Any] # 运行时动态配置
    run_id: uuid.UUID | None     # 唯一标识

使用示例:

chain.invoke(
    {"text": "hello"},
    config={
        "tags": ["production", "v2"],
        "metadata": {"user_id": "u123"},
        "max_concurrency": 5,
        "run_name": "翻译链",
    }
)

类型推断

每个 Runnable 都能自动推断输入/输出的 JSON Schema:

from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

r = RunnableLambda(add_one)
print(r.input_schema.model_json_schema())
# {'properties': {'__root__': {'title': '__Root__', 'type': 'integer'}}, ...}

print(r.output_schema.model_json_schema())
# {'properties': {'__root__': {'title': '__Root__', 'type': 'integer'}}, ...}

对于 RunnableSequence,输入 schema 取第一步的,输出 schema 取最后一步的。对于 RunnableParallel,输出 schema 是所有分支输出的并集。


小结

这篇覆盖了 LCEL 的核心用法:

  1. invoke/stream/batch 三种调用方式及其默认实现关系
  2. | 操作符 = __or__RunnableSequence + coerce_to_runnable 自动转换
  3. RunnableSequence 的扁平化结构
  4. RunnableConfig 的 8 个配置字段
  5. 类型推断(input_schema/output_schema

但还有一些关键问题没回答:streamRunnableSequence 中到底是怎么串联的?batch 的线程池怎么工作?RunnableParallel 怎么实现真正的并行? 这些是下一篇的主题。


LangChain Runnables 深度解析(二):流式与并发——数据如何流动

一个能跑的例子

import time
from langchain_core.runnables import RunnableLambda, RunnableParallel

def slow_upper(x: str) -> str:
    time.sleep(1)
    return x.upper()

def slow_reverse(x: str) -> str:
    time.sleep(1)
    return x[::-1]

# 串行执行需要 2 秒
start = time.time()
r1 = slow_upper("hello")
r2 = slow_reverse("hello")
print(f"串行: {time.time() - start:.1f}s")  # ~2.0s

# RunnableParallel 并行执行只需 ~1 秒
parallel = RunnableParallel(upper=slow_upper, reverse=slow_reverse)
start = time.time()
result = parallel.invoke("hello")
print(f"并行: {time.time() - start:.1f}s")  # ~1.0s
print(result)  # {'upper': 'HELLO', 'reverse': 'olleh'}

RunnableParallel 通过线程池实现了真正的并行执行。接下来看它是怎么做到的。


RunnableSequence.invoke 的执行路径

先回顾最简单的情况——序列执行。RunnableSequence.invokebase.py:3127):

# base.py:3127-3160
def invoke(self, input: Input, config: RunnableConfig | None = None, **kwargs):
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    # 1. 启动根 run
    run_manager = callback_manager.on_chain_start(
        None, input,
        name=config.get("run_name") or self.get_name(),
        run_id=config.pop("run_id", None),
    )
    input_ = input

    try:
        for i, step in enumerate(self.steps):
            # 2. 每一步创建子 run
            config = patch_config(
                config, callbacks=run_manager.get_child(f"seq:step:{i + 1}")
            )
            with set_config_context(config) as context:
                # 3. 输出传给下一步作为输入
                if i == 0:
                    input_ = context.run(step.invoke, input_, config, **kwargs)
                else:
                    input_ = context.run(step.invoke, input_, config)
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    else:
        run_manager.on_chain_end(input_)
        return cast("Output", input_)

执行流程图:

invoke(input, config)
    │
    ├─ ensure_config(config)         ← 填充默认值,合并 ContextVar
    ├─ on_chain_start(...)           ← 启动根回调
    │
    ├─ step[0].invoke(input_, config)  ← 子 run: "seq:step:1"
    │     └─ output_0
    │
    ├─ step[1].invoke(output_0, config) ← 子 run: "seq:step:2"
    │     └─ output_1
    │
    ├─ step[2].invoke(output_1, config) ← 子 run: "seq:step:3"
    │     └─ output_2
    │
    └─ on_chain_end(output_2)        ← 结束根回调

关键点:每一步的输出直接作为下一步的输入,kwargs 只传给第一步。


stream 的真相:迭代器串联

RunnableSequence.stream 不是简单地调用每步的 invoke——它把整个管道变成一条迭代器链

stream 方法(base.py:3524)委托给 transform

# base.py:3524-3530
def stream(self, input, config=None, **kwargs):
    yield from self.transform(iter([input]), config, **kwargs)

transform 委托给 _transform_stream_with_config,核心逻辑在 _transformbase.py:3461):

# base.py:3461-3482
def _transform(self, inputs: Iterator[Input], run_manager, config, **kwargs):
    steps = [self.first, *self.middle, self.last]
    # 把每一步的 transform 串联起来——惰性求值链!
    final_pipeline = cast("Iterator[Output]", inputs)
    for idx, step in enumerate(steps):
        config = patch_config(
            config, callbacks=run_manager.get_child(f"seq:step:{idx + 1}")
        )
        if idx == 0:
            final_pipeline = step.transform(final_pipeline, config, **kwargs)
        else:
            final_pipeline = step.transform(final_pipeline, config)

    yield from final_pipeline  # ← 消费最终迭代器时,整条链才开始执行

迭代器串联示意:

iter([input])
    │
    ▼
step[0].transform(...)  →  Iterator[A]   ← 惰性,不立即执行
    │
    ▼
step[1].transform(...)  →  Iterator[B]   ← 惰性
    │
    ▼
step[2].transform(...)  →  Iterator[C]   ← 惰性
    │
    ▼
yield from final_pipeline               ← 此时才拉动整条链!

这就是"懒求值链"——所有 transform 调用只是建立了迭代器管道,真正的数据流动要等到最终消费时才发生。如果中间某个 step 不支持 transform(如 RunnableLambda),它会缓冲所有输入,执行完毕后再一次性输出。


_transform_stream_with_config:流式回调骨架

每个 Runnable 的 transform 方法最终都通过 _transform_stream_with_configbase.py:2261)来执行,它是流式处理的统一骨架:

# base.py:2261-2357(简化版)
def _transform_stream_with_config(self, inputs, transformer, config, **kwargs):
    # 1. tee:分叉输入流——一份给 transformer,一份给追踪
    input_for_tracing, input_for_transform = tee(inputs, 2)
    # 提前拉一个 chunk 确保上游启动
    final_input = next(input_for_tracing, None)

    final_output = None

    # 2. 启动回调
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(...)

    try:
        child_config = patch_config(config, callbacks=run_manager.get_child())
        with set_config_context(child_config) as context:
            # 3. 调用实际的 transformer
            iterator = context.run(transformer, input_for_transform, **kwargs)

            # 4. 如果有 StreamingCallbackHandler,tap 输出流
            if stream_handler := ...:
                iterator = stream_handler.tap_output_iter(run_manager.run_id, iterator)

            # 5. 逐 chunk yield,同时累积 final_output
            while True:
                chunk = context.run(next, iterator)
                yield chunk
                if final_output is None:
                    final_output = chunk
                else:
                    try:
                        final_output = final_output + chunk  # ← 用 + 拼接
                    except TypeError:
                        final_output = chunk

        # 6. 累积 final_input(给追踪用)
        for ichunk in input_for_tracing:
            final_input = final_input + ichunk  # 同样用 + 拼接

    except BaseException as e:
        run_manager.on_chain_error(e, inputs=final_input)
        raise
    else:
        run_manager.on_chain_end(final_output, inputs=final_input)

这段代码做了 3 件事:

  1. tee 分叉:输入流分成两份——一份给实际处理,一份给追踪记录
  2. 逐 chunk yield:每个 chunk 一边 yield 给下游,一边用 + 累积为 final_output
  3. 回调通知:处理完毕后把累积的 final_inputfinal_output 传给 on_chain_end

final_output = final_output + chunk 这行很关键——LangChain 的流式 chunk 都实现了 __add__(如 AIMessageChunkAddableDict),通过 + 拼接成完整结果。


batch 并行机制

sync batch 使用 ContextThreadPoolExecutorconfig.py:527),async abatch 使用 asyncio.gather

sync batch

上面第一篇已经看过 Runnable.batch 的默认实现。关键在于 ContextThreadPoolExecutor

# config.py:527-576
class ContextThreadPoolExecutor(ThreadPoolExecutor):
    """ThreadPoolExecutor that copies the context to the child thread."""

    def submit(self, func, *args, **kwargs):
        # 关键:copy_context() 把当前线程的 ContextVar 复制到子线程
        return super().submit(
            partial(copy_context().run, func, *args, **kwargs)
        )

    def map(self, fn, *iterables, **kwargs):
        contexts = [copy_context() for _ in range(len(iterables[0]))]
        def _wrapped_fn(*args):
            return contexts.pop().run(fn, *args)
        return super().map(_wrapped_fn, *iterables, **kwargs)

它通过 copy_context() 确保 ContextVar(包括 var_child_runnable_config)在子线程中也能访问到——否则子线程里 ensure_config 拿不到父 config。

get_executor_for_configconfig.py:579)根据 max_concurrency 创建线程池:

# config.py:579-595
@contextmanager
def get_executor_for_config(config):
    config = config or {}
    with ContextThreadPoolExecutor(
        max_workers=config.get("max_concurrency")  # ← 控制并行度
    ) as executor:
        yield executor

async batch

Runnable.abatchbase.py:1002)使用 gather_with_concurrencyutils.py:63):

# utils.py:63-78
async def gather_with_concurrency(n: int | None, *coros) -> list:
    if n is None:
        return await asyncio.gather(*coros)       # 无限制
    semaphore = asyncio.Semaphore(n)
    return await asyncio.gather(*(gated_coro(semaphore, c) for c in coros))

async def gated_coro(semaphore, coro):
    async with semaphore:                         # ← 信号量限制并发
        return await coro

max_concurrency 的作用:

  • syncThreadPoolExecutor(max_workers=max_concurrency) — 限制线程数
  • asyncasyncio.Semaphore(max_concurrency) — 限制并发协程数

RunnableParallel.invoke 的实现

RunnableParallelbase.py:3830)用线程池并行执行所有分支:

# base.py:3830-3887(简化版)
def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)
    callback_manager = CallbackManager.configure(...)
    run_manager = callback_manager.on_chain_start(...)

    def _invoke_step(step, input_, config, key):
        child_config = patch_config(
            config, callbacks=run_manager.get_child(f"map:key:{key}")
        )
        with set_config_context(child_config) as context:
            return context.run(step.invoke, input_, child_config)

    try:
        steps = dict(self.steps__)
        with get_executor_for_config(config) as executor:
            # 每个分支提交到线程池
            futures = [
                executor.submit(_invoke_step, step, input, config, key)
                for key, step in steps.items()
            ]
            # 收集结果
            output = {
                key: future.result()
                for key, future in zip(steps, futures)
            }
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    else:
        run_manager.on_chain_end(output)
        return output

异步版本用 asyncio.gatherbase.py:3923):

# base.py:3919-3935
results = await asyncio.gather(
    *(
        _ainvoke_step(step, input, config, key)
        for key, step in steps.items()
    )
)
output = dict(zip(steps, results))

Config 传播机制

Config 在 Runnable 链中的传播通过 ContextVar 实现(config.py:146):

# config.py:146-148
var_child_runnable_config: ContextVar[RunnableConfig | None] = ContextVar(
    "child_runnable_config", default=None
)

传播流程:

chain.invoke(input, config={"tags": ["prod"]})
    │
    ├─ ensure_config(config)
    │     └─ 读取 var_child_runnable_config,合并用户传入的 config
    │
    ├─ _call_with_config(...)
    │     ├─ child_config = patch_config(config, callbacks=run_manager.get_child())
    │     └─ set_config_context(child_config)
    │           └─ var_child_runnable_config.set(child_config)  ← 写入 ContextVar
    │
    └─ 子 Runnable.invoke(input, config)
          └─ ensure_config(config)
                └─ 读取 var_child_runnable_config  ← 拿到父级 config

ensure_configconfig.py:216)的合并逻辑:

# config.py:216-266
def ensure_config(config: RunnableConfig | None = None) -> RunnableConfig:
    # 1. 创建空 config(默认值)
    empty = RunnableConfig(tags=[], metadata={}, callbacks=None,
                           recursion_limit=25, configurable={})
    # 2. 先合并 ContextVar 中的父级 config
    if var_config := var_child_runnable_config.get():
        empty.update({k: v.copy() if k in COPIABLE_KEYS else v
                      for k, v in var_config.items() if v is not None})
    # 3. 再合并显式传入的 config(优先级更高)
    if config is not None:
        empty.update({k: v.copy() if k in COPIABLE_KEYS else v
                      for k, v in config.items() if v is not None and k in CONFIG_KEYS})
    return empty

merge_configsconfig.py:357)对不同字段有不同的合并策略:

字段 合并策略
tags 去重并排序合并
metadata dict 浅合并(后者覆盖前者)
configurable dict 浅合并
callbacks 合并 handler 列表或 CallbackManager
recursion_limit 非默认值覆盖
其他(run_name, run_id 后者覆盖前者

_call_with_config:回调骨架

_call_with_configbase.py:2027)是所有 Runnable 子类实现 invoke 时使用的统一骨架:

# base.py:2027-2074
def _call_with_config(self, func, input_, config, run_type=None, **kwargs):
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)

    # 1. on_chain_start → 创建 run_manager
    run_manager = callback_manager.on_chain_start(
        serialized, input_,
        run_type=run_type,
        name=config.get("run_name") or self.get_name(),
        run_id=config.pop("run_id", None),
    )

    try:
        # 2. 创建子 config(给内部调用的 Runnable 使用)
        child_config = patch_config(config, callbacks=run_manager.get_child())
        with set_config_context(child_config) as context:
            # 3. 在正确的 context 中执行用户函数
            output = context.run(
                call_func_with_variable_args,
                func, input_, config, run_manager, **kwargs
            )
    except BaseException as e:
        run_manager.on_chain_error(e)  # 4a. 失败回调
        raise
    else:
        run_manager.on_chain_end(output)  # 4b. 成功回调
        return output

call_func_with_variable_argsconfig.py:423)会检查函数签名,按需传入 configrun_manager

# config.py:423-452
def call_func_with_variable_args(func, input, config, run_manager=None, **kwargs):
    if accepts_config(func):
        kwargs["config"] = patch_config(config, callbacks=run_manager.get_child())
    if run_manager is not None and accepts_run_manager(func):
        kwargs["run_manager"] = run_manager
    return func(input, **kwargs)

这就是为什么你的 RunnableLambda 函数可以选择性地接受 config 参数:

# 这两种函数签名都能用
def simple(x: int) -> int:
    return x + 1

def with_config(x: int, config: RunnableConfig) -> int:
    print(config["tags"])
    return x + 1

小结

这篇深入了数据流动的三条路径:

  1. invokeRunnableSequence 逐步传递输出→输入,通过 _call_with_config 统一管理回调
  2. stream:迭代器懒求值链(step.transform 串联),通过 _transform_stream_with_config 实现 tee 分叉 + chunk 累积
  3. batch:sync 用 ContextThreadPoolExecutor,async 用 asyncio.gather + Semaphore
  4. Config 传播ContextVar + ensure_config 自动继承父级 config

但 Runnable 的组合能力远不止序列和并行。条件分支怎么实现?透传和增强是什么模式?降级容错和自动重试怎么集成? 下一篇全面解析 8 种组合模式。


LangChain Runnables 深度解析(三):组合模式全解

一个能跑的例子

from langchain_core.runnables import RunnableBranch, RunnableLambda

branch = RunnableBranch(
    (lambda x: isinstance(x, str), lambda x: x.upper()),
    (lambda x: isinstance(x, int), lambda x: x + 1),
    lambda x: "default",  # 默认分支
)

print(branch.invoke("hello"))  # "HELLO"
print(branch.invoke(42))       # 43
print(branch.invoke(None))     # "default"

RunnableBranchif/elif/else——逐个检查条件,匹配就执行对应分支。接下来完整介绍 LangChain 的 8 种组合模式。


1. RunnableBranch(条件路由)

RunnableBranchbranch.py:42)接受 (condition, runnable) 元组列表和一个默认分支:

# branch.py:69-72
class RunnableBranch(RunnableSerializable[Input, Output]):
    branches: Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]
    default: Runnable[Input, Output]

构造时自动将条件和分支 coerce_to_runnablebranch.py:113-132):

# branch.py:113-132
for branch in branches[:-1]:
    condition, runnable = branch
    condition = coerce_to_runnable(condition)   # lambda → RunnableLambda
    runnable = coerce_to_runnable(runnable)     # lambda → RunnableLambda
    branches_.append((condition, runnable))

invoke 实现(branch.py:189):

# branch.py:189-245(简化版)
def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(...)

    try:
        for idx, branch in enumerate(self.branches):
            condition, runnable = branch
            # 执行条件检查(子 run: "condition:1")
            expression_value = condition.invoke(
                input,
                config=patch_config(config, callbacks=run_manager.get_child(
                    tag=f"condition:{idx + 1}"
                )),
            )
            if expression_value:
                # 条件为真,执行对应分支(子 run: "branch:1")
                output = runnable.invoke(
                    input,
                    config=patch_config(config, callbacks=run_manager.get_child(
                        tag=f"branch:{idx + 1}"
                    )),
                    **kwargs,
                )
                break
        else:
            # 所有条件都不满足,执行默认分支
            output = self.default.invoke(
                input,
                config=patch_config(config, callbacks=run_manager.get_child(
                    tag="branch:default"
                )),
                **kwargs,
            )
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    run_manager.on_chain_end(output)
    return output

执行流程:

invoke(input)
    │
    ├─ condition_1.invoke(input) → False
    ├─ condition_2.invoke(input) → True ← 命中!
    │     └─ branch_2.invoke(input) → output
    │
    └─ on_chain_end(output)

2. RunnablePassthrough / RunnableAssign / RunnablePick

这三个类(passthrough.py)是 RAG 模式的基石。

RunnablePassthrough(透传)

RunnablePassthroughpassthrough.py:74)把输入原样传递到输出,可选地执行一个副作用函数:

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# 基本用法:原样透传
chain = RunnablePassthrough()
print(chain.invoke(42))  # 42

# 配合 RunnableParallel 实现"透传 + 增强"
chain = RunnableParallel(
    original=RunnablePassthrough(),
    doubled=lambda x: x * 2,
)
print(chain.invoke(5))  # {'original': 5, 'doubled': 10}

核心实现非常简单——invoke 调用 identity 函数(passthrough.py:226):

# passthrough.py:50-59
def identity(x: Other) -> Other:
    return x

# passthrough.py:226-233
def invoke(self, input, config=None, **kwargs):
    if self.func is not None:
        call_func_with_variable_args(self.func, input, ensure_config(config), **kwargs)
    return self._call_with_config(identity, input, config)

RunnablePassthrough.assign(增强)

assign 是 RAG 场景最常用的模式——在 dict 输入上"追加"新字段:

from langchain_core.runnables import RunnablePassthrough

chain = RunnablePassthrough.assign(
    length=lambda x: len(x["text"]),
    upper=lambda x: x["text"].upper(),
)

print(chain.invoke({"text": "hello"}))
# {'text': 'hello', 'length': 5, 'upper': 'HELLO'}

assign 返回 RunnableAssignpassthrough.py:352),它内部用 RunnableParallel 处理新字段,然后 {**原始输入, **新字段} 合并:

# passthrough.py:207-223
@classmethod
def assign(cls, **kwargs) -> RunnableAssign:
    return RunnableAssign(RunnableParallel(kwargs))

# passthrough.py:480-498
def _invoke(self, value, run_manager, config, **kwargs):
    if not isinstance(value, dict):
        raise ValueError("The input to RunnablePassthrough.assign() must be a dict.")
    return {
        **value,                            # ← 保留原始字段
        **self.mapper.invoke(value, ...),   # ← 添加新字段
    }

RunnablePick(提取)

RunnablePickpassthrough.py:671)从 dict 中提取指定的 key:

from langchain_core.runnables.passthrough import RunnablePick

data = {"name": "Alice", "age": 30, "city": "Beijing"}

pick_one = RunnablePick(keys="name")
print(pick_one.invoke(data))  # "Alice"(单 key 返回值本身)

pick_multi = RunnablePick(keys=["name", "age"])
print(pick_multi.invoke(data))  # {'name': 'Alice', 'age': 30}(多 key 返回 dict)

RAG 经典模式

三者组合的经典 RAG 模式:

from langchain_core.runnables import RunnablePassthrough, RunnableParallel

# 伪代码:展示 RAG 管道结构
chain = (
    RunnableParallel(
        context=retriever,                        # 检索上下文
        question=RunnablePassthrough(),           # 透传用户问题
    )
    | prompt                                       # 拼接 prompt
    | model                                        # 调用 LLM
    | parser                                       # 解析输出
)

3. RunnableWithFallbacks(降级容错)

RunnableWithFallbacksfallbacks.py:36)在主 Runnable 失败时自动切换到备选方案:

from langchain_core.runnables import RunnableLambda

def unstable(x):
    raise ValueError("primary failed!")

def backup(x):
    return f"backup: {x}"

chain = RunnableLambda(unstable).with_fallbacks(
    [RunnableLambda(backup)]
)
print(chain.invoke("hello"))  # "backup: hello"

核心字段(fallbacks.py:88-104):

# fallbacks.py:88-104
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
    runnable: Runnable[Input, Output]                         # 主 Runnable
    fallbacks: Sequence[Runnable[Input, Output]]             # 备选方案列表
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)  # 捕获哪些异常
    exception_key: str | None = None                         # 传递异常给 fallback

invoke 的核心逻辑(fallbacks.py:166):

# fallbacks.py:166-213(简化版)
def invoke(self, input, config=None, **kwargs):
    first_error = None
    last_error = None
    for runnable in self.runnables:   # ← 遍历 [主, fallback1, fallback2, ...]
        try:
            if self.exception_key and last_error is not None:
                input[self.exception_key] = last_error  # 把错误传给下一个
            output = runnable.invoke(input, config, **kwargs)
        except self.exceptions_to_handle as e:
            if first_error is None:
                first_error = e
            last_error = e
        else:
            return output              # ← 成功就返回
    raise first_error                  # ← 全部失败,抛出第一个错误

@property
def runnables(self):
    yield self.runnable
    yield from self.fallbacks

注意 __getattr__ 的巧妙设计(fallbacks.py:593)——当你在 RunnableWithFallbacks 上调用 bind_tools 等方法时,它会自动在主 Runnable 和所有 fallback 上都调用:

# fallbacks.py:625-646 —— bind_tools 等方法自动传播到所有 fallback
model = gpt_4o.with_fallbacks([claude_sonnet])
model.bind_tools([...])  # 等价于:
# gpt_4o.bind_tools([...]).with_fallbacks([claude_sonnet.bind_tools([...])])

4. RunnableRetry(自动重试)

RunnableRetryretry.py:48)基于 tenacity 库实现自动重试:

from langchain_core.runnables import RunnableLambda

call_count = 0

def flaky(x):
    global call_count
    call_count += 1
    if call_count < 3:
        raise ConnectionError("network error")
    return f"success: {x}"

chain = RunnableLambda(flaky).with_retry(
    retry_if_exception_type=(ConnectionError,),
    stop_after_attempt=3,
    wait_exponential_jitter=True,
)
print(chain.invoke("hello"))  # "success: hello"(第 3 次成功)

RunnableRetry 继承自 RunnableBindingBase(装饰器模式),核心配置:

# retry.py:48-132
class RunnableRetry(RunnableBindingBase[Input, Output]):
    retry_exception_types: tuple[type[BaseException], ...] = (Exception,)
    wait_exponential_jitter: bool = True
    exponential_jitter_params: ExponentialJitterParams | None = None
    max_attempt_number: int = 3

_invoke 实现(retry.py:179)——直接使用 tenacity 的 Retrying

# retry.py:179-195
def _invoke(self, input_, run_manager, config, **kwargs):
    for attempt in self._sync_retrying(reraise=True):
        with attempt:
            result = super().invoke(
                input_,
                self._patch_config(config, run_manager, attempt.retry_state),
                **kwargs,
            )
        if attempt.retry_state.outcome and not attempt.retry_state.outcome.failed:
            attempt.retry_state.set_result(result)
    return result

每次重试都会创建新的子 run(tag 为 retry:attempt:N),方便追踪。

注意:streamtransform 不支持重试retry.py:378),因为重试一个流不太直观。


5. RunnableBinding(装饰器模式)

RunnableBindingbase.py:5928)是所有 with_* 方法的基础——它包裹一个 Runnable,附加额外的 kwargs、config 或行为:

from langchain_core.runnables import RunnableLambda

def greet(x, greeting="Hello"):
    return f"{greeting}, {x}!"

chain = RunnableLambda(greet).bind(greeting="Hi")
print(chain.invoke("Alice"))  # "Hi, Alice!"

RunnableBindingBase 的核心字段(base.py:5536):

# base.py:5536-5554
class RunnableBindingBase(RunnableSerializable[Input, Output]):
    bound: Runnable[Input, Output]             # 被包裹的 Runnable
    kwargs: Mapping[str, Any]                  # 绑定的 kwargs
    config: RunnableConfig                     # 绑定的 config
    config_factories: list[Callable]           # config 工厂函数
    custom_input_type: Any | None = None       # 覆盖输入类型
    custom_output_type: Any | None = None      # 覆盖输出类型

所有 with_* 方法都返回新的 RunnableBinding 实例:

方法 作用 返回类型
bind(**kwargs) 绑定 kwargs RunnableBinding
with_config(config) 绑定 config RunnableBinding
with_retry(...) 添加重试 RunnableRetry(子类)
with_fallbacks(...) 添加降级 RunnableWithFallbacks
with_listeners(...) 添加生命周期监听 RunnableBinding
with_types(...) 覆盖类型标注 RunnableBinding

RunnableBinding.bind 的实现(base.py:5981)——合并 kwargs:

# base.py:5981-5999
def bind(self, **kwargs):
    return self.__class__(
        bound=self.bound,
        config=self.config,
        kwargs={**self.kwargs, **kwargs},  # ← 新旧 kwargs 合并
    )

6. Configurable(运行时动态配置)

configurable_fieldsconfigurable_alternatives 允许在运行时动态修改 Runnable 的参数或切换实现。

configurable_fields(修改参数)

from langchain_core.runnables import RunnableLambda, ConfigurableField

def greet(x, *, greeting="Hello"):
    return f"{greeting}, {x}!"

# 注意:实际使用通常在 ChatModel 上,这里用 RunnableLambda 仅做概念演示
# 真实用法:model = ChatOpenAI(temperature=0).configurable_fields(
#     temperature=ConfigurableField(id="temp", name="Temperature")
# )
# model.with_config(configurable={"temp": 0.9}).invoke(...)

configurable_fieldsbase.py:2610)返回 RunnableConfigurableFieldsconfigurable.py:318):

# base.py:2652-2666
def configurable_fields(self, **kwargs):
    from langchain_core.runnables.configurable import RunnableConfigurableFields
    return RunnableConfigurableFields(default=self, fields=kwargs)

RunnableConfigurableFields._prepareconfigurable.py:420)在调用时根据 config["configurable"] 动态重建 Runnable:

# configurable.py:420-459(简化版)
def _prepare(self, config=None):
    config = ensure_config(config)
    configurable_fields = {
        specs_by_id[k][0]: v
        for k, v in config.get("configurable", {}).items()
        if k in specs_by_id
    }
    if configurable_fields:
        # 用新参数重建 Runnable!
        return (self.default.__class__(**{**init_params, **configurable_fields}), config)
    return (self.default, config)

configurable_alternatives(切换实现)

# 概念示例(需要实际模型包)
# prompt = PromptTemplate.from_template("Tell me about {topic}").configurable_alternatives(
#     ConfigurableField(id="prompt"),
#     default_key="joke",
#     poem=PromptTemplate.from_template("Write a poem about {topic}"),
# )
# chain = prompt | model
# chain.with_config(configurable={"prompt": "poem"}).invoke({"topic": "cats"})

RunnableConfigurableAlternativesconfigurable.py:475)根据 config["configurable"][which.id] 选择要使用的 Runnable。


7. RunnableEach(map 操作)

Runnable.map() 返回 RunnableEach——把单个 Runnable 映射到列表的每个元素上:

from langchain_core.runnables import RunnableLambda

add_one = RunnableLambda(lambda x: x + 1)
mapped = add_one.map()
print(mapped.invoke([1, 2, 3]))  # [2, 3, 4]

本质上就是对输入列表调用 bound.batch()


8 种组合模式对比

模式 类名 用途 创建方式
序列 RunnableSequence 串联执行 a | b
并行 RunnableParallel 同一输入并行处理 {"k1": a, "k2": b}
条件分支 RunnableBranch if/elif/else 路由 RunnableBranch((cond, run), default)
透传增强 RunnablePassthrough / RunnableAssign 保留原始输入+添加字段 RunnablePassthrough.assign(...)
提取 RunnablePick 从 dict 中取 key RunnablePick("key")
降级容错 RunnableWithFallbacks 主失败→备选 a.with_fallbacks([b])
自动重试 RunnableRetry 异常自动重试 a.with_retry(...)
装饰绑定 RunnableBinding 绑定 kwargs/config a.bind(key=val)

小结

这篇介绍了 LangChain 的 8 种组合模式:

  1. RunnableBranch — 条件路由,逐个检查条件
  2. RunnablePassthrough/Assign/Pick — 透传、增强、提取,RAG 的基石
  3. RunnableWithFallbacks — 降级容错,自动切换备选
  4. RunnableRetry — 基于 tenacity 的自动重试
  5. RunnableBinding — 装饰器模式,所有 with_* 的基础
  6. Configurable — 运行时动态修改参数或切换实现

这些模式都遵循同一个接口——Runnable[Input, Output]——所以可以任意组合嵌套。那么这个接口是怎么设计出来的?16 个文件 13712 行代码是怎么组织的?继承体系是什么样的?最后一篇来看源码架构与设计哲学。


LangChain Runnables 深度解析(四):源码架构与设计哲学

一个能跑的例子

from langchain_core.runnables import Runnable, RunnableSerializable, RunnableLambda
from langchain_core.load.serializable import Serializable

# Runnable 是一切的基类
assert issubclass(RunnableLambda, Runnable)

# RunnableSerializable 同时继承 Serializable 和 Runnable
assert issubclass(RunnableSerializable, Runnable)
assert issubclass(RunnableSerializable, Serializable)

# 继承链
print(RunnableLambda.__mro__)
# (RunnableLambda, Runnable, ABC, Generic, BaseModel, ...)
# 注意:RunnableLambda 直接继承 Runnable,不继承 RunnableSerializable

Runnable 是抽象基类,RunnableSerializable 是加了序列化能力的 Runnable。但不是所有 Runnable 都需要序列化——比如 RunnableLambda 就直接继承 Runnable


完整目录结构

runnables/                    # 13712 行,16 文件
├── __init__.py          136  # 懒加载入口
├── base.py             6257  # 核心:Runnable/Sequence/Parallel/Lambda/Binding/Generator
├── config.py            632  # RunnableConfig + 线程池 + ContextVar
├── utils.py             759  # 工具函数 + ConfigurableField + AddableDict
├── passthrough.py       841  # RunnablePassthrough/Assign/Pick
├── graph.py             739  # DAG 图表示(可视化用)
├── configurable.py      716  # DynamicRunnable + ConfigurableFields/Alternatives
├── fallbacks.py         664  # RunnableWithFallbacks
├── history.py           622  # RunnableWithMessageHistory
├── graph_mermaid.py     498  # Mermaid 图渲染
├── branch.py            461  # RunnableBranch
├── retry.py             379  # RunnableRetry
├── graph_ascii.py       366  # ASCII 图渲染
├── router.py            239  # RouterRunnable(已少用)
├── graph_png.py         215  # PNG 图渲染
└── schema.py            188  # StreamEvent 类型定义

base.py 一个文件占了全部代码的 45%——这是有意为之,把最核心的 7 个类放在一起避免循环依赖。


继承体系

                    BaseModel (Pydantic)
                         │
                    Serializable (load/serializable.py:88)
                    │    │
     ┌──────────────┘    │
     │                   │
     │        Runnable[Input, Output] (ABC, Generic)
     │                   │        (base.py:124)
     │                   │
     │    ┌──────────────┼──────────────────┐
     │    │              │                  │
     │    │    RunnableSerializable     RunnableLambda
     │    │    (base.py:2586)          (base.py:4395)
     │    │    ← Serializable + Runnable
     │    │              │
     │    │    ┌─────────┼────────────────────────┐
     │    │    │         │                        │
     │    │  RunnableSequence  RunnableParallel  RunnableBindingBase
     │    │  (base.py:2813)   (base.py:3561)    (base.py:5526)
     │    │                                       │
     │    │                              ┌────────┼────────┐
     │    │                              │        │        │
     │    │                     RunnableBinding  RunnableRetry  ...
     │    │                     (base.py:5928)  (retry.py:48)
     │    │
     │    RunnableGenerator (base.py:4092)
     │
     ├── RunnableBranch (branch.py:42)
     ├── RunnablePassthrough (passthrough.py:74)
     ├── RunnableAssign (passthrough.py:352)
     ├── RunnablePick (passthrough.py:671)
     ├── RunnableWithFallbacks (fallbacks.py:36)
     └── DynamicRunnable (configurable.py:49)
             ├── RunnableConfigurableFields (configurable.py:318)
             └── RunnableConfigurableAlternatives (configurable.py:475)

关键设计决策:

  • RunnableLambda 继承 RunnableSerializable——因为 lambda 函数没法序列化
  • RunnableBindingBaseRunnableSerializable 的子类——因为 RunnableRetry/RunnableBinding 需要序列化配置
  • RunnableWithFallbacks 继承 RunnableSerializable——因为主 Runnable 和 fallback 列表都需要序列化

外部组件也是 Runnable:

# 以下都是 Runnable 的子类
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.tools.base import BaseTool
from langchain_core.prompts import BasePromptTemplate
from langchain_core.output_parsers import BaseOutputParser
from langchain_core.retrievers import BaseRetriever

# BaseChatModel → ... → RunnableSerializable → Serializable + Runnable
# BaseTool → RunnableSerializable → Serializable + Runnable

这就是为什么 prompt | model | parser 能直接用 | 组合——它们都是 Runnable。


base.py 内部组织(6257 行)

base.py 这个巨型文件按区域划分:

行号范围 内容 行数
1-117 imports + TYPE_CHECKING ~117
124-2585 Runnable (抽象基类) ~2461
2586-2812 RunnableSerializable ~226
2813-3560 RunnableSequence ~747
3561-4090 RunnableParallel ~529
4092-4393 RunnableGenerator ~301
4395-5524 RunnableLambda ~1129
5526-5927 RunnableBindingBase ~401
5928-6171 RunnableBinding ~243
6172-6258 coerce_to_runnable + @chain ~86

Runnable 类本身 2461 行,包含了:

  • 核心方法:invoke/ainvoke/batch/abatch/stream/astream
  • 组合方法:__or__/__ror__/pipe
  • 配置方法:bind/with_config/with_retry/with_fallbacks/with_listeners
  • 回调骨架:_call_with_config/_acall_with_config/_transform_stream_with_config
  • schema 方法:input_schema/output_schema/config_schema
  • 事件流:astream_events/astream_log

类型推断机制

LangChain 通过两种方式推断 Runnable 的输入/输出类型:

1. 泛型参数(Generic metadata)

RunnableGeneric[Input, Output]。当子类指定具体类型时,如 RunnableSequence[str, int],通过 __pydantic_generic_metadata__ 获取:

# base.py:200-210(InputType 属性)
@property
def InputType(self) -> type[Input]:
    for cls in type(self).__mro__:
        if hasattr(cls, "__pydantic_generic_metadata__"):
            metadata = cls.__pydantic_generic_metadata__
            if "args" in metadata and len(metadata["args"]) >= _RUNNABLE_GENERIC_NUM_ARGS:
                return metadata["args"][0]  # ← 第一个泛型参数 = Input
    raise TypeError(...)

2. 函数签名检查(RunnableLambda)

RunnableLambda 通过 inspect 和 AST 分析推断类型:

def add_one(x: int) -> int:
    return x + 1

r = RunnableLambda(add_one)
# InputType 来自参数 x 的类型标注 → int
# OutputType 来自返回值类型标注 → int

更巧妙的是,它还能通过 AST 分析 lambda 函数对输入 dict 的 key 访问来推断输入 schema:

# utils.py:365-381
def get_function_first_arg_dict_keys(func) -> list[str] | None:
    code = inspect.getsource(func)
    tree = ast.parse(textwrap.dedent(code))
    visitor = IsFunctionArgDict()
    visitor.visit(tree)
    return sorted(visitor.keys) if visitor.keys else None
r = RunnableLambda(lambda x: x["name"] + x["age"])
print(r.input_schema.model_json_schema())
# 推断出输入需要 "name" 和 "age" 两个 key

回调集成架构

每个 Runnable 的执行都嵌入了回调系统,形成层级结构:

chain.invoke(input, config={"callbacks": [handler]})
    │
    ├─ CallbackManager.configure(config["callbacks"])
    │     └─ 创建根 CallbackManager
    │
    ├─ on_chain_start(input)              ← 根 run
    │     └─ run_manager (CallbackManagerForChainRun)
    │
    ├─ step[0].invoke(...)
    │     ├─ run_manager.get_child("seq:step:1")  ← 子 run
    │     └─ on_chain_start / on_chain_end
    │
    ├─ step[1].invoke(...)
    │     ├─ run_manager.get_child("seq:step:2")  ← 子 run
    │     └─ on_chain_start / on_chain_end
    │
    └─ on_chain_end(output)               ← 根 run 结束

子 run 的 tag 命名规则:

  • RunnableSequenceseq:step:1, seq:step:2, …
  • RunnableParallelmap:key:xxx
  • RunnableBranchcondition:1, branch:1, branch:default
  • RunnableRetryretry:attempt:2, retry:attempt:3, …

懒加载入口

runnables/__init__.py(136 行)使用 __getattr__ 实现懒加载:

# __init__.py:95-132
_dynamic_imports = {
    "Runnable": "base",
    "RunnableSequence": "base",
    "RunnableParallel": "base",
    "RunnableLambda": "base",
    "RunnableBranch": "branch",
    "RunnableConfig": "config",
    # ...
}

def __getattr__(attr_name: str) -> object:
    module_name = _dynamic_imports.get(attr_name)
    result = import_attr(attr_name, module_name, __spec__.parent)
    globals()[attr_name] = result    # ← 缓存,下次直接从 globals 取
    return result

好处:from langchain_core.runnables import RunnableLambda 只会加载 base.py,不会加载 branch.pyconfigurable.py 等不需要的模块。


Async "降级"实现

当子类没有覆盖 ainvoke 时,默认实现把 sync 版本扔到线程池(base.py:865):

# base.py:844-865
async def ainvoke(self, input, config=None, **kwargs):
    return await run_in_executor(config, self.invoke, input, config, **kwargs)

run_in_executorconfig.py:598)使用 asyncio.get_running_loop().run_in_executor

# config.py:598-632
async def run_in_executor(executor_or_config, func, *args, **kwargs):
    def wrapper():
        try:
            return func(*args, **kwargs)
        except StopIteration as exc:
            # StopIteration 不能设置到 asyncio.Future 上
            raise RuntimeError from exc

    if executor_or_config is None or isinstance(executor_or_config, dict):
        return await asyncio.get_running_loop().run_in_executor(
            None,
            partial(copy_context().run, wrapper),  # ← 复制 ContextVar
        )
    return await asyncio.get_running_loop().run_in_executor(executor_or_config, wrapper)

这意味着:每个 Runnable 自动获得 async 支持——即使只实现了 sync 版本。但这不是"真正的 async"——它只是在线程池中运行 sync 代码。如果需要真正的 async(例如 async HTTP 调用),子类应该覆盖 ainvoke


5 个架构设计原则

1. 统一接口(Protocol Conformance)

所有组件都实现同一个 Runnable[Input, Output] 接口:

class Runnable(ABC, Generic[Input, Output]):
    @abstractmethod
    def invoke(self, input: Input, config=None) -> Output: ...
    def stream(self, input: Input, config=None) -> Iterator[Output]: ...
    def batch(self, inputs: list[Input], config=None) -> list[Output]: ...
    # + async 变体

这使得 ChatModel、Tool、Prompt、Parser、Retriever 可以自由组合。

2. 装饰器模式(Decorator Pattern)

通过 RunnableBinding 实现 with_* 方法,不修改原 Runnable,而是包装一层:

RunnableBinding(bound=原始Runnable, kwargs={...}, config={...})
    └── RunnableRetry(bound=原始Runnable, max_attempt=3)
    └── RunnableWithFallbacks(runnable=原始, fallbacks=[备选])

3. 回调骨架模式(Template Method)

_call_with_config_transform_stream_with_config 提供统一的回调管理骨架,子类只需提供核心逻辑函数。

4. 惰性求值(Lazy Evaluation)

RunnableSequence.stream 通过迭代器链实现惰性求值,避免中间数据全量缓存。同样,模块导入通过 __getattr__ 实现惰性加载。

5. 渐进式 Async(Gradual Async)

默认通过 run_in_executor 把 sync 变 async,子类可以按需覆盖为真正的 async 实现。不强迫用户写 async 代码,但保留了 async 优化的空间。


依赖方向图

                    ┌────────────┐
                    │ serializable│
                    │  (load/)   │
                    └─────┬──────┘
                          │
                    ┌─────▼──────┐
                    │   utils    │  ConfigurableField, AddableDict,
                    │            │  accepts_config, gather_with_concurrency
                    └─────┬──────┘
                          │
                    ┌─────▼──────┐
                    │   config   │  RunnableConfig, ensure_config,
                    │            │  ContextThreadPoolExecutor, merge_configs
                    └─────┬──────┘
                          │
                    ┌─────▼──────┐
                    │    base    │  Runnable, RunnableSerializable,
                    │  (6257行)  │  Sequence, Parallel, Lambda,
                    │            │  Binding, Generator
                    └─────┬──────┘
                          │
          ┌───────┬───────┼───────┬──────────┐
          │       │       │       │          │
     ┌────▼──┐ ┌──▼───┐ ┌▼────┐ ┌▼───────┐ ┌▼──────────┐
     │branch │ │retry │ │fall-│ │pass-   │ │configurable│
     │       │ │      │ │backs│ │through │ │            │
     └───────┘ └──────┘ └─────┘ └────────┘ └────────────┘

依赖方向严格单向utilsconfigbase ← 各模块。没有循环依赖(需要时用局部 import)。


小结

4 篇文章走完了 LangChain Runnable 体系的全貌:

主题 核心收获
第 1 篇 5 分钟上手 LCEL invoke/stream/batch + | 操作符 + coerce_to_runnable
第 2 篇 流式与并发 迭代器懒求值链 + _transform_stream_with_config + ContextThreadPoolExecutor
第 3 篇 8 种组合模式 Branch/Passthrough/Assign/Pick/Fallbacks/Retry/Binding/Configurable
第 4 篇 源码架构 继承体系 + 回调骨架 + 懒加载 + async 降级 + 5 大设计原则

Runnable 是 LangChain 中最核心的抽象——13712 行代码,16 个文件,但核心思想只有一个:统一接口 + 自由组合。理解了 Runnable,就理解了 LangChain 的骨架。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐