1.REST2MCP

本文不是使用教程,而是从 源码实现角度 分析该项目如何完成「REST API 到 MCP 工具」的适配(REST2MCP)。


一、总体架构概览

mcp-context-forge 的核心目标之一是让外部 REST API 可以作为 MCP 工具(Tool) 被智能体调用。 整个 REST→MCP 适配链路分为五层:

层级 文件位置 主要职责
入口层 mcpgateway/main.py FastAPI 应用入口;接收来自 MCP 客户端的 JSON-RPC / HTTP / WS / SSE 调用请求
服务层(核心) mcpgateway/services/tool_service.py 工具执行调度;识别工具类型(REST/MCP/其他),构造 HTTP 请求,执行并封装为 MCP 响应
模型层 mcpgateway/schemas.py 定义工具的 Pydantic 数据结构,包括 REST Passthrough 字段(base_url、path_template、query_mapping、header_mapping 等)
持久化层 mcpgateway/db.py SQLAlchemy ORM 存储工具配置与调用统计
管理层 mcpgateway/admin.py 工具 CRUD、REST 字段编辑、调用指标与 Top Tools 统计
传输层(桥接) mcpgateway/transports/* 提供 SSE / Streamable HTTP / Stdio 等多传输桥接,确保不同客户端都能访问相同工具

二、REST → MCP 调用全链路

目标 REST APISSE/Streamable 桥transports/*httpx.AsyncClientToolServicemcpgateway/services/tool_service.pyFastAPI 路由mcpgateway/main.pyMCP 客户端目标 REST APISSE/Streamable 桥transports/*httpx.AsyncClientToolServicemcpgateway/services/tool_service.pyFastAPI 路由mcpgateway/main.pyMCP 客户端读取工具配置(schemas/db)判定类型=REST(passthrough)alt[非流式][流式]tools/call(JSON-RPC / HTTP / WS / SSE)1invoke_tool(name, arguments, ctx)2_build_rest_request()3_apply_pre_plugins()4request(...)5Response6sse_client()7on_event(data)8_apply_post_plugins()9to_mcp_tool_result()10MCP Tool Result11


四、总结

入口是 ToolService.invoke_tool(self, db, name, arguments, request_headers, app_user_email)。它先按名字把工具 ORM 取出来,做启用/可达性校验,然后根据 integration_type 分派到不同执行路径:

  • integration_type == "REST" → 走 REST 适配器(我们关心的部分)

  • integration_type == "MCP" → 用 SSE / Streamable HTTP 去调用上游 MCP 服务器 (A2A 还有一支,但与 REST→MCP 无关)


REST 分支:逐步展开到底做了什么

1) 组装请求头(鉴权 + 白名单透传 + 插件预处理)

  • 起点:headers = tool.headers or {}

  • 如果工具 auth_type == "oauth":通过 OAuthManager 先取 access_token,加 Authorization: Bearer ...。否则,调用 decode_auth(tool.auth_value) 解出固定鉴权头(Basic/Bearer/自定义 Auth-Headers),并过滤空 key/value后并入 headers(避免非法头名问题)。

  • 如果调用方带了 request_headers(来自网关入口),再经过 get_passthrough_headers(request_headers, headers, db) 选择性透传(白名单/策略在这个工具函数里做;不把敏感头无脑下传)。

  • 若启用了插件系统(PluginManager):

    • 把工具元数据塞进 GlobalContextTOOL_METADATA);

    • 执行 tool_pre_invoke:插件有机会改写名称、参数、甚至 headers;若返回了 modified_payload 就用修改后的值继续后面流程。 以上逻辑都在 invoke_tool 的 REST 分支中实现。tool_service

这一步的本质:把“平台/工具配置的固定认证头” + “调用端允许透传的头” + “插件可能注入/修改的头” 合成“受控请求头”,既能携带鉴权/追踪,又不让危险头穿透。


2) URL 模板替换 + 查询串合并 + 请求方法选择

  • 路径参数替换: 若 tool.url 里包含花括号参数(如 .../users/{id}),代码用 re.findall(r"\{(\w+)\}", tool.url) 抓出 {id},然后arguments 里取同名 key替换掉,且把已用掉的参数从 payload 里 pop 掉;缺参数就直接抛 ToolInvocationError

  • 查询串合并: 如果 tool.url 自身带了查询串(?a=1&b=2),会用 urlparse + parse_qs 取出来;随后把剩余 payload 合并入 query(注意 parse_qs 得到的是 Dict[str, List[str]],它这里取了第一个值)。最后 final_url 只保留 scheme://netloc/path,真正的 query 在 params 里。

  • HTTP 方法与载荷位置: 由 tool.request_type.upper() 决定:

    • GET → 用 params=payload(把 payload 都进查询字符串);

    • 其他方法(POST/PUT/PATCH/DELETE…)→ json=payload(作为 JSON body)。 这些细节都写在 REST 分支里“Build the payload / Handle URL path parameter substitution / Extract query params / Merge leftover payload + query params / Use the tool’s request_type”这一串代码段中。tool_service

这一步的本质:把 MCP 的 arguments 用“路径模板替换 + 查询/Body 映射”的规则,翻译成“完整可发的 HTTP 请求(URL + params/json)”。


3) 发请求:用 ResilientHttpClient 统一重试/超时/SSL 行为

ToolService 在构造时就创建了 _http_client = ResilientHttpClient(client_args={"timeout": settings.federation_timeout, "verify": not settings.skip_ssl_verify})。 REST 分支里真正发包用的就是这个 _http_client

  • GET 分支:await self._http_client.get(final_url, params=payload, headers=headers)

  • 其他方法:await self._http_client.request(method, final_url, json=payload, headers=headers) 紧接着 response.raise_for_status()——失败状态码直接抛异常,后面由统一异常处理路径转 ToolInvocationError 并打点。tool_service

这一步的本质:用网关内的“弹性 HTTP 客户端”把外部 REST 的“不稳定”隔离在边界上(重试/超时/证书校验开关都在这层统一起来)。


4) 响应→MCP ToolResult:内容过滤、状态映射与“204 支持”

  • 204:若 status_code == 204,直接返回文本 "Request completed successfully (No Content)"ToolResult

  • 非 2xx(但没在 raise_for_status 阶段抛):读取 JSON,若有 error 字段用其消息,否则返回通用 "Tool error encountered",并 is_error=True

  • 2xx 正常:取 response.json(),然后调用 extract_using_jq(result, tool.jsonpath_filter)响应过滤/重塑(jq 风格表达式);最终把过滤后的 JSON 序列化为字符串放进 TextContent 返回。 这段逻辑确保了调用侧最终总拿到一个符合 MCP 协议的 ToolResult(文本/结构化),而不是原始 HTTP 响应。tool_service

这一步的本质:把“HTTP 响应”翻译成“MCP 工具返回”的统一语义,并可选用 jq 做“后置抽取/裁剪”。


5) 可观测性与指标:create_span + _record_tool_metric(...)

  • 整个 invoke_toolcreate_span("tool.invoke", {...}) 包裹,span 里打了 tool id/name、integration_type、gateway_id、参数个数、是否有 headers 等属性;异常时设置 error 属性;最终写入 success 和耗时(ms)。

  • 最后无论成功/失败都会 await self._record_tool_metric(db, tool, start_time, success, error_message) 写数据库指标(时延、成功标记、错误消息)。 这些让 Admin/监控能做 Top Tools、成功率、P90/P99 等。tool_service


6) 安全侧重点:受控头透传 + OAuth/Bearer/Basic 支持 + 插件审计

  • 认证:支持 oauth(client credentials 或上游 gateway 的 auth code 场景在 MCP 分支里演示)、bearerbasic、以及“自定义认证头”。

  • 透传:必须通过 get_passthrough_headers(...),由数据库/策略判定哪些头能过,避免把敏感的调用端头(如 Cookie、Host、X-Internal-*)传到外部 REST。

  • 插件:tool_pre_invoke / tool_post_invoke 可以做鉴权、PII 过滤、审计记录、字段规范化等;若违反规则可抛 PluginViolationError 阻断调用。 这些均体现在同一 REST 分支的头部组合、插件钩子与异常类型里。tool_service


跟“传统 REST 代理”相比,这里多做了哪些“适配”工作?

  1. 路径参数模板:不是简单拼 query,而是严格要求把 {param}arguments 填上;缺失就 fail-fast(错误更早暴露)。tool_service

  2. URL 原生查询串 + arguments 合并:兼容“工具 URL 就自带 query”的情况,避免双写或覆盖。tool_service

  3. 方法→载荷的策略GET 强制把 payload 进 params,其他方法才进 json,避免误用。tool_service

  4. jq 抽取:下游返回很大时可按 jsonpath_filter 裁剪出需要的段,返回给 MCP 客户端更“干净”。tool_service

  5. 插件化治理:在“发请求前/返回后”都有钩子点,便于企业侧做审计/风控/内容治理。tool_service

  6. 网关级弹性 HTTP 客户端:请求统一走 ResilientHttpClient,便于集中做重试/超时/证书策略。tool_service


一句话收束

在这份实现里,“REST→MCP 适配”的本质就是:把 MCP 的 arguments 通过“路径模板替换 + 查询/Body 映射 + 受控头合并 + 弹性 HTTP 请求”,执行外部 REST,并把响应(可经 jq 过滤)包装成 MCP 的 ToolResult;同时插件钩子与度量/追踪把安全与可观测性嵌在调用前后两端。整个链路都在 ToolService.invoke_tool(..., integration_type == "REST") 的这一段里闭环完成。tool_service

2.MCP2Tools原理

一、原理全貌(握手 → 枚举工具 → 映射/落库 → 可调用)

  1. 建立到远端 MCP 服务器的传输连接

    • 远端如果跑的是 SSE,用 mcp.client.sse.sse_client(url, headers)

    • 如果是 Streamable HTTP,用 mcp.client.streamable_http.streamablehttp_client(url, headers)

    • 二者都通过 ClientSession(read_stream, write_stream) 构建 MCP 会话。

  2. 初始化 MCP 会话

    • await session.initialize():做 MCP 规范规定的握手(能力交换/协议版本/会话 ID)。

  3. 枚举远端工具

    • 用会话 API 拉取远端的工具清单(通常是 session.list_tools();不同客户端库名字可能略有差异,但都会有工具枚举能力)。

  4. 把每个远端工具映射成网关本地的 Tool 记录

    • 填充:原始名 original_name、显示名/别名、integration_type="MCP"request_type(= 传输类型 ssestreamablehttp)、gateway_id(指向这台远端 MCP 服务器的网关记录)、工具的 JSON Schema/参数定义、鉴权/标签/可见性/团队归属等;

    • 使用你文件里现成的 ToolService.register_tool(...)统一落库,并利用它的重复名冲突处理、可见性策略、指标初始化等。

  5. 后续调用

    • 当用户通过 MCP 网关调用该工具时,ToolService.invoke_tool(...) 会根据 integration_type == "MCP" 这一路,再次动态连接到对应的远端 MCP 服务器,并且:

      • await session.initialize()

      • await session.call_tool(tool.original_name, arguments) —— 这段你文件里已经写好了(见下文代码定位)。

这意味着:自动注册只做“同步目录 + 入库”;真正“使用工具”走的仍是 invoke_tool 里的 MCP 分支,按存档的 gateway_idrequest_type(SSE/Streamable)等信息动态发起远端调用。


二、 tool_service.py(关键位置)

  • 远端 MCP 连接与调用

    from mcp import ClientSession
    from mcp.client.sse import sse_client
    from mcp.client.streamable_http import streamablehttp_client

    invoke_toolMCP 分支里,能看到两段典型代码:

    • SSE 版本(片段定位到 async with sse_client(url=server_url, headers=headers) as streamsClientSession(*streams)await session.initialize()await session.call_tool(tool.original_name, arguments))。

    • Streamable HTTP 版本async with streamablehttp_client(url=server_url, headers=headers) as (read_stream, write_stream, ...)ClientSession(read_stream, write_stream)await session.initialize()await session.call_tool(...))。

    这说明调用端已经打通:你只要把“工具目录”预先入库,调用时会按 gateway_id + request_type 自动走对的路径。

  • 工具落库 APIToolService.register_tool(...):支持 federation_sourceimport_batch_idteam_idowner_emailvisibility 等元数据,用于外部导入/联邦来源标记与冲突处理(同名的 public/team 工具冲突抛 ToolNameConflictError)。这正好给“自动注册”用。

  • MCP 调用时的 OAuth / 头部透传 / 插件钩子(你已经有)invoke_tool MCP 分支里已经对 Gateway 级 OAuth白名单 Header 透传Plugin Pre/Post Invoke 做了处理。自动注册只需把工具关联到对应 gateway_id,调用时天然生效。


四、调用路径如何“串起来”(把发现到使用串成一条链)

  1. Admin / API 配置远端网关(DbGateway):写入 endpoint_urltransport(sse/streamablehttp)、auth_type 等。

  2. 一次性/定期调用 sync_gateway_tools(db, gateway)

    • 拉目录 → 映射为 integration_type="MCP" 的本地 Tool → 调 register_tool 入库。

  3. 用户侧调用

    • ToolService.invoke_tool(...) 读到这个 Tool(integration_type="MCP"gateway_idrequest_type),走 MCP 分支:

      • SSEsse_client(...)ClientSession(*streams)initialize()call_tool(name, args)

      • Streamablestreamablehttp_client(...)ClientSession(read, write)initialize()call_tool(...)


一句话总结

“自动读取并注册远端 MCP 工具”*的本质是:**用和“调用远端工具”相同的会话**(ClientSession on SSE/Streamable),在 initialize() 之后调用*工具枚举 APIlist_tools()),再把每个工具映射成你本地的 ToolCreate 并通过 register_tool 落库。注册后,使用时仍走你 invoke_toolMCP 分支(你已实现),按 gateway_id 与传输类型动态连接远端 call_tool(...)

3. “把 A2A Agent 变成一个 Tool”——注册阶段

入口是 create_tool_from_a2a_agent(...)。它做了三件事:

  1. 先检查是否已经为该 Agent 生成过对应的 Tool(用 original_name = a2a_{agent.slug} 唯一化)。若已存在就直接返回旧 Tool;否则继续创建。tool_service

  2. 按 Agent 元数据拼出一份 ToolCreate

    • integration_type="A2A"(关键标识,后续 invoke_tool 会据此走 A2A 分支)

    • request_type="POST"(缺省通过 HTTP POST 调用 A2A Agent)

    • url=agent.endpoint_url(调用目标)

    • input_schema 提供一个最小的 {query: string}(方便 Admin/UI 直接试调)

    • annotations 写入 a2a_agent_ida2a_agent_type(这两个是后续定位 Agent 的关键“桥”)

    • 以及继承 auth_type/auth_valuetags 等字段 最后把这份 ToolCreate 交给已有的 register_tool(...) 落库(统一处理冲突/可见性/事件通知/指标初始化)。tool_service

  3. 返回标准化的 ToolRead。tool_service

要点:A2A Tool 本质是 A2A Agent 的“本地代理条目”。它把 Agent 的 endpoint、鉴权、类型、ID 放进 Tool 的字段与 annotations 中,保证调用时能从 Tool 反查到 Agent。


2) “调用 A2A Tool 时怎么转给 A2A Agent”——执行阶段

在统一的 invoke_tool(...) 顶层路由里,先根据 integration_type 分派。当读到某工具是 A2A 且带有 a2a_agent_id 注解时,直接走 A2A 专用分支 _invoke_a2a_tool(...):tool_service

2.1 _invoke_a2a_tool(...) 做了什么?

  • 从 Tool 的 annotations["a2a_agent_id"] 反查 DbA2AAgent;若不存在/禁用则报错。tool_service

  • 调用私有方法 _call_a2a_agent(agent, arguments) 发起真正的 HTTP 请求,并把结果转换为 MCP 的 ToolResult(如果返回是 dict 且含 response 字段,就取 response;否则字符串化)。tool_service

2.2 _call_a2a_agent(...) 的“协议适配”

  • 输入规范化(兼容 Admin UI 的“扁平 query 字段”): 如果 arguments 里有 query(字符串),就自动包成 JSON-RPC 的消息结构

    {"message":{"messageId":"admin-test-...","role":"user","parts":[{"type":"text","text":"..."}]}}

    同时默认 method="message/send";否则就回退为透传 params。tool_service

  • “请求格式”选择

    • agent.agent_type in ["generic","jsonrpc"]endpoint_url/ 结尾 → 走 JSON-RPC 2.0{"jsonrpc":"2.0","method":..., "params":..., "id":1}

    • 否则 → 走 自定义 A2A 格式{"interaction_type": "...","parameters": params,"protocol_version": agent.protocol_version}

  • HTTP 调用与鉴权: 用 httpx.AsyncClientPOSTagent.endpoint_url,并按 auth_typeAuthorization: Bearer ...(支持 api_key / bearer)。状态 200 则返回 json(),否则抛错。tool_service

要点:A2A 分支把“本地 Tool 调用”翻译为“对 Agent endpoint 的 HTTP(或 JSON-RPC) 调用”,并且内置了一个输入整形器(把 {query: "..."} 转为更丰富的 message 结构)以及两种协议打包逻辑(标准 JSON-RPC vs. 自定义 A2A)。


3) 调用路径总览(从入口到结果)

  1. invoke_tool(...) 找到目标 Tool,断言可达;若 integration_type=="A2A" 且 annotations 有 a2a_agent_id_invoke_a2a_tool。tool_service

  2. _invoke_a2a_tool(...)a2a_agent_id 查到 DbA2AAgent;然后调用 _call_a2a_agent(...)。tool_service

  3. _call_a2a_agent(...) 把 UI/客户端传来的 arguments 整形成目标协议(JSON-RPC 或自定义 A2A),加上鉴权头,httpx.post() 发给 agent.endpoint_url,收回响应。tool_service

  4. _invoke_a2a_tool(...) 把响应包成 ToolResult(content=[TextContent(...)] , is_error=...) 返回;顶层 invoke_tool(...) 负责统一打点记录指标(成功/失败/时延)。


4) “A2A → Tool”的元数据桥接点(为什么能打通)

  • Tool.annotations:写入 a2a_agent_ida2a_agent_type;调用时据此反查 Agent。tool_service

  • Tool.url/request_type:保存 Agent 的 endpoint 与调用方法(POST),供 HTTP 客户端使用。tool_service

  • Tool.auth_type/auth_value:继承 Agent 的鉴权方式,调用时组装 Authorization。tool_service

  • ToolCreate/register_tool(...):统一入库、冲突处理、事件通知。tool_service


5) 和 REST/MCP 两类工具的差异

  • REST 工具:用 ResilientHttpClient 拼 URL 模板/Query/Header 白名单透传,直接打到外部 REST;响应可用 jq 过滤。tool_service

  • MCP 工具:走 sse_client/streamablehttp_client + ClientSession.initialize()session.call_tool(...);可带网关 OAuth、请求头透传。tool_service

  • A2A 工具不经 MCP 协议,而是直接 HTTP/JSON-RPC 到 Agent endpoint;有自带的输入整形(把 {query} 转 message)和两种请求打包。tool_service


一句话总结

A2A2tools 的实现思路是:先把 A2A Agent“映射/注册”为一个本地 Tool(integration_type=A2A,annotations 记录 agent_id);当调用这个 Tool 时,按 annotations 反查 Agent → 把入参整形成 JSON-RPC 或自定义 A2A 格式 → 带鉴权用 HTTP POST 调 Agent endpoint → 把结果包回 MCP 的 ToolResult,并复用统一的指标/追踪体系。对应代码分别在 create_tool_from_a2a_agent(...)invoke_tool(...)(A2A 分支)、_invoke_a2a_tool(...)_call_a2a_agent(...)

一句话总览

  • “组合成 MCP Server” = 定义一个 Server 实体 + 绑定多条 Tool

  • ServerService 负责:注册/更新 Server、验证并关联工具、提供按 Server 聚合后的工具清单与指标、以及事件通知

  • MCP 协议面的 tools/list 就是:读这个 Server 的工具关联,转换成工具描述返回tools/call 则:把调用路由给 ToolService.invoke_tool(这个在你的 tool_service.py 里),这里的 ServerService 主要提供“这个 Server 下应该有哪些工具可见”的编排数据。server_service


其中核心,就是

如果注册进来的是REST请求,就使用http客户端发起请求,然后将返回转化为MCP工具可以识别的返回

如果是MCP格式(即从远端的sse mcp服务传递过来的),则使用mcp客户端发起请求,

a2a走的是_invoke_a2a_tool接口,将输入转化为agent可以接受的格式

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐