基于Python sdk发布自己的第一个mcp-client
本文介绍了一个MCP客户端实现,用于连接mcp-server进行工具调用和大模型交互。主要功能包括:连接服务器获取工具列表、通过大模型确定调用方法及参数、执行工具并获取返回值、利用大模型进行结果总结,并提供详细日志。文章特别说明可使用阿里百炼API替代OpenAI,并提供了相关参数配置示例。核心代码展示了如何初始化客户端、处理查询请求、管理工具调用流程,以及如何清理资源。该实现支持自动决策是否调用
·
说在前面
上一篇文章发布了一个mcp-server,具体的server是否能被正确的访问到?是否能够得到正常的返回? 在github上找到一个客户端的代码实现,我把里面的大模型调用换成了支持国内大模型的方式,一起来验证一下吧~
主要功能
- 连接
mcp-server - 获取
mcp工具列表 - 调用大模型明确需要调用的方法以及参数
- 执行工具获取返回值
- 调用大模型进行问题总结
- 详尽的日志信息,帮助你更好的了解整个过程
一些说明
关于大模型的选择
文章里用的是open-ai sdk,但是因为万能的阿里云连接国际,所以阿里百炼的api-key也是通用的。百炼给新用户都是有免费的大模型调用额度的,放心使用。
大模型的使用
可以参考阿里百炼的api说明,里面有详细的参数,有兴趣的可以自行拼接尝试。
代码
import asyncio
import json
import os
import sys
from typing import Optional, List, Dict
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv() # load environment variables from .env
class MCPClient:
def __init__(self):
# Initialize session and client objects
# 表示对象可以是None,也可以是ClientSession类型
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai = OpenAI(
api_key="your key your key your key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# Store the context managers so they stay alive
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
# * 表示将streams解包,将其作为参数传递给ClientSession,假设streams是一个元组,
# 那么*streams就等价于ClientSession(stream1, stream2, stream3)
self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()
# Initialize
await self.session.initialize()
# List available tools to verify connection
print("Initialized SSE client...")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("工具列表:", json.dumps(response.model_dump(), ensure_ascii=False, indent=2))
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def cleanup(self):
"""Properly clean up the session and streams"""
if self._session_context:
await self._session_context.__aexit__(None, None, None)
if self._streams_context:
await self._streams_context.__aexit__(None, None, None)
async def process_query(self, query: str) -> str:
"""Process a query using OpenAI and available tools"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
# 转换工具格式以适应OpenAI的函数调用要求
available_tools: List[Dict] = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema # 假设inputSchema符合OpenAI的参数格式要求
}
} for tool in response.tools]
print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 初始OpenAI API调用
response = self.openai.chat.completions.create(
model="qwen3-32b", # 使用OpenAI模型
max_tokens=1000,
messages=messages,
tools=available_tools,
tool_choice="auto", # 自动决定是否调用工具
extra_body={
"enable_thinking": False,
}
)
# 打印大模型决策过程
print("\n===== 大模型工具调用决策 =====")
print(f"原始响应: {json.dumps(response.model_dump(), ensure_ascii=False, indent=2)}")
# 处理响应和工具调用
tool_results = []
final_text = []
response_message = response.choices[0].message
tool_calls = response_message.tool_calls
print(f"是否调用工具: {'是' if tool_calls else '否'}")
# 打印工具调用详情
if tool_calls:
print(f"调用工具数量: {len(tool_calls)}")
for i, call in enumerate(tool_calls):
print(f"工具 {i+1}: {call.function.name}, 参数: {call.function.arguments}")
# 如果有工具调用
if tool_calls:
for tool_call in tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
print(f"\n===== 调用工具 '{tool_name}' =====")
print(f"参数: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")
result = await self.session.call_tool(tool_name, tool_args)
print(f"返回结果: {json.dumps(str(result.content), ensure_ascii=False, indent=2)}")
tool_results.append({"call": tool_name, "result": result})
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用结果添加到对话历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call.model_dump()]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 获取OpenAI的下一次响应
response = self.openai.chat.completions.create(
model="qwen3-32b",
max_tokens=1000,
messages=messages,
extra_body={
"enable_thinking": False,
}
)
final_response = response.choices[0].message.content
if final_response:
final_text.append(final_response)
else:
# 没有工具调用,直接使用响应内容
if response_message.content:
final_text.append(response_message.content)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def main():
# if len(sys.argv) < 2:
# print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")
# sys.exit(1)
client = MCPClient()
try:
# await client.connect_to_sse_server(server_url=sys.argv[1])
await client.connect_to_sse_server(server_url="http://localhost:8080/sse")
await client.chat_loop()
finally:
print("Cleaning up...")
# await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
说到最后
以上。
更多推荐

所有评论(0)