open webui具有与MCP的集成能力,从而使open webui有更强大的外部工具调用能力,本文仍以一个简单的 time外部工具,对于在open webui中如何通过MCP集成外部服务进行串联讲解,并对系统中的相关代码进行分析,让你打开open webui的MCP能力,为使用agent奠定基础。

1.架构分析

        open webui与MCP集成式通过一个名为mcpo(MCP-to-OpenAPI proxy server)的组件实现的,mcpo作为一个服务来运行,该服务可承载各种MCP工具(架构与Pipelines类似,只不过Pipelines服务承载的是流水线,mcpo承载的是MCP 工具),mcpo架构如下图所示:

        非容器化部署时,一个mcpo作为服务可以承载多个MCP tool;容器化部署时,一般一个mcpo承载一个MCP tool。本文以非容器化部署为例。

2.与MCP集成

2.1运行环境准备

        运行MCP工具之前,需要先安装uv和mcpo

       #pip  install uv mcpo

2.2运行MCP工具

       以time MCP工具为例。查看https://github.com/modelcontextprotocol/servers/blob/main/src/time/README.md文件,

从这里看到time工具的名字为mcp_server_time,加载MCP tool:

        #uvx mcpo --port 8000 -- uvx mcp-server-time

        加载成功后,访问http://{ip:8000}/docs,可查看该工具的API说明:

2.3配置MCP工具

        在open webui中增加MCP工具,通过【管理员面板】->【设置】->【工具】进入MCP Tool管理页面,点击【+】,填写刚刚启动的mcpo地址,并保存:

2.4使用MCP工具

        在对话页面,点击【+】,可看到新增加MCP tool列表:

        启用mcp time tool,就可以在对话中使用该工具中

        提问时间相关问题,可见调用了该MCP工具:

3.MCP相关源码

3.1增加MCP工具

        增加MCP工具时,请求数据如下:

#对应 http://192.168.21.201:8080/api/v1/configs/tool_servers—POST

{
  "TOOL_SERVER_CONNECTIONS": [
    {
      "url": "http://192.168.21.201:8000",
      "path": "openapi.json",
      "auth_type": "bearer",
      "key": "",
      "config": {
        "enable": true,
        "access_control": null
      },
      "info": {
        "name": "mcp time tool",
        "description": "本工具可以获取当前时间以及不同时区时间转换"
      }
    }
  ]
}

        对应方法为config.py文件中set_tool_servers_config,代码如下:

该方法很简单,主要是调用get_tool_servers_data方法获取所有mcpo服务中工具并赋值到全局变量中,MCP工具并未持久化存储。

@router.post("/tool_servers", response_model=ToolServersConfigForm)
async def set_tool_servers_config(
    request: Request,
    form_data: ToolServersConfigForm,
    user=Depends(get_admin_user),
):

    '''

        把MCPO连接信息,也就是请求中的TOOL_SERVER_CONNECTIONS保存到全局变量

        request.app.state.config.TOOL_SERVER_CONNECTIONS

    '''

    #遍历mcpo服务,获取所有可用的MCP工具规格信息并赋值给全局变量
    request.app.state.config.TOOL_SERVER_CONNECTIONS = [
        connection.model_dump() for connection in form_data.TOOL_SERVER_CONNECTIONS
    ]

    '''

        所有可用MCP工具规格信息保存在全局变量request.app.state.TOOL_SERVERS中,每个工具规格信息内容如下:

            {
                "idx": idx,
                "url": server.get("url"),
                "openapi": openapi_data,
                "info": response.get("info"),
                "specs": response.get("specs"), #工具函数标准定义
            }

    '''

    request.app.state.TOOL_SERVERS = await get_tool_servers_data(
        request.app.state.config.TOOL_SERVER_CONNECTIONS
    )

    return {#返回MCOP连接信息
        "TOOL_SERVER_CONNECTIONS": request.app.state.config.TOOL_SERVER_CONNECTIONS,
    }

        下面重点对get_tool_servers_data方法源码进行分析:

本方法办理所有被启用的mcpo服务,mcpo返回工具规格信息。

async def get_tool_servers_data(
    servers: List[Dict[str, Any]], session_token: Optional[str] = None
) -> List[Dict[str, Any]]:
    # Prepare list of enabled servers along with their original index
    server_entries = []
    for idx, server in enumerate(servers): #遍历所有mcpo服务列表,归入server_entries
        if server.get("config", {}).get("enable"):#仅针对被启用的mcpo服务进行处理
            #openai spec路径可以是全路径,也可以是相对路径
            openapi_path = server.get("path", "openapi.json")
            if "://" in openapi_path: //全路径
                full_url = openapi_path
            else:
                if not openapi_path.startswith("/"):#相对路径,需要拼接前面的{url}
                   openapi_path = f"/{openapi_path}"

                full_url = f"{server.get('url')}{openapi_path}"

            info = server.get("info", {})

            '''

                获取认证信息,认证类型可以是bearer,也可以是session。如果是bearer,则使用

               表单中的{key},否则使用调用参数传入的session_key

            '''

            auth_type = server.get("auth_type", "bearer")
            token = None

            if auth_type == "bearer":
                token = server.get("key", "")
            elif auth_type == "session":
                token = session_token
            server_entries.append((idx, server, full_url, info, token))

    # 创建异步任务,从所有的mcpo服务列表获取MCP工具
    tasks = [
        get_tool_server_data(token, url) for (_, _, url, _, token) in server_entries
    ]

    #异步执行并汇总所有异步任务执行结果
    responses = await asyncio.gather(*tasks, return_exceptions=True)

    # Build final results with index and server metadata
    results = []
    for (idx, server, url, info, _), response in zip(server_entries, responses):
        if isinstance(response, Exception):#防错处理,对出错的mcpo不予处理
            log.error(f"Failed to connect to {url} OpenAPI tool server")
            continue

        '''

           openapi_data为open-webui调用mcpo获取工具返回的数据,示例如下:

           

{
  "openapi": "3.1.0",
  "info": {
    "title": "mcp-time",
    "description": "mcp-time MCP Server",
    "version": "1.13.1"
  },
  "paths": {
    "/get_current_time": {
      "post": {
        "summary": "Get Current Time",
        "description": "Get current time in a specific timezones",
        "operationId": "tool_get_current_time_post",
        "requestBody": {
          "content": {
            "application/json": {
              "schema": {
                "$ref": "#/components/schemas/get_current_time_form_model"
              }
            }
          },
          "required": true
        },
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {
                  "title": "Response Tool Get Current Time Post"
                }
              }
            }
          },
          "422": {
            "description": "Validation Error",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/HTTPValidationError"
                }
              }
            }
          }
        }
      }
    },
    "/convert_time": {
      "post": {
        "summary": "Convert Time",
        "description": "Convert time between timezones",
        "operationId": "tool_convert_time_post",
        "requestBody": {
          "content": {
            "application/json": {
              "schema": {
                "$ref": "#/components/schemas/convert_time_form_model"
              }
            }
          },
          "required": true
        },
        "responses": {
          "200": {
            "description": "Successful Response",
            "content": {
              "application/json": {
                "schema": {
                  "title": "Response Tool Convert Time Post"
                }
              }
            }
          },
          "422": {
            "description": "Validation Error",
            "content": {
              "application/json": {
                "schema": {
                  "$ref": "#/components/schemas/HTTPValidationError"
                }
              }
            }
          }
        }
      }
    }
  },
  "components": {
    "schemas": {
      "HTTPValidationError": {
        "properties": {
          "detail": {
            "items": {
              "$ref": "#/components/schemas/ValidationError"
            },
            "type": "array",
            "title": "Detail"
          }
        },
        "type": "object",
        "title": "HTTPValidationError"
      },
      "ValidationError": {
        "properties": {
          "loc": {
            "items": {
              "anyOf": [
                {
                  "type": "string"
                },
                {
                  "type": "integer"
                }
              ]
            },
            "type": "array",
            "title": "Location"
          },
          "msg": {
            "type": "string",
            "title": "Message"
          },
          "type": {
            "type": "string",
            "title": "Error Type"
          }
        },
        "type": "object",
        "required": [
          "loc",
          "msg",
          "type"
        ],
        "title": "ValidationError"
      },
      "convert_time_form_model": {
        "properties": {
          "source_timezone": {
            "type": "string",
            "title": "Source Timezone",
            "description": "Source IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no source timezone provided by the user."
          },
          "time": {
            "type": "string",
            "title": "Time",
            "description": "Time to convert in 24-hour format (HH:MM)"
          },
          "target_timezone": {
            "type": "string",
            "title": "Target Timezone",
            "description": "Target IANA timezone name (e.g., 'Asia/Tokyo', 'America/San_Francisco'). Use 'America/New_York' as local timezone if no target timezone provided by the user."
          }
        },
        "type": "object",
        "required": [
          "source_timezone",
          "time",
          "target_timezone"
        ],
        "title": "convert_time_form_model"
      },
      "get_current_time_form_model": {
        "properties": {
          "timezone": {
            "type": "string",
            "title": "Timezone",
            "description": "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no timezone provided by the user."
          }
        },
        "type": "object",
        "required": [
          "timezone"
        ],
        "title": "get_current_time_form_model"
      }
    }
  }
}

        '''

        openapi_data = response.get("openapi", {})

        if info and isinstance(openapi_data, dict):#用请求中的name和description赋值
            if "name" in info:
                openapi_data["info"]["title"] = info.get("name", "Tool Server")

            if "description" in info:
                openapi_data["info"]["description"] = info.get("description", "")

        results.append(
            {#工具规格信息
                "idx": idx, #对应工具索引
                "url": server.get("url"), #工具远程地址
                "openapi": openapi_data, #调用工具返回的工具信息
                "info": response.get("info"),#工具描述信息
                "specs": response.get("specs"), #工具函数标准定义

                '''

                    一个函数的规格信息如下:

{
  "type": "function",
  "name": "tool_get_current_time_post",
  "description": "Get current time in a specific timezones",
  "parameters": {
    "type": "object",
    "properties": {
      "timezone": {
        "type": "string",
        "title": "Timezone",
        "description": "IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use 'America/New_York' as local timezone if no timezone provided by the user."
      }
    },
    "required": ["timezone"]
  }
}

                '''
            }
        )

    return results

3.2工具调用

        在会话中选择MCP工具后,发起对话时请求如下:

#对应http://{ip:port}/api/chat/completions 

 {
  "stream": true,
  "model": "qwen3:1.7b",
  "messages": [
    {
      "role": "user",
      "content": "北京当前时间?"
    }
  ],
  "params": {},
  "tool_ids": [ #关注这里。MCP工具在这里,实际是mcpo信息 。
    "server:0"
  ],
  "tool_servers": [],#本地自有工具在这里
  "features": {
    "image_generation": false,
    "code_interpreter": false,
    "web_search": false,
    "memory": false
  },
  "variables": {
    "{{USER_NAME}}": "acaluis",
    "{{USER_LOCATION}}": "Unknown",
    "{{CURRENT_DATETIME}}": "2025-09-02 11:18:47",
    "{{CURRENT_DATE}}": "2025-09-02",
    "{{CURRENT_TIME}}": "11:18:47",
    "{{CURRENT_WEEKDAY}}": "Tuesday",
    "{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
    "{{USER_LANGUAGE}}": "zh-CN"
  },
  "model_item": {
    "id": "qwen3:1.7b",
    "name": "qwen3:1.7b",
    "object": "model",
    "created": 1756782836,
    "owned_by": "ollama",
    "ollama": {
      "name": "qwen3:1.7b",
      "model": "qwen3:1.7b",
      "modified_at": "2025-08-20T03:50:50.085066919Z",
      "size": 1359293444,
      "digest": "8f68893c685c3ddff2aa3fffce2aa60a30bb2da65ca488b61fff134a4d1730e7",
      "details": {
        "parent_model": "",
        "format": "gguf",
        "family": "qwen3",
        "families": [
          "qwen3"
        ],
        "parameter_size": "2.0B",
        "quantization_level": "Q4_K_M"
      },
      "connection_type": "local",
      "urls": [
        0
      ]
    },
    "connection_type": "local",
    "tags": [],
    "actions": [],
    "filters": []
  },
  "session_id": "QpBpP8qJic_kvTBNAAAP",
  "chat_id": "0c8a0fb6-c1cb-42e7-a269-26f5e4a581d6",
  "id": "648a0d1f-bded-4146-905b-ead4cee58c9a",
  "background_tasks": {
    "title_generation": true,
    "tags_generation": true,
    "follow_up_generation": true
  }
}      

        相关入口代码在process_chat_payload:

该方法中涉及MCP工具的代码很简单,就是调用 get_tools,后继代码前面已经陆续做过分析,不再赘述。

async def process_chat_payload(request, form_data, user, metadata, model):

    ……

    #处理MCP 工具

    tool_ids = metadata.get("tool_ids", None) #从表单中获取MCP 工具列表

    # 处理自有工具
    tool_servers = metadata.get("tool_servers", None)

    log.debug(f"{tool_ids=}")
    log.debug(f"{tool_servers=}")

    tools_dict = {}

    if tool_ids:#根据表单中的tool_ids获取对应的工具,get_tools方法下面分析
        tools_dict = get_tools(
            request,
            tool_ids,
            user,
            {
                **extra_params,
                "__model__": models[task_model_id],
                "__messages__": form_data["messages"],
                "__files__": metadata.get("files", []),
            },
        )

    ……    

        get_tools方法代码如下:

本方法从可用的MCP工具列表中获取用户在对话中指定的所有工具的所有方法,返回dict,每个字典项是一个工具中的函数。主要逻辑是一个嵌套循环,外循环为mcpo列表,内循环为一个工具的所有方法列表。

def get_tools(
    request: Request, tool_ids: list[str], user: UserModel, extra_params: dict
) -> dict[str, dict]:
    tools_dict = {}

    for tool_id in tool_ids:#遍历请求中mcpo
        tool = Tools.get_tool_by_id(tool_id)#MCP工具未做持久化存储
        if tool is None:
            if tool_id.startswith("server:"):#看请求数据,使用MCP工具时走这个分支
                server_idx = int(tool_id.split(":")[1])
                tool_server_connection = (#根据mcpo索引定位mcop服务
                    request.app.state.config.TOOL_SERVER_CONNECTIONS[server_idx]
                )
                tool_server_data = None

                #获取指定mcpo承载的工具信息
                for server in request.app.state.TOOL_SERVERS:
                    if server["idx"] == server_idx:
                        tool_server_data = server
                        break
                assert tool_server_data is not None
                specs = tool_server_data.get("specs", [])

                for spec in specs:#遍历工具中的所有接口
                    function_name = spec["name"] #设置函数名

                    # 以下是认证相关处理,根据认证类型设置令牌

                    auth_type = tool_server_connection.get("auth_type", "bearer")
                    token = None

                    if auth_type == "bearer":
                        token = tool_server_connection.get("key", "")
                    elif auth_type == "session":
                        token = request.state.token.credentials

                    #该方法返回一个异步函数,该异步函数执行最终的函数调用

                    def make_tool_function(function_name, token, tool_server_data):
                        async def tool_function(**kwargs):
                            return await execute_tool_server(
                                token=token,
                                url=tool_server_data["url"],
                                name=function_name,
                                params=kwargs,
                                server_data=tool_server_data,
                            )

                        return tool_function

                    #生成工具函数

                    tool_function = make_tool_function(
                        function_name, token, tool_server_data
                    )

                    #工具函数异步化

                    callable = get_async_tool_function_and_apply_extra_params(
                        tool_function,
                        {},
                    )

                    #生成工具函数字典

                    tool_dict = {
                        "tool_id": tool_id,
                        "callable": callable,
                        "spec": spec,
                    }

                    # TODO: if collision, prepend toolkit name
                    if function_name in tools_dict:
                        log.warning(
                            f"Tool {function_name} already exists in another tools!"
                        )
                        log.warning(f"Discarding {tool_id}.{function_name}")
                    else:
                        tools_dict[function_name] = tool_dict#把工具函数字典增加到tools_dict中
            else:
                continue
        else:#目前尚不清楚何时会走本分支,暂不分析
            module = request.app.state.TOOLS.get(tool_id, None)
            if module is None:
                module, _ = load_tool_module_by_id(tool_id)
                request.app.state.TOOLS[tool_id] = module

            extra_params["__id__"] = tool_id

            # Set valves for the tool
            if hasattr(module, "valves") and hasattr(module, "Valves"):
                valves = Tools.get_tool_valves_by_id(tool_id) or {}
                module.valves = module.Valves(**valves)
            if hasattr(module, "UserValves"):
                extra_params["__user__"]["valves"] = module.UserValves(  # type: ignore
                    **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id)
                )

            for spec in tool.specs:
                # TODO: Fix hack for OpenAI API
                # Some times breaks OpenAI but others don't. Leaving the comment
                for val in spec.get("parameters", {}).get("properties", {}).values():
                    if val.get("type") == "str":
                        val["type"] = "string"

                # Remove internal reserved parameters (e.g. __id__, __user__)
                spec["parameters"]["properties"] = {
                    key: val
                    for key, val in spec["parameters"]["properties"].items()
                    if not key.startswith("__")
                }

                # convert to function that takes only model params and inserts custom params
                function_name = spec["name"]
                tool_function = getattr(module, function_name)
                callable = get_async_tool_function_and_apply_extra_params(
                    tool_function, extra_params
                )

                # TODO: Support Pydantic models as parameters
                if callable.__doc__ and callable.__doc__.strip() != "":
                    s = re.split(":(param|return)", callable.__doc__, 1)
                    spec["description"] = s[0]
                else:
                    spec["description"] = function_name

                tool_dict = {
                    "tool_id": tool_id,
                    "callable": callable,
                    "spec": spec,
                    # Misc info
                    "metadata": {
                        "file_handler": hasattr(module, "file_handler")
                        and module.file_handler,
                        "citation": hasattr(module, "citation") and module.citation,
                    },
                }

                # TODO: if collision, prepend toolkit name
                if function_name in tools_dict:
                    log.warning(
                        f"Tool {function_name} already exists in another tools!"
                    )
                    log.warning(f"Discarding {tool_id}.{function_name}")
                else:
                    tools_dict[function_name] = tool_dict

    return tools_dict

4.扩展

        open webui源码分析5-Tools_vivetool webui-CSDN博客中已经使用了Tools,MCP Tool本质是也是工具。二者的区别在于Tools是系统自带的工具,数据在数据库中持久化存储,与MCP无关;MCP Tool则通过MCP协议接入到open webui,数据未做持久化存储,仅保存在全局变量request.app.state.config中。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐