上篇回顾:我们学习了如何通过 守卫机制 保障 AI 安全,通过 运行时系统 管理用户上下文与工具调用,并通过 MCP 协议 集成标准化外部服务。
本篇(下篇) 将深入三大核心能力:长期记忆的高级用法人机协同审批流程三种 RAG 架构实战,助你打造生产级智能体系统。


第四部分:长期记忆(Long-Term Memory)

1. 长期记忆是什么?

LangChain 的长期记忆基于 LangGraph Store,以 命名空间(namespace) + 键(key) 的结构存储任意 JSON 数据,支持:

  • 按用户/组织隔离数据(如 ("users", "user_001")
  • 向量检索(通过嵌入模型实现语义搜索)
  • 结构化过滤(如查找 language = "中文" 的用户)

💡 提示:InMemoryStore 仅用于测试,生产环境应使用 PostgresStoreRedisStore 等持久化后端。


2. 基础用法:读写用户配置

from dataclasses import dataclass
from typing_extensions import TypedDict
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langgraph.store.memory import InMemoryStore

# 定义上下文结构
@dataclass
class UserContext:
    user_id: str

# 定义要保存的用户信息结构(供 LLM 理解)
class UserPreference(TypedDict):
    name: str
    language: str
    style: str  # 如 "简洁"、"详细"

# 创建内存存储
memory_store = InMemoryStore()

# 工具 1:保存用户偏好
@tool
def save_user_preference(preference: UserPreference, runtime: ToolRuntime[UserContext]) -> str:
    """将用户的语言偏好和回复风格保存到长期记忆中。"""
    user_id = runtime.context.user_id
    store = runtime.store
    
    # 写入:命名空间 ("users",),键为 用户ID
    store.put(("users",), user_id, preference)
    return "已成功保存您的偏好设置!"

# 工具 2:读取用户偏好
@tool
def get_user_preference(runtime: ToolRuntime[UserContext]) -> str:
    """从长期记忆中读取用户的语言和风格偏好。"""
    user_id = runtime.context.user_id
    store = runtime.store
    
    memory_item = store.get(("users",), user_id)
    if memory_item:
        return f"用户偏好: {memory_item.value}"
    else:
        return "尚未设置偏好,请先告诉我您的姓名、语言和回复风格。"

# 创建智能体
agent = create_agent(
    model="claude-sonnet-4-5-20250929",
    tools=[save_user_preference, get_user_preference],
    store=memory_store,          # 传递存储
    context_schema=UserContext   # 声明上下文类型
)

# 第一次:用户设置偏好
agent.invoke(
    {"messages": [{"role": "user", "content": "我叫李明,我喜欢中文,回复要简洁。"}]},
    context=UserContext(user_id="user_001")
)

# 第二次:读取偏好
result = agent.invoke(
    {"messages": [{"role": "user", "content": "我的偏好是什么?"}]},
    context=UserContext(user_id="user_001")
)
print(result["messages"][-1].content)
# 输出:用户偏好:{'姓名': '李明', '语言': '中文', '风格': '简洁'}
2.1 代码示例 (以硅基流动API为例子)
import asyncio

from langchain.agents import create_agent
from langchain.chat_models import init_chat_model

from dataclasses import dataclass
from typing_extensions import TypedDict
from langchain.tools import tool, ToolRuntime
from langgraph.store.memory import InMemoryStore  


# 定义上下文结构
@dataclass
class UserContext:
    user_id: str

# 定义要保存的用户信息结构(供 LLM 理解)
class UserPreference(TypedDict):
    name: str
    language: str
    style: str  # 如 "简洁"、"详细"

# 创建内存存储
memory_store = InMemoryStore()

# 工具 1:保存用户偏好
@tool
def save_user_preference(preference: UserPreference, runtime: ToolRuntime[UserContext]) -> str:
    """将用户的语言偏好和回复风格保存到长期记忆中。"""
    user_id = runtime.context.user_id
    store = runtime.store
    
    # 写入:命名空间 ("users",),键为 用户ID
    store.put(namespace = ("users",), key = user_id, value = preference)
    return "已成功保存您的偏好设置!"

# 工具 2:读取用户偏好
@tool
def get_user_preference(runtime: ToolRuntime[UserContext]) -> str:
    """从长期记忆中读取用户的语言和风格偏好。"""
    user_id = runtime.context.user_id
    store = runtime.store
    
    memory_item = store.get(("users",), user_id)
    if memory_item:
        return f"用户偏好: {memory_item.value}"
    else:
        return "尚未设置偏好,请先告诉我您的姓名、语言和回复风格。"


if __name__ == "__main__":
    open_model = init_chat_model(
        model="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
        model_provider="openai",
        api_key="sk-*********************************",
        base_url="https://api.siliconflow.cn/v1/",
    )
    
    agent = create_agent(
        model=open_model,
        tools=[save_user_preference, get_user_preference],
        store=memory_store,          # 传递存储
        context_schema=UserContext   # 声明上下文类型
    )
    
    result =agent.invoke(
        {"messages": [{"role": "user", "content": "我叫李明,我喜欢中文,回复要简洁。"}]},
        context=UserContext(user_id="user_001")
    )
    print(result["messages"][-1].content)

    # 第二次:读取偏好
    result = agent.invoke(
        {"messages": [{"role": "user", "content": "我的偏好是什么?"}]},
        context=UserContext(user_id="user_001")
    )
    print(result["messages"][-1].content)


3. 高级用法:语义搜索记忆

from langchain_community.embeddings import HuggingFaceEmbeddings

# 初始化嵌入模型(用于向量检索)
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")

# 创建支持向量检索的存储
memory_store = InMemoryStore(
    index={
        "embed": lambda texts: embedding_model.embed_documents(texts),  # 嵌入函数
        "dims": 384  # all-MiniLM-L6-v2 的维度
    }
)

# 写入多条记忆(模拟历史对话)
memory_store.put(("chitchat", "user_001"), "1-爱好", {"内容": "用户喜欢编程和爬山。"})
memory_store.put(("chitchat", "user_001"), "2-工作", {"内容": "用户是数据工程师,使用 Python 和 SQL。"})
memory_store.put(("chitchat", "user_001"), "3-目标", {"内容": "用户想学习 LangChain 构建智能体。"})

# 语义搜索:查找与“兴趣”相关的记忆
search_results = memory_store.search(
    namespace=("chitchat", "user_001"),
    query="用户的兴趣爱好是什么?",  # 用自然语言查询
    limit=2  # 返回最相关的 2 条
)

for item in search_results:
    print(f"Key: {item.key}, 内容: {item.value['内容']}")
# 输出可能包含:
# Key: 1-爱好, 内容: 用户喜欢编程和爬山。
# Key: 3-目标, 内容: 用户想学习 LangChain 构建智能体。

应用场景:在聊天机器人中自动召回用户历史兴趣,实现个性化回复。

3.1 代码示例 (以硅基流动API为例子)
import os

from langgraph.store.memory import InMemoryStore
from langchain.embeddings import init_embeddings

os.environ["OPENAI_API_KEY"] = "sk-*********************************"
os.environ["OPENAI_API_BASE"] = "https://api.siliconflow.cn/v1/"

# 初始化嵌入模型(用于向量检索)
embedding_model = init_embeddings(
    model="BAAI/bge-m3",
    provider="openai"
)

# 创建支持向量检索的存储
memory_store = InMemoryStore(
    index={
        "embed": lambda texts: embedding_model.embed_documents(texts),  # 嵌入函数
        "dims": 1024  # all-MiniLM-L6-v2 的维度
    }
)

# 写入多条记忆(模拟历史对话)
memory_store.put(("chitchat", "user_001"), "1-爱好", {"内容": "用户喜欢编程和爬山。"})
memory_store.put(("chitchat", "user_001"), "2-工作", {"内容": "用户是数据工程师,使用 Python 和 SQL。"})
memory_store.put(("chitchat", "user_001"), "3-目标", {"内容": "用户想学习 LangChain 构建智能体。"})

# 语义搜索:查找与“兴趣”相关的记忆
search_results = memory_store.search(
    ("chitchat", "user_001"),
    query="用户的职业是什么?",  # 用自然语言查询
    limit=2  # 返回最相关的 2 条
)

for item in search_results:
    print(f"Key: {item.key}, 内容: {item.value['内容']}")

第五部分:人机协同(Human-in-the-Loop)

1. 什么场景需要人工审批?

  • 高风险操作:转账、删除数据、发送邮件
  • 合规要求:医疗建议、法律文书
  • 质量控制:内容审核、敏感决策

LangChain 的 HumanInTheLoopMiddleware 可在工具调用前暂停执行,等待人工决策。


2. 基础审批流程:允许/拒绝/编辑

from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command

# 假设有两个工具
from langchain.tools import tool

@tool
def send_email(recipient: str, subject: str, body: str) -> str:
    return f"邮件已发送至 {recipient}"

@tool
def delete_database_record(table: str, condition: str) -> str:
    return f"已从 {table} 中删除满足 {condition} 的记录"

# 创建带人机协同的智能体
agent = create_agent(
    model="openai:gpt-4o",
    tools=[send_email, delete_database_record],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                # 发送邮件:允许批准、编辑、拒绝
                "send_email": True,
                # 删除操作:只允许批准或拒绝,禁止编辑(避免误改条件)
                "delete_database_record": {"allowed_decisions": ["approve", "reject"]},
                # 安全工具无需审批(如查询)
                # "query_user_info": False
            },
            description_prefix="⚠️ 敏感操作待审批"
        )
    ],
    checkpointer=InMemorySaver()  # 必须提供检查点以支持暂停/恢复
)

# 配置线程 ID(用于关联同一会话)
config = {"configurable": {"thread_id": "session_123"}}

# 第一步:触发审批
result = agent.invoke(
    {"messages": [{"role": "user", "content": "给 alice@example.com 发一封主题为‘会议提醒’的邮件,内容是‘明天上午10点开会’"}]},
    config=config
)

# 检查是否中断
if "__interrupt__" in result:
    interrupt_info = result["__interrupt__"][0].value
    print("等待人工审批的操作:")
    for request in interrupt_info["action_requests"]:
        print(f"- 工具: {request['name']}, 参数: {request['arguments']}")

# 第二步:人工批准(模拟)
approval_result = agent.invoke(
    Command(resume={"decisions": [{"type": "approve"}]}),
    config=config  # 必须使用相同 thread_id
)

print("最终结果:", approval_result["messages"][-1].content)
# 输出:邮件已发送至 alice@example.com

3. 拒绝并提供反馈

# 拒绝邮件发送,并解释原因
rejection_result = agent.invoke(
    Command(
        resume={
            "decisions": [{
                "type": "reject",
                "message": "收件人域名不合法,请使用公司邮箱(@company.com)"
            }]
        }
    ),
    config=config
)

# 智能体会收到拒绝消息,并可能重新生成
print("AI 收到反馈后:", rejection_result["messages"][-1].content)
# 输出可能为:"抱歉,我将使用您的公司邮箱重新发送。"

最佳实践:拒绝时务必提供清晰、可操作的反馈,帮助 AI 改进下一次尝试。


第六部分:检索增强生成(RAG)三种架构

1. 2 步 RAG:简单高效(适合 FAQ、文档问答)

from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain.chat_models import init_chat_model

# 步骤 1:构建知识库
loader = TextLoader("公司政策.txt", encoding="utf-8")
documents = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = splitter.split_documents(documents)

# 步骤 2:创建向量库
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vectorstore = FAISS.from_documents(chunks, embedding)
retriever = vectorstore.as_retriever()

# 步骤 3:构建 2 步 RAG 链
template = """根据以下上下文回答问题:
{context}

问题:{question}
"""
prompt = ChatPromptTemplate.from_template(template)
model = init_chat_model("openai:gpt-4o")

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 使用
answer = rag_chain.invoke("公司年假有多少天?")
print(answer)

优点:一次检索 + 一次生成,延迟低,逻辑清晰。
缺点:无法动态决定是否需要检索,或多次检索。

1.1 代码示例 (以硅基流动API为例子)
import os

from langchain.chat_models import init_chat_model
from langchain.embeddings import init_embeddings

from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import TextLoader

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

from langchain_text_splitters import RecursiveCharacterTextSplitter


os.environ["OPENAI_API_KEY"] = "sk-******************************************"
os.environ["OPENAI_API_BASE"] = "https://api.siliconflow.cn/v1/"

# 初始化嵌入模型(用于向量检索)
embedding_model = init_embeddings(model="BAAI/bge-m3",provider="openai")

open_model = init_chat_model(
        model="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
        model_provider="openai",
        api_key="sk-******************************************",
        base_url="https://api.siliconflow.cn/v1/",
    )

if __name__ == "__main__":
    loader = TextLoader("三国演义(原文版).txt", encoding="utf-8")
    documents = loader.load()
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    chunks = splitter.split_documents(documents)
    
    vectorstore = FAISS.from_documents(chunks, embedding_model)
    retriever = vectorstore.as_retriever()
    
    template = """根据以下上下文回答问题:
    {context}

    问题:{question}
    """
    prompt = ChatPromptTemplate.from_template(template)

    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()}
        | prompt
        | open_model
        | StrOutputParser()
    )

    # 使用
    answer = rag_chain.invoke("桃园结义的三人是谁")
    print(answer)

2. Agentic RAG:灵活推理(适合复杂查询)

from langchain.tools import tool
from langchain.agents import create_agent

@tool
def retrieve_company_policy(keyword: str) -> str:
    """根据关键词从公司政策知识库中检索相关内容。"""
    results = vector_store.similarity_search(keyword, k=2)
    return "\n".join([doc.page_content for doc in results])

# Create agent with retrieval tool
agent = create_agent(
    model="claude-sonnet-4-5-20250929",
    tools=[retrieve_company_policy],
    system_prompt="当用户问题涉及公司政策时,必须先使用`检索公司政策`工具获取信息,再回答。"
)

# The agent will autonomously decide whether to invoke the tool
result = agent.invoke({
    "messages": [{"role": "user", "content": "我可以请多少天病假?需要什么手续?"}]
})
print(result["messages"][-1].content)
# 智能体流程:
# 1. 判断需要外部知识 → 调用 检索公司政策("病假")
# 2. 根据检索结果生成答案

优点:LLM 自主决策,支持多轮检索、多工具组合。
缺点:可能不调用工具(需强系统提示),延迟不可控。


3. 混合 RAG:带验证的高质量系统

# 混合 RAG 核心思想:检索 → 验证 → 生成 → 再验证

```python
@tool
def query_knowledge_base(question: str) -> list[str]:
    docs = vector_store.similarity_search(question, k=3)
    return [d.page_content for d in docs]

@tool
def validate_answer_relevance(question: str, context: list[str], answer: str) -> bool:
    """用 LLM 判断答案是否基于上下文且相关。"""
    validation_model = init_chat_model("gpt-4o-mini")
    prompt = f"""
        问题:{question}
        上下文:{' '.join(context)[:1000]}
        答案:{answer}

        请判断答案是否:
        1. 基于提供的上下文
        2. 准确回答了问题

        只回答 "YES" 或 "NO"。
        """
    response = validation_model.invoke([{"role": "user", "content": prompt}])
    return "YES" in response.content

# 在智能体中组合使用
agent = create_agent(
    model="claude-sonnet-4-5",
    tools=[query_knowledge_base, validate_answer_relevance],
    system_prompt="""
        你必须:
        1. 先用 `查询知识库` 获取信息
        2. 生成初步答案
        3. 用 `验证答案相关性` 检查答案质量
        4. 如果验证失败,重新思考或告知用户信息不足
        """
)

适用场景:医疗、金融、法律等对准确性要求极高的领域。

总结

能力 关键组件 适用场景
守卫 PIIMiddleware, 自定义中间件 安全合规、内容过滤
运行时 Context, Store, StreamWriter 多租户、个性化、记忆
MCP MCP 服务器 + 客户端 工具标准化、微服务集成
长期记忆 LangGraph Store + 命名空间 用户画像、对话历史
人机协同 HITL + Checkpointer 高风险操作审批
RAG 2步 / Agentic / 混合 知识问答、动态检索
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐