[拆解LangChain执行引擎] PregelNode——无状态的功能节点
一个Pregel由Node和Channel构建而成,后者保持状态并以Pub/Sub的方式驱动Node执行,Pregel中的Node是一个PregelNode对象,是一个完全无状态的功能节点。
Pregel中的Node对应的类型为PregelNode。对于一个PregelNode对象来说,它最核心的部分就是绑定在它上面的一个可执行操作,它是抽象类Runnable的子类。在LangChain整个体系中,Runnable类型几乎无处不在,包括语言模型(不论是传统的LLM模型还是Chat模型)、实现RAG的Retriever和提示词模板等组件都是一个Runnable对象,实际上Pregel本身也是一个Runnable对象。LangChain的“Chain”就是由一组Runnable对象按照指定的顺序构建而成。Runnable如此重要,值得单开一个系列进行独立介绍,这里我们可用先将它理解成一个可执行对象,可用帮助我们执行定义Node的函数。
class PregelNode:
bound: Runnable[Any, Any]
1. 输入
PregelNode的channels和triggers字段表示作为输入和触发器的Channel的名称。由于ManagedValue也可用作为输入,所以channels字段也可用包含ManagerValue的名称,这里我们统称为Channel。同一个Channel可以同时作为输入和触发器,出现在这两个字段中。
class PregelNode:
channels : str | list[str]
triggers : list[str]
对于单一的输入,Channel名称可以设置为字符串,也可以封装成列表,它们会影响Node处理函数的输入参数传递方式。对于前者(字符串),对应Channel的值会作为原始参数传递给Node的处理函数,后者则会封装成字典进行传递,字典的Key为Channel的名称。如下的演示实例体现了这一点。
from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.pregel._read import PregelNode
def build_node(node_name:str)->PregelNode:
channel_in = f"{node_name}_in"
channel_out = f"{node_name}_out"
node = (NodeBuilder()
.do(lambda args:args)
.write_to(channel_out)).build()
node.triggers = [channel_in]
node.channels = channel_in if node_name == "foo" else [channel_in]
return node
app = Pregel(
nodes= {name: build_node(name) for name in ["foo","bar"]},
channels= {
"foo_in": LastValue(str),
"bar_in": LastValue(str),
"foo_out": LastValue(object),
"bar_out": LastValue(object),
},
input_channels=["foo_in", "bar_in"],
output_channels=["foo_out", "bar_out"])
result = app.invoke(input={"foo_in":"foobar", "bar_in":"foobar"})
assert result["foo_out"] == "foobar"
assert result["bar_out"] == {'bar_in': 'foobar'}
如果一个Channel被多个Node作为输入,只要该Channel被任一Node以列表的形式注册,引擎内部的对齐机制将统一使用字典作为所有Node处理函数的输入。比如我们按照如下的方式修改了上面的程序,是foo和bar两个Node都使用foobarChannel作为输入,该输入Channel在bar中以列表的形式进行了设置,以字符串形式设置的fooNode的处理函数的输入参数依然会编程字典。这是一个不为人知的细节。
from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.pregel._read import PregelNode
def build_node(node_name:str)->PregelNode:
node = (NodeBuilder()
.do(lambda args:args)
.write_to(node_name)).build()
node.triggers = ["input"]
node.channels = "input" if node_name == "foo" else ["input"]
return node
app = Pregel(
nodes= {name: build_node(name) for name in ["foo","bar"]},
channels= {
"input": LastValue(str),
"foo": LastValue(object),
"bar": LastValue(object),
},
input_channels=["input"],
output_channels=["foo", "bar"])
result = app.invoke(input={"input":"foobar"})
assert result["foo"] == {'input': 'foobar'}
assert result["bar"] == {'input': 'foobar'}
2. 输出
Node的执行结果依赖于PregelNode的writers字段返回的一组Runnable对象输出到对应的Channel,系统默认使用的是一个ChannelWrite。如代码片段所示,初始化ChannelWrite对象时需要提供一组ChannelWriteEntry或ChannelWriteTupleEntry来表示针对目标Channel的写入意图。另一个应用了@cached_property装饰器的flat_writers返回一组扁平化的Runnable对象以提供性能。
class PregelNode:
writers : list[Runnable]
@cached_property
def flat_writers(self) -> list[Runnable]
class ChannelWrite(RunnableCallable):
writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send]
def __init__(
self,
writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send],
*,
tags: Sequence[str] | None = None,
)
ChannelWriteEntry的channel和value字段分别表示输出Channel名称和值。skip_none字段决定是否需要忽略None值,如果指定了mapper字段,value还会被它作进一步处理并最终生成输出到Channel的值。一个ChannelWriteEntry对应一个单一Channel的输出,而ChannelWriteTupleEntry可以完成针对多Channel的输出。具体来说,Node处理函数返回的对象可以绑定在它的value字段上,通过mapper提供的可执行对象映射为一个“二元组序列”,此二元组的两部分对应输出Channel的名称和值。ChannelWriteTupleEntry的static设置的三元组序列仅作静态分析用,可以忽略。
class ChannelWriteEntry(NamedTuple):
channel: str
value: Any = PASSTHROUGH
skip_none: bool = False
mapper: Callable | None = None
class ChannelWriteTupleEntry(NamedTuple):
mapper: Callable[[Any], Sequence[tuple[str, Any]] | None]
value: Any = PASSTHROUGH
static: Sequence[tuple[str, Any, str | None]] | None = None
PASSTHROUGH = object()
在如下所示的演示实例中,我们创建了一个包含唯一Node的Pregel对象,并创建了一个包含单一ChannelWrite对象的列表作为该Node的writers字段。此ChannelWrite的writes列表包含两个ChannelWriteEntry和一个ChannelWriteTupleEntry,它们分别完成了针对三个Channel的输出。
from langgraph.channels import LastValue
from langgraph.pregel import Pregel
from langgraph.pregel._read import PregelNode
from langgraph.pregel._write import ChannelWrite, ChannelWriteEntry, ChannelWriteTupleEntry
entry1 = ChannelWriteEntry(
channel="foo",
value= "123"
)
entry2 = ChannelWriteEntry(
channel="bar",
value= "456",
mapper= lambda v: int(v)
)
tuple_entry = ChannelWriteTupleEntry(
value= {"foo":"123", "bar":"456"},
mapper= lambda v: [("baz", int(v["foo"]) + int(v["bar"]))]
)
node = PregelNode(
triggers=["start"],
channels=[],
writers=[ChannelWrite(writes=[entry1, entry2, tuple_entry])]
)
app = Pregel(
nodes={"body": node},
channels={
"start": LastValue(None),
"foo": LastValue(str),
"bar": LastValue(int),
"baz": LastValue(int)
},
input_channels=["start"],
output_channels=["foo", "bar", "baz"],
)
result = app.invoke(input={"start": None})
assert result == {"foo": "123", "bar": 456, "baz": 579}
对于通过PregelNode对象表示的Node来说,其bound字段返回的Runnable对象用于执行处理函数,操作执行的结果由writers列表的一组Runnable对象写入相应的Channel,这两个核心工作最终会被如下这个node属性合并。对于Pregel来说,该属性在功能上基本就代表了整个Node,这也是该属性如此命名的原因。
class PregelNode:
@cached_property
def node(self) -> Runnable[Any, Any] | None
3. 输入映射
Node基于Channel的输入、触发和输出分别对应channels、triggers和writers字段。如果所有输入Channel读取的原始输入(以字典形式表示)和提交给处理函数的参数有出入,我们还可以利用mapper字段返回的可执行对象(Callable[[Any], Any])作进一步映射。
class PregelNode:
mapper : Callable[[Any], Any] | None
如下的演示程序通过Pregel的mapper字段设置为了一个Lambda表达式,将提供给Node处理函数处理的字典转换成元组。
from langgraph.channels import LastValue
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.pregel._read import PregelNode
from typing import Tuple
node: PregelNode = (NodeBuilder()
.subscribe_to("foo","bar")
.do(lambda args:args)
.write_to("output")).build()
node.mapper = lambda args:tuple(args.values())
app = Pregel(
nodes={"body": node},
channels={
"foo": LastValue(str),
"bar": LastValue(str),
"output": LastValue(Tuple[str,str]),
},
input_channels=["foo","bar"],
output_channels=["output"],
)
result = app.invoke(input={"foo":"hello", "bar":"world"})
assert result["output"] == ("hello","world")
4. 失败重试
Agent中的Node可能会涉及网络传输、数据检索等会导致瞬时错误的操作,失败后自动重试机制是确保可靠性的主要手段,我们可以利用PregelNode 的retry_policy字段设置相应的重试策略。具体的重试策略通过如下所示的RetryPolicy具名元组表示。RetryPolicy的max_attempts和initial_interval分别表示最大重试次数(包含初次调用)和第一次重试前的初始等待时间,单位为秒。如果没有为Node设置针对性的重试策略,Pregel的retry_policy字段设置的重试策略将作为兜底。
class PregelNode:
retry_policy : Sequence[RetryPolicy] | None
class RetryPolicy(NamedTuple):
initial_interval: float = 0.5
backoff_factor: float = 2.0
max_interval: float = 128.0
max_attempts: int = 3
jitter: bool = True
retry_on: (
type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
) = default_retry_on
class Pregel(
PregelProtocol[StateT, ContextT, InputT, OutputT],
Generic[StateT, ContextT, InputT, OutputT]):
retry_policy : Sequence[RetryPolicy] = ()
重试策略采用基于“间隔倍增”的退避机制(Back off),也就是下次重试等待时间是前一次等待的N倍,这个倍数通过backoff_factor字段来提供,max_interval字段为等待实现设置了上限。为了防止多个并发Node同时重试而产生“惊群效应”,我们可以在重试间隔中添加随机抖动,jitter是这一特性的开关。调用失败有很多原因,重试在任何错误场景中都有意义,我们可以利用retry_on字段设置为重置设置前置条件。该字段的默认值对应如下这个default_retry_on函数。
def default_retry_on(exc: Exception) -> bool:
import httpx
import requests
if isinstance(exc, ConnectionError):
return True
if isinstance(exc, httpx.HTTPStatusError):
return 500 <= exc.response.status_code < 600
if isinstance(exc, requests.HTTPError):
return 500 <= exc.response.status_code < 600 if exc.response else True
if isinstance(
exc,
(
ValueError,
TypeError,
ArithmeticError,
ImportError,
LookupError,
NameError,
SyntaxError,
RuntimeError,
ReferenceError,
StopIteration,
StopAsyncIteration,
OSError,
),
):
return False
return True
在如下的演示程序中,我们定义了一个用于创建Pregel对象的build_pregel函数,该函数的max_attempts参数决定组成返回Pregel对象的Node采用的RetryPolicy的重试次数。Node的处理函数是一个由get_handler函数返回的闭包,该闭包在前两次执行的时候总是会排除异常。程序反映的情况是,重试次数若设置为2,调用会抛出异常;但是若设置为3,会保证成功调用。
from langgraph.channels import LastValue
from langgraph.types import RetryPolicy
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.pregel._read import PregelNode
from typing import Any
def get_handler():
times = 0
def handle(args: dict[str, Any]) -> str:
nonlocal times
times += 1
if times < 3:
raise Exception("manually thrown error.")
return "Success"
return handle
def build_node(max_attempts: int) -> PregelNode:
node = (
NodeBuilder().subscribe_to("start").do(get_handler()).write_to("output")
).build()
node.retry_policy = [RetryPolicy(max_attempts=max_attempts)]
return node
def build_pregel(max_attempts: int) -> Pregel:
return Pregel(
nodes={"body": build_node(max_attempts)},
channels={
"start": LastValue(None),
"output": LastValue(str),
},
input_channels=["start"],
output_channels=["output"],
)
app = build_pregel(max_attempts=2)
try:
app.invoke(input={"start": None})
assert False, "Expected an exception but none was raised."
except Exception as e:
assert str(e) == "manually thrown error."
app = build_pregel(max_attempts=3)
result = app.invoke(input={"start": None})
assert result["output"] == "Success"
5. 结果缓存
如果Node绑定一个相对耗时的计算,并且结果完全由给定的输入决定,那么针对输入对结果予以缓存无疑是改善时延的好办法。基于结果的缓存可以通过PregelNode 的cache_policy字段返回的缓存策略来控制。缓存策略通过CachePolicy类型表示,它具有key_func和ttl两个字段,前者提供一个用于解析缓存键(字符串或者字节数组)的可执行对象,后者用于设置缓存过期时间(如果没有显示设置,意味着永不过期)。
class PregelNode:
cache_policy : CachePolicy | None
@cached_property
def input_cache_key(self) -> INPUT_CACHE_KEY_TYPE
@dataclass(**_DC_KWARGS)
class CachePolicy(Generic[KeyFuncT]):
key_func: KeyFuncT = default_cache_key
ttl: int | None = None
KeyFuncT = TypeVar("KeyFuncT", bound=Callable[..., str | bytes])
INPUT_CACHE_KEY_TYPE = tuple[Callable[..., Any], tuple[str, ...]]
def default_cache_key(*args: Any, **kwargs: Any) -> str | bytes:
return pickle.dumps((_freeze(args), _freeze(kwargs)), protocol=5, fix_imports=False)
对结果实施缓存的前提是需要将输入的“指纹”作为缓存键,这里的缓存键根据通过INPUT_CACHE_KEY_TYPE定义的二元组进行计算,该二元组前半部分提供的可执行对象相当于一个哈希函数,能够将原始内容转换成“指纹”;后者提供的多元组以路径的方式唯一标识当前的节点。CachePolicy的key_func字段对应的可执行对象将会作为INPUT_CACHE_KEY_TYPE二元组的前半部分,从定义可以看出默认设置的default_cache_key函数会利用pickle以序列化的方式将原始输入转换成字节作为指纹。该指纹和二元组后半部分合并称为Node执行结果缓存项的Key。
这里之所以需要强制使用字符串或者字节来表示缓存键,是因为Pregel针对缓存的实现并不限于内存存储,原则上可以与任意的内存数据库进行整合(比如redis)。缓存存储在Pregel中通过如下这个抽象基类BaseCache表示,它定义了一系列的抽象方法完成针对缓存的读取、写入和清除,我们可以通过派生此基类实现自定义的缓存存储。开发测试阶段我们经常会使用基于内存存储的InMemoryCache。如果没有对Node的缓存策略作针对性设置,在Pregel对象上设置的缓存策略将作为兜底。
class BaseCache(ABC, Generic[ValueT]):
serde: SerializerProtocol = JsonPlusSerializer(pickle_fallback=True)
def __init__(self, *, serde: SerializerProtocol | None = None) -> None:
self.serde = serde or self.serde
@abstractmethod
def get(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
@abstractmethod
async def aget(self, keys: Sequence[FullKey]) -> dict[FullKey, ValueT]:
@abstractmethod
def set(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
@abstractmethod
async def aset(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) -> None:
@abstractmethod
def clear(self, namespaces: Sequence[Namespace] | None = None) -> None:
@abstractmethod
async def aclear(self, namespaces: Sequence[Namespace] | None = None) -> None:
class Pregel(
PregelProtocol[StateT, ContextT, InputT, OutputT],
Generic[StateT, ContextT, InputT, OutputT]):
cache: BaseCache | None = None
cache_policy: CachePolicy | None = None
在如下所示的演示程序中,对于创建的Pregel的唯一Node,它虽然具有两个输入Channel(foo和bar),但是对应的处理函数会将当前时间戳作为返回结果。我们为该Node设置了缓存策略,并将过期时间设置为30秒。
from langgraph.channels import LastValue
from langgraph.types import CachePolicy
from langgraph.pregel import Pregel,NodeBuilder
import datetime,time
from langgraph.cache.memory import InMemoryCache
node = (NodeBuilder()
.subscribe_to("foo","bar")
.do(lambda _: datetime.datetime.now())
.write_to("output")).build()
node.cache_policy = CachePolicy(ttl=30)
app = Pregel(
nodes={"body": node},
cache=InMemoryCache(),
channels={
"foo": LastValue(str),
"bar": LastValue(str),
"output": LastValue(str),
},
input_channels=["foo","bar"],
output_channels=["output"])
input = {"foo":"abc", "bar":"xyz"}
result = app.invoke(input=input)
print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
time.sleep(5)
result = app.invoke(input=input)
print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
time.sleep(5)
input = {"foo":"xyz", "bar":"abc"}
result = app.invoke(input=input)
print(f"[{datetime.datetime.now()}]{input} -> {result['output']}")
我们以5秒为间隔调用了Pregel对象三次,前两次使用相同的输入({“foo”:“abc”, “bar”:“xyz”})。三次调用的时间戳和输入输出会以如下的形式打印出来,我们可以清晰地看到前两次由于提供了相同的参数,所以得到了相同的结果,很明显第二次得到的是缓存的结果。
[2026-01-31 23:51:20.178285]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
[2026-01-31 23:51:25.180527]{'foo': 'abc', 'bar': 'xyz'} -> 2026-01-31 23:51:20.177192
[2026-01-31 23:51:30.183805]{'foo': 'xyz', 'bar': 'abc'} -> 2026-01-31 23:51:30.182490
6.补遗
前面已经介绍了PregelNode类型的大部分核心成员,对于如下几个遗漏的成员,我们在这里作一下概况性介绍。tags使我们可以在Node上打上相应的标签,而metadata则可以在它上面附加任意的元数据。
class PregelNode:
tags : Sequence[str] | None
metadata : Mapping[str, Any] | None
subgraphs : Sequence[PregelProtocol]
def copy(self, update: dict[str, Any]) -> PregelNode
def invoke(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Any
async def ainvoke(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Any
def stream(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> Iterator[Any]
async def astream(
self,
input: Any,
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Any]
Node结合边构成了图,而图本身也可以作为一个Node参与构建一个更大的图,所以图具有一个嵌套的层级结构,一个Node可以包含一组子图,体现在subgraphs字段上。方法copy对返回自身的一个浅拷贝,至于四个方法(invoke、ainvoke、stream和astream)实现的两种调用模式,最终还是通过调用bound字段的Runnable对象的同名方法实现的。
我们最后使用最简单的语言对Pregel做一个总结:我们可以将 PregelNode 想象成一个智能反应堆,其中triggers是点火装置(决定什么时候开始),channels是原料管道(输入数据),mapper 是入料加工(数据预处理),bound 是核心反应室(业务逻辑),writers 是成品输送带(更新状态)。
7. NodeBuilder
为了让大家对表示Node的PregelNode类型有深入地理解,在前面的演示中我们大都采用直接对其字段进行设置的方式,实际上在真正的开发中基本不会这么做,而是选择使用NodeBuilder来构建它,后者提供更加精简的API。
class NodeBuilder:
def subscribe_only(
self,
channel: str,
) -> Self
def subscribe_to(
self,
*channels: str,
read: bool = True,
) -> Self
def read_from(
self,
*channels: str,
) -> Self
def write_to(
self,
*channels: str | ChannelWriteEntry,
**kwargs: _WriteValue,
) -> Self
def meta(self, *tags: str, **metadata: Any) -> Self
def add_retry_policies(self, *policies: RetryPolicy) -> Self
def add_cache_policy(self, policy: CachePolicy) -> Self
def build(self) -> PregelNode
如果构建的Node只需定义一个Channel,我们可以调用subscribe_only方法,它将以字符串的(不是列表)形式赋值给channels字段。subscribe_to方法默认会将指定的Channel同时添加到triggers和channels列表中,如果将read参数设置为False,指定的Channel只会作为输入添加到channels列表中。read_from方法指定的仅仅是输入Channel,所以只会添加到channels列表中。
如果自行构建PregelNode,针对Channel的输出会很麻烦,使用NodeBuilder的write_to方法就简单多了,我们只需要指定输出Channel的名称列表就可以了。如果需要多输出作更细粒度的控制,也可以指定一组ChannelWriteEntry对象。我们也可以利用write_to方法提供的关键字参数针对Channel的写入(此时参数名会作为输出Channel的名称),其类型_WriteValue定义如下,所以我们可以使用兼容的Lambda表达式简单快捷地完成输出。
_WriteValue = Callable[[Input], Output] | Any
前面介绍的打标签和附加元数据的功能可以调用NodeBuilder的meta方法来完成,失败重试和结果缓存策略则由add_retry_policies和add_cache_policy方法来提供。等所有设置完成之后,我们直接调用build方法将目标Node构建出来。
更多推荐



所有评论(0)