open webui源码分析14-MCP
本文以一个简单的 mcp工具为例,对于在open webui中如何通过MCP集成外部服务进行串联讲解,并对系统中的相关代码进行分析
open webui具有与MCP的集成能力,从而使open webui有更强大的外部工具调用能力,本文仍以一个简单的 time外部工具,对于在open webui中如何通过MCP集成外部服务进行串联讲解,并对系统中的相关代码进行分析,让你打开open webui的MCP能力,为使用agent奠定基础。
1.架构分析
open webui与MCP集成式通过一个名为mcpo(MCP-to-OpenAPI proxy server)的组件实现的,mcpo作为一个服务来运行,该服务可承载各种MCP工具(架构与Pipelines类似,只不过Pipelines服务承载的是流水线,mcpo承载的是MCP 工具),mcpo架构如下图所示:
非容器化部署时,一个mcpo作为服务可以承载多个MCP tool;容器化部署时,一般一个mcpo承载一个MCP tool。本文以非容器化部署为例。
2.与MCP集成
2.1运行环境准备
运行MCP工具之前,需要先安装uv和mcpo
#pip install uv mcpo
2.2运行MCP工具
以time MCP工具为例。查看https://github.com/modelcontextprotocol/servers/blob/main/src/time/README.md文件,
从这里看到time工具的名字为mcp_server_time,加载MCP tool:
#uvx mcpo --port 8000 -- uvx mcp-server-time
加载成功后,访问http://{ip:8000}/docs,可查看该工具的API说明:
2.3配置MCP工具
在open webui中增加MCP工具,通过【管理员面板】->【设置】->【工具】进入MCP Tool管理页面,点击【+】,填写刚刚启动的mcpo地址,并保存:
2.4使用MCP工具
在对话页面,点击【+】,可看到新增加MCP tool列表:
启用mcp time tool,就可以在对话中使用该工具中
提问时间相关问题,可见调用了该MCP工具:
3.MCP相关源码
3.1增加MCP工具
增加MCP工具时,请求数据如下:
#对应 http://192.168.21.201:8080/api/v1/configs/tool_servers—POST
{
"TOOL_SERVER_CONNECTIONS": [
{
"url": "http://192.168.21.201:8000",
"path": "openapi.json",
"auth_type": "bearer",
"key": "",
"config": {
"enable": true,
"access_control": null
},
"info": {
"name": "mcp time tool",
"description": "本工具可以获取当前时间以及不同时区时间转换"
}
}
]
}
对应方法为config.py文件中set_tool_servers_config,代码如下:
该方法很简单,主要是调用get_tool_servers_data方法获取所有mcpo服务中工具并赋值到全局变量中,MCP工具并未持久化存储。
@router.post("/tool_servers", response_model=ToolServersConfigForm)
async def set_tool_servers_config(
request: Request,
form_data: ToolServersConfigForm,
user=Depends(get_admin_user),
):'''
把MCPO连接信息,也就是请求中的TOOL_SERVER_CONNECTIONS保存到全局变量
request.app.state.config.TOOL_SERVER_CONNECTIONS
'''
#遍历mcpo服务,获取所有可用的MCP工具规格信息并赋值给全局变量
request.app.state.config.TOOL_SERVER_CONNECTIONS = [
connection.model_dump() for connection in form_data.TOOL_SERVER_CONNECTIONS
]'''
所有可用MCP工具规格信息保存在全局变量request.app.state.TOOL_SERVERS中,每个工具规格信息内容如下:
{
"idx": idx,
"url": server.get("url"),
"openapi": openapi_data,
"info": response.get("info"),
"specs": response.get("specs"), #工具函数标准定义
}'''
request.app.state.TOOL_SERVERS = await get_tool_servers_data(
request.app.state.config.TOOL_SERVER_CONNECTIONS
)return {#返回MCOP连接信息
"TOOL_SERVER_CONNECTIONS": request.app.state.config.TOOL_SERVER_CONNECTIONS,
}
下面重点对get_tool_servers_data方法源码进行分析:
本方法办理所有被启用的mcpo服务,mcpo返回工具规格信息。
async def get_tool_servers_data(
servers: List[Dict[str, Any]], session_token: Optional[str] = None
) -> List[Dict[str, Any]]:
# Prepare list of enabled servers along with their original index
server_entries = []
for idx, server in enumerate(servers): #遍历所有mcpo服务列表,归入server_entries
if server.get("config", {}).get("enable"):#仅针对被启用的mcpo服务进行处理
#openai spec路径可以是全路径,也可以是相对路径
openapi_path = server.get("path", "openapi.json")
if "://" in openapi_path: //全路径
full_url = openapi_path
else:
if not openapi_path.startswith("/"):#相对路径,需要拼接前面的{url}
openapi_path = f"/{openapi_path}"full_url = f"{server.get('url')}{openapi_path}"
info = server.get("info", {})
'''
获取认证信息,认证类型可以是bearer,也可以是session。如果是bearer,则使用
表单中的{key},否则使用调用参数传入的session_key
'''
auth_type = server.get("auth_type", "bearer")
token = Noneif auth_type == "bearer":
token = server.get("key", "")
elif auth_type == "session":
token = session_token
server_entries.append((idx, server, full_url, info, token))# 创建异步任务,从所有的mcpo服务列表获取MCP工具
tasks = [
get_tool_server_data(token, url) for (_, _, url, _, token) in server_entries
]#异步执行并汇总所有异步任务执行结果
responses = await asyncio.gather(*tasks, return_exceptions=True)# Build final results with index and server metadata
results = []
for (idx, server, url, info, _), response in zip(server_entries, responses):
if isinstance(response, Exception):#防错处理,对出错的mcpo不予处理
log.error(f"Failed to connect to {url} OpenAPI tool server")
continue'''
openapi_data为open-webui调用mcpo获取工具返回的数据,示例如下:
{ "openapi": "3.1.0", "info": { "title": "mcp-time", "description": "mcp-time MCP Server", "version": "1.13.1" }, "paths": { "/get_current_time": { "post": { "summary": "Get Current Time", "description": "Get current time in a specific timezones", "operationId": "tool_get_current_time_post", "requestBody": { "content": { "application/json": { "schema": { "$ref": "#/components/schemas/get_current_time_form_model" } } }, "required": true }, "responses": { "200": { "description": "Successful Response", "content": { "application/json": { "schema": { "title": "Response Tool Get Current Time Post" } } } }, "422": { "description": "Validation Error", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/HTTPValidationError" } } } } } } }, "/convert_time": { "post": { "summary": "Convert Time", "description": "Convert time between timezones", "operationId": "tool_convert_time_post", "requestBody": { "content": { "application/json": { "schema": { "$ref": "#/components/schemas/convert_time_form_model" } } }, "required": true }, "responses": { "200": { "description": "Successful Response", "content": { "application/json": { "schema": { "title": "Response Tool Convert Time Post" } } } }, "422": { "description": "Validation Error", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/HTTPValidationError" } } } } } } } }, "components": { "schemas": { "HTTPValidationError": { "properties": { "detail": { "items": { "$ref": "#/components/schemas/ValidationError" }, "type": "array", "title": "Detail" } }, "type": "object", "title": "HTTPValidationError" }, "ValidationError": { "properties": { "loc": { "items": { "anyOf": [ { "type": "string" }, { "type": "integer" } ] }, "type": "array", "title": "Location" }, "msg": { "type": "string", "title": "Message" }, "type": { "type": "string", "title": "Error Type" } }, "type": "object", "required": [ "loc", "msg", "type" ], "title": "ValidationError" }, "convert_time_form_model": { "properties": { "source_timezone": { "type": "string", "title": "Source Timezone", "description": "Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no source timezone provided by the user." }, "time": { "type": "string", "title": "Time", "description": "Time to convert in 24-hour format (HH:MM)" }, "target_timezone": { "type": "string", "title": "Target Timezone", "description": "Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use 'America/New_York' as local timezone if no target timezone provided by the user." } }, "type": "object", "required": [ "source_timezone", "time", "target_timezone" ], "title": "convert_time_form_model" }, "get_current_time_form_model": { "properties": { "timezone": { "type": "string", "title": "Timezone", "description": "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no timezone provided by the user." } }, "type": "object", "required": [ "timezone" ], "title": "get_current_time_form_model" } } } }
'''
openapi_data = response.get("openapi", {})
if info and isinstance(openapi_data, dict):#用请求中的name和description赋值
if "name" in info:
openapi_data["info"]["title"] = info.get("name", "Tool Server")if "description" in info:
openapi_data["info"]["description"] = info.get("description", "")results.append(
{#工具规格信息
"idx": idx, #对应工具索引
"url": server.get("url"), #工具远程地址
"openapi": openapi_data, #调用工具返回的工具信息
"info": response.get("info"),#工具描述信息
"specs": response.get("specs"), #工具函数标准定义'''
一个函数的规格信息如下:
{ "type": "function", "name": "tool_get_current_time_post", "description": "Get current time in a specific timezones", "parameters": { "type": "object", "properties": { "timezone": { "type": "string", "title": "Timezone", "description": "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no timezone provided by the user." } }, "required": ["timezone"] } }
'''
}
)return results
3.2工具调用
在会话中选择MCP工具后,发起对话时请求如下:
#对应http://{ip:port}/api/chat/completions
{
"stream": true,
"model": "qwen3:1.7b",
"messages": [
{
"role": "user",
"content": "北京当前时间?"
}
],
"params": {},
"tool_ids": [ #关注这里。MCP工具在这里,实际是mcpo信息 。
"server:0"
],
"tool_servers": [],#本地自有工具在这里
"features": {
"image_generation": false,
"code_interpreter": false,
"web_search": false,
"memory": false
},
"variables": {
"{{USER_NAME}}": "acaluis",
"{{USER_LOCATION}}": "Unknown",
"{{CURRENT_DATETIME}}": "2025-09-02 11:18:47",
"{{CURRENT_DATE}}": "2025-09-02",
"{{CURRENT_TIME}}": "11:18:47",
"{{CURRENT_WEEKDAY}}": "Tuesday",
"{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
"{{USER_LANGUAGE}}": "zh-CN"
},
"model_item": {
"id": "qwen3:1.7b",
"name": "qwen3:1.7b",
"object": "model",
"created": 1756782836,
"owned_by": "ollama",
"ollama": {
"name": "qwen3:1.7b",
"model": "qwen3:1.7b",
"modified_at": "2025-08-20T03:50:50.085066919Z",
"size": 1359293444,
"digest": "8f68893c685c3ddff2aa3fffce2aa60a30bb2da65ca488b61fff134a4d1730e7",
"details": {
"parent_model": "",
"format": "gguf",
"family": "qwen3",
"families": [
"qwen3"
],
"parameter_size": "2.0B",
"quantization_level": "Q4_K_M"
},
"connection_type": "local",
"urls": [
0
]
},
"connection_type": "local",
"tags": [],
"actions": [],
"filters": []
},
"session_id": "QpBpP8qJic_kvTBNAAAP",
"chat_id": "0c8a0fb6-c1cb-42e7-a269-26f5e4a581d6",
"id": "648a0d1f-bded-4146-905b-ead4cee58c9a",
"background_tasks": {
"title_generation": true,
"tags_generation": true,
"follow_up_generation": true
}
}
相关入口代码在process_chat_payload:
该方法中涉及MCP工具的代码很简单,就是调用 get_tools,后继代码前面已经陆续做过分析,不再赘述。
async def process_chat_payload(request, form_data, user, metadata, model):
……
#处理MCP 工具
tool_ids = metadata.get("tool_ids", None) #从表单中获取MCP 工具列表
# 处理自有工具
tool_servers = metadata.get("tool_servers", None)log.debug(f"{tool_ids=}")
log.debug(f"{tool_servers=}")tools_dict = {}
if tool_ids:#根据表单中的tool_ids获取对应的工具,get_tools方法下面分析
tools_dict = get_tools(
request,
tool_ids,
user,
{
**extra_params,
"__model__": models[task_model_id],
"__messages__": form_data["messages"],
"__files__": metadata.get("files", []),
},
)……
get_tools方法代码如下:
本方法从可用的MCP工具列表中获取用户在对话中指定的所有工具的所有方法,返回dict,每个字典项是一个工具中的函数。主要逻辑是一个嵌套循环,外循环为mcpo列表,内循环为一个工具的所有方法列表。
def get_tools(
request: Request, tool_ids: list[str], user: UserModel, extra_params: dict
) -> dict[str, dict]:
tools_dict = {}for tool_id in tool_ids:#遍历请求中mcpo
tool = Tools.get_tool_by_id(tool_id)#MCP工具未做持久化存储
if tool is None:
if tool_id.startswith("server:"):#看请求数据,使用MCP工具时走这个分支
server_idx = int(tool_id.split(":")[1])
tool_server_connection = (#根据mcpo索引定位mcop服务
request.app.state.config.TOOL_SERVER_CONNECTIONS[server_idx]
)
tool_server_data = None#获取指定mcpo承载的工具信息
for server in request.app.state.TOOL_SERVERS:
if server["idx"] == server_idx:
tool_server_data = server
break
assert tool_server_data is not None
specs = tool_server_data.get("specs", [])for spec in specs:#遍历工具中的所有接口
function_name = spec["name"] #设置函数名# 以下是认证相关处理,根据认证类型设置令牌
auth_type = tool_server_connection.get("auth_type", "bearer")
token = Noneif auth_type == "bearer":
token = tool_server_connection.get("key", "")
elif auth_type == "session":
token = request.state.token.credentials#该方法返回一个异步函数,该异步函数执行最终的函数调用
def make_tool_function(function_name, token, tool_server_data):
async def tool_function(**kwargs):
return await execute_tool_server(
token=token,
url=tool_server_data["url"],
name=function_name,
params=kwargs,
server_data=tool_server_data,
)return tool_function
#生成工具函数
tool_function = make_tool_function(
function_name, token, tool_server_data
)#工具函数异步化
callable = get_async_tool_function_and_apply_extra_params(
tool_function,
{},
)#生成工具函数字典
tool_dict = {
"tool_id": tool_id,
"callable": callable,
"spec": spec,
}# TODO: if collision, prepend toolkit name
if function_name in tools_dict:
log.warning(
f"Tool {function_name} already exists in another tools!"
)
log.warning(f"Discarding {tool_id}.{function_name}")
else:
tools_dict[function_name] = tool_dict#把工具函数字典增加到tools_dict中
else:
continue
else:#目前尚不清楚何时会走本分支,暂不分析
module = request.app.state.TOOLS.get(tool_id, None)
if module is None:
module, _ = load_tool_module_by_id(tool_id)
request.app.state.TOOLS[tool_id] = moduleextra_params["__id__"] = tool_id
# Set valves for the tool
if hasattr(module, "valves") and hasattr(module, "Valves"):
valves = Tools.get_tool_valves_by_id(tool_id) or {}
module.valves = module.Valves(**valves)
if hasattr(module, "UserValves"):
extra_params["__user__"]["valves"] = module.UserValves( # type: ignore
**Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
)for spec in tool.specs:
# TODO: Fix hack for OpenAI API
# Some times breaks OpenAI but others don't. Leaving the comment
for val in spec.get("parameters", {}).get("properties", {}).values():
if val.get("type") == "str":
val["type"] = "string"# Remove internal reserved parameters (e.g. __id__, __user__)
spec["parameters"]["properties"] = {
key: val
for key, val in spec["parameters"]["properties"].items()
if not key.startswith("__")
}# convert to function that takes only model params and inserts custom params
function_name = spec["name"]
tool_function = getattr(module, function_name)
callable = get_async_tool_function_and_apply_extra_params(
tool_function, extra_params
)# TODO: Support Pydantic models as parameters
if callable.__doc__ and callable.__doc__.strip() != "":
s = re.split(":(param|return)", callable.__doc__, 1)
spec["description"] = s[0]
else:
spec["description"] = function_nametool_dict = {
"tool_id": tool_id,
"callable": callable,
"spec": spec,
# Misc info
"metadata": {
"file_handler": hasattr(module, "file_handler")
and module.file_handler,
"citation": hasattr(module, "citation") and module.citation,
},
}# TODO: if collision, prepend toolkit name
if function_name in tools_dict:
log.warning(
f"Tool {function_name} already exists in another tools!"
)
log.warning(f"Discarding {tool_id}.{function_name}")
else:
tools_dict[function_name] = tool_dictreturn tools_dict
4.扩展
open webui源码分析5-Tools_vivetool webui-CSDN博客中已经使用了Tools,MCP Tool本质是也是工具。二者的区别在于Tools是系统自带的工具,数据在数据库中持久化存储,与MCP无关;MCP Tool则通过MCP协议接入到open webui,数据未做持久化存储,仅保存在全局变量request.app.state.config中。
更多推荐
所有评论(0)