MCP客户端实现
本文介绍了Open-Streamui项目中MCP客户端的完整实现方案,重点解决了四大核心技术挑战:1) 通过MCPService类实现多协议自适应连接,支持SSE和HTTP传输;2) 构建智能工具调用机制,包括预处理、执行和结果格式化;3) 设计MCPServiceSync同步包装器解决异步/同步兼容问题;4) 开发基于AI的智能工具推荐系统,通过语义分析自动匹配最优工具。该方案显著提升了AI应用
Open-Streamui MCP客户端实现:从异步连接到AI集成的完整指南
文章目录
在AI应用开发中,如何让AI模型智能调用外部工具一直是个技术难题。今天,我将分享我们在Open-Streamui项目中实现MCP(Model Context Protocol)客户端的完整技术方案,从基础连接到AI智能调用的每一步都清晰呈现。
引言:为什么我们需要MCP客户端?
在开发AI应用时,我们经常遇到这样的场景:用户问"北京天气怎么样?",AI需要调用天气API。传统的做法是硬编码这些工具调用逻辑,但这样既不灵活也难以扩展。
MCP协议的出现解决了这个问题,它提供了一种标准化的方式让AI模型与外部工具交互。我们的任务就是实现一个强大的MCP客户端,让AI能够智能地选择和使用工具。
第一步:建立稳定的MCP服务器连接
问题分析
MCP服务器可能使用不同的传输协议(SSE、HTTP),我们需要一个能够自适应协议、稳定管理的连接方案。
解决方案:MCPService类
我们设计了MCPService类,核心连接逻辑如下:
async def connect_to_server(self, server_name: str) -> bool:
# 检查配置和连接状态
if server_name not in self.mcp_servers:
self.logger.error(f"MCP服务器 '{server_name}' 未在配置中定义")
return False
# 使用异步锁确保线程安全
async with self._lock:
if server_name in self.sessions:
self.logger.info(f"MCP服务器 '{server_name}' 已连接")
return True
# 根据URL选择传输方式
url = server_config.get("url", "")
if url.endswith("/sse"):
# SSE传输
sse_transport = await exit_stack.enter_async_context(
sse_client(url, headers=headers)
)
read, write = sse_transport
else:
# Streamable HTTP传输
http_transport = await exit_stack.enter_async_context(
streamablehttp_client(url, headers=headers)
)
read, write = http_transport
第二步:实现智能工具调用机制
工具调用流程
当AI模型需要调用工具时,我们的系统会执行以下步骤:
async def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]) -> Any:
# 确保连接
if server_name not in self.sessions:
success = await self.connect_to_server(server_name)
if not success:
raise RuntimeError(f"无法连接到MCP服务器 '{server_name}'")
# 执行调用
session = self.sessions[server_name]
result = await session.call_tool(tool_name, arguments)
# 格式化结果
if isinstance(result, list):
# 处理多内容结果
content_list = []
for content in result:
if hasattr(content, 'text'):
content_list.append(content.text)
return "\n".join(content_list)
else:
return str(result)
工具管理特性
- 工具预加载:连接成功后自动加载工具列表并缓存
- 智能调用:AI模型只需指定工具名称和参数
- 结果处理:自动处理不同格式的返回结果
第三步:解决异步与同步的兼容性问题
问题背景
Streamlit是同步框架,但MCP操作本质上是异步的。我们需要一个桥梁来连接这两个世界。
解决方案:MCPServiceSync同步包装器
class MCPServiceSync:
"""MCP服务同步包装器"""
def __init__(self, config_manager):
self._service = MCPService(config_manager)
# 使用当前线程的事件循环
try:
self._loop = asyncio.get_event_loop()
except RuntimeError:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
def connect_to_server(self, server_name: str) -> bool:
"""同步连接到MCP服务器"""
try:
return self._loop.run_until_complete(
self._service.connect_to_server(server_name)
)
except Exception as e:
self._service.logger.error(f"连接MCP服务器 '{server_name}' 失败: {e}")
return False
设计优势
- 兼容性:让异步MCP服务能在同步环境中使用
- 性能:复用事件循环,避免重复创建
- 简洁性:对外提供简单的同步接口
第四步:AI智能工具选择算法
智能工具检测系统
我们实现了基于AI的智能工具推荐系统,而不是简单的关键词匹配。系统会分析用户意图并推荐最合适的工具:
def auto_detect_mcp_tools(self, user_message: str) -> List[Dict]:
"""自动检测用户消息中可能需要使用的MCP工具"""
# 获取所有可用工具
all_tools = []
for server in self.get_mcp_servers():
server_tools = self.get_mcp_tools(server)
for tool in server_tools:
tool["server"] = server # 添加服务器信息
all_tools.append(tool)
if not all_tools:
return []
# 构建工具描述信息,供AI分析
tools_description = ""
for i, tool in enumerate(all_tools):
tools_description += f"{i+1}. {tool['name']}: {tool['description']}\n"
# 使用AI分析用户意图并推荐工具
system_prompt = f"""你是一个智能工具推荐助手。请分析用户的问题,判断是否需要使用工具,并推荐最合适的工具。
可用的工具列表:
{tools_description}
请根据以下规则进行推荐:
1. 仔细分析用户问题的意图和需求
2. 选择与用户需求最匹配的工具
3. 考虑工具的适用性和相关性
4. 如果用户问题不需要工具,可以返回空列表
5. 最多推荐3个最相关的工具
请用JSON格式返回推荐结果,格式如下:
{{
"recommended_tools": ["tool_name1", "tool_name2", "tool_name3"]
}}
如果不需要工具,返回:
{{
"recommended_tools": []
}}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message}
]
try:
# 调用AI进行分析
response = self.chat_completion(messages)
# 解析AI的JSON响应
import json
import re
# 尝试从响应中提取JSON
json_match = re.search(r'\{[^{}]*\}', response)
if json_match:
json_str = json_match.group()
result = json.loads(json_str)
recommended_tool_names = result.get("recommended_tools", [])
# 根据工具名称找到对应的工具信息
recommended_tools = []
for tool_name in recommended_tool_names:
for tool in all_tools:
if tool["name"] == tool_name:
recommended_tools.append(tool)
break
return recommended_tools[:3] # 最多返回3个推荐工具
else:
# 如果无法解析JSON,使用备用逻辑
return self._fallback_tool_detection(user_message, all_tools)
except Exception as e:
# 如果AI分析失败,使用备用逻辑
return self._fallback_tool_detection(user_message, all_tools)
备用检测逻辑
当AI分析失败时,系统会使用基于语义相似度的备用检测逻辑:
def _fallback_tool_detection(self, user_message: str, all_tools: List[Dict]) -> List[Dict]:
"""备用工具检测逻辑(当AI分析失败时使用)"""
# 工具类型映射
tool_categories = {
"search": ["search", "query", "find", "lookup", "搜索", "查询", "查找"],
"calculation": ["calculate", "math", "compute", "calculator", "计算", "数学", "算数"],
"file": ["file", "document", "read", "write", "文件", "文档", "读取"],
"web": ["web", "http", "api", "internet", "网络", "网页", "在线"],
"time": ["time", "date", "calendar", "schedule", "时间", "日期", "日历"],
"weather": ["weather", "forecast", "temperature", "天气", "预报", "气温"],
"translation": ["translate", "language", "translation", "翻译", "语言"],
"database": ["database", "sql", "query", "db", "数据库", "查询"]
}
# 检测用户意图
user_message_lower = user_message.lower()
detected_categories = []
for category, keywords in tool_categories.items():
if any(keyword in user_message_lower for keyword in keywords):
detected_categories.append(category)
# 根据检测到的类别推荐工具
recommended_tools = []
for tool in all_tools:
tool_name_lower = tool["name"].lower()
tool_desc_lower = tool["description"].lower()
# 检查工具是否匹配检测到的类别
for category in detected_categories:
category_keywords = tool_categories[category]
if any(keyword in tool_name_lower or keyword in tool_desc_lower for keyword in category_keywords):
if tool not in recommended_tools:
recommended_tools.append(tool)
break
# 如果没有匹配到特定类别,但用户问题看起来需要工具,返回所有工具
if not recommended_tools and len(user_message) > 10:
return all_tools[:5]
return recommended_tools[:5] # 最多返回5个推荐工具
智能对话生成
当AI检测到需要调用工具时,会生成包含工具调用的回答:
def generate_chat_response_with_mcp(self, user_message: str, chat_history: Optional[List] = None) -> Dict:
"""生成包含MCP工具调用的聊天回答"""
# 自动检测可能需要使用的工具
recommended_tools = self.auto_detect_mcp_tools(user_message)
# 构建包含工具调用的AI回答
if recommended_tools:
# 让AI生成包含工具调用的回答
response = self._generate_response_with_tools(user_message, recommended_tools)
return {
"response": response,
"tool_calls": recommended_tools
}
else:
# 普通AI回答
return {
"response": self.generate_chat_response(user_message, chat_history),
"tool_calls": []
}
第五步:前端用户体验设计
工具调用详情显示
为了让用户直观地看到工具调用过程,我们实现了:
- 实时显示:工具调用时立即显示进度
- 结果持久化:调用详情保存在对话历史中
- 可展开面板:用户可以查看详细的调用信息
配置界面功能
- 服务器管理:添加、编辑、删除MCP服务器
- 连接测试:一键测试服务器连接状态
- 工具预览:显示每个服务器的可用工具列表
实际应用场景演示
场景1:天气查询
# 用户:"北京天气怎么样?"
# AI自动检测到需要天气工具
# 调用weather_server的get_weather工具
result = ai_service.generate_chat_response_with_mcp("北京天气怎么样?")
# 返回:"北京今天晴,温度15-25度"
项目成果与未来展望
通过这个MCP客户端实现,我们成功地将MCP协议集成到了Open-Streamui项目中,实现了:
- 标准化工具调用:统一的工具调用接口
- 智能AI集成:AI模型能够自动选择合适的工具
- 优秀用户体验:直观的工具调用界面
- 高度可扩展:易于添加新的MCP服务器和工具
未来优化方向
- 更智能的工具推荐算法:基于语义理解而非关键词匹配
- 支持更多的MCP协议特性:如工具调用链、批量调用等
这个实现不仅解决了当前的技术需求,更为项目的长期发展奠定了坚实的基础。希望这个技术分享对你在AI应用开发中集成外部工具有所帮助!
本文是Open-Streamui项目技术分享系列的一部分,欢迎关注我们的GitHub仓库获取更多技术内容。
更多推荐



所有评论(0)