基于Python sdk发布自己的第一个mcp-client
本文介绍了一个MCP客户端实现,用于连接mcp-server进行工具调用和大模型交互。主要功能包括:连接服务器获取工具列表、通过大模型确定调用方法及参数、执行工具并获取返回值、利用大模型进行结果总结,并提供详细日志。文章特别说明可使用阿里百炼API替代OpenAI,并提供了相关参数配置示例。核心代码展示了如何初始化客户端、处理查询请求、管理工具调用流程,以及如何清理资源。该实现支持自动决策是否调用
·
说在前面
上一篇文章发布了一个mcp-server
,具体的server
是否能被正确的访问到?是否能够得到正常的返回? 在github
上找到一个客户端的代码实现,我把里面的大模型调用换成了支持国内大模型的方式,一起来验证一下吧~
主要功能
- 连接
mcp-server
- 获取
mcp
工具列表 - 调用大模型明确需要调用的方法以及参数
- 执行工具获取返回值
- 调用大模型进行问题总结
- 详尽的日志信息,帮助你更好的了解整个过程
一些说明
关于大模型的选择
文章里用的是open-ai sdk
,但是因为万能的阿里云连接国际,所以阿里百炼的api-key
也是通用的。百炼给新用户都是有免费的大模型调用额度的,放心使用。
大模型的使用
可以参考阿里百炼的api说明,里面有详细的参数,有兴趣的可以自行拼接尝试。
代码
import asyncio
import json
import os
import sys
from typing import Optional, List, Dict
from contextlib import AsyncExitStack
from mcp import ClientSession
from mcp.client.sse import sse_client
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv() # load environment variables from .env
class MCPClient:
def __init__(self):
# Initialize session and client objects
# 表示对象可以是None,也可以是ClientSession类型
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai = OpenAI(
api_key="your key your key your key",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# Store the context managers so they stay alive
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
# * 表示将streams解包,将其作为参数传递给ClientSession,假设streams是一个元组,
# 那么*streams就等价于ClientSession(stream1, stream2, stream3)
self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()
# Initialize
await self.session.initialize()
# List available tools to verify connection
print("Initialized SSE client...")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("工具列表:", json.dumps(response.model_dump(), ensure_ascii=False, indent=2))
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def cleanup(self):
"""Properly clean up the session and streams"""
if self._session_context:
await self._session_context.__aexit__(None, None, None)
if self._streams_context:
await self._streams_context.__aexit__(None, None, None)
async def process_query(self, query: str) -> str:
"""Process a query using OpenAI and available tools"""
messages = [
{
"role": "user",
"content": query
}
]
response = await self.session.list_tools()
# 转换工具格式以适应OpenAI的函数调用要求
available_tools: List[Dict] = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema # 假设inputSchema符合OpenAI的参数格式要求
}
} for tool in response.tools]
print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 初始OpenAI API调用
response = self.openai.chat.completions.create(
model="qwen3-32b", # 使用OpenAI模型
max_tokens=1000,
messages=messages,
tools=available_tools,
tool_choice="auto", # 自动决定是否调用工具
extra_body={
"enable_thinking": False,
}
)
# 打印大模型决策过程
print("\n===== 大模型工具调用决策 =====")
print(f"原始响应: {json.dumps(response.model_dump(), ensure_ascii=False, indent=2)}")
# 处理响应和工具调用
tool_results = []
final_text = []
response_message = response.choices[0].message
tool_calls = response_message.tool_calls
print(f"是否调用工具: {'是' if tool_calls else '否'}")
# 打印工具调用详情
if tool_calls:
print(f"调用工具数量: {len(tool_calls)}")
for i, call in enumerate(tool_calls):
print(f"工具 {i+1}: {call.function.name}, 参数: {call.function.arguments}")
# 如果有工具调用
if tool_calls:
for tool_call in tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具调用
print(f"\n===== 调用工具 '{tool_name}' =====")
print(f"参数: {json.dumps(tool_args, ensure_ascii=False, indent=2)}")
result = await self.session.call_tool(tool_name, tool_args)
print(f"返回结果: {json.dumps(str(result.content), ensure_ascii=False, indent=2)}")
tool_results.append({"call": tool_name, "result": result})
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
# 将工具调用结果添加到对话历史
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [tool_call.model_dump()]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result.content)
})
print(f"大模型调用详情: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 获取OpenAI的下一次响应
response = self.openai.chat.completions.create(
model="qwen3-32b",
max_tokens=1000,
messages=messages,
extra_body={
"enable_thinking": False,
}
)
final_response = response.choices[0].message.content
if final_response:
final_text.append(final_response)
else:
# 没有工具调用,直接使用响应内容
if response_message.content:
final_text.append(response_message.content)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def main():
# if len(sys.argv) < 2:
# print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")
# sys.exit(1)
client = MCPClient()
try:
# await client.connect_to_sse_server(server_url=sys.argv[1])
await client.connect_to_sse_server(server_url="http://localhost:8080/sse")
await client.chat_loop()
finally:
print("Cleaning up...")
# await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
说到最后
以上。
更多推荐
所有评论(0)