MemMachine 原理深度解析:从源码出发,一文搞懂 AI Agent 的“长期记忆层“
文章摘要:本文介绍了大语言模型(LLM)存在的两个关键限制——无状态性和上下文窗口有限性,以及传统RAG式记忆解决方案的不足。作者提出MemMachine记忆系统,通过模拟人类认知分层(工作记忆、情景记忆、语义记忆)来解决这些问题。系统架构包含客户端层、API层、核心服务层和存储层,采用模块化设计支持多租户隔离和模型替换。关键创新点包括事件记忆流处理、结构化语义抽取和向量图存储技术,最终实现细粒度
文章目录
第 0 章 引子:为什么 LLM 需要一个"记忆层"?
大语言模型有两个先天限制:
- 无状态:每次 API 调用之间不共享任何信息,所有"上下文"必须显式写进
messages数组; - 上下文窗口有限:即便是 200K token 的模型,把所有历史塞进去也会导致延迟、成本、注意力稀释一起恶化。
最朴素的解法是"把对话写进向量库,查询时召回 Top-K",这就是早期的 RAG-style memory。但实践中很快暴露出几个新问题:
- 粒度太粗:一条用户消息可能同时包含"姓名"、“偏好”、“日程”,向量召回时只能整段返回,无法精确利用;
- 结构化缺失:用户说过"我喜欢川菜"和"我讨厌香菜",应该作为可独立查询、可独立删除的两条事实,而不是绑在同一段文本里;
- 时间线丢失:对话是有顺序的,"昨天用户改主意了"这种信息必须保留顺序;
- 多租户隔离:一个 Agent 通常要同时服务多个用户/会话,记忆必须可按维度切割;
- 模型可替换:今天用 OpenAI,明天换 Bedrock,记忆不能跟着丢。
MemMachine 的设计哲学,就是模拟人类的认知分层来解决这些问题:
┌────────────────────────────────────────────────────────────┐
│ 人类大脑 │
├────────────────────────────────────────────────────────────┤
│ 工作记忆 : 正在想的事 (秒级) │
│ 情景记忆 : 发生过什么 (按时间,可回放) │
│ 语义记忆 : 我知道什么 (脱离时间的事实/常识) │
└────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────┐
│ MemMachine │
├────────────────────────────────────────────────────────────┤
│ Working Memory : 当前会话内存(短期上下文) │
│ Episodic Memory : 对话/事件流(时间序列,可检索) │
│ Semantic Memory : 用户画像(LLM 抽取的结构化事实) │
└────────────────────────────────────────────────────────────┘
接下来的所有章节,都是围绕这三种记忆,以及围绕它们的存储、检索、生命周期管理展开的。
第 1 章 30 秒看懂 MemMachine
1.1 一张图看懂
1.2 关键模块的源码位置
packages/server/src/memmachine_server/
├── main/memmachine.py # 总入口 MemMachine 主类
├── episodic_memory/
│ ├── episodic_memory.py # 单会话情景记忆 (STM+LTM 编排)
│ ├── episodic_memory_manager.py # 多会话/LRU 缓存
│ ├── short_term_memory/ # deque + 滚动摘要
│ ├── long_term_memory/ # 基于 VectorGraphStore 的 LTM
│ ├── declarative_memory/ # LTM 的底层:派生 + 嵌入
│ └── event_memory/ # v0.3.x 新一代 LTM
│ ├── event_memory.py
│ ├── segmenter/ # Event → Segment
│ ├── deriver/ # Segment → Derivative
│ └── segment_store/ # Segment 持久化
├── semantic_memory/
│ ├── semantic_memory.py # 后台抽取服务
│ ├── semantic_ingestion.py # 单 set 的处理循环
│ ├── semantic_llm.py # LLM 抽取/巩固调用
│ ├── cluster_manager.py # 相邻语义特征聚类
│ └── util/semantic_prompt_template.py # Profile 提示词
├── retrieval_agent/
│ ├── service_locator.py # 工厂函数
│ └── agents/
│ ├── tool_select_agent.py # LLM 路由
│ ├── coq_agent.py # Chain-of-Query 迭代式
│ ├── split_query_agent.py # 多实体并发查询
│ └── memmachine_retriever.py# 直接查 LTM
├── common/
│ ├── vector_store/ # 向量库统一抽象
│ ├── vector_graph_store/ # 图+向量统一抽象
│ ├── episode_store/ # 原始 Episode 持久化
│ ├── embedder/, reranker/, language_model/
│ ├── filter/ # FilterExpr DSL
│ └── metrics_factory/ # Prometheus
└── server/ # FastAPI + MCP HTTP/Stdio
后续所有章节都会回链到具体文件与行号。
第 2 章 核心数据模型
理解 MemMachine 之前,先认清下面几个数据类型。
2.1 Episode:一切的起点
所有写入 MemMachine 的最小单位都是 Episode。它定义在 packages/server/src/memmachine_server/common/episode_store/ 之下:
class Episode:
uid: str # 唯一标识符
content: str # 消息内容
session_key: str # 所属会话,例如 "my_org/my_project"
producer_id: str # 内容产生者,例如 "alice"
producer_role: str # 角色:user / assistant / system
produced_for_id: str # 内容接收者,例如 "travel_agent"
episode_type: EpisodeType # MESSAGE / TEXT / ...
content_type: ContentType # MESSAGE / TEXT / ...
sequence_num: int # 会话内的递增序号
created_at: datetime
metadata: dict # 业务自定义元数据
filterable_metadata: dict # 用于检索过滤的元数据子集
Episode 与一个 OpenAI/Anthropic 消息几乎一一对应;但相比裸消息,它额外携带了 session_key、filterable_metadata 与 sequence_num,这些字段是后续多租户隔离、按维度过滤、跨会话排序的关键。
2.2 Segment / Derivative:事件记忆里的派生单元
EventMemory 引入了"事件 → 片段 → 派生"的三级抽象,源码:packages/server/src/memmachine_server/episodic_memory/event_memory/data_types.py:96-188。
class Event:
uuid: UUID
timestamp: datetime
context: Context # ProducerContext | NullContext
blocks: list[Block] # 一条事件可包含多个 Block
class Segment: # Event 的最小可检索片段
uuid: UUID
event_uuid: UUID
index: int # 在 event.blocks 中的位置
offset: int # 在 block 内的切分偏移
timestamp: datetime
block: Block
class Derivative: # Segment 派生出的可向量化文本
uuid: UUID
segment_uuid: UUID
block: Block # 通常是 TextBlock
为什么要分这么细?
- Block:一个 Block 代表一种内容类型(文本/JSON/HTML),不同 Block 类型对应不同的下游处理。当前主要实现是
TextBlock。 - Segment:把一条事件按"语义边界"切成小块,让向量召回更精准。文本场景下,由
TextSegmenter(packages/server/src/memmachine_server/episodic_memory/event_memory/segmenter/text_segmenter.py)使用 LangChain 的RecursiveCharacterTextSplitter在中文/英文标点上递归切分。 - Derivative:在 Segment 之上再做派生(如"格式化为 ‘producer: text’"、“按句子切分”)。由
WholeTextDeriver/SentenceTextDeriver(packages/server/src/memmachine_server/episodic_memory/event_memory/deriver/text_deriver.py:47-80)产出。
真正写进向量库的是 Derivative,不是 Event 本身。一条 Event 可能展开为 5 个 Segment、10 个 Derivative,从而把召回粒度精细到句子级别。
2.3 SemanticFeature:语义记忆的最小单元
# packages/server/src/memmachine_server/semantic_memory/semantic_model.py:70-103
class SemanticFeature:
set_id: SetIdT # 所属"集合"(典型为 org/project 或 user)
category: str # 分类,如 "profile_prompt"
tag: str # 顶级标签,如 "Demographic Information"
feature_name: str # 二级特征名,如 "name"
value: str # 实际值,如 "张三"
metadata: Metadata # 包含 id、citations(关联 Episode UID)
注意它是 三元组 + 引用:(tag, feature_name, value) 描述事实,metadata.citations 反向链回原始 Episode,便于做"为什么记忆里有这条?"的可追溯审计。
2.4 三层命名空间
MemMachine 使用三层结构隔离记忆:
Organization (org_id)
└── Project (project_id)
└── Session (session_id)
└── 具体的 Episode / Feature
源码里它们被合并成一个字符串:
session_key = f"{org_id}/{project_id}" # 形式由配置决定
session_key 是所有底层存储的过滤主键。多租户场景下:
org_id用来隔离组织(一般对应公司/客户);project_id用来隔离产品/应用(不同 Agent 互不可见);metadata.user_id/metadata.agent_id/metadata.session_id在同一个(org, project)内做更细粒度切分。
提示:SDK 里
Project.memory(user_id=..., agent_id=..., session_id=...)调用,最终都会把这些字段写到 Episode 的metadata和filterable_metadata,从而保证后续检索可以按它们过滤。
第 3 章 写入路径:一条消息进来之后发生了什么
这是 MemMachine 工作流里最关键的一段代码。理解它,你就理解了 80% 的 MemMachine。
3.1 入口:MemMachine.add_episodes()
代码位置:packages/server/src/memmachine_server/main/memmachine.py:649-703。删掉日志、计数器之后的核心逻辑:
async def add_episodes(
self,
session_data,
episode_entries,
*,
target_memories=ALL_MEMORY_TYPES,
) -> list[EpisodeIdT]:
# 1) 原始 Episode 永久落盘(关系型/嵌入式存储)
episode_storage = await self._resources.get_episode_storage()
episodes = await episode_storage.add_episodes(
session_data.session_key, episode_entries,
)
tasks = []
# 2) 情景记忆(STM + LTM/EventMemory)
if MemoryType.Episodic in target_memories:
async with episodic_memory_manager.open_or_create_episodic_memory(...) as ep:
tasks.append(ep.add_memory_episodes(episodes))
# 3) 语义记忆(异步排队,等待后台抽取)
if MemoryType.Semantic in target_memories:
tasks.append(
semantic_session_manager.add_message(
episodes=episodes, session_data=session_data,
)
)
await asyncio.gather(*tasks) # 步骤 2、3 并发执行
return [e.uid for e in episodes]
划重点:
- 始终先落 EpisodeStore:哪怕你显式禁用了 Episodic/Semantic,原始事件也已经持久化了,这是事后审计与回溯的基础;
- Episodic / Semantic 写入并发:通过
asyncio.gather,不会互相阻塞; - Semantic 不是同步抽取:
add_message只是把 Episode 加入"待抽取"队列,真正的 LLM 调用发生在SemanticService的后台任务里(见 3.5 节)。
3.2 Episodic 路径:EpisodicMemory.add_memory_episodes()
源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:208-242。
async def add_memory_episodes(self, episodes: list[Episode]) -> None:
# 把 metadata 中所有"标量"提取到 filterable_metadata,
# 这是后续按用户/会话/标签过滤的关键
for episode in episodes:
if episode.metadata is not None and episode.filterable_metadata is None:
episode.filterable_metadata = {
k: v for k, v in episode.metadata.items()
if isinstance(v, get_args(PropertyValue))
}
tasks: list[Coroutine] = []
if self._short_term_memory:
tasks.append(self._short_term_memory.add_episodes(episodes))
if self._long_term_memory:
tasks.append(self._long_term_memory.add_episodes(episodes))
await asyncio.gather(*tasks)
注意 STM 与 LTM 是并发写入 的。它们彼此独立、互不依赖;这意味着你可以单独开启 STM(最便宜,只有内存 + 偶尔的 LLM 摘要)或单独开启 LTM(重一些,需要向量库与图库),也可以同时启用。
3.3 ShortTermMemory:deque + 滚动摘要
源码:packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py。
它的逻辑非常直观——一个带容量的双端队列:
# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:187-209
async def add_episodes(self, episodes: list[Episode]) -> bool:
async with self._lock.write_lock():
self._memory.extend(episodes)
self._current_episode_count += len(episodes)
self._current_message_len += sum(len(e.content) for e in episodes)
full = await self._is_full()
if full:
await self._do_evict()
return full
当总长度 = 已存 episode 字符数 + 摘要字符数 超过 message_capacity(默认 64000,单位为字符)时,触发 eviction:
# packages/server/src/memmachine_server/episodic_memory/short_term_memory/short_term_memory.py:222-240
async def _do_evict(self) -> None:
# 1) 先把已经被摘要过的旧 episode 直接淘汰
while len(self._memory) > self._current_episode_count and await self._is_full():
self._current_message_len -= len(self._memory[0].content)
self._memory.popleft()
# 2) 还是满,就把所有 episode 异步压缩成一份新摘要
if len(self._memory) > 0 and await self._is_full():
result = list(self._memory)
self._current_episode_count = 0
await self._consolidator.summarize(result)
摘要的提示词模板由 packages/server/src/memmachine_server/common/configuration/default_episode_summary_system_prompt.txt 等文件提供,包含"保留实体/事件/未决问题"等关键约束。
为什么用滚动摘要而不是滑动窗口?
- 单纯丢弃旧 Episode 会导致长会话失忆;
- 全部塞入上下文会爆 token;
- 滚动摘要做"压缩 + 增量重写",让短期记忆始终保持一个有界但语义完整的视图。
3.4 LongTermMemory:声明式记忆 + 图存储
LTM 的核心抽象是 DeclarativeMemory:把 Episode 派生为可向量召回的 Derivative,再落到一个同时具备图能力与向量能力的存储(默认 Neo4j)。
async def add_episodes(self, episodes: Iterable[Episode]) -> None:
episodes = sorted(episodes, key=lambda e: (e.timestamp, e.uid))
# 1) Episode → Node:保留原始字段 + 业务可过滤字段
episode_nodes = [Node(uid=e.uid, properties={...}) for e in episodes]
# 2) 并发派生 Derivative(默认 1 episode → N derivative)
episodes_derivatives = await asyncio.gather(
*[self._derive_derivatives(e) for e in episodes]
)
derivatives = [d for lst in episodes_derivatives for d in lst]
# 3) 批量嵌入 Derivative 文本
derivative_embeddings = await self._embedder.ingest_embed(
[d.content for d in derivatives],
)
# 4) 写入两类节点
derivative_nodes = [
Node(uid=d.uid, properties={...},
embeddings={emb_name: (embedding, similarity_metric)})
for d, embedding in zip(derivatives, derivative_embeddings)
]
await asyncio.gather(
self._vector_graph_store.add_nodes(self._episode_collection, episode_nodes),
self._vector_graph_store.add_nodes(self._derivative_collection, derivative_nodes),
)
# 5) 建立 (Derivative)-[DERIVED_FROM]->(Episode) 关系
await self._vector_graph_store.add_edges(
relation=self._derived_from_relation,
source_collection=self._derivative_collection,
target_collection=self._episode_collection,
edges=derivative_episode_edges,
)
派生逻辑见 _derive_derivatives:
# packages/server/src/memmachine_server/episodic_memory/declarative_memory/declarative_memory.py:213-272
match episode.content_type:
case ContentType.MESSAGE:
if not self._message_sentence_chunking:
return [Derivative(content=f"{episode.source}: {episode.content}", ...)]
sentences = extract_sentences(episode.content)
return [
Derivative(content=f"{episode.source}: {sentence}", ...)
for sentence in sentences
]
case ContentType.TEXT:
return [Derivative(content=episode.content, ...)]
两个值得关注的设计:
- 每条 derivative 文本前会拼接
{producer}:。这是给向量模型"加 context"的一种廉价手法,让 “alice: 我喜欢川菜” 与 “bob: 我讨厌川菜” 在嵌入空间天然分开。 - 可选
message_sentence_chunking:开启后,按句子切分嵌入;适合"一条消息里夹杂多种事实"的场景。
最终的 Neo4j 图结构如下:
(:Episode {uid, content, producer_id, ...})
▲
│ [DERIVED_FROM]
│
(:Derivative {uid, content, embedding, ...})
向量索引建立在 Derivative 上;命中之后通过 DERIVED_FROM 反向找回 Episode。
3.5 EventMemory:v0.3.x 引入的新一代情景记忆
EventMemory 与 LongTermMemory 在功能上等价(都是"长期、可检索的情景记忆"),但实现思路不同。它的核心在于:
- 把"图能力"从存储底座中剥离(不再强依赖 Neo4j);
- 用通用的
VectorStore(Qdrant / SQLite-USearch / SQLite-vec)做向量召回; - 用独立的
SegmentStore维护片段邻接关系,实现"以 seed 为中心扩展前后文"。
源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:204-307。
async def _encode_events(self, events: Iterable[Event]) -> None:
events = list(events); self._validate_events(events)
# 阶段 1:Segmenter 把 Event 切成 Segment
segment_lists = await asyncio.gather(
*(self._segmenter.segment(event) for event in events)
)
segments = [s for lst in segment_lists for s in lst]
# 阶段 2:Deriver 从 Segment 派生 Derivative
derivative_lists = await asyncio.gather(
*(self._deriver.derive(seg) for seg in segments)
)
segments_to_derivatives = dict(zip(segments, derivative_lists))
derivatives = [d for lst in derivative_lists for d in lst]
# 阶段 3:批量嵌入
derivative_texts = [EventMemory._extract_text(d.block) for d in derivatives]
derivative_embeddings = await self._embedder.ingest_embed(derivative_texts)
# 阶段 4:写 SegmentStore(保留 Segment 之间的邻接顺序)
await self._segment_store_partition.add_segments({
seg: [d.uuid for d in ds] for seg, ds in segments_to_derivatives.items()
})
# 阶段 5:写 VectorStore(按 Derivative 维度)
derivative_records = [EventMemory._build_derivative_record(d, e)
for d, e in zip(derivatives, derivative_embeddings)]
if derivative_records:
await self._vector_store_collection.upsert(records=derivative_records)
更进一步:
- 五个阶段都在 Prometheus 上以 histogram 形式上报(标签
phase=segmentation/derivation/embedding/segment_store/vector_store),便于运维定位性能瓶颈; _validate_events会拒绝带保留字段的事件(_segment_uuid、_timestamp是 EventMemory 占用的保留字段);- 写入 derivative 时同时写两类属性:系统属性(
_segment_uuid、_timestamp)与用户自定义属性(直接落到向量库 record 的 properties)。
EventMemory vs LongTermMemory 怎么选?
- 想用 Neo4j 一体化做"图遍历 + 向量召回",选 LTM;
- 想跑在轻量级单机(SQLite/sqlite-vec),或想把向量库换成 Qdrant 集群、不引入图数据库,选 EventMemory;
- 两者可以并存,但同一会话通常二选一即可。
3.6 SemanticMemory:把对话蒸馏成"用户画像"
写入路径里最有意思的一段是语义记忆。它不是同步抽取,而是依赖一个后台轮询任务:
# packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:783-836
async def _background_ingestion_task(self) -> None:
ingestion_service = IngestionService(
params=IngestionService.Params(
semantic_storage=self._semantic_storage,
resource_retriever=self._set_id_resource,
history_store=self._episode_storage,
max_features_per_update=self._max_features_per_update,
),
)
backoff_sec = self._background_ingestion_interval_sec
while not self._is_shutting_down:
# 1) 找出"有未抽取消息 / 距上次抽取已经超时"的 set
dirty_sets = [
s async for s in self._semantic_storage.get_history_set_ids(
min_uningested_messages=self._feature_update_message_limit,
older_than=datetime.now(tz=UTC) - self._feature_time_limit,
)
]
if len(dirty_sets) == 0:
await self._interruptible_sleep(self._background_ingestion_interval_sec)
continue
# 2) 批量处理,失败则指数退避
try:
await ingestion_service.process_set_ids(dirty_sets)
except Exception:
had_errors = True
...
# 3) 清理已被消化的历史行
purged = await self._semantic_storage.purge_ingested_rows(dirty_sets)
每个 set 的处理逻辑:
# packages/server/src/memmachine_server/semantic_memory/semantic_ingestion.py:122-278
async def _process_single_set(self, set_id: str) -> None:
resources = await self._resource_retriever(set_id) # 拿 LLM / Embedder / Category
history_ids = [h async for h in self._semantic_storage.get_history_messages(
set_ids=[set_id], limit=5, is_ingested=False,
)]
... # 加载原始 Episode
async def process_semantic_type(semantic_category):
for message in messages:
features = [...] # 已有特征作为"旧 profile"喂给 LLM
commands = await llm_feature_update(
features=features,
message_content=message.content,
model=resources.language_model,
update_prompt=semantic_category.prompt.update_prompt,
)
await self._apply_commands(commands=commands, ...)
await asyncio.gather(*[
process_semantic_type(t) for t in resources.semantic_categories
])
await self._semantic_storage.mark_messages_ingested(...)
await self._consolidate_set_memories_if_applicable(set_id=set_id, resources=resources)
LLM 调用本身在 llm_feature_update 中(packages/server/src/memmachine_server/semantic_memory/semantic_llm.py:69-101),输出格式严格约束为:
{ "commands": [
{"command": "add", "tag": "Demographic Information", "feature": "name", "value": "张三"},
{"command": "delete", "tag": "Hobbies & Interests", "feature": "smoker"}
]}
为什么要走"命令"而不是"覆盖式更新"?
- 命令易于撤销与回放;
- 命令天然支持并发:多个 category 各自独立产出命令;
delete命令使得"用户改主意"这种场景可以优雅落地:旧值删除 + 新值新增。
Profile 抽取提示词的完整骨架在 packages/server/src/memmachine_server/semantic_memory/util/semantic_prompt_template.py:6-153。它把语义记忆设计成两级 K-V:
tag (大类,如 "Demographic Information")
└─ feature (具体特征名,如 "name")
└─ value (值,如 "张三")
预置的 6 个 category(profile_prompt / coding_prompt / writing_assistant_prompt / financial_analyst_prompt / health_assistant_prompt / crm_prompt)都基于这套骨架定制(见 packages/server/src/memmachine_server/server/prompt/default_prompts.py:19-26)。
最后一步是巩固(consolidation):当语义特征积累到阈值(默认 20),就调用 llm_consolidate_features 把"重复 / 关联 / 矛盾"的特征合并、改写、删除。提示词在同一文件的 build_consolidation_prompt,思路类似神经网络的"长时程巩固"——把"原始记忆矿石 → 纯净的记忆颗粒 → 分箱 → 合金记忆"。
3.7 写入路径全景图
到这里,我们已经看完了"写"的完整故事。接下来是"读"。
第 4 章 读取路径:一次查询是如何被回答的
4.1 入口:MemMachine.query_search()
源码:packages/server/src/memmachine_server/main/memmachine.py:916-986。
async def query_search(self, session_data, *, target_memories, query, limit, expand_context,
score_threshold, search_filter, agent_mode=False, ...) -> SearchResponse:
property_filter = parse_filter(search_filter) if search_filter else None
episodic_task = None
semantic_task = None
if MemoryType.Episodic in target_memories:
retrieval_agent = await self._get_retrieval_agent() if agent_mode else None
episodic_task = asyncio.create_task(self._search_episodic_memory(
session_data=session_data, query=query, limit=limit, ...,
retrieval_agent=retrieval_agent,
))
if MemoryType.Semantic in target_memories:
semantic_session = await self._resources.get_semantic_session_manager()
async def _collect_semantic_results():
return [f async for f in semantic_session.search(
message=query, session_data=session_data, ..., search_filter=property_filter,
)]
semantic_task = asyncio.create_task(_collect_semantic_results())
return MemMachine.SearchResponse(
episodic_memory=await episodic_task if episodic_task else None,
semantic_memory=await semantic_task if semantic_task else None,
)
要点:
- 情景与语义查询并发:通过两个
asyncio.create_task; agent_mode=True时启用 RetrievalAgent(第 5 章详述);search_filter是 DSL 字符串:parse_filter把它解析成FilterExpr(见 6.4 节)。
4.2 Episodic 查询的双路径
源码:packages/server/src/memmachine_server/main/memmachine.py:711-767。
async with episodic_memory_manager.open_or_create_episodic_memory(...) as episodic_session:
if retrieval_agent is None or episodic_session.long_term_memory is None:
# 普通路径:直接打底层 query_memory
response = await episodic_session.query_memory(
query=query, limit=limit, expand_context=expand_context,
score_threshold=score_threshold, property_filter=search_filter,
)
else:
# Agent 路径:由 RetrievalAgent 编排 LLM + 多个子代理
response = await self._query_episodic_with_retrieval_agent(
episodic_session=episodic_session, retrieval_agent=retrieval_agent, ...,
)
return response
4.3 普通路径:EpisodicMemory.query_memory()
源码:packages/server/src/memmachine_server/episodic_memory/episodic_memory.py:352-476。简化后的核心:
async def query_memory(self, query, *, limit=None, expand_context=0,
score_threshold=-inf, property_filter=None,
mode=QueryMode.BOTH) -> QueryResponse | None:
search_limit = limit if limit is not None else 20
# 1) STM + LTM 并发查询
session_result, scored_long_episodes = await asyncio.gather(
self._query_short_term_memory(query=query, limit=search_limit, ...),
self._query_long_term_memory(query=query, limit=search_limit,
expand_context=expand_context, ...),
)
short_episode, short_summary = session_result
# 2) 去重:STM 优先(命中的 LTM 如果 uid 已经在 STM,则跳过)
episode_uid_set = {e.uid for e in short_episode}
unique_scored_long_episodes = []
for score, episode in scored_long_episodes:
if episode.uid not in episode_uid_set:
episode_uid_set.add(episode.uid)
unique_scored_long_episodes.append((score, episode))
# 3) 组装 QueryResponse(短期 + 长期 + 摘要)
return EpisodicMemory.QueryResponse(
short_term_memory=ShortTermMemoryResponse(
episodes=[...], episode_summary=[short_summary],
),
long_term_memory=LongTermMemoryResponse(
episodes=[EpisodeResponse(score=score, **e.model_dump())
for score, e in unique_scored_long_episodes],
),
)
注意几个细节:
- STM 优先:同一条 Episode 如果 STM 已经包含,就不会再从 LTM 返回。这避免了"近期会话里说过的话被重复列出"。
- QueryMode 支持三种模式:
BOTH/LONG_TERM_ONLY/SHORT_TERM_ONLY,方便调用方做"只看长期 / 只看短期"。 - STM 不仅返回 episode,还返回 summary:当用户问"刚刚我们聊了啥",summary 就是天然的答案。
4.4 EventMemory 的查询流水线
如果情景记忆走 EventMemory,调用链最终落到 EventMemory.query(),源码:packages/server/src/memmachine_server/episodic_memory/event_memory/event_memory.py:344-524。它分四个阶段:
代码骨架:
async def _query(self, query, *, vector_search_limit, expand_context,
property_filter, format_options) -> QueryResult:
# 阶段 1: 嵌入查询
query_embedding = (await self._embedder.search_embed([query]))[0]
# 阶段 2: 在 derivative 集合上做向量召回
[query_result] = await self._vector_store_collection.query(
query_vectors=[query_embedding], limit=vector_search_limit,
property_filter=collection_filter, return_vector=False, return_properties=True,
)
# 多个 derivative 可能指向同一个 segment,按 segment 去重
seed_embedding_scores: dict[UUID, float] = {}
for match in query_result.matches:
segment_uuid = UUID(str(match.record.properties[_SEGMENT_UUID_FIELD_NAME]))
if segment_uuid not in seed_embedding_scores:
seed_embedding_scores[segment_uuid] = match.score
seed_segment_uuids = list(seed_embedding_scores)
# 阶段 3: 以 seed 为中心向前/向后扩展上下文
max_backward_segments = expand_context // 3
max_forward_segments = expand_context - max_backward_segments
segment_contexts_by_seed = await self._segment_store_partition.get_segment_contexts(
seed_segment_uuids=seed_segment_uuids,
max_backward_segments=max_backward_segments,
max_forward_segments=max_forward_segments,
property_filter=property_filter,
)
# 阶段 4: 评分。无 reranker 用 embedding 分数,有 reranker 用语义重排
if self._reranker is None:
scores = [seed_embedding_scores[u] for u in kept_seed_segment_uuids]
else:
scores = await self._score_segment_contexts(query, segment_contexts, format_options)
# 排序后返回 ScoredSegmentContext 列表
return QueryResult(scored_segment_contexts=[...])
两个值得展开的点:
expand_context1:2 倾斜:源码里max_backward_segments = expand_context // 3,剩下的给前向。原因是大多数对话场景下,后文比前文更能解释当前段(用户提到的"它"通常指代后面才出现的实体)。property_filter的字段名转换:用户写filter_dict={"user_id": "alice"},MemMachine 内部会把user_id翻译为_user_id或m.user_id(依赖于是否为系统字段),具体在_to_vector_record_property方法里。这套规则使得"用户元数据"和"系统元数据"在同一个向量库 record 上共存而不冲突。
4.5 Semantic 查询:向量召回 + 跨 set 过滤
源码:packages/server/src/memmachine_server/semantic_memory/semantic_memory.py:169-200。
async def _set_id_search(self, *, set_id, embedding, min_distance=None, limit=30,
load_citations=False, filter_expr=None) -> AsyncIterator[SemanticFeature]:
filter_expr = _with_has_set_ids(set_ids=[set_id], filter_expr=filter_expr)
async for feature in self._semantic_storage.get_feature_set(
filter_expr=filter_expr, page_size=limit,
vector_search_opts=SemanticStorage.VectorSearchOpts(
query_embedding=np.array(embedding), min_distance=min_distance,
),
load_citations=load_citations,
):
yield feature
特点:
- 语义查询走的是pgvector(默认)或 Neo4j 的向量索引;
- 一次查询可以跨多个
set_id(用merge_async_iterators合并多个 set 的结果); load_citations=True时,返回的特征会带上原始 Episode UID,便于上层做"引用展示"。
4.6 读取路径全景图
第 5 章 RetrievalAgent:Agent Lightning 思路下的智能检索
RetrievalAgent 是 MemMachine 在 v0.3.x 里引入的一个新功能:把"检索"本身视为一个 Agent,而不是一次确定性的向量查询。它的灵感来自 Agent Lightning: Train ANY AI Agents with Reinforcement Learning(arXiv: 2508.03680,见仓库 README.md 的 BibTeX)。
5.1 角色卡:三个 child agent 加一个路由器
源码:packages/server/src/memmachine_server/retrieval_agent/service_locator.py:17-57。
def create_retrieval_agent(*, model, reranker, agent_name="ToolSelectAgent"):
memory_agent = MemMachineAgent(...)
if agent_name == memory_agent.agent_name:
return memory_agent
coq_agent = ChainOfQueryAgent(...)
split_agent = SplitQueryAgent(...)
if agent_name == coq_agent.agent_name:
return coq_agent
if agent_name == split_agent.agent_name:
return split_agent
return ToolSelectAgent(AgentToolBaseParam(
model=model,
children_tools=[split_agent, coq_agent, memory_agent],
extra_params={"default_tool_name": coq_agent.agent_name},
reranker=reranker,
))
四种 agent 的分工:
| Agent | 思路 | accuracy | token_cost | time_cost | 适用查询 |
|---|---|---|---|---|---|
MemMachineAgent |
直接调 EpisodicMemory.query_memory(LONG_TERM_ONLY) |
0 | 0 | 0 | 单跳、直接的事实型查询 |
SplitQueryAgent |
把"包含多个实体的单跳查询"拆成 N 个独立查询并行执行 | - | 中 | 中 | “比较 A 和 B”、“列出 X、Y、Z” |
ChainOfQueryAgent |
迭代式:查 → 判断证据是否充分 → 重写 → 再查 | 10 | 9 | 10 | 多跳推理、必须 hop 才能回答 |
ToolSelectAgent |
LLM 路由器,决定具体调哪一个 child agent | 8 | 6 | 8 | 默认入口;不知道查询类型时 |
5.2 ToolSelectAgent:用 LLM 做路由
核心提示词(节选自 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:21-81):
You are a tool router. ...
GOAL
- Choose exactly one of: {coq}, {split_query}, {memory_retrieval}
- Output NONE only when the query type cannot be determined...
MECHANISM
1) Validate input
2) Classify the query type:
A) MULTI-HOP -> {coq}
B) SINGLE-HOP MULTI ENTITY -> {split_query}
C) SINGLE-HOP / DIRECT -> {memory_retrieval}
它在调用链上的角色:
do_query 实现见 packages/server/src/memmachine_server/retrieval_agent/agents/tool_select_agent.py:179-197,会把所选 agent 的名字写回 perf_metrics["selected_tool"],方便观测。
5.3 ChainOfQueryAgent:迭代式查询
ChainOfQuery 的关键创新是用 单个 LLM 调用 同时完成"充分性判断 + 查询重写"。看核心提示词(packages/server/src/memmachine_server/retrieval_agent/agents/coq_agent.py:25-125)的输出契约:
{
"is_sufficient": true | false,
"evidence_indices": [0, 2, 5],
"new_query": "...",
"confidence_score": 0.0 ~ 1.0
}
调用流程:
attempt = 0
while attempt < max_attempts:
docs = memmachine_agent.do_query(current_query)
rsp = LLM(COMBINED_SUFFICIENCY_AND_REWRITE_PROMPT,
original_query, used_queries, retrieved_docs=docs)
if rsp.is_sufficient and rsp.confidence_score >= 0.8:
break
current_query = rsp.new_query
attempt += 1
return aggregated_docs
实践中你会看到这种回放:
[CoQ #0] query="Marie Curie 的丈夫从事什么领域?" -> 不充分
[CoQ #1] query="Pierre Curie 的研究领域" -> 充分,置信 0.92
这正是 Agent Lightning 论文中的"小模型迭代式 RAG"思路。
5.4 SplitQueryAgent:单跳多实体并发
当 ToolSelectAgent 判定 query 是"单跳但包含多个独立实体"(如 Compare GDP of Japan and South Korea)时,会路由到 SplitQueryAgent。它会用 LLM 把 query 拆成多个子查询并并发执行,再用 reranker 合并去重。源码在 packages/server/src/memmachine_server/retrieval_agent/agents/split_query_agent.py。
5.5 MemMachineAgent:直查 LTM
最便宜的 agent,定义见 packages/server/src/memmachine_server/retrieval_agent/agents/memmachine_retriever.py:47-91。它直接调用 EpisodicMemory.query_memory(mode=LONG_TERM_ONLY),不做任何重写、不调任何额外 LLM。
async def do_query(self, policy, query):
query_response = await query.memory.query_memory(
query=query.query, limit=query.limit,
expand_context=query.expand_context,
score_threshold=query.score_threshold,
property_filter=query.property_filter,
mode=EpisodicMemory.QueryMode.LONG_TERM_ONLY,
)
return [Episode(...) for episode in query_response.long_term_memory.episodes], perf
5.6 Reranker 在 RetrievalAgent 中的作用
AgentToolBase._do_rerank 是所有 child agent 在返回之前的最后一道工序(packages/server/src/memmachine_server/retrieval_agent/common/agent_api.py:97-137):
async def _do_rerank(self, query, episodes):
if query.limit <= 0:
return sorted(episodes, key=lambda x: x.created_at)
if len(episodes) <= query.limit or self._reranker is None:
return sorted(episodes[:query.limit], key=lambda x: x.created_at)
contents = [episodes_to_string([e]) for e in episodes]
scores = await self._reranker.score(query.query, contents) # 带重试 + 限流
result = sorted(zip(episodes, scores), key=lambda x: x[1], reverse=True)
return sorted([r[0] for r in result[:query.limit]], key=lambda x: x.created_at)
排序策略很务实:
- 先按重排分数挑 Top-K,保证质量;
- 再按
created_at升序输出,让上层把"语义最相关但时间正确"的 Episode 串起来; - 重排出错且是限流时,自动
await sleep(5)后重试,最多 60 次。
可选的 reranker(在 packages/server/src/memmachine_server/common/reranker/ 下):
identity:不改变顺序,作为占位;bm25:纯文本匹配;rrf-hybrid:Reciprocal Rank Fusion 把 embedding 分与 BM25 分融合;cohere:Cohere Rerank API;amazon-bedrock:AWS Bedrock Rerank API。
第 6 章 存储抽象:MemMachine 如何在不同后端间自由切换
MemMachine 的"可移植性"来自三个核心抽象:EpisodeStore、VectorStore、VectorGraphStore。它们各自承担不同的职责。
6.1 三类存储抽象
| 抽象 | 主要职责 | 典型实现 | 适用场景 |
|---|---|---|---|
EpisodeStore |
永久持久化原始 Episode | PostgreSQL / SQLite | 关系/SQL 风格的事件存档 |
VectorStore |
纯向量检索 + 属性过滤 | Qdrant、SQLite-USearch、SQLite-vec | EventMemory、可独立扩展的向量层 |
VectorGraphStore |
向量 + 图遍历一体 | Neo4j、NebulaGraph | 经典长期情景记忆(DERIVED_FROM 等关系) |
SemanticStorage |
结构化特征 + 向量索引 | PostgreSQL + pgvector、Neo4j | 语义记忆(Profile) |
6.2 VectorStore 抽象
源码:packages/server/src/memmachine_server/common/vector_store/vector_store.py。
核心接口(伪代码):
class VectorStore:
async def create_collection(namespace: str, name: str,
config: VectorStoreCollectionConfig) -> VectorStoreCollection
async def get_collection(...) -> VectorStoreCollection | None
async def drop_collection(...) -> None
class VectorStoreCollection:
async def upsert(records: list[Record]) -> None
async def query(query_vectors, *, limit, property_filter,
return_vector, return_properties) -> list[QueryResult]
async def get(uuids) -> list[Record]
async def delete(uuids) -> None
约束:
- 每个 Collection 在创建时固定
dimensions、similarity_metric、indexed_properties_schema。换嵌入模型要么用新 Collection,要么用别名机制。 - 命名约束
[a-z0-9_]+、单段 ≤ 32 字节,是为了和 PostgreSQL 表名、Neo4j label、Qdrant collection 名等都兼容。 Record.properties同时支持系统属性(如_segment_uuid)与用户属性(直接进入 record)。EventMemory 利用这一点把"切片元数据"和"业务元数据"放在一张表里。
6.3 VectorGraphStore 抽象
源码:packages/server/src/memmachine_server/common/vector_graph_store/。
这一抽象的特点是:节点既能向量检索,又能图遍历。Neo4j 的 vector index + Cypher 是天然实现;NebulaGraph 5.2+ 也内置了向量类型。
它的主要接口可以理解为 VectorStore 的超集:
add_nodes / add_edges:写节点与关系;query_nodes:向量召回 + 属性过滤;traverse:从命中的节点出发做图遍历(如沿[DERIVED_FROM]拿回 Episode)。
这就是为什么 LongTermMemory 用一个抽象解决了"召回 + 关联",不需要再调用 EpisodeStore。
6.4 FilterExpr:跨存储的过滤 DSL
源码:packages/server/src/memmachine_server/common/filter/filter_parser.py。
用户写:
user_id = "alice" AND (tag IN ("flight", "hotel") OR session_id = "s001")
被解析成 FilterExpr 树:
And(
left=Comparison(field="user_id", op="=", value="alice"),
right=Or(
left=In(field="tag", values=["flight", "hotel"]),
right=Comparison(field="session_id", op="=", value="s001"),
)
)
然后由每种存储实现各自把这棵树翻译成原生过滤:
| 存储 | 翻译目标 |
|---|---|
| PostgreSQL | WHERE 子句 + JSONB 操作符 |
| Qdrant | Filter 嵌套 must / should |
| Neo4j | Cypher WHERE 子句 |
| SQLite-vec | WHERE 子句 |
这样调用方写一份 filter 字符串,无论后端怎么切换都能工作。
6.5 公共组件总览
packages/server/src/memmachine_server/common/ 下的组件构成了 MemMachine 的"标准件库":
| 组件 | 作用 | 支持的实现 |
|---|---|---|
embedder/ |
嵌入模型抽象 | OpenAI、Bedrock、Ollama、任意 OpenAI 兼容 |
language_model/ |
LLM 抽象,统一 generate_response / generate_parsed_response |
OpenAI、Anthropic、Bedrock、Ollama、OpenAI 兼容 |
reranker/ |
重排序器 | identity、bm25、rrf-hybrid、cohere、amazon-bedrock |
metrics_factory/ |
Prometheus 指标 | Counter / Summary / Histogram |
resource_manager/ |
按名字索取上述资源 + 缓存 | - |
session_manager/ |
会话信息存取 | PostgreSQL / SQLite |
episode_store/ |
原始 Episode 持久化 | PostgreSQL / SQLite |
ResourceManager 是这套组件的"中央集线器":MemMachine 主类里所有 await self._resources.get_embedder(...) 都从它出。
第 7 章 工程细节:决定生产可用的关键
7.1 实例 LRU 缓存
每个 (org_id, project_id) 都需要一个 EpisodicMemory 实例(持有 STM 内存、LTM 连接),频繁创建/销毁会拖慢响应。源码:packages/server/src/memmachine_server/episodic_memory/instance_lru_cache.py 与 episodic_memory_manager.py。
# packages/server/src/memmachine_server/episodic_memory/episodic_memory_manager.py:43-52
class EpisodicMemoryManagerParams(BaseModel):
instance_cache_size: int = 100 # 同时驻留的最大实例数
max_life_time: int = 600 # 空闲多久后回收(秒)
...
机制要点:
- 引用计数:通过
release_ref跟踪有多少协程正在访问,正在用的不淘汰; - 后台清理协程:每 2 秒扫一次,把空闲超时的实例 close 掉;
- session-level 读写锁:保证同一会话同时只有一个 create 在跑。
这套机制让你可以同时服务上万会话,而 Neo4j / Qdrant 的连接池不会爆。
7.2 全异步 IO 模型
整个 MemMachine 服务端是纯 async def:
asyncio.gather用来 fan-out(写入 STM/LTM/Semantic 并发、STM/LTM 并发查询);asyncio.TaskGroup用在 ingestion 时收集 episode(Python 3.12+);- 所有外部 IO(数据库、嵌入、LLM)都是协程友好的库;
rw_locks.AsyncRWLock实现了非阻塞读写锁(一写多读)。
理解这一点之后就不会惊讶为什么 MemMachine 在单 worker 下也能撑住几千 QPS——FastAPI 默认 single-process + uvicorn 已经够了。
7.3 多 Worker 部署
packages/server/src/memmachine_server/server/app.py:100-144 给了如何起多 worker:
workers_env = os.getenv("MEMMACHINE_WORKERS")
if workers_env:
workers = int(workers_env)
else:
workers = 1 # 容器化环境默认 1,避免误用宿主 CPU 数
uvicorn.run("memmachine_server.server.app:app",
host=config.server.host, port=config.server.port,
workers=workers, access_log=True, log_level=...)
提醒:
- 不要无脑设
os.cpu_count():容器里这个值经常是宿主机的核数,不是 cgroup 限额; - 多 worker 共用同一份 Postgres/Neo4j 后端:每个 worker 自己起一个 Python 进程,不共享内存中的 LRU 缓存,因此实际容量是
workers × instance_cache_size。
7.4 会话删除的异步队列
源码:packages/server/src/memmachine_server/main/memmachine.py:309-359。删除流程是软删除 + 异步清理:
async def delete_session(self, session_data) -> None:
await session_manager.update_session_status(
session_key=session_data.session_key,
status=SessionDataManager.SessionStatus.Deleted, # 先打软删除标
)
self._deletion_queue.put_nowait(session_data) # 进入清理队列
后台 _delete_session_worker 依次:
- 删 EpisodeStore 里的所有 episode(分批
EPISODE_DELETE_BATCH_SIZE=1000); - 调用
_cleanup_semantic_history清掉对应的语义引用; - 删 episodic memory;
- 删 semantic memory;
- 最后把 session 行从 session 管理器移除。
为什么要异步?
- 大会话动辄 10 万 episode,同步删会让 API 超时;
- 删除失败时实例仍然是"软删除"状态,可以重试;
- 服务重启时通过
get_sessions_by_status(Deleted)还能把未完成的清理恢复出来(见start()内的key_to_session钩子)。
7.5 配置合并:分阶段使用不同模型
源码:packages/server/src/memmachine_server/main/memmachine.py:420-460。
_with_default_episodic_memory_conf 把"全局默认"和"用户 per-session 配置"做深度合并。这使得:
- 全局可以指定
default_long_term_memory_embedder = openai_large、default_long_term_memory_reranker = cohere; - 单会话可以覆盖:
{"long_term_memory": {"embedder": "bedrock_titan"}}; - 三类 LLM(
agent/answer/judge)可以各自使用不同模型,节省成本(便宜模型做路由 + 重写,贵模型只在最后一步)。
7.6 可观测性
MemMachine 内嵌 Prometheus 指标,几个最有用的指标:
| 指标名 | 含义 |
|---|---|
Ingestion_latency |
EpisodicMemory 写入延迟(毫秒,Summary) |
query_latency |
EpisodicMemory 查询延迟 |
Ingestion_count / query_count |
调用次数 |
event_memory_encode_events_phase_seconds{phase=...} |
EventMemory 五阶段时序 |
event_memory_query_phase_seconds{phase=...} |
EventMemory 查询四阶段时序 |
直接挂 Grafana 即可。配合 event_memory_* 系列指标,你可以一眼看出"是嵌入慢还是 segment_store 慢"。
第 8 章 端到端实战:把所有知识拼成一张图
8.1 写入:memory.add("我喜欢靠窗的座位") 究竟做了多少事?
client = MemMachineClient(base_url="http://localhost:8080")
project = client.get_or_create_project(org_id="acme", project_id="travel")
memory = project.memory(user_id="alice", agent_id="travel_agent", session_id="s001")
memory.add("我喜欢靠窗的座位,最好是商务舱。", metadata={"category": "travel"})
服务端发生了:
- REST 解析 →
AddMemoriesSpec→EpisodeEntry; - 路由到 MemMachine 主类 →
add_episodes(session_data, episode_entries); - EpisodeStore 写入 → 一行
episodes落库,返回新分配的uid; - 并发开启两个任务:
- EpisodicMemory.add_memory_episodes:
- 把
category=travel拷贝到filterable_metadata; - 并发跑 STM 与 LTM:
- STM:append 到 deque;若总长度超阈值,弹出旧 episode + 异步走滚动摘要;
- LTM/EventMemory:把消息切成 segment / derivative,嵌入后写 Neo4j 或 VectorStore;
- 把
- SemanticSessionManager.add_message:仅把 episode_id 写入"待抽取"表;
- EpisodicMemory.add_memory_episodes:
- API 立即返回 episode_ids;
- 若干秒后,后台的
_background_ingestion_task扫到该 set “脏了”,触发:- 拉取 5 条未抽取 episode;
- 对每个语义 category(如
profile_prompt、crm_prompt),调 LLM 产出 SemanticCommand:[{"command":"add", "tag":"Travel Preferences", "feature":"seat_preference", "value":"User prefers window seat in business class"}] - apply commands → 嵌入 → 写 pgvector,并把这条 episode 标记为
is_ingested=True; - 若特征数 ≥ 20,触发巩固 LLM 调用,合并/删除冗余特征。
8.2 读取:memory.search("用户的座位偏好?")
results = memory.search("用户的座位偏好?")
print(results.content.episodic_memory.long_term_memory.episodes[0].content)
# → "我喜欢靠窗的座位,最好是商务舱。"
print(results.content.semantic_memory[0])
# → SemanticFeature(tag="Travel Preferences", feature_name="seat_preference",
# value="User prefers window seat in business class", ...)
服务端:
MemMachine.query_search(session_data, query=...)并发开两个 task;- Episodic task:
- 打开 EpisodicMemory 实例(从 LRU 缓存);
- 普通模式:并发查 STM + LTM;STM 优先去重;按嵌入分或 reranker 分数排序;
- Agent 模式:
ToolSelectAgent用 LLM 判定"这是单跳直接查询" → 路由到MemMachineAgent→ 直查 LTM;
- Semantic task:
- 嵌入 query;
- 在 pgvector 上做向量召回(按 set_id 过滤、
min_distance阈值); - 返回
SemanticFeature列表(含citations回链到原 Episode);
- 合并成
SearchResponse返回。
8.3 跟踪建议
当你想看清楚一条请求的实际行为:
# 1) 提高 server 日志级别
export MEMMACHINE_LOG_LEVEL=DEBUG
# 2) 实时看 EventMemory 五阶段时序
curl http://localhost:8080/metrics | grep event_memory_
# 3) 在 client 端开 timing
import time; t=time.time(); memory.search("..."); print(time.time()-t)
EventMemory 的 encode_events timing: segmentation=... derivation=... ... 日志非常直观,定位瓶颈一目了然。
第 9 章 把 MemMachine 接到你的 Agent
这里只给出"原理视角的决策表"。具体的 SDK / REST / MCP 接入示例可在 MemMachine 仓库的
README.md与examples/目录中找到。
| 接入方式 | 推荐场景 | 关键点 |
|---|---|---|
| Python SDK 内嵌 | 你的 Agent 是 Python 写的,希望最低延迟 | 直接调用 memmachine-client;与 LangChain / LangGraph / CrewAI / LlamaIndex 都有现成 wrapper |
| REST API | 跨语言、跨服务部署(Go / Node / Rust 等);多个 Agent 共享一个 Memory 服务 | 走 /api/v2/memories/*;注意 agent_mode 参数选 RetrievalAgent 与否 |
| MCP Server | Claude Desktop、Cursor 等 MCP 客户端,或想把记忆作为标准 MCP 工具暴露 | 启动 memmachine-mcp-stdio 或 mcp-http,工具签名即记忆 CRUD |
| Function Calling Tool | 在 OpenAI Function Calling、Anthropic Tool Use 框架下显式调用 | 把 add / search 包装成 tool spec;让 LLM 自己决定何时调用 |
| 隐式注入(Sidecar/Hook) | 想让旧 Agent "零侵入"获得记忆 | 在 LLM 请求前 hook,把 memory.search(query) 的结果拼到 system prompt |
原理层面上要选对方式,只需要回答两个问题:
- 你希望 LLM 主动决定何时记忆 / 何时回忆吗?
- 是 → 用 Function Calling Tool 或 MCP;
- 否 → 用 SDK / REST 在应用代码里"埋点"。
- 你能容忍多少端到端延迟?
- <100ms 的本地 Agent → SDK + 嵌入式 SQLite 后端;
- 多租户云服务 → REST + Qdrant/Neo4j;
- 离线分析 → 全部走 REST,启用 RetrievalAgent + 大模型重排。
附录 A:源码路径速查表
附录 B:术语对照
| 中文 | 英文 | 在 MemMachine 中的含义 |
|---|---|---|
| 事件 | Episode | 写入 MemMachine 的最小单位,对应一条消息或一段文本 |
| 工作记忆 | Working Memory | 单次请求的临时上下文,不持久化 |
| 短期记忆 | Short-Term Memory (STM) | 当前会话的滚动摘要 + 最近 episode |
| 长期记忆 | Long-Term Memory (LTM) | 经过派生 / 嵌入后落到图存储的情景记忆 |
| 事件记忆 | Event Memory | v0.3.x 引入的新一代 LTM,基于 VectorStore + SegmentStore |
| 语义记忆 | Semantic Memory | LLM 抽取的结构化用户画像 (tag/feature/value) |
| 派生 | Derivative | 由 Segment 派生出的"可向量化文本" |
| 片段 | Segment | 由 Segmenter 把 Event 切分而成的最小检索单元 |
| 集合 | Set | 语义记忆里的逻辑分组,通常对应 (org_id, project_id) 或某个 user |
| 巩固 | Consolidation | 语义记忆里的"合并/删除/重写"过程 |
| 检索代理 | RetrievalAgent | 把检索看作 Agent 的智能编排模块 |
结语
如果只用一句话总结 MemMachine 的设计:
它把"AI 长期记忆"建模成多层的、可分离的、可观测的存储抽象,再用 LLM 把对话蒸馏成结构化事实,最后用 Agent 化的检索代理把找回来的内容做精准编排。
它不是简单的"向量库套一层 API",而是把人类认知科学(工作 / 情景 / 语义记忆)与现代 LLM 工程实践(异步抽取、Agent Lightning、可插拔后端)有机融合的工程作品。
更多推荐


所有评论(0)