一、中间件介绍与存在价值

1.1 什么是中间件

中间件是在 Agent 执行流程的关键节点插入自定义逻辑的机制。

核心思想:切面编程(AOP)

  • 不修改 Agent 核心代码
  • 在特定钩子点注入业务逻辑
  • 多个中间件可链式组合

中间件提供类似 Web 服务器中间件(middleware)的模式:在流程的关键节点之前/之后插入逻辑,从而实现 “监控/修改/控制/强制执行” 四类能力。

执行流程:
用户请求 → [中间件层] → Agent 核心 → [中间件层] → 返回结果

1.2 中间件存在的价值

痛点场景 中间件解决方案
安全合规 PII 脱敏、敏感操作人工审批、命令白名单
成本控制 Token 统计、动态模型选择、调用频率限制
可观测性 请求日志、性能监控、调用链路追踪
上下文管理 长对话自动摘要、历史消息压缩
性能优化 响应缓存、并发控制、超时处理
错误处理 自动重试、降级策略、异常捕获

早期的 Agent 抽象(包括 LangChain 之前版本或其他代理框架)虽然能快速搭建 “模型 → 工具 →模型…” 的循环,但在 复杂场景(生产环境)下,开发者往往需要修改很多环节:比如动态地修改提示词、切换模型、控制工具调用、限制调用频次、摘要会话、监控日志、人工干预等。

核心价值:

  1. 解耦:业务逻辑与核心 Agent 代码分离
  2. 复用:中间件可在多个 Agent 间共享
  3. 组合:多个中间件链式组合,构建完整防护体系
  4. 标准化:统一的钩子接口,降低学习成本

二、6 个核心钩子方法对比总览

钩子方法 触发时机 返回值 主要用途
before_agent Agent 启动前 None / State 初始化、输入校验
before_model 模型调用前(每次) None / Request 修改 Prompt、上下文注入
wrap_model_call 包装整个模型调用 Callable 缓存、重试、流式控制
wrap_tool_call 包装每个工具调用 Callable 权限控制、参数校验、审计
after_model 模型调用后(每次) None / Response 输出过滤、内容脱敏
after_agent Agent 完全结束后 None / State 资源清理、性能统计

执行顺序:
before_agent

[ before_model → wrap_model_call → after_model ] ←(可能循环多次)

(如需调用工具)→ wrap_tool_call

after_agent


三、6 个钩子方法详解与代码示例


三、6 个钩子方法详解与代码示例

3.1 before_agent - Agent 启动前钩子

触发时机:Agent 收到请求后,执行任何逻辑之前(整个流程入口)
方法签名def before_agent(self, state: dict, runtime)
返回值None 或修改后的 state
典型用途:输入校验、权限检查、状态初始化

# -*- coding: utf-8 -*-
"""
# -*- coding: utf-8 -*-
"""
before_agent_middle
Author: user
Date: 2026/3/17
Description: 
"""
from langchain.agents.middleware import before_agent
from typing import Dict, Any, Optional

from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.tools import tool
from langchain_community.callbacks import get_openai_callback
from langchain.chat_models import init_chat_model
from langchain.agents.middleware import AgentMiddleware, ModelRequest
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from langchain.messages import HumanMessage
from dotenv import load_dotenv
load_dotenv(override=True)
from typing import Optional, Callable
import asyncio
import os


@before_agent
def validate_input(state: Dict[str, Any], runtime) -> Optional[Dict[str, Any]]:
    """
    校验输入并初始化上下文

    Args:
        state: Agent 状态字典,必须包含 "messages" 字段
        runtime: 运行时对象,包含 context、user_id 等属性

    Returns:
        修改后的 state 或 None(表示不修改)
    """
    print("enter before_agent")
    print(state,runtime)
    messages = state.get("messages", [])
    if not messages:
        raise ValueError("无效的用户输入")
    elif not (isinstance(messages[-1],HumanMessage) or messages[-1].get("role",'') == "user"):
        raise ValueError("无效的用户输入")
    else:
        print("before_agent 检查成功!")

    # 初始化计数器
    if runtime.context==None:
        runtime.context = {}
    runtime.context["model_calls"] = 0
    runtime.context["start_time"] = __import__("time").time()

    print(f"[before_agent] 用户 {getattr(runtime, 'user_id', 'anonymous')} 启动请求")
    return state

model = init_chat_model(model="deepseek-v3.1", #
                        model_provider='openai',
                        api_key= os.getenv("api_key"),
                        base_url= os.getenv("base_url"),
                        temperature=0.3,
                        max_retries=4,
                        #max_tokens=10
                        )


# 2. 定义工具
@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city}:晴,25度,微风徐徐"

@tool
def get_location() -> str:
    """获取用户位置"""
    return "北京"

# 4. 添加记忆
checkpointer = InMemorySaver()


agent = create_agent(
    model=model,  # 默认模型(会被中间件覆盖)
    tools=[get_weather,get_location],
    middleware=[validate_input],
)

config = {"configurable": {"thread_id": "user-001"}}
for event in agent.stream(
        {"messages": [{"role": "user", "content": "我在哪里?天气如何?"}]},
        context={"user_role": "admin", "session_id": "sess_123"},
        config=config,
        stream_mode="values"
):
    event['messages'][-1].pretty_print()

在这里插入图片描述

3.2 before_model - 模型调用前钩子

触发时机:每次调用 LLM 前(可能多次)
方法签名:def before_model(self, request: ModelRequest)
返回值:None 或修改后的 ModelRequest
典型用途:动态 Prompt、模型路由、上下文增强

# -*- coding: utf-8 -*-
"""
before_model_middle_3
Author: user
Date: 2026/3/17
Description: 
"""
from langchain.agents.middleware import before_model
from langchain.agents.middleware.types import ModelRequest, AgentState
from typing import Optional
from langchain.chat_models import init_chat_model
from langchain.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent
import os
from dotenv import load_dotenv
from langgraph.runtime import Runtime
from langchain.messages import SystemMessage,HumanMessage

load_dotenv(override=True)

model = init_chat_model(model="deepseek-v3.1", #
                        model_provider='openai',
                        api_key= os.getenv("api_key"),
                        base_url= os.getenv("base_url"),
                        temperature=0.3,
                        max_retries=4,
                        #max_tokens=10
                        )

@before_model
def inject_system_prompt(state: AgentState, runtime: Runtime) -> Optional[AgentState]:
    """
    注入系统提示和时间上下文

    Args:
        request: 模型请求对象,包含 messages、model、state、runtime 等属性

    Returns:
        修改后的 request 或 None
    """
    print('enter before_model---------------------------')
    #print(f"state = {state}")
    messages = state['messages']

    print(messages)
    # 确保系统消息存在
    if not isinstance(messages[0],SystemMessage):
        print("注入系统消息~")
        #{"role": "system", "content": "你是一个专业助手,叫小芳,回答用户问题的时候告诉他你是谁。"}
        messages.insert(0, SystemMessage(content="你是一个专业助手,叫小芳,回答用户问题的时候告诉他你是谁。"))

    if isinstance(messages[-1],HumanMessage):
        # 添加当前时间
        import time
        #print("new messages:",messages)
        user_message = messages[-1].model_dump()
        user_message["content"] += f"\n\n[当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}]"
        new_user_message = HumanMessage(**user_message)  # ✅ 重新构造
        #print(f"new_user_message = {new_user_message}")
        messages[-1] = new_user_message
        #print(messages)
        state['messages'] = messages
        print("[before_model] 已注入系统提示")
    return state

# 1. 系统提示词
system_prompt = """你是一位幽默的天气预报员。
根据天气给出穿衣建议,用轻松的方式表达。"""

@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city}:晴,25度,微风徐徐"

@tool
def get_location() -> str:
    """获取用户位置"""
    return "北京"

# 4. 添加记忆
checkpointer = InMemorySaver()

agent = create_agent(
    model=model,  # 默认模型(会被中间件覆盖)
    tools=[get_weather,get_location],
    middleware=[inject_system_prompt],
)


config = {"configurable": {"thread_id": "user-001"}}
for event in agent.stream(
        {"messages": [{"role": "user", "content": "我在哪里?天气如何?分析一下"}]},
        config=config,
        stream_mode="values"
):
    event['messages'][-1].pretty_print()


四 对比总结

1. 核心区别对比

特性 before_agent before_model
触发时机 Agent 开始执行任务前(agent.run() 前) 调用 LLM 模型前(model.predict() 前)
作用对象 控制整个 Agent 的执行流程 仅干预模型输入/输出
可修改的内容 Agent 输入、工具、记忆等配置 模型的 Prompt、参数、上下文
常见用途 动态加载工具、权限校验、输入预处理 Prompt 增强、敏感内容过滤、缓存优化

  • 用 before_agent 当需要:
    • 影响 Agent 的整体行为(如工具、记忆、流程控制)
    • 在早期阶段拦截或修改输入(如权限校验)
  • 用 before_model 当需要:
    • 精细控制模型输入(如 Prompt 工程)
    • 干预单次调用的上下文或参数

2 典型流程

before_agent → Agent 逻辑 → before_model → 模型调用 → 输出结果

避免冲突:

  • 不要在 before_model 中修改 Agent 级别的配置
  • 确保钩子逻辑轻量,避免性能瓶颈
  • 调试技巧:
    • 通过打印日志确认钩子触发顺序
    • 使用 kwargs 传递跨阶段数据(如 user_id)
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐