玄同 765

大语言模型 (LLM) 开发工程师 | 中国传媒大学 · 数字媒体技术(智能交互与游戏设计)

CSDN · 个人主页 | GitHub · Follow


关于作者

  • 深耕领域:大语言模型开发 / RAG 知识库 / AI Agent 落地 / 模型微调
  • 技术栈:Python | RAG (LangChain / Dify + Milvus) | FastAPI + Docker
  • 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案

「让 AI 交互更智能,让技术落地更高效」
欢迎技术探讨与项目合作,解锁大模型与智能交互的无限可能!


LangChain 记忆模块深度解析

摘要

记忆是 AI 代理的核心能力之一。LangChain v1.0+ 提供了一套完整的记忆解决方案,包括消息历史存储、键值存储、向量检索等多种机制。本文将深入解析 LangChain 的记忆架构,并通过实际代码示例展示如何构建生产级的记忆系统。


一、记忆系统概述

1.1 为什么需要记忆

AI 代理需要记忆来:

场景 记忆类型 示例
对话连贯 短期记忆 用户刚才说了什么
用户偏好 长期记忆 用户喜欢用 Python
知识积累 语义记忆 项目相关的技术文档
任务追踪 工作记忆 待办事项列表

1.2 LangChain 记忆架构

管理层

存储层

策略层

窗口截断

Token限制

摘要压缩

ChatMessageHistory
消息历史

BaseStore
键值存储

VectorStore
向量存储

RunnableWithMessageHistory
历史管理器

MemoryManager
自定义管理器

大语言模型


二、核心组件详解

2.1 BaseStore - 通用键值存储

BaseStore 是 LangChain v1.0+ 引入的通用存储接口:

from langchain_core.stores import BaseStore
from typing import Generic, TypeVar, Iterator, Sequence

V = TypeVar("V")

class BaseStore(Generic[V], ABC):
    """
    通用键值存储抽象基类。
  
    核心方法:
    - get: 获取单个值
    - set: 设置单个值
    - delete: 删除单个值
    - mget: 批量获取
    - mset: 批量设置
    - yield_keys: 遍历键
    """
  
    @abstractmethod
    def get(self, key: str) -> V | None:
        """获取单个值。"""
        pass
  
    @abstractmethod
    def set(self, key: str, value: V) -> None:
        """设置单个值。"""
        pass
  
    @abstractmethod
    def delete(self, key: str) -> None:
        """删除单个值。"""
        pass
  
    def mget(self, keys: Sequence[str]) -> list[V | None]:
        """批量获取值。"""
        return [self.get(key) for key in keys]
  
    def mset(self, key_value_pairs: Sequence[tuple[str, V]]) -> None:
        """批量设置值。"""
        for key, value in key_value_pairs:
            self.set(key, value)
  
    def yield_keys(self, prefix: str | None = None) -> Iterator[str]:
        """遍历所有键。"""
        raise NotImplementedError

2.2 内置存储实现

LangChain 提供了多种开箱即用的实现:

存储类型 类名 适用场景
内存存储 InMemoryStore 开发测试
文件存储 FileStore 本地持久化
Redis 存储 RedisStore 分布式部署
SQLite 存储 SQLiteStore 轻量级持久化
Elasticsearch ElasticsearchStore 企业级搜索

使用示例

from langchain_core.stores import InMemoryStore
from langchain_community.storage import RedisStore, SQLiteStore

# 内存存储
memory_store = InMemoryStore()
memory_store.set("user:1", {"name": "Alice", "preferences": {"theme": "dark"}})

# Redis 存储
redis_store = RedisStore(redis_url="redis://localhost:6379", ttl=3600)
redis_store.set("session:abc123", {"user_id": "user:1", "created_at": "2026-02-12"})

# SQLite 存储
sqlite_store = SQLiteStore(connection_string="sqlite:///memories.db")
sqlite_store.set("memory:1", "用户喜欢使用 Python 编程")

# 批量操作
sqlite_store.mset([
    ("memory:2", "用户正在学习 LangChain"),
    ("memory:3", "用户偏好 VS Code 编辑器"),
])

# 批量获取
memories = sqlite_store.mget(["memory:1", "memory:2", "memory:3"])

2.3 ChatMessageHistory - 消息历史存储

专门用于存储对话历史的接口:

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

class BaseChatMessageHistory(ABC):
    """
    聊天消息历史抽象基类。
  
    核心方法:
    - messages: 获取所有消息
    - add_message: 添加消息
    - clear: 清空历史
    """
  
    @property
    @abstractmethod
    def messages(self) -> list[BaseMessage]:
        """获取所有消息。"""
        pass
  
    @abstractmethod
    def add_message(self, message: BaseMessage) -> None:
        """添加消息到历史。"""
        pass
  
    def clear(self) -> None:
        """清空消息历史。"""
        pass
  
    # 便捷方法
    def add_user_message(self, message: str) -> None:
        """添加用户消息。"""
        self.add_message(HumanMessage(content=message))
  
    def add_ai_message(self, message: str) -> None:
        """添加 AI 消息。"""
        self.add_message(AIMessage(content=message))

自定义文件存储实现

import json
from pathlib import Path
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage

class FileChatMessageHistory(BaseChatMessageHistory):
    """
    基于文件的消息历史存储。
  
    将对话历史以 JSON 格式存储在文件中。
    文件格式:
    [
        {"role": "user", "content": "你好"},
        {"role": "assistant", "content": "你好!有什么可以帮助你的?"}
    ]
    """
  
    def __init__(self, file_path: str):
        self.file_path = Path(file_path)
        self._messages: list[BaseMessage] = []
        self._load_messages()
  
    @property
    def messages(self) -> list[BaseMessage]:
        """获取所有消息。"""
        return self._messages
  
    def _load_messages(self) -> None:
        """从文件加载消息。"""
        if self.file_path.exists():
            data = json.loads(self.file_path.read_text())
            for msg in data:
                match msg["role"]:
                    case "user":
                        self._messages.append(HumanMessage(content=msg["content"]))
                    case "assistant":
                        self._messages.append(AIMessage(content=msg["content"]))
                    case "system":
                        self._messages.append(SystemMessage(content=msg["content"]))
  
    def add_message(self, message: BaseMessage) -> None:
        """添加消息到历史。"""
        self._messages.append(message)
        self._save_messages()
  
    def _save_messages(self) -> None:
        """保存消息到文件。"""
        self.file_path.parent.mkdir(parents=True, exist_ok=True)
      
        data = []
        for msg in self._messages:
            match msg:
                case HumanMessage():
                    data.append({"role": "user", "content": msg.content})
                case AIMessage():
                    data.append({"role": "assistant", "content": msg.content})
                case SystemMessage():
                    data.append({"role": "system", "content": msg.content})
      
        self.file_path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
  
    def clear(self) -> None:
        """清空消息历史。"""
        self._messages = []
        if self.file_path.exists():
            self.file_path.unlink()

三、构建多层记忆系统

3.1 三层记忆架构

第三层 - 长期记忆

Long-term Memory
用户偏好/知识库

第二层 - 工作记忆

Daily Notes
每日笔记

第一层 - 会话记忆

Session History
当前对话

上下文构建

LLM

3.2 完整实现

"""
LangChain 多层记忆系统实现。

三层记忆架构:
1. 会话记忆 - 当前对话历史
2. 工作记忆 - 每日笔记
3. 长期记忆 - 用户偏好和知识库

作者: LangChain 学习者
日期: 2026-02
"""

from datetime import datetime
from pathlib import Path
from typing import Optional

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_core.stores import BaseStore
from pydantic import BaseModel


class LongTermMemory(BaseStore[str]):
    """
    长期记忆存储 - 持久化的用户偏好和知识。
  
    存储内容:
    - 用户偏好(编程语言、编辑器等)
    - 项目信息
    - 重要事实
  
    存储格式: Markdown 文件
    """
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.memory_file = workspace / "MEMORY.md"
  
    def get(self, key: str) -> str | None:
        """获取长期记忆。"""
        if self.memory_file.exists():
            content = self.memory_file.read_text(encoding="utf-8")
            return content if content.strip() else None
        return None
  
    def set(self, key: str, value: str) -> None:
        """设置长期记忆。"""
        self.memory_file.parent.mkdir(parents=True, exist_ok=True)
        self.memory_file.write_text(value, encoding="utf-8")
  
    def delete(self, key: str) -> None:
        """删除长期记忆。"""
        if self.memory_file.exists():
            self.memory_file.unlink()
  
    def append(self, content: str) -> None:
        """追加内容到长期记忆。"""
        existing = self.get("main") or ""
        self.set("main", existing + "\n\n" + content)
  
    def as_system_message(self) -> Optional[SystemMessage]:
        """转换为系统消息格式。"""
        content = self.get("main")
        if content and content.strip():
            return SystemMessage(
                content=f"<long_term_memory>\n{content}\n</long_term_memory>"
            )
        return None


class DailyNotes(BaseStore[str]):
    """
    每日笔记存储 - 当天的工作记录。
  
    存储内容:
    - 今日任务
    - 会议记录
    - 临时想法
  
    存储格式: YYYY-MM-DD.md
    """
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
  
    def _get_today_file(self) -> Path:
        """获取今日笔记文件路径。"""
        today = datetime.now().strftime("%Y-%m-%d")
        return self.workspace / f"{today}.md"
  
    def get(self, key: str) -> str | None:
        """获取今日笔记。"""
        file = self._get_today_file()
        if file.exists():
            content = file.read_text(encoding="utf-8")
            return content if content.strip() else None
        return None
  
    def set(self, key: str, value: str) -> None:
        """设置今日笔记。"""
        file = self._get_today_file()
        file.parent.mkdir(parents=True, exist_ok=True)
        file.write_text(value, encoding="utf-8")
  
    def delete(self, key: str) -> None:
        """删除今日笔记。"""
        file = self._get_today_file()
        if file.exists():
            file.unlink()
  
    def append(self, content: str) -> None:
        """追加内容到今日笔记。"""
        timestamp = datetime.now().strftime("%H:%M")
        existing = self.get("main") or ""
        new_content = f"{existing}\n- [{timestamp}] {content}"
        self.set("main", new_content.strip())
  
    def as_system_message(self) -> Optional[SystemMessage]:
        """转换为系统消息格式。"""
        content = self.get("main")
        if content and content.strip():
            today = datetime.now().strftime("%Y-%m-%d")
            return SystemMessage(
                content=f"<daily_notes date=\"{today}\">\n{content}\n</daily_notes>"
            )
        return None


class SessionHistory(BaseChatMessageHistory, BaseModel):
    """
    会话历史存储 - 当前对话的消息记录。
  
    存储格式: JSON 文件
    """
  
    session_id: str
    workspace: Path
    _messages: list[BaseMessage] = []
  
    class Config:
        arbitrary_types_allowed = True
  
    def __init__(self, **data):
        super().__init__(**data)
        self._messages = []
        self._load_messages()
  
    @property
    def _file_path(self) -> Path:
        safe_id = self.session_id.replace(":", "_").replace("/", "_")
        return self.workspace / "sessions" / f"{safe_id}.json"
  
    @property
    def messages(self) -> list[BaseMessage]:
        return self._messages
  
    def _load_messages(self) -> None:
        """从文件加载消息。"""
        import json
      
        if self._file_path.exists():
            data = json.loads(self._file_path.read_text())
            for msg in data:
                match msg["role"]:
                    case "user":
                        self._messages.append(HumanMessage(content=msg["content"]))
                    case "assistant":
                        self._messages.append(AIMessage(content=msg["content"]))
                    case "system":
                        self._messages.append(SystemMessage(content=msg["content"]))
  
    def add_message(self, message: BaseMessage) -> None:
        """添加消息。"""
        self._messages.append(message)
        self._save_messages()
  
    def _save_messages(self) -> None:
        """保存消息到文件。"""
        import json
      
        self._file_path.parent.mkdir(parents=True, exist_ok=True)
      
        data = []
        for msg in self._messages:
            match msg:
                case HumanMessage():
                    data.append({"role": "user", "content": msg.content})
                case AIMessage():
                    data.append({"role": "assistant", "content": msg.content})
                case SystemMessage():
                    data.append({"role": "system", "content": msg.content})
      
        self._file_path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
  
    def clear(self) -> None:
        """清空历史。"""
        self._messages = []
        if self._file_path.exists():
            self._file_path.unlink()


class MemoryManager:
    """
    统一记忆管理器 - 整合三层记忆。
  
    使用示例:
        >>> manager = MemoryManager(Path("./workspace"))
        >>> messages = manager.build_context("session:123")
        >>> response = await llm.ainvoke(messages)
    """
  
    def __init__(self, workspace: Path):
        self.workspace = workspace
        self.long_term_memory = LongTermMemory(workspace)
        self.daily_notes = DailyNotes(workspace)
        self._sessions: dict[str, SessionHistory] = {}
  
    def get_session_history(self, session_id: str) -> SessionHistory:
        """获取或创建会话历史。"""
        if session_id not in self._sessions:
            self._sessions[session_id] = SessionHistory(
                session_id=session_id,
                workspace=self.workspace
            )
        return self._sessions[session_id]
  
    def build_context(
        self,
        session_id: str,
        system_prompt: str | None = None
    ) -> list[BaseMessage]:
        """
        构建完整的上下文消息列表。
      
        组装顺序:
        1. 系统提示
        2. 长期记忆
        3. 每日笔记
        4. 会话历史
      
        Args:
            session_id: 会话标识符
            system_prompt: 自定义系统提示
      
        Returns:
            消息列表
        """
        messages: list[BaseMessage] = []
      
        # 1. 系统提示
        if system_prompt:
            messages.append(SystemMessage(content=system_prompt))
        else:
            messages.append(SystemMessage(content=self._default_system_prompt()))
      
        # 2. 长期记忆
        ltm_msg = self.long_term_memory.as_system_message()
        if ltm_msg:
            messages.append(ltm_msg)
      
        # 3. 每日笔记
        notes_msg = self.daily_notes.as_system_message()
        if notes_msg:
            messages.append(notes_msg)
      
        # 4. 会话历史
        session = self.get_session_history(session_id)
        messages.extend(session.messages)
      
        return messages
  
    def _default_system_prompt(self) -> str:
        """默认系统提示。"""
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        return f"""你是一个智能 AI 助手。

当前时间: {now}
工作空间: {self.workspace}

你可以使用工具来帮助用户完成任务。"""
  
    def save_memory(self, content: str) -> None:
        """保存到长期记忆。"""
        self.long_term_memory.append(content)
  
    def save_note(self, content: str) -> None:
        """保存到每日笔记。"""
        self.daily_notes.append(content)

3.3 使用示例

import asyncio
from pathlib import Path
from langchain_openai import ChatOpenAI

async def main():
    # 初始化记忆管理器
    workspace = Path.home() / ".ai_assistant" / "workspace"
    memory = MemoryManager(workspace)
  
    # 初始化 LLM
    llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
  
    # 会话 ID
    session_id = "telegram:user_12345"
  
    # 用户消息
    user_msg = "我喜欢用 Python 编程,正在学习 LangChain"
  
    # 添加到会话历史
    session = memory.get_session_history(session_id)
    session.add_user_message(user_msg)
  
    # 构建上下文
    messages = memory.build_context(session_id)
  
    # 调用 LLM
    response = await llm.ainvoke(messages)
  
    # 保存 AI 响应
    session.add_ai_message(response.content)
  
    # 保存重要信息到长期记忆
    memory.save_memory("用户偏好: 喜欢用 Python 编程")
    memory.save_memory("学习目标: LangChain")
  
    # 保存到每日笔记
    memory.save_note("讨论了编程语言偏好和学习计划")
  
    print(response.content)

if __name__ == "__main__":
    asyncio.run(main())

四、与 LangGraph 集成

4.1 在 StateGraph 中使用记忆

from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI

class AgentState(MessagesState):
    """代理状态。"""
    session_id: str

def build_agent_with_memory(
    llm: ChatOpenAI,
    memory_manager: MemoryManager
):
    """构建带记忆的代理图。"""
  
    async def call_model(state: AgentState) -> dict:
        """调用 LLM 节点。"""
        session_id = state.get("session_id", "default")
      
        # 构建完整上下文
        messages = memory_manager.build_context(session_id)
        messages.extend(state["messages"])
      
        # 调用 LLM
        response = await llm.ainvoke(messages)
      
        return {"messages": [response]}
  
    # 构建图
    graph = StateGraph(AgentState)
    graph.add_node("call_model", call_model)
    graph.set_entry_point("call_model")
  
    # 编译(带 checkpoint 持久化)
    return graph.compile(checkpointer=MemorySaver())

4.2 使用 LangGraph 内置 Store

from langgraph.store.memory import InMemoryStore

# 创建存储
store = InMemoryStore()

# 在节点中访问
def node_with_memory(state, config, *, store):
    # 读取记忆
    memory = store.get(("memories", state["user_id"]))
  
    # 更新记忆
    store.set(
        ("memories", state["user_id"]),
        {"last_interaction": datetime.now().isoformat()}
    )
  
    return state

五、高级功能

5.1 语义搜索(向量存储)

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class SemanticMemory:
    """
    语义记忆 - 使用向量数据库存储和检索记忆。
  
    支持语义搜索,可以找到相关的历史记忆。
    """
  
    def __init__(self, persist_directory: Path):
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            embedding_function=self.embeddings,
            persist_directory=str(persist_directory)
        )
  
    def add_memory(self, content: str, metadata: dict = None) -> None:
        """添加记忆到向量存储。"""
        self.vectorstore.add_texts(
            texts=[content],
            metadatas=[metadata or {"timestamp": datetime.now().isoformat()}]
        )
  
    def search(self, query: str, k: int = 5) -> list[tuple[str, float]]:
        """语义搜索相关记忆。"""
        results = self.vectorstore.similarity_search_with_score(query, k=k)
        return [(doc.page_content, score) for doc, score in results]
  
    def as_context(self, query: str, k: int = 5) -> str:
        """获取与查询相关的记忆上下文。"""
        results = self.search(query, k)
        if not results:
            return ""
      
        context = "\n".join([f"- {content}" for content, _ in results])
        return f"<relevant_memories>\n{context}\n</relevant_memories>"

5.2 记忆压缩

from langchain.chains.summarize import load_summarize_chain
from langchain_core.documents import Document

class MemoryCompressor:
    """
    记忆压缩器 - 使用 LLM 压缩历史记忆。
  
    当记忆过长时,自动生成摘要以节省 Token。
    """
  
    def __init__(self, llm):
        self.llm = llm
        self.chain = load_summarize_chain(llm, chain_type="map_reduce")
  
    async def compress(self, messages: list[BaseMessage]) -> str:
        """压缩消息历史为摘要。"""
        docs = [Document(page_content=m.content) for m in messages]
        result = await self.chain.arun(docs)
        return result
  
    async def compress_if_needed(
        self,
        messages: list[BaseMessage],
        max_tokens: int = 4000
    ) -> list[BaseMessage]:
        """如果超过 Token 限制,则压缩历史。"""
        # 估算 Token 数
        total_chars = sum(len(m.content) for m in messages)
        estimated_tokens = total_chars // 4  # 粗略估算
      
        if estimated_tokens <= max_tokens:
            return messages
      
        # 压缩旧消息
        recent_messages = messages[-10:]  # 保留最近 10 条
        old_messages = messages[:-10]
      
        if old_messages:
            summary = await self.compress(old_messages)
            return [
                SystemMessage(content=f"<conversation_summary>\n{summary}\n</conversation_summary>")
            ] + recent_messages
      
        return recent_messages

六、最佳实践

6.1 记忆分层策略

记忆类型 存储位置 保留时间 示例内容
会话记忆 内存/文件 会话期间 当前对话
工作记忆 文件 1-7 天 今日任务
长期记忆 文件/数据库 永久 用户偏好
语义记忆 向量数据库 永久 知识库

6.2 Token 管理策略

from langchain_core.messages import trim_messages

def manage_context(messages: list[BaseMessage], max_tokens: int = 4000):
    """管理上下文 Token 数量。"""
    return trim_messages(
        messages,
        max_tokens=max_tokens,
        strategy="last",  # 保留最后的消息
        token_counter=len,  # 简单计数器
        include_system=True,  # 保留系统消息
    )

6.3 存储后端选择

场景 推荐存储 原因
开发测试 InMemoryStore 简单快速
单机部署 SQLiteStore 轻量级持久化
分布式部署 RedisStore 高性能共享
企业级应用 ElasticsearchStore 可扩展搜索

七、总结

LangChain 的记忆系统提供了完整的解决方案:

组件 用途 适用场景
BaseStore 通用键值存储 用户偏好、配置
ChatMessageHistory 对话历史存储 会话记忆
VectorStore 向量存储 语义搜索
MemoryManager 统一管理 生产应用

通过合理组合这些组件,可以构建出功能强大的 AI 记忆系统,让代理具备持续学习和个性化服务的能力。


参考资料

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐