📌 一句话速览

本文将深入剖析如何通过 LangGraph、MCP 协议与 ReactAgent 三者融合,构建一个支持多轮对话、工具调用、状态持久化与任务中断的生产级智能代理系统。


运行效果


智能问答

📌 智能代理系统的演进需求

随着大模型能力的不断增强,单纯的“问答机器人”已无法满足企业级应用场景。用户期望的是:

  • ✅ 多轮对话记忆 —— 能记住上下文,支持连续追问

  • ✅ 工具调用能力 —— 可连接数据库、API、业务系统

  • ✅ 状态可持久化 —— 对话中断后能恢复现场

  • ✅ 任务可中断 —— 用户可随时取消长耗时操作

  • ✅ 流式输出体验 —— 实时响应,提升交互感

传统单次 Prompt 调用模式已力不从心。我们需要一个状态驱动、工具感知、可中断、可恢复的智能代理架构 —— 这正是 LangGraph + MCP + ReactAgent 技术组合的价值所在。


技术组合全景图

技术组件

角色定位

解决的问题

LangGraph

状态图引擎

对话状态管理、流程控制、持久化

MCP协议

工具调用标准协议

统一接入外部工具,解耦工具实现

ReactAgent

推理-行动循环框架

实现“思考→调用→观察→再思考”闭环

LangChain

LLM与工具抽象层

统一模型调用、消息封装、工具集成

💡 本系统通过 create_react_agent 创建基于状态图的代理,使用 MultiServerMCPClient 动态加载工具,通过 InMemorySaver 实现对话状态持久化,最终形成一个高内聚、低耦合、易扩展的智能代理系统。


核心代码结构详解

1. 初始化与环境配置

```python
def __init__(self):
    # 校验环境变量,确保关键配置不缺失
    required_env_vars = [
        "MODEL_NAME", "MODEL_TEMPERATURE", "MODEL_BASE_URL",
        "MODEL_API_KEY", "MCP_HUB_COMMON_QA_GROUP_URL",
    ]
    for var in required_env_vars:
        if not os.getenv(var):
            raise ValueError(f"Missing required environment variable: {var}")

    # 初始化LLM客户端,支持流式、重试、超时等生产级配置
    self.llm = ChatOpenAI(...)

    # 初始化MCP客户端,支持多工具服务器动态接入
    self.client = MultiServerMCPClient({
        "mcp-hub": {
            "url": os.getenv("MCP_HUB_COMMON_QA_GROUP_URL"),
            "transport": "streamable_http",
        },
    })

    # 使用内存检查点保存对话状态(生产环境建议替换为Redis)
    self.checkpointer = InMemorySaver()
    
    # 运行中任务字典,用于支持任务取消
    self.running_tasks = {}

🛠️ 设计要点:环境变量校验前置,避免运行时崩溃;工具配置支持热插拔;状态存储可替换,便于扩展。

2. 流式响应封装

@staticmethod
def _create_response(content: str, message_type: str = "continue", data_type: str = DataTypeEnum.ANSWER.value[0]) -> str:
    """
    封装SSE格式响应,支持前端区分消息类型(继续/结束/错误/信息)
    """
    res = {
        "data": {"messageType": message_type, "content": content},
        "dataType": data_type,
    }
    return "data:" + json.dumps(res, ensure_ascii=False) + "\n\n"

🌊 作用:实现真正的流式输出,前端可实时渲染模型思考过程、工具调用、最终答案,大幅提升用户体验。

3. 上下文记忆管理

@staticmethod
def short_trim_messages(state):
    """
    模型调用前自动修剪历史消息,防止上下文溢出
    保留系统消息 + 最新的人类消息序列,确保对话连贯性
    """
    trimmed_messages = trim_messages(
        messages=state["messages"],
        max_tokens=20000,
        token_counter=lambda msgs: sum(len(m.content or "") for m in msgs),
        strategy="last",      # 保留最新消息
        start_on="human",     # 从用户消息开始保留
        include_system=True,  # 必须保留系统提示
    )
    return {"llm_input_messages": trimmed_messages}

🧠 智能裁剪:避免因历史消息过长导致模型性能下降或API报错,同时保持对话逻辑连贯。

4. 主运行逻辑:run_agent

async def run_agent(self, query: str, response, session_id: Optional[str] = None, uuid_str: str = None, user_token=None):
    """
    核心执行方法:启动智能代理,处理用户查询,流式返回结果
    支持:身份验证、任务取消、工具调用监听、对话状态持久化、记录存储
    """
    # 解码用户Token,获取用户ID作为任务标识
    user_dict = await decode_jwt_token(user_token)
    task_id = user_dict["id"]
    task_context = {"cancelled": False}
    self.running_tasks[task_id] = task_context

    try:
        t02_answer_data = []  # 收集完整回答用于后续存储

        # 动态获取可用工具列表
        tools = await self.client.get_tools()

        # 使用session_id作为thread_id,实现多轮对话状态隔离
        thread_id = session_id if session_id else"default_thread"
        config = {"configurable": {"thread_id": thread_id}}

        # 定义系统提示词,约束模型行为与输出格式
        system_message = SystemMessage(content="""...""")  # 内容略,见完整代码

        # 创建React代理,绑定模型、工具、状态管理器和预处理钩子
        agent = create_react_agent(
            model=self.llm,
            tools=tools,
            prompt=system_message,
            checkpointer=self.checkpointer,
            pre_model_hook=self.short_trim_messages,  # 注册消息修剪钩子
        )

        # 流式执行代理,监听每一步输出
        async for message_chunk, metadata in agent.astream(
            input={"messages": [HumanMessage(content=query)]},
            config=config,
            stream_mode="messages",
        ):
            # 检查任务是否被取消
            if self.running_tasks[task_id]["cancelled"]:
                await response.write(self._create_response("\n> 这条消息已停止", "info"))
                await response.write(self._create_response("", "end", DataTypeEnum.STREAM_END.value[0]))
                break

            # 处理工具调用节点
            if metadata["langgraph_node"] == "tools":
                tool_name = message_chunk.name or "未知工具"
                tool_use = "> 调用工具:" + tool_name + "\n\n"
                await response.write(self._create_response(tool_use))
                t02_answer_data.append(tool_use)
                continue

            # 输出模型生成内容
            if message_chunk.content:
                content = message_chunk.content
                t02_answer_data.append(content)
                await response.write(self._create_response(content))
                if hasattr(response, "flush"):
                    await response.flush()
                await asyncio.sleep(0)  # 让出事件循环

        # 仅在未取消时保存对话记录
        if not self.running_tasks[task_id]["cancelled"]:
            await add_user_record(
                uuid_str, session_id, query, t02_answer_data, {},
                DiFyAppEnum.COMMON_QA.value[0], user_token
            )

    except asyncio.CancelledError:
        ... # 处理取消异常
    except Exception as e:
        ... # 处理运行时异常
    finally:
        # 清理任务记录
        if task_id in self.running_tasks:
            del self.running_tasks[task_id]

⚙️ 核心价值: 状态隔离:通过 thread_id 区分不同用户/会话 工具透明:实时输出工具调用过程,增强可信度 可中断:支持用户主动取消长时间运行任务 可审计:完整记录对话过程,便于复盘与优化

5. 任务取消与状态管理

async def cancel_task(self, task_id: str) -> bool:
    """取消指定任务,通过设置标志位实现优雅中断"""
    if task_id in self.running_tasks:
        self.running_tasks[task_id]["cancelled"] = True
        return True
    return False

def get_running_tasks(self):
    """获取当前所有运行中任务ID,用于监控与管理"""
    return list(self.running_tasks.keys())

🚫 用户体验:当模型“思考太久”,用户可点击“停止”按钮,系统立即响应,避免资源浪费与体验卡顿。

6. MCP使用姿势

  • streamable_http方式调用

self.client = MultiServerMCPClient({
 "mcp-hub": {
        "url": "http://xxxx.com",
        "transport": "streamable_http",
  }
}
  • 本地子进程方式调用三方开源工具

self.client = MultiServerMCPClient({
 "undoom-douyin-data-analysis": {
      "command": "uvx",
      "transport": "stdio",
      "args": [
           "--index-url",
           "https://mirrors.aliyun.com/pypi/simple/",
           "--from",
           "undoom-douyin-data-analysis",
           "undoom-douyin-mcp",
           ],
  },
}
  • 本地子进程方式调用本地开发的工具

current_dir = os.path.dirname(os.path.abspath(__file__))
mcp_tool_path = os.path.join(current_dir, "mcp", "query_db_tool.py")
self.client = MultiServerMCPClient({
  "query_qa_record": {
        "command": "python",
        "args": [mcp_tool_path],
        "transport": "stdio",
     }
}

总结

LangGraph + MCP + ReactAgent 的组合,不是简单的技术堆砌,而是面向复杂、真实、生产环境的智能代理架构解决方案。

它解决了传统 Agent 的三大痛点:

❌ 状态丢失 → ✅ LangGraph 状态图持久化 ❌ 工具混乱 → ✅ MCP 协议标准化接入 ❌ 黑盒执行 → ✅ ReactAgent 透明化推理过程

📚 完整代码

参考我的开源项目: git@github.com:apconw/sanic-web.git

🌈 项目亮点

  • ✅ 集成 MCP 多智能体架构

  • ✅ 支持 Dify / LangChain / LlamaIndex / Ollama / vLLM / Neo4j

  • ✅ 前端采用 Vue3 + TypeScript + Vite5,现代化交互体验

  • ✅ 内置 ECharts / AntV 图表问答 + CSV 表格问答

  • ✅ 支持对接主流 RAG 系统 与 Text2SQL 引擎

  • ✅ 轻量级 Sanic 后端,适合快速部署与二次开发

运行效果:

数据问答

数据问答

数据问答

 AI大模型从0到精通全套学习大礼包

我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。

只要你是真心想学AI大模型,我这份资料就可以无偿共享给你学习。大模型行业确实也需要更多的有志之士加入进来,我也真心希望帮助大家学好这门技术,如果日后有什么学习上的问题,欢迎找我交流,有技术上面的问题,我是很愿意去帮助大家的!

如果你也想通过学大模型技术去帮助就业和转行,可以点扫描下方👇👇
大模型重磅福利:入门进阶全套104G学习资源包免费分享!
在这里插入图片描述

01.从入门到精通的全套视频教程

包含提示词工程、RAG、Agent等技术点
在这里插入图片描述

02.AI大模型学习路线图(还有视频解说)

全过程AI大模型学习路线

在这里插入图片描述

​​在这里插入图片描述

03.学习电子书籍和技术文档

市面上的大模型书籍确实太多了,这些是我精选出来的

在这里插入图片描述

04.大模型面试题目详解

在这里插入图片描述

在这里插入图片描述

05.这些资料真的有用吗?

这份资料由我和鲁为民博士共同整理,鲁为民博士先后获得了北京清华大学学士和美国加州理工学院博士学位,在包括IEEE Transactions等学术期刊和诸多国际会议上发表了超过50篇学术论文、取得了多项美国和中国发明专利,同时还斩获了吴文俊人工智能科学技术奖。目前我正在和鲁博士共同进行人工智能的研究。

所有的视频由智泊AI老师录制,且资料与智泊AI共享,相互补充。这份学习大礼包应该算是现在最全面的大模型学习资料了。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。

在这里插入图片描述
在这里插入图片描述

智泊AI始终秉持着“让每个人平等享受到优质教育资源”的育人理念‌,通过动态追踪大模型开发、数据标注伦理等前沿技术趋势‌,构建起"前沿课程+智能实训+精准就业"的高效培养体系。

课堂上不光教理论,还带着学员做了十多个真实项目。学员要亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

在这里插入图片描述
如果说你是以下人群中的其中一类,都可以来智泊AI学习人工智能,找到高薪工作,一次小小的“投资”换来的是终身受益!

应届毕业生‌:无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌:非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能 ‌突破瓶颈:传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

👉获取方式:
😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费】🆓

在这里插入图片描述

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐