04-LLM与MCP集成实践

概述

在前三篇文章中,我们分别介绍了 MCP 协议、服务器开发和客户端开发。本文将深入探讨如何将 MCP 工具集成到大语言模型(LLM)中,使用 LangGraph 构建智能代理应用,实现自动化的工具调用和状态管理。

LangGraph简介

LangGraph 是 LangChain 生态中的状态机工作流引擎,专门用于构建复杂的 AI 应用。

核心概念

  1. State (状态): 存储对话历史和中间结果
  2. Node (节点): 执行特定操作的函数
  3. Edge (边): 定义节点之间的连接关系
  4. Graph (图): 由节点和边组成的有向图

为什么使用 LangGraph

  • 状态管理: 自动管理对话历史
  • 工具编排: 智能调用 MCP 工具
  • 条件分支: 根据结果执行不同逻辑
  • 可扩展性: 支持复杂的多步骤任务

项目准备

依赖安装

# 使用 uv
uv sync

# 或使用 pip
pip install langchain langchain-openai langgraph langchain-mcp-adapters

环境配置

创建 .env 文件:

# DeepSeek API 配置
DEEPSEEK_API_KEY=sk-your-actual-api-key-here

# 调试模式(可选)
DEBUG=false

基础集成架构

架构图

┌─────────────────────────────────────────────────────────┐
│                   LangGraph 应用                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌──────────────┐      ┌──────────────┐            │
│  │  Chat Node   │─────>│  Tool Node   │            │
│  │  (LLM + 工具) │      │  (执行工具)  │            │
│  └──────┬───────┘      └──────┬───────┘            │
│         │                     │                         │
│         │                     │                         │
│         ▼                     ▼                         │
│  ┌──────────────────────────────────────┐             │
│  │         MCP Server (stdio)         │             │
│  │  - 数学计算工具                    │             │
│  │  - 资源和提示模板                  │             │
│  └──────────────────────────────────────┘             │
│                                                         │
└─────────────────────────────────────────────────────────┘

数据流

用户输入
  ↓
Chat Node (LLM处理)
  ↓
需要工具? → 是 → Tool Node (调用MCP工具)
  ↓                ↓
否             返回结果
  ↓                ↓
生成响应 ←─────────┘
  ↓
返回给用户

LangGraph代理实现

基础结构

import asyncio
import json
from typing import List, Annotated
from typing_extensions import TypedDict
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_mcp_adapters.prompts import load_mcp_prompt
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 服务器参数
server_params = StdioServerParameters(
    command="python",
    args=["math_mcp_server_stdio.py"],
)

状态定义

class State(TypedDict):
    """状态类型定义"""
    messages: Annotated[List[AnyMessage], add_messages]

LLM 配置

def create_llm():
    """创建 LLM 实例"""
    api_key = os.getenv("DEEPSEEK_API_KEY")
    if not api_key:
        raise ValueError("DEEPSEEK_API_KEY not found in environment variables")

    llm = ChatOpenAI(
        model="deepseek-chat",
        temperature=0,
        api_key=api_key,
        base_url="https://api.deepseek.com"
    )
    return llm

会话初始化

async def initialize_session():
    """初始化 MCP 会话并加载工具"""
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 初始化会话
            await session.initialize()

            # 加载 MCP 工具
            tools = await load_mcp_tools(session)

            # 加载系统提示
            system_prompt_messages = await load_mcp_prompt(session, "system_prompt")

            # 提取提示文本
            system_prompt_text = ""
            if system_prompt_messages and len(system_prompt_messages) > 0:
                content = system_prompt_messages[0].content
                if hasattr(content, 'text'):
                    system_prompt_text = content.text
                elif isinstance(content, str):
                    system_prompt_text = content

            return session, tools, system_prompt_text

Chat Node 实现

def create_chat_node(llm, tools, system_prompt_text):
    """创建 Chat Node"""
    # 绑定工具到 LLM
    llm_with_tool = llm.bind_tools(tools)

    # 创建提示模板
    prompt_template = ChatPromptTemplate.from_messages([
        ("system", system_prompt_text),
        MessagesPlaceholder("messages")
    ])

    chat_llm = prompt_template | llm_with_tool

    # 定义节点函数
    def chat_node(state: State) -> State:
        messages = state["messages"]

        # 智能过滤消息
        filtered_messages = filter_messages(messages)

        # 调用 LLM
        result = chat_llm.invoke({"messages": filtered_messages})

        return {"messages": [result]}

    return chat_node

消息过滤

def filter_messages(messages):
    """过滤消息,只保留必要的消息"""
    filtered = []
    i = 0

    while i < len(messages):
        msg = messages[i]

        # 保留 HumanMessage
        if isinstance(msg, HumanMessage):
            filtered.append(msg)
            i += 1

        # 保留 AIMessage
        elif isinstance(msg, AIMessage):
            filtered.append(msg)

            # 如果包含 tool_calls,包含后续的 ToolMessage
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                tool_call_ids = {tc['id'] for tc in msg.tool_calls}
                j = i + 1
                while j < len(messages) and isinstance(messages[j], ToolMessage):
                    tool_msg = messages[j]
                    if hasattr(tool_msg, 'tool_call_id') and tool_msg.tool_call_id in tool_call_ids:
                        # 确保 ToolMessage content 是字符串
                        if not isinstance(tool_msg.content, str):
                            if hasattr(tool_msg.content, '__dict__'):
                                tool_msg.content = json.dumps(tool_msg.content.__dict__, ensure_ascii=False)
                            else:
                                tool_msg.content = str(tool_msg.content)
                        filtered.append(tool_msg)
                        j += 1
                    else:
                        break
                i = j
            else:
                i += 1

        # 跳过独立的 ToolMessage
        elif isinstance(msg, ToolMessage):
            i += 1
        else:
            i += 1

    return filtered

构建图

async def create_graph(session):
    """构建 LangGraph"""
    # 创建 LLM
    llm = create_llm()

    # 加载工具和提示
    tools = await load_mcp_tools(session)
    system_prompt_messages = await load_mcp_prompt(session, "system_prompt")

    # 提取提示文本
    system_prompt_text = system_prompt_messages[0].content.text if system_prompt_messages else ""

    # 创建 Chat Node
    chat_node = create_chat_node(llm, tools, system_prompt_text)

    # 构建 StateGraph
    graph_builder = StateGraph(State)
    graph_builder.add_node("agent", chat_node)
    graph_builder.add_node("tools", ToolNode(tools=tools))

    # 添加边
    graph_builder.add_edge(START, "agent")
    graph_builder.add_conditional_edges(
        "agent",
        tools_condition,
        {
            "tools": "tools",
            END: END
        }
    )
    graph_builder.add_edge("tools", "agent")

    # 编译图(带内存保存)
    graph = graph_builder.compile(checkpointer=MemorySaver())

    return graph

主函数

async def main():
    """主函数"""
    config = {"configurable": {"thread_id": "test-thread-1"}}

    # 初始化会话
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # 创建图
            agent = await create_graph(session)

            # 显示可用工具
            tools = await load_mcp_tools(session)
            print("可用工具:", [tool.name for tool in tools])

            # 对话循环
            print("\n" + "=" * 50)
            print("LangGraph Agent with DeepSeek is ready!")
            print("=" * 50 + "\n")

            while True:
                try:
                    message = input("User: ").strip()

                    # 退出命令
                    if message.lower() in ['quit', 'exit', 'q']:
                        print("\n正在退出程序...")
                        break

                    if not message:
                        continue

                    # 创建用户消息
                    user_message = HumanMessage(content=message)

                    # 运行代理
                    result = await agent.ainvoke(
                        {"messages": [user_message]},
                        config=config
                    )

                    # 打印响应
                    final_message = result["messages"][-1]
                    if hasattr(final_message, 'content'):
                        print(f"AI: {final_message.content}")

                    print()  # 空行

                except KeyboardInterrupt:
                    print("\n\n检测到中断信号 (Ctrl+C)")
                    print("正在安全退出程序...")
                    break
                except Exception as e:
                    print(f"\n发生错误: {e}")
                    import traceback
                    traceback.print_exc()

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\n\n程序已终止")
    except Exception as e:
        print(f"\n程序异常退出: {e}")

完整实现

项目文件结构

mcp_demo/
├── math_mcp_server_stdio.py       # 数学计算服务器
├── mcp_langgraph_serever.py      # LangGraph 集成服务器
└── .env                          # 环境配置

运行应用

# 启动 LangGraph 服务器
python mcp_langgraph_serever.py

对话示例

User: 你好!
AI: 你好!我是你的AI助手,可以帮助你进行数学计算。

User: 请计算 23 + 45
AI: 我来帮你计算 23 + 45。
23 + 45 = 68

User: 再算一下 12 * 8
AI: 我来帮你计算 12 * 8。
12 * 8 = 96

User: 计算圆周率乘以 2
AI: 我来帮你计算圆周率乘以 2。
π ≈ 3.141592653589793
3.141592653589793 * 2 ≈ 6.283185307179586

User: 退出
正在退出程序...

调试功能实现

调试模式开关

import os

# 调试配置
_DEBUG_STATE = {"enabled": os.getenv("DEBUG", "false").lower() in ["true", "1", "yes", "on"]}

def debug_print(*args, **kwargs):
    """打印调试信息"""
    if _DEBUG_STATE["enabled"]:
        print("[DEBUG]", *args, **kwargs)

def toggle_debug_mode():
    """切换调试模式"""
    _DEBUG_STATE["enabled"] = not _DEBUG_STATE["enabled"]
    return _DEBUG_STATE["enabled"]

def is_debug_enabled():
    """检查是否启用调试"""
    return _DEBUG_STATE["enabled"]

添加调试信息

def chat_node(state: State) -> State:
    messages = state["messages"]

    # 调试信息
    debug_print(f"\nchat_node received {len(messages)} messages")

    # 过滤消息
    filtered_messages = filter_messages(messages)

    debug_print(f"Filtered to {len(filtered_messages)} messages for LLM")

    # 打印消息详情
    for i, msg in enumerate(filtered_messages):
        debug_print(f"Message {i}: {type(msg).__name__}")
        if isinstance(msg.content, str):
            preview = msg.content[:80] if len(msg.content) > 80 else msg.content
            debug_print(f"  Content: {preview}...")

    # 调用 LLM
    result = chat_llm.invoke({"messages": filtered_messages})

    debug_print(f"LLM returned: {type(result).__name__}")

    return {"messages": [result]}

运行时切换调试

while True:
    message = input("User: ").strip()

    # 切换调试模式
    if message.lower() == '/debug':
        new_status = toggle_debug_mode()
        status = "ENABLED" if new_status else "DISABLED"
        print(f"\nDEBUG MODE: {status}\n")
        continue

    # ... 其他逻辑

高级功能

1. 对话历史持久化

from langgraph.checkpoint.postgres import PostgresSaver

# 使用 PostgreSQL 持久化
checkpointer = PostgresSaver.from_conn_string("postgresql://...")

graph = graph_builder.compile(checkpointer=checkpointer)

2. 工具调用限制

from langchain_core.tools import tool

@tool
def limited_multiply(a: int, b: int) -> str:
    """限制范围的乘法"""
    if a > 1000 or b > 1000:
        return "数字太大,超出计算范围"
    return f"{a} * {b} = {a * b}"

3. 自定义错误处理

def create_error_handling_node():
    """错误处理节点"""
    def error_handler(state: State) -> State:
        messages = state["messages"]
        last_message = messages[-1]

        if isinstance(last_message, AIMessage) and hasattr(last_message, 'tool_calls'):
            # 检查是否有错误
            for tool_call in last_message.tool_calls:
                # 执行错误处理逻辑
                pass

        return state

    return error_handler

4. 工具调用验证

def validate_tool_call(tool_name, arguments):
    """验证工具调用参数"""
    if tool_name == "divide":
        if arguments.get("b", 0) == 0:
            raise ValueError("除数不能为零")

    if tool_name == "sqrt":
        if arguments.get("number", 0) < 0:
            raise ValueError("不能计算负数的平方根")

    return True

性能优化

1. 工具缓存

from functools import lru_cache

@lru_cache(maxsize=100)
async def cached_tool_call(session, tool_name, arguments):
    """带缓存工具调用"""
    return await session.call_tool(tool_name, arguments)

2. 并发工具调用

async def parallel_tool_calls(session, tool_calls):
    """并发调用多个工具"""
    tasks = [
        session.call_tool(call.name, call.args)
        for call in tool_calls
    ]
    results = await asyncio.gather(*tasks)
    return results

3. 批量资源加载

async def batch_load_resources(session, uris):
    """批量加载资源"""
    tasks = [session.read_resource(uri) for uri in uris]
    results = await asyncio.gather(*tasks)
    return results

测试策略

单元测试

import pytest
from unittest.mock import Mock, AsyncMock

@pytest.mark.asyncio
async def test_chat_node():
    # 创建 mock LLM
    mock_llm = AsyncMock()
    mock_llm.invoke.return_value = AIMessage(content="Test response")

    # 创建状态
    state = {
        "messages": [HumanMessage(content="Test")]
    }

    # 测试节点
    node = create_chat_node(mock_llm, [], "")
    result = node(state)

    assert "messages" in result
    assert len(result["messages"]) == 1

集成测试

@pytest.mark.asyncio
async def test_graph_execution():
    # 创建图
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            graph = await create_graph(session)

            # 执行测试
            result = await graph.ainvoke(
                {"messages": [HumanMessage(content="计算 1 + 1")]},
                config={"configurable": {"thread_id": "test"}}
            )

            assert "messages" in result
            assert "2" in result["messages"][-1].content

故障排查

问题 1: 工具调用失败

错误信息:

Tool execution error: Tool not found

解决方案:

  • 检查工具名称拼写
  • 验证工具是否已加载
  • 查看 MCP 服务器日志

问题 2: 状态丢失

问题: 多轮对话无法保持上下文

解决方案:

# 使用相同的 thread_id
config = {"configurable": {"thread_id": "user-session-123"}}

# 所有会话使用相同配置
result = await agent.ainvoke(
    {"messages": [HumanMessage(content="...")]},
    config=config
)

问题 3: 工具格式错误

错误信息:

Invalid ToolMessage format

解决方案:

  • 确保 ToolMessage.content 是字符串
  • 实现消息过滤逻辑
  • 验证 LLM 返回格式

最佳实践

1. 模块化设计

# 独立的工具管理模块
class MCPToolManager:
    def __init__(self, session):
        self.session = session
        self.tools = None
        self.cache = {}

    async def load_tools(self):
        if not self.tools:
            self.tools = await load_mcp_tools(self.session)
        return self.tools

2. 配置管理

from pydantic import BaseSettings

class AgentConfig(BaseSettings):
    model: str = "deepseek-chat"
    temperature: float = 0.0
    max_retries: int = 3
    debug: bool = False

    class Config:
        env_file = ".env"

3. 日志记录

import logging

logger = logging.getLogger(__name__)

def setup_logging(level=logging.INFO):
    logging.basicConfig(
        level=level,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

4. 监控和指标

import time
from functools import wraps

def measure_time(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        result = await func(*args, **kwargs)
        duration = time.time() - start
        logger.info(f"{func.__name__} took {duration:.2f}s")
        return result
    return wrapper

阅读顺序建议

  1. 01-MCP协议入门指南: 了解 MCP 基本概念和核心组件
  2. 02-快速构建MCP服务器: 使用 FastMCP 构建服务器
  3. 03-MCP客户端开发实战: 开发 stdio 和 HTTP 客户端
  4. 04-LLM与MCP集成实践: 集成到 LangGraph 构建智能代理
  5. 05-多服务器架构与最佳实践: 多服务器架构和生产部署

总结

本文详细介绍了如何将 MCP 工具集成到 LangGraph 中,包括:

  • LangGraph 基础概念和架构
  • 状态定义和节点实现
  • 工具调用和消息过滤
  • 调试功能和错误处理
  • 性能优化和最佳实践

在下一篇《多服务器架构与最佳实践》中,我们将学习如何同时连接多个 MCP 服务器,构建更复杂的应用架构。

参考资源

文章标签

LangGraph, LLM集成, MCP集成, 智能代理, 工具调用, 状态管理, Python开发, DeepSeek, LangChain
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐