[拆解LangChain执行引擎]梳理Agent的执行流程
到目前为止,我们已经大体了解Pregel的`invoke/ainvoke`方法在背后都做了什么。接下来我们分两种情况简单梳理一下Pregel对象的执行的大体流程,这也是我们创建的Agent的执行流程。具体执行场景分两种, 一种是指定输入从头开始的常规调用,另一种是针对某个Checkpoint的恢复调用。
到目前为止,我们已经大体了解Pregel的invoke/ainvoke方法在背后都做了什么。接下来我们分两种情况简单梳理一下Pregel对象的执行的大体流程,这也是我们创建的Agent的执行流程。具体执行场景分两种, 一种是指定输入从头开始的常规调用,另一种是针对某个Checkpoint的恢复调用。
1. 常规调用
对于常规调用(这里假设Pregel设置了Checkpointer),我们针对输入Channel提供对应的输入参数调用invoke/ainvoke方法,并且利用RunnableConfig对Thread ID进行设置,此后便进入了基于Superstep的循环迭代。Pregel会根据提供的参数创建一个PregelLoop对象来实施迭代。
首次迭代的第一个Superstep(Superstep -1)相对特殊一些,执行引擎在将输入写入Channel后,它会根据Node针对Channel的订阅关系确定下一步该执行的Node,并生成对应可执行任务。这个可执行任务不是状态快照中的PregelTask对象,其类型为PregelExecutableTask对象。如果设置的持久化模式不是exit,引擎会针对当前的Channel状态生成一个Checkpoint,并调用Checkpointer的put/aput方法实施持久化。从上面的实例模拟的输出结果来看,这里的持久化应该是异步调用的。后续迭代基本采用这样的模式。
创建的任务将在新的Superstep中执行,但在并指执行这些PregelExecutableTask对象之前,一个PregelScratchpad对象会被创建出来。PregelScratchpad提供当前和最大允许的Superstep编号(由设置的迭代限制决定,默认为10000),同时还提供Resume Value列表(来源于持久化的基于Resume的Pending Write,这里为空)和一组分别服务于恢复调用和子图调用的计数器。
执行任务所需的输入具有两种来源,一种是常规Channel,另一种是ManagedValue,后者是基于当前的PregelScratchpad的实时计算的结果。任务在执行过程中只能读取Channel在上一个Superstep固化的数据,并且它们在执行过程中也不运行直接改变Channel的值,双重保障确保了数据的一致性,每个并发执行的任务看到的数据都是一致的。每个任务完成执行后会将针对Channel的更新诉求封装成Pending Write提交给执行引擎,后者对对其进行缓存。如果没有将持久化模式设置为exit,Checkpointer的put_writers/aput_writers方法会被调用其持久化。
当所有任务成功执行后,整个流程进入一个同步屏障。对于执行引擎之前收集到的所有针对Channel的写入(包括面向业务的常规Channel和用于存储Push任务的名为__pregel_tasks的Channel),在此同步屏障中被统一应用。随后,需要在下一Superstep执行的任务被解析出来,其中包括借助Channel订阅驱动的Pull任务,和利用__pregel_tasks通道存储的Send对象创建的Push任务。如果没有将持久化模式设置为exit,此时针对当前状态的Checkpoint被创建出来。对于sync持久化模式,下一Superstep的执行会却确保在Checkpoint被成功持久化之后开启,否则两者就是一个并行执行的过程。
如果采用exit持久化模式,执行引擎只会在整个流程结束后对最终的状态创建一个Checkpoint,并对它进行全程唯一的持久化。如果流程中发生中断,之前累计的Pending Writes在这个时候被统一持久化。不论中断与否,输出Channel的值都会被读取出来组合成字典作为返回值。如果迭代超出限制,执行就此中止,对于sync或者async持久化模式,之前的Checkpoint持久化已经实施了,但是对于exit模式,则会根据当前状态实施持久化。
2. 恢复调用
对于恢复执行,我们必须利用RunnableConfig配置提供Thread ID。如果同时指定了Checkpoint ID,意味着试图从它对应的历史时刻开始执行,相当于在那里重建了一个分支,开启了一个平行世界;否则则是行为从上次中断地方继续前行。恢复调用还可以利用Command的resume字段为对应的中断提供Resume Value。
执行引擎针对恢复调用会忽略提供的输入,转而调用Checkpointer的get_tuple/aget_tuple方法得到对应的CheckpointTuple元组。CheckpointTuple元组携带的Checkpoint、Pending Writes以及其他先相关的元数据和配置会用来创建上面介绍的PregelLoop对象。 Checkpoint随后被提取出来写入对应的Channel,并按照上面介绍的方式解析出接下来应该执行的任务。
对于利用PregelLoop实施的第一个迭代于后续迭代有所不同。首先,Resume类型的Pending Write会被提取用于填充PregelScratchpad的resume列表;其次,对于重建的任务,如果在Pending Write中有对应的条目,且状态为执行成功,它将不会重复执行,但是此Pending Write会提交给引擎。后续的步骤就与常规执行没有什么区别了。
更多推荐

所有评论(0)