Open-Streamui MCP客户端实现:从异步连接到AI集成的完整指南

在AI应用开发中,如何让AI模型智能调用外部工具一直是个技术难题。今天,我将分享我们在Open-Streamui项目中实现MCP(Model Context Protocol)客户端的完整技术方案,从基础连接到AI智能调用的每一步都清晰呈现。

引言:为什么我们需要MCP客户端?

在开发AI应用时,我们经常遇到这样的场景:用户问"北京天气怎么样?",AI需要调用天气API。传统的做法是硬编码这些工具调用逻辑,但这样既不灵活也难以扩展。

MCP协议的出现解决了这个问题,它提供了一种标准化的方式让AI模型与外部工具交互。我们的任务就是实现一个强大的MCP客户端,让AI能够智能地选择和使用工具。

第一步:建立稳定的MCP服务器连接

问题分析

MCP服务器可能使用不同的传输协议(SSE、HTTP),我们需要一个能够自适应协议、稳定管理的连接方案。

解决方案:MCPService类

我们设计了MCPService类,核心连接逻辑如下:

async def connect_to_server(self, server_name: str) -> bool:
    # 检查配置和连接状态
    if server_name not in self.mcp_servers:
        self.logger.error(f"MCP服务器 '{server_name}' 未在配置中定义")
        return False
    
    # 使用异步锁确保线程安全
    async with self._lock:
        if server_name in self.sessions:
            self.logger.info(f"MCP服务器 '{server_name}' 已连接")
            return True
        
        # 根据URL选择传输方式
        url = server_config.get("url", "")
        if url.endswith("/sse"):
            # SSE传输
            sse_transport = await exit_stack.enter_async_context(
                sse_client(url, headers=headers)
            )
            read, write = sse_transport
        else:
            # Streamable HTTP传输
            http_transport = await exit_stack.enter_async_context(
                streamablehttp_client(url, headers=headers)
            )
            read, write = http_transport

第二步:实现智能工具调用机制

工具调用流程

当AI模型需要调用工具时,我们的系统会执行以下步骤:

async def call_tool(self, server_name: str, tool_name: str, arguments: Dict[str, Any]) -> Any:
    # 确保连接
    if server_name not in self.sessions:
        success = await self.connect_to_server(server_name)
        if not success:
            raise RuntimeError(f"无法连接到MCP服务器 '{server_name}'")
    
    # 执行调用
    session = self.sessions[server_name]
    result = await session.call_tool(tool_name, arguments)
    
    # 格式化结果
    if isinstance(result, list):
        # 处理多内容结果
        content_list = []
        for content in result:
            if hasattr(content, 'text'):
                content_list.append(content.text)
        return "\n".join(content_list)
    else:
        return str(result)

工具管理特性

  • 工具预加载:连接成功后自动加载工具列表并缓存
  • 智能调用:AI模型只需指定工具名称和参数
  • 结果处理:自动处理不同格式的返回结果

第三步:解决异步与同步的兼容性问题

问题背景

Streamlit是同步框架,但MCP操作本质上是异步的。我们需要一个桥梁来连接这两个世界。

解决方案:MCPServiceSync同步包装器

class MCPServiceSync:
    """MCP服务同步包装器"""
    
    def __init__(self, config_manager):
        self._service = MCPService(config_manager)
        # 使用当前线程的事件循环
        try:
            self._loop = asyncio.get_event_loop()
        except RuntimeError:
            self._loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self._loop)
    
    def connect_to_server(self, server_name: str) -> bool:
        """同步连接到MCP服务器"""
        try:
            return self._loop.run_until_complete(
                self._service.connect_to_server(server_name)
            )
        except Exception as e:
            self._service.logger.error(f"连接MCP服务器 '{server_name}' 失败: {e}")
            return False

设计优势

  • 兼容性:让异步MCP服务能在同步环境中使用
  • 性能:复用事件循环,避免重复创建
  • 简洁性:对外提供简单的同步接口

第四步:AI智能工具选择算法

智能工具检测系统

我们实现了基于AI的智能工具推荐系统,而不是简单的关键词匹配。系统会分析用户意图并推荐最合适的工具:

def auto_detect_mcp_tools(self, user_message: str) -> List[Dict]:
    """自动检测用户消息中可能需要使用的MCP工具"""
    
    # 获取所有可用工具
    all_tools = []
    for server in self.get_mcp_servers():
        server_tools = self.get_mcp_tools(server)
        for tool in server_tools:
            tool["server"] = server  # 添加服务器信息
            all_tools.append(tool)
    
    if not all_tools:
        return []
    
    # 构建工具描述信息,供AI分析
    tools_description = ""
    for i, tool in enumerate(all_tools):
        tools_description += f"{i+1}. {tool['name']}: {tool['description']}\n"
    
    # 使用AI分析用户意图并推荐工具
    system_prompt = f"""你是一个智能工具推荐助手。请分析用户的问题,判断是否需要使用工具,并推荐最合适的工具。

可用的工具列表:
{tools_description}

请根据以下规则进行推荐:
1. 仔细分析用户问题的意图和需求
2. 选择与用户需求最匹配的工具
3. 考虑工具的适用性和相关性
4. 如果用户问题不需要工具,可以返回空列表
5. 最多推荐3个最相关的工具

请用JSON格式返回推荐结果,格式如下:
{{
    "recommended_tools": ["tool_name1", "tool_name2", "tool_name3"]
}}

如果不需要工具,返回:
{{
    "recommended_tools": []
}}
"""
    
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_message}
    ]
    
    try:
        # 调用AI进行分析
        response = self.chat_completion(messages)
        
        # 解析AI的JSON响应
        import json
        import re
        
        # 尝试从响应中提取JSON
        json_match = re.search(r'\{[^{}]*\}', response)
        if json_match:
            json_str = json_match.group()
            result = json.loads(json_str)
            recommended_tool_names = result.get("recommended_tools", [])
            
            # 根据工具名称找到对应的工具信息
            recommended_tools = []
            for tool_name in recommended_tool_names:
                for tool in all_tools:
                    if tool["name"] == tool_name:
                        recommended_tools.append(tool)
                        break
            
            return recommended_tools[:3]  # 最多返回3个推荐工具
        else:
            # 如果无法解析JSON,使用备用逻辑
            return self._fallback_tool_detection(user_message, all_tools)
            
    except Exception as e:
        # 如果AI分析失败,使用备用逻辑
        return self._fallback_tool_detection(user_message, all_tools)

备用检测逻辑

当AI分析失败时,系统会使用基于语义相似度的备用检测逻辑:

def _fallback_tool_detection(self, user_message: str, all_tools: List[Dict]) -> List[Dict]:
    """备用工具检测逻辑(当AI分析失败时使用)"""
    
    # 工具类型映射
    tool_categories = {
        "search": ["search", "query", "find", "lookup", "搜索", "查询", "查找"],
        "calculation": ["calculate", "math", "compute", "calculator", "计算", "数学", "算数"],
        "file": ["file", "document", "read", "write", "文件", "文档", "读取"],
        "web": ["web", "http", "api", "internet", "网络", "网页", "在线"],
        "time": ["time", "date", "calendar", "schedule", "时间", "日期", "日历"],
        "weather": ["weather", "forecast", "temperature", "天气", "预报", "气温"],
        "translation": ["translate", "language", "translation", "翻译", "语言"],
        "database": ["database", "sql", "query", "db", "数据库", "查询"]
    }
    
    # 检测用户意图
    user_message_lower = user_message.lower()
    detected_categories = []
    for category, keywords in tool_categories.items():
        if any(keyword in user_message_lower for keyword in keywords):
            detected_categories.append(category)
    
    # 根据检测到的类别推荐工具
    recommended_tools = []
    for tool in all_tools:
        tool_name_lower = tool["name"].lower()
        tool_desc_lower = tool["description"].lower()
        
        # 检查工具是否匹配检测到的类别
        for category in detected_categories:
            category_keywords = tool_categories[category]
            if any(keyword in tool_name_lower or keyword in tool_desc_lower for keyword in category_keywords):
                if tool not in recommended_tools:
                    recommended_tools.append(tool)
                    break
    
    # 如果没有匹配到特定类别,但用户问题看起来需要工具,返回所有工具
    if not recommended_tools and len(user_message) > 10:
        return all_tools[:5]
    
    return recommended_tools[:5]  # 最多返回5个推荐工具

智能对话生成

当AI检测到需要调用工具时,会生成包含工具调用的回答:

def generate_chat_response_with_mcp(self, user_message: str, chat_history: Optional[List] = None) -> Dict:
    """生成包含MCP工具调用的聊天回答"""
    
    # 自动检测可能需要使用的工具
    recommended_tools = self.auto_detect_mcp_tools(user_message)
    
    # 构建包含工具调用的AI回答
    if recommended_tools:
        # 让AI生成包含工具调用的回答
        response = self._generate_response_with_tools(user_message, recommended_tools)
        return {
            "response": response,
            "tool_calls": recommended_tools
        }
    else:
        # 普通AI回答
        return {
            "response": self.generate_chat_response(user_message, chat_history),
            "tool_calls": []
        }

第五步:前端用户体验设计

工具调用详情显示

为了让用户直观地看到工具调用过程,我们实现了:

  • 实时显示:工具调用时立即显示进度
  • 结果持久化:调用详情保存在对话历史中
  • 可展开面板:用户可以查看详细的调用信息

配置界面功能

  • 服务器管理:添加、编辑、删除MCP服务器
  • 连接测试:一键测试服务器连接状态
  • 工具预览:显示每个服务器的可用工具列表

实际应用场景演示

场景1:天气查询

# 用户:"北京天气怎么样?"
# AI自动检测到需要天气工具
# 调用weather_server的get_weather工具
result = ai_service.generate_chat_response_with_mcp("北京天气怎么样?")
# 返回:"北京今天晴,温度15-25度"

项目成果与未来展望

通过这个MCP客户端实现,我们成功地将MCP协议集成到了Open-Streamui项目中,实现了:

  1. 标准化工具调用:统一的工具调用接口
  2. 智能AI集成:AI模型能够自动选择合适的工具
  3. 优秀用户体验:直观的工具调用界面
  4. 高度可扩展:易于添加新的MCP服务器和工具

未来优化方向

  • 更智能的工具推荐算法:基于语义理解而非关键词匹配
  • 支持更多的MCP协议特性:如工具调用链、批量调用等

这个实现不仅解决了当前的技术需求,更为项目的长期发展奠定了坚实的基础。希望这个技术分享对你在AI应用开发中集成外部工具有所帮助!


本文是Open-Streamui项目技术分享系列的一部分,欢迎关注我们的GitHub仓库获取更多技术内容。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐