接下来介绍的几个Runnable类型会整合其他的Runnable,使它们按照指定的流程执行。RunnableSequenceRunnableParallel分布实现了多Runnable对象的 “串行” 和 “并行” 执行,RunnableBranch根据计算条件划分了逻辑分支,而RunnablePassthrough则简单实现了数据透传,并再此基础上完成数据成员的添加和覆盖。

1. RunnableSequence

在 LangChain 的设计哲学中,RunnableSequence是整个 LCEL的核心引擎。它负责将多个不同的Runnable组件(如 Prompt、LLM、Parser、Lambda 等)连接在一起,形成一个完整的工作流。我们平时使用的管道符号 “|” 就是创建RunnableSequence的快捷方式。

从如下的代码可以看出,RunnableSequence派生于RunnableSerializable。我们在初始化它的时候可以直接指定完整的序列,也可以以具名参数的形式指定第一个、最后一个以及中间的Runnable对象。

class RunnableSequence(RunnableSerializable[Input, Output]):
    def __init__(
        self,
        *steps: RunnableLike,
        name: str | None = None,
        first: Runnable[Any, Any] | None = None,
        middle: list[Runnable[Any, Any]] | None = None,
        last: Runnable[Any, Any] | None = None,
    ) -> None

如果直接指定完整序列,我们不必要求序列中每个元素都是Runnable对象,只要是“长得像” Runnable的RunnableLike对象就可以了。从如下给出的针对RunnableLike的定义可以看出, “选择自由度”还是挺大的。

RunnableLike = (
    Runnable[Input, Output]
    | Callable[[Input], Output]
    | Callable[[Input], Awaitable[Output]]
    | Callable[[Iterator[Input]], Iterator[Output]]
    | Callable[[AsyncIterator[Input]], AsyncIterator[Output]]
    | _RunnableCallableSync[Input, Output]
    | _RunnableCallableAsync[Input, Output]
    | _RunnableCallableIterator[Input, Output]
    | _RunnableCallableAsyncIterator[Input, Output]
    | Mapping[str, Any]
)

class _RunnableCallableSync(Protocol[Input, Output]):
    def __call__(self, _in: Input, /, *, config: RunnableConfig) -> Output: ...

class _RunnableCallableAsync(Protocol[Input, Output]):
    def __call__(
        self, _in: Input, /, *, config: RunnableConfig
    ) -> Awaitable[Output]: ...

class _RunnableCallableIterator(Protocol[Input, Output]):
    def __call__(
        self, _in: Iterator[Input], /, *, config: RunnableConfig
    ) -> Iterator[Output]: ...

class _RunnableCallableAsyncIterator(Protocol[Input, Output]):
    def __call__(
        self, _in: AsyncIterator[Input], /, *, config: RunnableConfig
    ) -> AsyncIterator[Output]

构造函数中指定的可执行对象、函数和字典(Mapping[str, Any])最终都会转换成对应的Runnable对象,这个规则同样应用在其他类型的Runnable上。具体的转换规则如下:

  • 如果是一个异步生成器/迭代器,转换成一个RunnableGenerator对象;
  • 如果是一个可调用对象,转换成一个RunnableLambda对象;
  • 如果是一个字典,转换成一个RunnableParallel对象;

由于RunnableSequence是由指定的Runnable对象按序构建的,所以第一个Runnable的输入和最后一个Runnable的输出就是它的输入和输出,表示输入输出类型以及相关Schema的方法都是根据这个规则进行了重写。Runnable中用于构建管道的pipe方法返回的就是这个RunnableSequence对象,管道操作符“|”对应的方法__or____ror__也是如此。

class Runnable(ABC, Generic[Input, Output]):
    def __or__(
        self,
        other: Runnable[Any, Other]
        | Callable[[Iterator[Any]], Iterator[Other]]
        | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
        | Callable[[Any], Other]
        | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
    ) -> RunnableSerializable[Input, Other]:
        return RunnableSequence(self, coerce_to_runnable(other))

    def __ror__(
        self,
        other: Runnable[Other, Any]
        | Callable[[Iterator[Other]], Iterator[Any]]
        | Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
        | Callable[[Other], Any]
        | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
    ) -> RunnableSerializable[Other, Output]:
        return RunnableSequence(coerce_to_runnable(other), self)

    def pipe(
        self,
        *others: Runnable[Any, Other] | Callable[[Any], Other],
        name: str | None = None,
    ) -> RunnableSerializable[Input, Other]:
        return RunnableSequence(self, *others, name=name)

如下的代码演示了管道编程与RunnableSequence的一致性。不论是RunnableSequence的构造函数,还是管道操作符函数都没有强制要求输入Runnable对象,所以我们可以直接使用Lambda表达式。这里使用到了RunnablePassthrough对象,可以任务它仅仅实现“参数透传”的Runnable对象。

from langchain_core.runnables import RunnableSequence, RunnablePassthrough

chain = RunnableSequence(
        lambda x: x + 1,
        lambda x: x * 2,
        lambda x: x * x)
result = chain.invoke(3)
assert result == 64  # ((3 + 1) * 2) ^ 2 -> 64

chain = RunnablePassthrough() | (lambda x: x + 1) | (lambda x: x * 2) | (lambda x: x * x) 
result = chain.invoke(3)
assert result == 64  # ((3 + 1) * 2) ^ 2 -> 64

2. RunnableParallel

RunnableParallel是实现并行处理的核心组件。它允许同时运行多个不同的任务,并将它们的结果合并成一个字典作为输出。它常用于多路检索、任务拆解和并行调用多个模型。它依然派生于RunnableSerializable,我们在初始化的时候可以采用不同的形式指定给定的名称与对应可执行对象之间的映射关系。顺便提一下,RunnableMapRunnableParallel的一个别名。

class RunnableParallel(RunnableSerializable[Input, dict[str, Any]]):
    def __init__(
        self,
        steps__: Mapping[
            str,
            Runnable[Input, Any]
            | Callable[[Input], Any]
            | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
        ]
        | None = None,
        **kwargs: Runnable[Input, Any]
        | Callable[[Input], Any]
        | Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
    ) -> None

RunnableParallel构造函数的steps参数的字典和关键字参数都是一个字典,它们的值类型可以是RunnableCallable[[Input], Any]或者一个字典(值类型为Runnable[Input, Any]Callable[[Input], Any] ),这些对象也会按照RunnableSequence一样的规则转换成对应的Runnable对象。这些Runnable并发执行的并发度会受控于指定的RunnableConfigmax_concurrency 配置,如下的演示程序证实了这一点。

from langchain_core.runnables import RunnableParallel
import time
from functools import partial
from typing import Any

def handle(task:str, _:Any)-> str:
    print(f"Starting task: {task}")
    time.sleep(1)
    print(f"Finished task: {task}")
    return task
        
runnable = RunnableParallel(
    foo=partial(handle, "foo"),
    bar=partial(handle, "bar"),
    baz=partial(handle, "baz"),
    qux=partial(handle, "qux"),
)

result = runnable.invoke({}, config={"max_concurrency": 2}) 
assert result == {
    "foo": "foo",
    "bar": "bar",
    "baz": "baz",
    "qux": "qux",
}

# output:
# Starting task: foo
# Starting task: bar
# Finished task: bar
# Finished task: foo
# Starting task: baz
# Starting task: qux
# Finished task: baz
# Finished task: qux

3. RunnablePassthrough

RunnablePassthrough的主要作用是将输入原封不动地透传给链中的下一个步骤,或者在保留原始输入的同时动态地添加新的数据成员。它可以作为 恒等函数 使用,直接返回接收到的输入,这在需要将一个值同时传给多个下游分支时非常有用。如下面的代码片段所示,如果调用构造函数提供了相应的可执行对象,那么它们会在invoke方法中被调用,否则调用的就是用于实现输入透传的identity函数。其他的ainvokestream/astreamtransform/atransform方法也做了类似的重写。

class RunnablePassthrough(RunnableSerializable[Other, Other]):
    def __init__(
        self,
        func: Callable[[Other], None]
        | Callable[[Other, RunnableConfig], None]
        | Callable[[Other], Awaitable[None]]
        | Callable[[Other, RunnableConfig], Awaitable[None]]
        | None = None,
        afunc: Callable[[Other], Awaitable[None]]
        | Callable[[Other, RunnableConfig], Awaitable[None]]
        | None = None,
        *,
        input_type: type[Other] | None = None,
        **kwargs: Any,
    ) -> None:        
        if inspect.iscoroutinefunction(func):
            afunc = func
            func = None
        super().__init__(func=func, afunc=afunc, input_type=input_type, **kwargs)

    @override
    def invoke(
        self, input: Other, config: RunnableConfig | None = None, **kwargs: Any
    ) -> Other:
        if self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        return self._call_with_config(identity, input, config)

def identity(x: Other) -> Other:
    return x

3.1 RunnableAssign

提升RunnablePassthrough重要性的还有定义在它上面的assign方法,它返回的RunnableAssign在透传的基础上实现了新数据成员的添加和对现有成员的覆盖。我们在构建一个RunnableAssign对象时会指定一个RunnableParallel[dict[str, Any]]对象来初始化它的mapper字段,重写的invoke方法会调用它来得到一个字典,原始输入的字典与此字典进行合并后会作为最终的返回值。其他的ainvokestream/astreamtransform/atransform方法也做了类似的重写。

class RunnableAssign(RunnableSerializable[dict[str, Any], dict[str, Any]]):
    mapper: RunnableParallel

    def __init__(self, mapper: RunnableParallel[dict[str, Any]], **kwargs: Any) -> None:
        super().__init__(mapper=mapper, **kwargs)

    @override
    def invoke(
        self,
        input: dict[str, Any],
        config: RunnableConfig | None = None,
        **kwargs: Any,
    ) -> dict[str, Any]:
        return self._call_with_config(self._invoke, input, config, **kwargs)

    def _invoke(
        self,
        value: dict[str, Any],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> dict[str, Any]:return {
            **value,
            **self.mapper.invoke(
                value,
                patch_config(config, callbacks=run_manager.get_child()),
                **kwargs,
            ),
        }

按照上面的定义,我们可以采用如下的方式创建一个RunnableAssign对象,并利用指定的RunnableParallel对象计算并添加表示“总和”和“平均值”的两个成员。

from langchain_core.runnables import RunnableAssign,RunnableParallel

mapper = RunnableParallel(
    total = lambda inputs: sum(inputs.values()),
    avg = lambda inputs: sum(inputs.values()) / len(inputs),    
)

assign = RunnableAssign(mapper=mapper)
result = assign.invoke({"foo": 1, "bar": 2, "baz": 3})
assert result == {
    "foo": 1,
    "bar": 2,
    "baz": 3,
    "total": 6,
    "avg": 2.0,
}

3.2 assign方法

定义在RunnablePassthrough中的类方法assign采用如下的方式返回创建的RunnableAssign对象。Runnableassign方法返回的就是它自己和创建的RunnableAssign对象构建的管道。

class RunnablePassthrough(RunnableSerializable[Other, Other]):
    @classmethod
    @override
    def assign(
        cls,
        **kwargs: Runnable[dict[str, Any], Any]
        | Callable[[dict[str, Any]], Any]
        | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
    ) -> RunnableAssign:
    return RunnableAssign(RunnableParallel[dict[str, Any]](kwargs))

class Runnable(ABC, Generic[Input, Output]):
   def assign(
        self,
        **kwargs: Runnable[dict[str, Any], Any]
        | Callable[[dict[str, Any]], Any]
        | Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
    ) -> RunnableSerializable[Any, Any]:
        return self | RunnableAssign(RunnableParallel[dict[str, Any]](kwargs))

前面的例子也可以进一步简化成如下的形式:

from langchain_core.runnables import RunnablePassthrough

chain = RunnablePassthrough().assign (
    total = lambda inputs: sum(inputs.values()),
    avg = lambda inputs: sum(inputs.values()) / len(inputs),    
)

result = chain.invoke({"foo": 1, "bar": 2, "baz": 3})
assert result == {
    "foo": 1,
    "bar": 2,
    "baz": 3,
    "total": 6,
    "avg": 2.0,
}

4. RunnableBranch

RunnableBranch是实现条件路由的核心方式。从构造函数的定义可以看出,分支分为两种,一种是由二元组组成的条件分支,前者以一个Runnable[Input, bool]Callable[[Input], bool]或者Callable[[Input], Awaitable[bool]]对象的形式表示前置条件,后者以一个RunnableLike对象的形式承载对应的操作。它可以有多个条件分支,但是只有一个被至于最后的默认分支,由一个RunnableLike对象表示。创建一个RunnableBranch对象必须提供至少两个分支, 并且必须具有默认分支。

对于每个分支,作为前置条件和处理器的可执行对象或者字典都会转换成对应的Runnable对象,所以最终的分支以一个Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]对象的形式保存在branches字段中。在重写的invoke/ainvokestream/astream以及transform/atransform方法中,提供的输入会作为参数调用作为条件分支前置条件的Runnable对象,如果返回True,则执行对应处理器的Runnable对象。如果没有条件满足,则执行代表默认分支的Runnable对象。

class RunnableBranch(RunnableSerializable[Input, Output]):
    branches: Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]
    def __init__(
        self,
        *branches: tuple[
            Runnable[Input, bool]
            | Callable[[Input], bool]
            | Callable[[Input], Awaitable[bool]],
            RunnableLike,
        ]
        | RunnableLike,
    ) -> None:

如下的代码演示了在一个处理字典的流程中如何利用RunnableBranch根据输入提供的时间生成一条对应的问候语,该问候语以greeting为Key添加到输入字典中,并最终将此字典作为输出。

from langchain_core.runnables import RunnableBranch
from datetime import time
from typing import Callable

def build_conditional_branch(
    condition: Callable[[time], bool],
    greeting: str
) -> tuple[Callable[[dict], bool], Callable[[dict], dict]]:
    return (
        lambda input: condition(input["time"]),
        lambda input: {**input, "greeting": greeting}
    )

runnable = RunnableBranch[dict,dict](
    build_conditional_branch(lambda t: t.hour < 12, "Good morning"),
    build_conditional_branch(lambda t: 12 <= t.hour < 18, "Good afternoon"),
    lambda input: {**input, "greeting": "Good evening"})

result = runnable.invoke({"time": time(9, 0)})
assert result == {"time": time(9, 0), "greeting": "Good morning"}

result = runnable.invoke({"time": time(15, 0)})
assert result == {"time": time(15, 0), "greeting": "Good afternoon"}

result = runnable.invoke({"time": time(20, 0)})
assert result == {"time": time(20, 0), "greeting": "Good evening"}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐