SAA Graph 内存管理源码分析

短期记忆 CheckpointSaver

BaseCheckpointSaver

  • 核心职责:在 AI 状态图中,负责保存对话的记忆和执行状态执行引擎的“快照”机制。它是一个“基础设施组件”。

    • 断点续传:当工作流因人工审批(Human-in-the-loop)或错误中断时,可以从最后一个 Checkpoint 恢复。
    • 多轮对话管理:通过 threadId 区分不同的用户会话,确保状态互不干扰。
    • 版本管理/回溯:保存了状态的历史记录,允许回滚到之前的某个状态。
  • BaseCheckpointSaver接口

public interface BaseCheckpointSaver {
	String THREAD_ID_DEFAULT = "$default";


	Collection<Checkpoint> list(RunnableConfig config);

	Optional<Checkpoint> get(RunnableConfig config);

	RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception;

	Tag release(RunnableConfig config) throws Exception;

	record Tag(String threadId, Collection<Checkpoint> checkpoints) {
		public Tag(String threadId, Collection<Checkpoint> checkpoints) {
			this.threadId = threadId;
			this.checkpoints = ofNullable(checkpoints).map(List::copyOf).orElseGet(List::of);
		}
	}
}
  • BaseCheckpointSaver类图

image-20260208103305153

MemorySaver

  • 它是一个基于内存的状态持久化器(Checkpoint Saver),主要用于在 AI 智能体(Agent)或工作流执行过程中,记录、读取和管理各个节点的“状态快照”(Checkpoints)
  • OverAllState 是执行时的内存数据,而 MemorySaver 存储的是 OverAllState 在某一时刻的“快照”(Snapshot)。MemorySaver 和 OverAllState 是通过 Checkpoint 对象紧密结合在一起使用的。

image-20260208134150359

image-20260208132558082

关键数据结构
  • _checkpointsByThread:
    • 类型:Map<String, LinkedList>
    • Key 是 threadId(会话 ID)。
    • Value 是一个 LinkedList,按时间顺序存储了该会话的所有状态快照。
  • _lock:
    • 类型:ReentrantLock
    • 重要性:由于 AI 图任务通常是异步或并发执行的(如多个节点并行运行),使用重入锁确保了对内存中状态读写操作的线程安全
核心方法解析

A. loadOrInitCheckpoints (模板方法)

这是内部最核心的辅助方法。它封装了“加锁 -> 获取 ThreadId -> 执行逻辑 -> 释放锁”的标准化流程。

  • 它使用 transformer(一个函数式接口)来处理具体的增删改查逻辑,保持了代码的简洁和高并发下的安全性。

B. put (保存/更新状态

  • 更新模式:如果 config 中带了 checkPointId,它会在历史记录中找到那个旧快照并替换。
  • 新增模式:如果不带 ID,它会使用 checkpoints.push(checkpoint) 将新状态压入链表顶部(最新的状态始终在最前面)。
  • 钩子函数:调用了 insertedCheckpoint 或 updatedCheckpoint,方便子类扩展(例如打印日志或指标统计)。

C. get (获取状态)

  • 支持精准获取:通过 checkPointId 获取历史中某个特定时刻的状态。
  • 支持默认获取:如果不指定 ID,调用 getLast 获取当前会话的最新状态。

D. release (释放/清理)

  • 根据 threadId 彻底删除该会话的所有历史记录。这通常用于会话结束或主动清空记忆。
public class MemorySaver implements BaseCheckpointSaver {

	final Map<String, LinkedList<Checkpoint>> _checkpointsByThread = new HashMap<>();
	private final ReentrantLock _lock = new ReentrantLock();

  //钩子方法 (Hooks):loadedCheckpoints, insertedCheckpoint,updatedCheckpoint,releasedCheckpoints 等方法都是 protected 且为空实现。这是一种模板模式,允许开发者继承 MemorySaver 来实现例如“持久化到 Redis”或“异步写入数据库”的增强功能,而不需要修改核心锁逻辑。
	protected LinkedList<Checkpoint> loadedCheckpoints(RunnableConfig config, LinkedList<Checkpoint> checkpoints) throws Exception {
		return checkpoints;
	}

	protected void insertedCheckpoint(RunnableConfig config, LinkedList<Checkpoint> checkpoints, Checkpoint checkpoint) throws Exception {
	}

	protected void updatedCheckpoint(RunnableConfig config, LinkedList<Checkpoint> checkpoints, Checkpoint checkpoint) throws Exception {
	}

	protected void releasedCheckpoints(RunnableConfig config, LinkedList<Checkpoint> checkpoints, Tag releaseTag) throws Exception {
	}

	protected final <T> T loadOrInitCheckpoints(RunnableConfig config,
			TryFunction<LinkedList<Checkpoint>, T, Exception> transformer) throws Exception {
    //它封装了“加锁 -> 获取 ThreadId -> 执行逻辑 -> 释放锁”的标准化流程。
		_lock.lock();
		try {
			var threadId = config.threadId().orElse(THREAD_ID_DEFAULT);
			return transformer.tryApply(loadedCheckpoints(config, _checkpointsByThread.computeIfAbsent(threadId, k -> new LinkedList<>())));
		}
		finally {
			_lock.unlock();
		}
	}

	protected final Collection<Checkpoint> remove(String threadId) {
		return _checkpointsByThread.remove(Objects.requireNonNull(threadId));
	}

	@Override
	public final Collection<Checkpoint> list(RunnableConfig config) {
		try {
      // 使用了 Collections::unmodifiableCollection 返回不可变集合,防止外部代码意外修改内存中的原始数据。
			return loadOrInitCheckpoints(config, Collections::unmodifiableCollection);
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public final Optional<Checkpoint> get(RunnableConfig config) {

		try {
			return loadOrInitCheckpoints(config, checkpoints -> {
        //支持精准获取:通过 checkPointId 获取历史中某个特定时刻的状态。
				if (config.checkPointId().isPresent()) {
					return config.checkPointId()
							.flatMap(id -> checkpoints.stream()
									.filter(checkpoint -> checkpoint.getId().equals(id))
									.findFirst());
				}
        //支持默认获取:如果不指定 ID,调用 getLast 获取当前会话的最新状态。
				return getLast(checkpoints, config);

			});
		}
		catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public final RunnableConfig put(RunnableConfig config, Checkpoint checkpoint) throws Exception {

		return loadOrInitCheckpoints(config, checkpoints -> {

			if (config.checkPointId().isPresent()) { 
        // 更新模式:如果 config 中带了 checkPointId,它会在历史记录中找到那个旧快照并替换。
				String checkPointId = config.checkPointId().get();
				int index = IntStream.range(0, checkpoints.size())
						.filter(i -> checkpoints.get(i).getId().equals(checkPointId))
						.findFirst()
						.orElseThrow(() -> (new NoSuchElementException(format("Checkpoint with id %s not found!", checkPointId))));
				checkpoints.set(index, checkpoint);
        // 钩子函数:便于mysql等持久化实现
				updatedCheckpoint(config, checkpoints, checkpoint);
				return config;
			}

      //新增模式:如果不带 ID,它会使用 checkpoints.push(checkpoint) 将新状态压入链表顶部(最新的状态始终在最前面)。
			checkpoints.push(checkpoint); 
      // 钩子函数:便于mysql等持久化实现
			insertedCheckpoint(config, checkpoints, checkpoint);

			return RunnableConfig.builder(config)
					.checkPointId(checkpoint.getId())
					.build();

		});
	}

	@Override
	public final Tag release(RunnableConfig config) throws Exception {

		return loadOrInitCheckpoints(config, checkpoints -> {
      //根据 threadId 彻底删除该会话的所有历史记录。这通常用于会话结束或主动清空记忆。
			var threadId = config.threadId().orElse(THREAD_ID_DEFAULT);
			var tag = new Tag(threadId, remove(threadId));
      // 钩子函数:便于mysql等持久化实现
			releasedCheckpoints(config, checkpoints, tag);
			return tag;
		});
	}
}

实战场景分析

在你的 AI 项目中,MemorySaver 通常这样使用:

  • 场景 1:流式对话
    当用户问“它是谁?”时,引擎通过 threadId 调用 get 拿到上一轮的 Checkpoint,从而知道“它”指的是什么。
  • 场景 1:人工审查 (Human-in-the-loop)
    工作流运行到“转账确认”节点,引擎调用 put 保存状态并暂停。用户点击网页上的“同意”后,后端从内存中取出 Checkpoint,恢复图执行。

长期记忆 Store

Store

  • 核心职责:用于持久化存储跨会话的数据,语义存储、用户画像、知识沉淀当然你想让 AI 具备“成长性”(记住关于用户的长久信息):你需要自己定义数据结构并存入 MemoryStore。

    • 用户偏好设置 (User Profile)
      • Namespace: users/123/preferences
      • Key: language -> Value: zh-CN
    • 跨会话的“事实性”知识 (Learned Facts)
      • AI 经过分析发现:devices/sensor_01 的正常电压范围是 220V-240V。它将此存入 Store。以后任何会话询问该设备,AI 都能直接查表获取。
    • 简单的 RAG(无向量搜索需求)
      • 存储一些 FAQ 或者是配置信息,不需要做复杂的向量转换,直接根据 Key 或前缀匹配。
  • Strore接口

public interface Store {

	/**
	 * 在指定的命名空间中存储具有给定键的项。如果具有相同命名空间和键的项已存在,则会更新该项。
	 */
	void putItem(StoreItem item);

	/**
	 * 从指定的命名空间中检索具有给定键的项。
	 */
	Optional<StoreItem> getItem(List<String> namespace, String key);

	/**
	 * 从指定命名空间中删除具有给定键的项。
	 */
	boolean deleteItem(List<String> namespace, String key);

	/**
	 * 根据提供的搜索条件搜索。
	 */
	StoreSearchResult searchItems(StoreSearchRequest searchRequest);

	/**
	 * 根据所提供的条件列出可用的命名空间。
	 */
	List<String> listNamespaces(NamespaceListRequest namespaceRequest);

	/**
	 * 清空,警告:此操作不可逆,并将删除所有已存储的内容。
	 */
	void clear();

	/**
	 * 获取总数。
	 */
	long size();

	/**
	 * 校验是否为空
	 */
	boolean isEmpty();

}
  • Store类图

image-20260208171820491

MemoryStore

  • MemoryStore 本质上是一个带 命名空间(Namespace) 隔离的高级内存数据库。
  1. 数据结构 (ConcurrentHashMap + ReadWriteLock)
    • 使用 ConcurrentHashMap 保证基础读写的并发安全。
    • 引入了 ReentrantReadWriteLock(读写锁):
      • 读锁 (readLock):允许多个线程同时进行 getItem、searchItems 或 listNamespaces。
      • 写锁 (writeLock):在 putItem 或 deleteItem 时是互斥的。
    • 设计意图:这通常是为了优化 “搜索” 性能。因为 searchItems 涉及全表扫描、过滤、排序和分页,耗时较长,读写锁能防止在搜索时数据被修改导致的不一致。
  2. 分层命名空间 (Namespace)
    • Key 的格式是 namespace1/namespace2/key。
    • listNamespaces 支持 maxDepth 参数。你可以像查询文件目录一样查询记忆的结构(例如:查询 user_1/ 下的所有子分类)。
  3. 高级查询能力 (searchItems)
    • 它不是简单的 KV 获取,它支持:
      • Filtering:根据自定义条件过滤。
      • Sorting:根据字段排序。
      • Pagination:支持 offset 和 limit(分页)。
    • 实战含义:这证明了它的定位是存储大量、零散的信息,而不是像 MemorySaver 那样只存一个大的状态快照。

业务层记忆 MultiTurnContextManager

  • 我们可以基于自身的业务设计的多轮对话上下文(记忆)管理器MultiTurnContextManager

关键数据结构

  • 该类维护了两个关键的 Map,将“进行中的对话”与“已完成的对话”分开处理:
    • history (历史记录):
      • 存储已经完成的对话回合(User Question + AI Plan)。
      • 使用 Deque(双端队列)实现一个滑动窗口。当记忆超过配置的 maxTurnHistory 时,会自动丢弃最老的一轮(FIFO),防止 Prompt 长度爆炸。
    • pendingTurns (缓冲区):
      • 存储当前正在生成中的对话。
      • 因为 AI 的响应通常是**流式(Streaming)**返回的,所以需要一个 StringBuilder (planBuilder) 来不断追加(Append)碎片字符,直到回答结束。

image-20260208222328570

MultiTurnContextManager源码

@Slf4j
@Component
@AllArgsConstructor
public class MultiTurnContextManager {

	private final AgentProperties properties;

	// todo:考虑持久化存储
	private final Map<String, Deque<ConversationTurn>> history = new ConcurrentHashMap<>();

	private final Map<String, PendingTurn> pendingTurns = new ConcurrentHashMap<>();

	/**
	 * 当系统接收到用户的新问题时调用。它在 pendingTurns 中开辟一块空间,记录当前的问题,并初始化一个新的计划缓冲区。
	 */
	public void beginTurn(String threadId, String userQuestion) {
		if (StringUtils.isAnyBlank(threadId, userQuestion)) {
			return;
		}
		pendingTurns.put(threadId, new PendingTurn(userQuestion.trim()));
	}

	/**
	 * 流式追加 (appendPlannerChunk)
	 */
	public void appendPlannerChunk(String threadId, String chunk) {
		if (StringUtils.isAnyBlank(threadId, chunk)) {
			return;
		}
		PendingTurn pending = pendingTurns.get(threadId);
		if (pending != null) {
			pending.planBuilder.append(chunk);
		}
	}

	/**
	 * 完成归档 (finishTurn),当流式输出结束(Complete)时调用
	 */
	public void finishTurn(String threadId) {
		PendingTurn pending = pendingTurns.remove(threadId);
		if (pending == null) {
			return;
		}
		String plan = StringUtils.trimToEmpty(pending.planBuilder.toString());
		if (StringUtils.isBlank(plan)) {
			log.debug("No planner output recorded for thread {}, skipping history update", threadId);
			return;
		}

		String trimmedPlan = StringUtils.abbreviate(plan, properties.getMaxPlanLength());
		Deque<ConversationTurn> deque = history.computeIfAbsent(threadId, k -> new ArrayDeque<>());
		synchronized (deque) {
			while (deque.size() >= properties.getMaxTurnHistory()) {
				deque.pollFirst();
			}
			deque.addLast(new ConversationTurn(pending.userQuestion, trimmedPlan));
		}
	}

	/**
	 * 当运行停止的时候丢失缓存的对话
	 */
	public void discardPending(String threadId) {
		pendingTurns.remove(threadId);
	}

	/**
	 * 重新启动上一轮,以便新的规划器输出可以替换它(例如,在人工反馈之后)。 上一个已保存的轮次将被删除,其问题将被重新使用。
	 */
	public void restartLastTurn(String threadId) {
		Deque<ConversationTurn> deque = history.get(threadId);
		if (deque == null || deque.isEmpty()) {
			return;
		}
		ConversationTurn lastTurn;
		synchronized (deque) {
			lastTurn = deque.pollLast();
		}
		if (lastTurn != null) {
			pendingTurns.put(threadId, new PendingTurn(lastTurn.userQuestion()));
		}
	}

	/**
	 * 构建多轮对话提示词
	 * 用户: [问题A] 
	 * AI计划: [计划A] 
	 * 用户: [问题B]
	 * AI计划: [计划B]
	 */
	public String buildContext(String threadId) {
		Deque<ConversationTurn> deque = history.get(threadId);
		if (deque == null || deque.isEmpty()) {
			return "(无)";
		}
		return deque.stream()
			.map(turn -> "用户: " + turn.userQuestion() + "\nAI计划: " + turn.plan())
			.collect(Collectors.joining("\n"));
	}

	private record ConversationTurn(String userQuestion, String plan) {
	}

	private static class PendingTurn {

		private final String userQuestion;

		private final StringBuilder planBuilder = new StringBuilder();

		private PendingTurn(String userQuestion) {
			this.userQuestion = userQuestion;
		}

	}

}

对比总结

核心概念对比

维度 MultiTurnContextManager Saver (Checkpoint) Store (Long-term)
人类类比 语境意识 (Dialogue Flow) 瞬时/短期记忆 (Working Memory) 长久记忆/知识库 (Semantic Memory)
框架层级 业务逻辑层 (Application) 执行引擎层 (Graph Infrastructure) 全局资源层 (Global Resource)
存储内容 对话文本对(问与答的字符串) 状态机快照(变量、中间结果、标志位) 结构化事实/偏好(KV键值对、JSON)
关注点 语义连贯性(用于注入 Prompt) 执行可靠性(断点续传、人工审查) 个性化与进化(跨会话记住用户特征)
数据范围 当前会话的最近 N 轮 当前 Thread 的全量执行状态 全局或按 Namespace 隔离的长久数据

开发总结:什么时候该用谁?

  • 如果你发现 AI “前言不搭后语”

    →→ 优化 MultiTurnContextManager(增加滑动窗口大小,改进 Prompt 模板)。

  • 如果你希望程序报错后能接着跑,或者需要人工点击按钮才继续

    →→ 配置 Saver(将 MemorySaver 升级为 JdbcCheckpointSaver)。

  • 如果你希望 AI 具备“成长性”,能记住用户的名字、常用偏好、或者之前总结的规律

    →→ 使用 Store(定义 Namespace,将业务事实写入 StoreItem)。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐