04-LLM与MCP集成实践
本文介绍了如何将MCP工具集成到大语言模型(LLM)中,使用LangGraph构建智能代理应用。主要内容包括: LangGraph的核心概念:状态、节点、边和图,用于管理对话历史和工具调用 项目准备工作:安装依赖和配置环境 基础集成架构:展示数据流和组件交互 具体实现方案: 状态定义和LLM配置 会话初始化流程 Chat Node实现 消息过滤机制 该方案实现了自动化工具调用和状态管理,支持复杂的
·
04-LLM与MCP集成实践
概述
在前三篇文章中,我们分别介绍了 MCP 协议、服务器开发和客户端开发。本文将深入探讨如何将 MCP 工具集成到大语言模型(LLM)中,使用 LangGraph 构建智能代理应用,实现自动化的工具调用和状态管理。
LangGraph简介
LangGraph 是 LangChain 生态中的状态机工作流引擎,专门用于构建复杂的 AI 应用。
核心概念
- State (状态): 存储对话历史和中间结果
- Node (节点): 执行特定操作的函数
- Edge (边): 定义节点之间的连接关系
- Graph (图): 由节点和边组成的有向图
为什么使用 LangGraph
- 状态管理: 自动管理对话历史
- 工具编排: 智能调用 MCP 工具
- 条件分支: 根据结果执行不同逻辑
- 可扩展性: 支持复杂的多步骤任务
项目准备
依赖安装
# 使用 uv
uv sync
# 或使用 pip
pip install langchain langchain-openai langgraph langchain-mcp-adapters
环境配置
创建 .env 文件:
# DeepSeek API 配置
DEEPSEEK_API_KEY=sk-your-actual-api-key-here
# 调试模式(可选)
DEBUG=false
基础集成架构
架构图
┌─────────────────────────────────────────────────────────┐
│ LangGraph 应用 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Chat Node │─────>│ Tool Node │ │
│ │ (LLM + 工具) │ │ (执行工具) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ MCP Server (stdio) │ │
│ │ - 数学计算工具 │ │
│ │ - 资源和提示模板 │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
数据流
用户输入
↓
Chat Node (LLM处理)
↓
需要工具? → 是 → Tool Node (调用MCP工具)
↓ ↓
否 返回结果
↓ ↓
生成响应 ←─────────┘
↓
返回给用户
LangGraph代理实现
基础结构
import asyncio
import json
from typing import List, Annotated
from typing_extensions import TypedDict
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.prebuilt import tools_condition, ToolNode
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
from mcp.client.stdio import stdio_client
from mcp import ClientSession, StdioServerParameters
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_mcp_adapters.prompts import load_mcp_prompt
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 服务器参数
server_params = StdioServerParameters(
command="python",
args=["math_mcp_server_stdio.py"],
)
状态定义
class State(TypedDict):
"""状态类型定义"""
messages: Annotated[List[AnyMessage], add_messages]
LLM 配置
def create_llm():
"""创建 LLM 实例"""
api_key = os.getenv("DEEPSEEK_API_KEY")
if not api_key:
raise ValueError("DEEPSEEK_API_KEY not found in environment variables")
llm = ChatOpenAI(
model="deepseek-chat",
temperature=0,
api_key=api_key,
base_url="https://api.deepseek.com"
)
return llm
会话初始化
async def initialize_session():
"""初始化 MCP 会话并加载工具"""
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 初始化会话
await session.initialize()
# 加载 MCP 工具
tools = await load_mcp_tools(session)
# 加载系统提示
system_prompt_messages = await load_mcp_prompt(session, "system_prompt")
# 提取提示文本
system_prompt_text = ""
if system_prompt_messages and len(system_prompt_messages) > 0:
content = system_prompt_messages[0].content
if hasattr(content, 'text'):
system_prompt_text = content.text
elif isinstance(content, str):
system_prompt_text = content
return session, tools, system_prompt_text
Chat Node 实现
def create_chat_node(llm, tools, system_prompt_text):
"""创建 Chat Node"""
# 绑定工具到 LLM
llm_with_tool = llm.bind_tools(tools)
# 创建提示模板
prompt_template = ChatPromptTemplate.from_messages([
("system", system_prompt_text),
MessagesPlaceholder("messages")
])
chat_llm = prompt_template | llm_with_tool
# 定义节点函数
def chat_node(state: State) -> State:
messages = state["messages"]
# 智能过滤消息
filtered_messages = filter_messages(messages)
# 调用 LLM
result = chat_llm.invoke({"messages": filtered_messages})
return {"messages": [result]}
return chat_node
消息过滤
def filter_messages(messages):
"""过滤消息,只保留必要的消息"""
filtered = []
i = 0
while i < len(messages):
msg = messages[i]
# 保留 HumanMessage
if isinstance(msg, HumanMessage):
filtered.append(msg)
i += 1
# 保留 AIMessage
elif isinstance(msg, AIMessage):
filtered.append(msg)
# 如果包含 tool_calls,包含后续的 ToolMessage
if hasattr(msg, 'tool_calls') and msg.tool_calls:
tool_call_ids = {tc['id'] for tc in msg.tool_calls}
j = i + 1
while j < len(messages) and isinstance(messages[j], ToolMessage):
tool_msg = messages[j]
if hasattr(tool_msg, 'tool_call_id') and tool_msg.tool_call_id in tool_call_ids:
# 确保 ToolMessage content 是字符串
if not isinstance(tool_msg.content, str):
if hasattr(tool_msg.content, '__dict__'):
tool_msg.content = json.dumps(tool_msg.content.__dict__, ensure_ascii=False)
else:
tool_msg.content = str(tool_msg.content)
filtered.append(tool_msg)
j += 1
else:
break
i = j
else:
i += 1
# 跳过独立的 ToolMessage
elif isinstance(msg, ToolMessage):
i += 1
else:
i += 1
return filtered
构建图
async def create_graph(session):
"""构建 LangGraph"""
# 创建 LLM
llm = create_llm()
# 加载工具和提示
tools = await load_mcp_tools(session)
system_prompt_messages = await load_mcp_prompt(session, "system_prompt")
# 提取提示文本
system_prompt_text = system_prompt_messages[0].content.text if system_prompt_messages else ""
# 创建 Chat Node
chat_node = create_chat_node(llm, tools, system_prompt_text)
# 构建 StateGraph
graph_builder = StateGraph(State)
graph_builder.add_node("agent", chat_node)
graph_builder.add_node("tools", ToolNode(tools=tools))
# 添加边
graph_builder.add_edge(START, "agent")
graph_builder.add_conditional_edges(
"agent",
tools_condition,
{
"tools": "tools",
END: END
}
)
graph_builder.add_edge("tools", "agent")
# 编译图(带内存保存)
graph = graph_builder.compile(checkpointer=MemorySaver())
return graph
主函数
async def main():
"""主函数"""
config = {"configurable": {"thread_id": "test-thread-1"}}
# 初始化会话
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 创建图
agent = await create_graph(session)
# 显示可用工具
tools = await load_mcp_tools(session)
print("可用工具:", [tool.name for tool in tools])
# 对话循环
print("\n" + "=" * 50)
print("LangGraph Agent with DeepSeek is ready!")
print("=" * 50 + "\n")
while True:
try:
message = input("User: ").strip()
# 退出命令
if message.lower() in ['quit', 'exit', 'q']:
print("\n正在退出程序...")
break
if not message:
continue
# 创建用户消息
user_message = HumanMessage(content=message)
# 运行代理
result = await agent.ainvoke(
{"messages": [user_message]},
config=config
)
# 打印响应
final_message = result["messages"][-1]
if hasattr(final_message, 'content'):
print(f"AI: {final_message.content}")
print() # 空行
except KeyboardInterrupt:
print("\n\n检测到中断信号 (Ctrl+C)")
print("正在安全退出程序...")
break
except Exception as e:
print(f"\n发生错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n程序已终止")
except Exception as e:
print(f"\n程序异常退出: {e}")
完整实现
项目文件结构
mcp_demo/
├── math_mcp_server_stdio.py # 数学计算服务器
├── mcp_langgraph_serever.py # LangGraph 集成服务器
└── .env # 环境配置
运行应用
# 启动 LangGraph 服务器
python mcp_langgraph_serever.py
对话示例
User: 你好!
AI: 你好!我是你的AI助手,可以帮助你进行数学计算。
User: 请计算 23 + 45
AI: 我来帮你计算 23 + 45。
23 + 45 = 68
User: 再算一下 12 * 8
AI: 我来帮你计算 12 * 8。
12 * 8 = 96
User: 计算圆周率乘以 2
AI: 我来帮你计算圆周率乘以 2。
π ≈ 3.141592653589793
3.141592653589793 * 2 ≈ 6.283185307179586
User: 退出
正在退出程序...
调试功能实现
调试模式开关
import os
# 调试配置
_DEBUG_STATE = {"enabled": os.getenv("DEBUG", "false").lower() in ["true", "1", "yes", "on"]}
def debug_print(*args, **kwargs):
"""打印调试信息"""
if _DEBUG_STATE["enabled"]:
print("[DEBUG]", *args, **kwargs)
def toggle_debug_mode():
"""切换调试模式"""
_DEBUG_STATE["enabled"] = not _DEBUG_STATE["enabled"]
return _DEBUG_STATE["enabled"]
def is_debug_enabled():
"""检查是否启用调试"""
return _DEBUG_STATE["enabled"]
添加调试信息
def chat_node(state: State) -> State:
messages = state["messages"]
# 调试信息
debug_print(f"\nchat_node received {len(messages)} messages")
# 过滤消息
filtered_messages = filter_messages(messages)
debug_print(f"Filtered to {len(filtered_messages)} messages for LLM")
# 打印消息详情
for i, msg in enumerate(filtered_messages):
debug_print(f"Message {i}: {type(msg).__name__}")
if isinstance(msg.content, str):
preview = msg.content[:80] if len(msg.content) > 80 else msg.content
debug_print(f" Content: {preview}...")
# 调用 LLM
result = chat_llm.invoke({"messages": filtered_messages})
debug_print(f"LLM returned: {type(result).__name__}")
return {"messages": [result]}
运行时切换调试
while True:
message = input("User: ").strip()
# 切换调试模式
if message.lower() == '/debug':
new_status = toggle_debug_mode()
status = "ENABLED" if new_status else "DISABLED"
print(f"\nDEBUG MODE: {status}\n")
continue
# ... 其他逻辑
高级功能
1. 对话历史持久化
from langgraph.checkpoint.postgres import PostgresSaver
# 使用 PostgreSQL 持久化
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
graph = graph_builder.compile(checkpointer=checkpointer)
2. 工具调用限制
from langchain_core.tools import tool
@tool
def limited_multiply(a: int, b: int) -> str:
"""限制范围的乘法"""
if a > 1000 or b > 1000:
return "数字太大,超出计算范围"
return f"{a} * {b} = {a * b}"
3. 自定义错误处理
def create_error_handling_node():
"""错误处理节点"""
def error_handler(state: State) -> State:
messages = state["messages"]
last_message = messages[-1]
if isinstance(last_message, AIMessage) and hasattr(last_message, 'tool_calls'):
# 检查是否有错误
for tool_call in last_message.tool_calls:
# 执行错误处理逻辑
pass
return state
return error_handler
4. 工具调用验证
def validate_tool_call(tool_name, arguments):
"""验证工具调用参数"""
if tool_name == "divide":
if arguments.get("b", 0) == 0:
raise ValueError("除数不能为零")
if tool_name == "sqrt":
if arguments.get("number", 0) < 0:
raise ValueError("不能计算负数的平方根")
return True
性能优化
1. 工具缓存
from functools import lru_cache
@lru_cache(maxsize=100)
async def cached_tool_call(session, tool_name, arguments):
"""带缓存工具调用"""
return await session.call_tool(tool_name, arguments)
2. 并发工具调用
async def parallel_tool_calls(session, tool_calls):
"""并发调用多个工具"""
tasks = [
session.call_tool(call.name, call.args)
for call in tool_calls
]
results = await asyncio.gather(*tasks)
return results
3. 批量资源加载
async def batch_load_resources(session, uris):
"""批量加载资源"""
tasks = [session.read_resource(uri) for uri in uris]
results = await asyncio.gather(*tasks)
return results
测试策略
单元测试
import pytest
from unittest.mock import Mock, AsyncMock
@pytest.mark.asyncio
async def test_chat_node():
# 创建 mock LLM
mock_llm = AsyncMock()
mock_llm.invoke.return_value = AIMessage(content="Test response")
# 创建状态
state = {
"messages": [HumanMessage(content="Test")]
}
# 测试节点
node = create_chat_node(mock_llm, [], "")
result = node(state)
assert "messages" in result
assert len(result["messages"]) == 1
集成测试
@pytest.mark.asyncio
async def test_graph_execution():
# 创建图
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
graph = await create_graph(session)
# 执行测试
result = await graph.ainvoke(
{"messages": [HumanMessage(content="计算 1 + 1")]},
config={"configurable": {"thread_id": "test"}}
)
assert "messages" in result
assert "2" in result["messages"][-1].content
故障排查
问题 1: 工具调用失败
错误信息:
Tool execution error: Tool not found
解决方案:
- 检查工具名称拼写
- 验证工具是否已加载
- 查看 MCP 服务器日志
问题 2: 状态丢失
问题: 多轮对话无法保持上下文
解决方案:
# 使用相同的 thread_id
config = {"configurable": {"thread_id": "user-session-123"}}
# 所有会话使用相同配置
result = await agent.ainvoke(
{"messages": [HumanMessage(content="...")]},
config=config
)
问题 3: 工具格式错误
错误信息:
Invalid ToolMessage format
解决方案:
- 确保 ToolMessage.content 是字符串
- 实现消息过滤逻辑
- 验证 LLM 返回格式
最佳实践
1. 模块化设计
# 独立的工具管理模块
class MCPToolManager:
def __init__(self, session):
self.session = session
self.tools = None
self.cache = {}
async def load_tools(self):
if not self.tools:
self.tools = await load_mcp_tools(self.session)
return self.tools
2. 配置管理
from pydantic import BaseSettings
class AgentConfig(BaseSettings):
model: str = "deepseek-chat"
temperature: float = 0.0
max_retries: int = 3
debug: bool = False
class Config:
env_file = ".env"
3. 日志记录
import logging
logger = logging.getLogger(__name__)
def setup_logging(level=logging.INFO):
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
4. 监控和指标
import time
from functools import wraps
def measure_time(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
duration = time.time() - start
logger.info(f"{func.__name__} took {duration:.2f}s")
return result
return wrapper
阅读顺序建议
- 01-MCP协议入门指南: 了解 MCP 基本概念和核心组件
- 02-快速构建MCP服务器: 使用 FastMCP 构建服务器
- 03-MCP客户端开发实战: 开发 stdio 和 HTTP 客户端
- 04-LLM与MCP集成实践: 集成到 LangGraph 构建智能代理
- 05-多服务器架构与最佳实践: 多服务器架构和生产部署
总结
本文详细介绍了如何将 MCP 工具集成到 LangGraph 中,包括:
- LangGraph 基础概念和架构
- 状态定义和节点实现
- 工具调用和消息过滤
- 调试功能和错误处理
- 性能优化和最佳实践
在下一篇《多服务器架构与最佳实践》中,我们将学习如何同时连接多个 MCP 服务器,构建更复杂的应用架构。
参考资源
文章标签
LangGraph, LLM集成, MCP集成, 智能代理, 工具调用, 状态管理, Python开发, DeepSeek, LangChain
更多推荐


所有评论(0)