说在前面

上一篇文章发布了一个mcp-server,具体的server是否能被正确的访问到?是否能够得到正常的返回? 在github上找到一个客户端的代码实现,我把里面的大模型调用换成了支持国内大模型的方式,一起来验证一下吧~

主要功能

  • 连接mcp-server
  • 获取mcp 工具列表
  • 调用大模型明确需要调用的方法以及参数
  • 执行工具获取返回值
  • 调用大模型进行问题总结
  • 详尽的日志信息,帮助你更好的了解整个过程

一些说明

关于大模型的选择

文章里用的是open-ai sdk,但是因为万能的阿里云连接国际,所以阿里百炼的api-key也是通用的。百炼给新用户都是有免费的大模型调用额度的,放心使用。

大模型的使用

可以参考阿里百炼的api说明,里面有详细的参数,有兴趣的可以自行拼接尝试。

代码

import asyncio
import json
import os
import sys
from typing import Optional, List, Dict
from contextlib import AsyncExitStack

from mcp import ClientSession
from mcp.client.sse import sse_client

from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        # 表示对象可以是None,也可以是ClientSession类型
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai = OpenAI(
            api_key="your key your key your key",
            base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        )

    async def connect_to_sse_server(self, server_url: str):
        """Connect to an MCP server running with SSE transport"""
        # Store the context managers so they stay alive
        self._streams_context = sse_client(url=server_url)
        streams = await self._streams_context.__aenter__()
        # * 表示将streams解包,将其作为参数传递给ClientSession,假设streams是一个元组,
        # 那么*streams就等价于ClientSession(stream1, stream2, stream3)

        self._session_context = ClientSession(*streams)
        self.session: ClientSession = await self._session_context.__aenter__()

        # Initialize
        await self.session.initialize()

        # List available tools to verify connection
        print("Initialized SSE client...")
        print("Listing tools...")
        response = await self.session.list_tools()
        tools = response.tools
        print("工具列表:", json.dumps(response.model_dump(), ensure_ascii=False, indent=2))
        print("\nConnected to server with tools:", [tool.name for tool in tools])

    async def cleanup(self):
        """Properly clean up the session and streams"""
        if self._session_context:
            await self._session_context.__aexit__(None, None, None)
        if self._streams_context:
            await self._streams_context.__aexit__(None, None, None)

    async def process_query(self, query: str) -> str:
        """Process a query using OpenAI and available tools"""
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]

        response = await self.session.list_tools()
        # 转换工具格式以适应OpenAI的函数调用要求
        available_tools: List[Dict] = [{ 
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description,
                "parameters": tool.inputSchema  # 假设inputSchema符合OpenAI的参数格式要求
            }
        } for tool in response.tools]
        print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
        # 初始OpenAI API调用
        response = self.openai.chat.completions.create(
            model="qwen3-32b",  # 使用OpenAI模型
            max_tokens=1000,
            messages=messages,
            tools=available_tools,
            tool_choice="auto",  # 自动决定是否调用工具
            extra_body={
                "enable_thinking": False,
            }
        )
        # 打印大模型决策过程
        print("\n===== 大模型工具调用决策 =====")
        print(f"原始响应: {json.dumps(response.model_dump(), ensure_ascii=False, indent=2)}")
        # 处理响应和工具调用
        tool_results = []
        final_text = []

        response_message = response.choices[0].message
        tool_calls = response_message.tool_calls

        print(f"是否调用工具: {'是' if tool_calls else '否'}")
        # 打印工具调用详情
        if tool_calls:
            print(f"调用工具数量: {len(tool_calls)}")
            for i, call in enumerate(tool_calls):
                print(f"工具 {i+1}: {call.function.name}, 参数: {call.function.arguments}")

        # 如果有工具调用
        if tool_calls:
            for tool_call in tool_calls:
                tool_name = tool_call.function.name
                tool_args = json.loads(tool_call.function.arguments)
                
                # 执行工具调用
                print(f"\n===== 调用工具 '{tool_name}' =====")
                print(f"参数: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")
                result = await self.session.call_tool(tool_name, tool_args)
                
                print(f"返回结果: {json.dumps(str(result.content), ensure_ascii=False, indent=2)}")
                
                tool_results.append({"call": tool_name, "result": result})
                final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

                # 将工具调用结果添加到对话历史
                messages.append({
                    "role": "assistant",
                    "content": None,
                    "tool_calls": [tool_call.model_dump()]
                })
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": str(result.content)
                })
                
                print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
                # 获取OpenAI的下一次响应
                response = self.openai.chat.completions.create(
                    model="qwen3-32b",
                    max_tokens=1000,
                    messages=messages,
                    extra_body={
                        "enable_thinking": False,
                    }
                )

                final_response = response.choices[0].message.content
                if final_response:
                    final_text.append(final_response)
        else:
            # 没有工具调用,直接使用响应内容
            if response_message.content:
                final_text.append(response_message.content)

        return "\n".join(final_text)
    

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")
        
        while True:
            try:
                query = input("\nQuery: ").strip()
                
                if query.lower() == 'quit':
                    break
                    
                response = await self.process_query(query)
                print("\n" + response)
                    
            except Exception as e:
                print(f"\nError: {str(e)}")


async def main():
    # if len(sys.argv) < 2:
    #     print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")
    #     sys.exit(1)

    client = MCPClient()
    try:
        # await client.connect_to_sse_server(server_url=sys.argv[1])
        await client.connect_to_sse_server(server_url="http://localhost:8080/sse")
        await client.chat_loop()
    finally:
        print("Cleaning up...")

        # await client.cleanup()


if __name__ == "__main__":
    asyncio.run(main())

说到最后

以上。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐