当我们在调用Pregel对象的invoke方法的时候,可以利用参数(durability)指定采用的持久化模式,它决定了在Superstep N成功结束之后,针对Checkpoint的持久化与开始Superstep N+1之间的关系。具体具有如下三种选择:

  • sync:完成了针对Superstep N的Checkpoint持久化后才执行Superstep N+1,这是最“安全”的执行方式。如果在执行过程中发生崩溃,由于状态已同步写入,系统可以确保从最新的Checkpoint完全恢复。但由于需要等待磁盘或网络写入完成,其执行延迟相对较高。
  • async: Superstep N的Checkpoint持久化和Superstep N+1同时开始。这种异步方式达到了性能与安全性的平衡。它通过重叠计算和 I/O 来提高吞吐量,减少等待时间。这是系统的默认配置。
  • exit:不在每个Superstep完成后持久化Checkpoint,而是等到整个调用结束或者遇到中断是才进行持久化。这种方式能够带来最高的性能,因为中间步骤没有持久化开销。但其风险最大,如果程序在图执行结束前崩溃,该次运行的所有中间状态和最终结果都将丢失。

1. Checkpoint持久化

接下来通过几个简单的实例演示来进一步加强对上述三种持久化模式的理解。为了确定Checkpoint持久化的时机,我们定义了如下这个派生于InMemorySaver的ExtendedInMemorySaver类。重写的方法在返回基类的同名方法的调用结果前,模拟了一秒的演示,并做了相应的输出。

from langgraph.checkpoint.memory import InMemorySaver
import time
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersions

class ExtendedInMemorySaver(InMemorySaver):   
    def put(
        self,
        config: RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        time.sleep(1)  # Simulate some delay
        print(f"put called with checkpoint for step {metadata['step']}")
        return super().put(config, checkpoint, metadata, new_versions)

我们构建了如下这个由四个Node组成的Pregel,它的Checkpointer使用的正是上面这个ExtendedInMemorySaver。我们通过写入通道foo驱动节点foo1和foo2并行执行,foo1和foo2在执行结束分别写入对应的Channel驱动bar1和bar2执行。为了确定Node执行的时机,我们也在对应的处理函数中做了相应的输出。

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue, BinaryOperatorAggregate
import operator,time
from functools import partial
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.base import Checkpoint, CheckpointMetadata, ChannelVersions

def handle(node: str, arg: dict):
    print(f"node '{node}' is called.")
    return [node]

foo1 = (NodeBuilder()
        .subscribe_to("foo",read = False)
        .do(partial(handle, "foo1"))
        .write_to("bar1")
)
foo2 = (NodeBuilder()
        .subscribe_to("foo",read = False)
        .do(partial(handle, "foo2"))
        .write_to("bar2")
)
bar1 = (NodeBuilder()
        .subscribe_to("bar1",read = False)
        .do(partial(handle, "bar1"))
        .write_to("output")
)
bar2 = (NodeBuilder()
        .subscribe_to("bar2",read = False)
        .do(partial(handle, "bar2"))
        .write_to("output")
)
app = Pregel(
    nodes={"foo1": foo1, "foo2": foo2, "bar1": bar1, "bar2": bar2},
    channels={
        "foo": LastValue(str),
        "bar1": LastValue(str),
        "bar2": LastValue(str),
        "output": BinaryOperatorAggregate(list, operator.add),
    },  
    input_channels=["foo"],
    output_channels=["output"],
    checkpointer=ExtendedInMemorySaver(),
)

config = {"configurable": {"thread_id": "123"}}
result = app.invoke(input={"foo": "start"}, config=config, durability="sync")
assert result["output"] == ["bar1", "bar2"]

我们调用Pregel对象的invoke方法时显式地将durability参数设置为sync。从如下所示的输出可以看出,当节点foo1和foo2完成执行后,对应Superstep 0的Checkpoint被持久化之后,Superstep 1中的bar1和bar2才开始执行。输出结果还反映了另一个现象:虽然我们采用了sync持久化模式,但是针对Superstep -1针对原始输入的持久化并不能保证在Superstep 0(最先驱动的节点foo1和foo2执行所在的Superstep)开始之前完成。

node 'foo1' is called.
node 'foo2' is called.
put called with checkpoint for step -1
put called with checkpoint for step 0
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step 1

如下所示的是采用async持久化模式的输出结果,可以看出Superstep 1针对节点bar1和bar2的执行和针对Superstep 0的基于Checkpoint持久化是同步进行的。由于put方法模拟了1秒的延时,所以持久化最后才结束。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step -1
put called with checkpoint for step 0
put called with checkpoint for step 1

如果将持久化模式设置为exit,将会产生如下的输出结果。可以看出,这种模式仅在整个调用结束后对Checkpoint作一次持久化。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put called with checkpoint for step 2

2. Pending Write持久化

持久化包括在Superstep完成后针对Checkpoint的持久化和Superstep过程中针对Pending Write的持久化,但是syncasync持久化模式对后者没有任何区别,当Node执行结束或者遇到中断都会针对当前产生的Pending Write作及时的持久化。为了确认我们的想法,我们修改了ExtendedInMemorySaver,按照如下的方式重写了put_writes方法。

class ExtendedInMemorySaver(InMemorySaver):       
    def put_writes(
        self,
        config: RunnableConfig,
        writes: Sequence[tuple[str, Any]],
        task_id: str,
        task_path: str = "",
    ) -> None:
        time.sleep(1)  # Simulate some delay
        print(f"put_writes called with writes: {writes}")

即使我们采用sync持久化模式,针对四个Node任务的Pending Write都是以异步方式执行的,所以会产生如下的输出结果。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put_writes called with writes: deque([('bar2', ['foo2'])])
put_writes called with writes: deque([('bar1', ['foo1'])])
put_writes called with writes: deque([('output', ['bar1'])])
put_writes called with writes: deque([('output', ['bar2'])])

但是如果持久化模式设置成exit,在不产生中断的情况下,不会有任何的Pending Write被持久化,输出将会是如下的结果。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.

为了模拟exit持久化模式下的中断,我们修改了四个Node最终调用的handle方法,让它在节点bar1中模拟一个人为中断。

def handle(node: str, arg: dict):
    print(f"node '{node}' is called.")
    if node == "bar1":
        interrupt("manual interrupt")
    return [node]

虽然遇到中断的bar2是在Superstep 1中执行的,但是整个过程中没有任何一个Checkpoint被持久化,这种情况下不得不对整个过程实施回滚。此时会写入如下所示的四个Pending Write,除了针对bar1的中断类型的Pending Write,其他三个都是针对成功执行任务的Channel写入。

node 'foo1' is called.
node 'foo2' is called.
node 'bar1' is called.
node 'bar2' is called.
put_writes called with writes: deque([('bar1', ['foo1'])])
put_writes called with writes: deque([('bar2', ['foo2'])])
put_writes called with writes: deque([('output', ['bar2'])])
put_writes called with writes: [('__interrupt__', (Interrupt(value='manual interrupt', id='f10d2458e1d1ff38c6b55d008907af52'),))]
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐