IBM MCP网关的帽子戏法:如何实现的REST2MCP、A2A2MCP、MCP2Tools
入口是。它先按名字把工具 ORM 取出来,做启用/可达性校验,然后根据分派→ 走REST 适配器(我们关心的部分)→ 用 SSE / Streamable HTTP 去调用上游 MCP 服务器 (A2A 还有一支,但与 REST→MCP 无关)“自动读取并注册远端 MCP 工具”*的本质是:**用和“调用远端工具”相同的会话**(on SSE/Streamable),在之后调用*工具枚举 API)
1.REST2MCP
本文不是使用教程,而是从 源码实现角度 分析该项目如何完成「REST API 到 MCP 工具」的适配(REST2MCP)。
一、总体架构概览
mcp-context-forge 的核心目标之一是让外部 REST API 可以作为 MCP 工具(Tool) 被智能体调用。 整个 REST→MCP 适配链路分为五层:
| 层级 | 文件位置 | 主要职责 |
|---|---|---|
| 入口层 | mcpgateway/main.py |
FastAPI 应用入口;接收来自 MCP 客户端的 JSON-RPC / HTTP / WS / SSE 调用请求 |
| 服务层(核心) | mcpgateway/services/tool_service.py |
工具执行调度;识别工具类型(REST/MCP/其他),构造 HTTP 请求,执行并封装为 MCP 响应 |
| 模型层 | mcpgateway/schemas.py |
定义工具的 Pydantic 数据结构,包括 REST Passthrough 字段(base_url、path_template、query_mapping、header_mapping 等) |
| 持久化层 | mcpgateway/db.py |
SQLAlchemy ORM 存储工具配置与调用统计 |
| 管理层 | mcpgateway/admin.py |
工具 CRUD、REST 字段编辑、调用指标与 Top Tools 统计 |
| 传输层(桥接) | mcpgateway/transports/* |
提供 SSE / Streamable HTTP / Stdio 等多传输桥接,确保不同客户端都能访问相同工具 |
二、REST → MCP 调用全链路
目标 REST APISSE/Streamable 桥transports/*httpx.AsyncClientToolServicemcpgateway/services/tool_service.pyFastAPI 路由mcpgateway/main.pyMCP 客户端目标 REST APISSE/Streamable 桥transports/*httpx.AsyncClientToolServicemcpgateway/services/tool_service.pyFastAPI 路由mcpgateway/main.pyMCP 客户端读取工具配置(schemas/db)判定类型=REST(passthrough)alt[非流式][流式]tools/call(JSON-RPC / HTTP / WS / SSE)1invoke_tool(name, arguments, ctx)2_build_rest_request()3_apply_pre_plugins()4request(...)5Response6sse_client()7on_event(data)8_apply_post_plugins()9to_mcp_tool_result()10MCP Tool Result11
四、总结
入口是 ToolService.invoke_tool(self, db, name, arguments, request_headers, app_user_email)。它先按名字把工具 ORM 取出来,做启用/可达性校验,然后根据 integration_type 分派到不同执行路径:
-
integration_type == "REST"→ 走 REST 适配器(我们关心的部分) -
integration_type == "MCP"→ 用 SSE / Streamable HTTP 去调用上游 MCP 服务器 (A2A 还有一支,但与 REST→MCP 无关)
REST 分支:逐步展开到底做了什么
1) 组装请求头(鉴权 + 白名单透传 + 插件预处理)
-
起点:
headers = tool.headers or {}。 -
如果工具
auth_type == "oauth":通过OAuthManager先取 access_token,加Authorization: Bearer ...。否则,调用decode_auth(tool.auth_value)解出固定鉴权头(Basic/Bearer/自定义 Auth-Headers),并过滤空 key/value后并入headers(避免非法头名问题)。 -
如果调用方带了
request_headers(来自网关入口),再经过get_passthrough_headers(request_headers, headers, db)选择性透传(白名单/策略在这个工具函数里做;不把敏感头无脑下传)。 -
若启用了插件系统(
PluginManager):-
把工具元数据塞进
GlobalContext(TOOL_METADATA); -
执行
tool_pre_invoke:插件有机会改写名称、参数、甚至 headers;若返回了modified_payload就用修改后的值继续后面流程。 以上逻辑都在invoke_tool的 REST 分支中实现。tool_service
-
这一步的本质:把“平台/工具配置的固定认证头” + “调用端允许透传的头” + “插件可能注入/修改的头” 合成“受控请求头”,既能携带鉴权/追踪,又不让危险头穿透。
2) URL 模板替换 + 查询串合并 + 请求方法选择
-
路径参数替换: 若
tool.url里包含花括号参数(如.../users/{id}),代码用re.findall(r"\{(\w+)\}", tool.url)抓出{id},然后从arguments里取同名 key替换掉,且把已用掉的参数从 payload 里pop掉;缺参数就直接抛ToolInvocationError。 -
查询串合并: 如果
tool.url自身带了查询串(?a=1&b=2),会用urlparse+parse_qs取出来;随后把剩余 payload 合并入 query(注意parse_qs得到的是Dict[str, List[str]],它这里取了第一个值)。最后final_url只保留scheme://netloc/path,真正的 query 在params里。 -
HTTP 方法与载荷位置: 由
tool.request_type.upper()决定:-
GET→ 用params=payload(把 payload 都进查询字符串); -
其他方法(POST/PUT/PATCH/DELETE…)→
json=payload(作为 JSON body)。 这些细节都写在 REST 分支里“Build the payload / Handle URL path parameter substitution / Extract query params / Merge leftover payload + query params / Use the tool’s request_type”这一串代码段中。tool_service
-
这一步的本质:把 MCP 的
arguments用“路径模板替换 + 查询/Body 映射”的规则,翻译成“完整可发的 HTTP 请求(URL + params/json)”。
3) 发请求:用 ResilientHttpClient 统一重试/超时/SSL 行为
ToolService 在构造时就创建了 _http_client = ResilientHttpClient(client_args={"timeout": settings.federation_timeout, "verify": not settings.skip_ssl_verify})。 REST 分支里真正发包用的就是这个 _http_client:
-
GET分支:await self._http_client.get(final_url, params=payload, headers=headers) -
其他方法:
await self._http_client.request(method, final_url, json=payload, headers=headers)紧接着response.raise_for_status()——失败状态码直接抛异常,后面由统一异常处理路径转ToolInvocationError并打点。tool_service
这一步的本质:用网关内的“弹性 HTTP 客户端”把外部 REST 的“不稳定”隔离在边界上(重试/超时/证书校验开关都在这层统一起来)。
4) 响应→MCP ToolResult:内容过滤、状态映射与“204 支持”
-
204:若
status_code == 204,直接返回文本"Request completed successfully (No Content)"的ToolResult。 -
非 2xx(但没在
raise_for_status阶段抛):读取 JSON,若有error字段用其消息,否则返回通用"Tool error encountered",并is_error=True。 -
2xx 正常:取
response.json(),然后调用extract_using_jq(result, tool.jsonpath_filter)做响应过滤/重塑(jq 风格表达式);最终把过滤后的 JSON 序列化为字符串放进TextContent返回。 这段逻辑确保了调用侧最终总拿到一个符合 MCP 协议的ToolResult(文本/结构化),而不是原始 HTTP 响应。tool_service
这一步的本质:把“HTTP 响应”翻译成“MCP 工具返回”的统一语义,并可选用 jq 做“后置抽取/裁剪”。
5) 可观测性与指标:create_span + _record_tool_metric(...)
-
整个
invoke_tool用create_span("tool.invoke", {...})包裹,span 里打了 tool id/name、integration_type、gateway_id、参数个数、是否有 headers 等属性;异常时设置error属性;最终写入success和耗时(ms)。 -
最后无论成功/失败都会
await self._record_tool_metric(db, tool, start_time, success, error_message)写数据库指标(时延、成功标记、错误消息)。 这些让 Admin/监控能做 Top Tools、成功率、P90/P99 等。tool_service
6) 安全侧重点:受控头透传 + OAuth/Bearer/Basic 支持 + 插件审计
-
认证:支持
oauth(client credentials 或上游 gateway 的 auth code 场景在 MCP 分支里演示)、bearer、basic、以及“自定义认证头”。 -
透传:必须通过
get_passthrough_headers(...),由数据库/策略判定哪些头能过,避免把敏感的调用端头(如 Cookie、Host、X-Internal-*)传到外部 REST。 -
插件:
tool_pre_invoke/tool_post_invoke可以做鉴权、PII 过滤、审计记录、字段规范化等;若违反规则可抛PluginViolationError阻断调用。 这些均体现在同一 REST 分支的头部组合、插件钩子与异常类型里。tool_service
跟“传统 REST 代理”相比,这里多做了哪些“适配”工作?
-
路径参数模板:不是简单拼 query,而是严格要求把
{param}从arguments填上;缺失就 fail-fast(错误更早暴露)。tool_service -
URL 原生查询串 + arguments 合并:兼容“工具 URL 就自带 query”的情况,避免双写或覆盖。tool_service
-
方法→载荷的策略:
GET强制把 payload 进params,其他方法才进json,避免误用。tool_service -
jq 抽取:下游返回很大时可按
jsonpath_filter裁剪出需要的段,返回给 MCP 客户端更“干净”。tool_service -
插件化治理:在“发请求前/返回后”都有钩子点,便于企业侧做审计/风控/内容治理。tool_service
-
网关级弹性 HTTP 客户端:请求统一走
ResilientHttpClient,便于集中做重试/超时/证书策略。tool_service
一句话收束
在这份实现里,“REST→MCP 适配”的本质就是:把 MCP 的
arguments通过“路径模板替换 + 查询/Body 映射 + 受控头合并 + 弹性 HTTP 请求”,执行外部 REST,并把响应(可经 jq 过滤)包装成 MCP 的ToolResult;同时插件钩子与度量/追踪把安全与可观测性嵌在调用前后两端。整个链路都在ToolService.invoke_tool(..., integration_type == "REST")的这一段里闭环完成。tool_service
2.MCP2Tools原理
一、原理全貌(握手 → 枚举工具 → 映射/落库 → 可调用)
-
建立到远端 MCP 服务器的传输连接
-
远端如果跑的是 SSE,用
mcp.client.sse.sse_client(url, headers); -
如果是 Streamable HTTP,用
mcp.client.streamable_http.streamablehttp_client(url, headers); -
二者都通过
ClientSession(read_stream, write_stream)构建 MCP 会话。
-
-
初始化 MCP 会话
-
await session.initialize():做 MCP 规范规定的握手(能力交换/协议版本/会话 ID)。
-
-
枚举远端工具
-
用会话 API 拉取远端的工具清单(通常是
session.list_tools();不同客户端库名字可能略有差异,但都会有工具枚举能力)。
-
-
把每个远端工具映射成网关本地的 Tool 记录
-
填充:原始名
original_name、显示名/别名、integration_type="MCP"、request_type(= 传输类型sse或streamablehttp)、gateway_id(指向这台远端 MCP 服务器的网关记录)、工具的 JSON Schema/参数定义、鉴权/标签/可见性/团队归属等; -
使用你文件里现成的
ToolService.register_tool(...)去统一落库,并利用它的重复名冲突处理、可见性策略、指标初始化等。
-
-
后续调用
-
当用户通过 MCP 网关调用该工具时,
ToolService.invoke_tool(...)会根据integration_type == "MCP"这一路,再次动态连接到对应的远端 MCP 服务器,并且:-
await session.initialize(); -
await session.call_tool(tool.original_name, arguments)—— 这段你文件里已经写好了(见下文代码定位)。
-
-
这意味着:自动注册只做“同步目录 + 入库”;真正“使用工具”走的仍是
invoke_tool里的 MCP 分支,按存档的gateway_id、request_type(SSE/Streamable)等信息动态发起远端调用。
二、 tool_service.py(关键位置)
-
远端 MCP 连接与调用:
from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.streamable_http import streamablehttp_client
在
invoke_tool的 MCP 分支里,能看到两段典型代码:-
SSE 版本(片段定位到
async with sse_client(url=server_url, headers=headers) as streams→ClientSession(*streams)→await session.initialize()→await session.call_tool(tool.original_name, arguments))。 -
Streamable HTTP 版本(
async with streamablehttp_client(url=server_url, headers=headers) as (read_stream, write_stream, ...)→ClientSession(read_stream, write_stream)→await session.initialize()→await session.call_tool(...))。
这说明调用端已经打通:你只要把“工具目录”预先入库,调用时会按
gateway_id + request_type自动走对的路径。 -
-
工具落库 API:
ToolService.register_tool(...):支持federation_source、import_batch_id、team_id、owner_email、visibility等元数据,用于外部导入/联邦来源标记与冲突处理(同名的 public/team 工具冲突抛ToolNameConflictError)。这正好给“自动注册”用。 -
MCP 调用时的 OAuth / 头部透传 / 插件钩子(你已经有):
invoke_toolMCP 分支里已经对 Gateway 级 OAuth、白名单 Header 透传、Plugin Pre/Post Invoke 做了处理。自动注册只需把工具关联到对应gateway_id,调用时天然生效。
四、调用路径如何“串起来”(把发现到使用串成一条链)
-
Admin / API 配置远端网关(DbGateway):写入
endpoint_url、transport(sse/streamablehttp)、auth_type等。 -
一次性/定期调用
sync_gateway_tools(db, gateway):-
拉目录 → 映射为
integration_type="MCP"的本地 Tool → 调register_tool入库。
-
-
用户侧调用:
-
ToolService.invoke_tool(...)读到这个 Tool(integration_type="MCP"、gateway_id、request_type),走 MCP 分支:-
SSE:
sse_client(...)→ClientSession(*streams)→initialize()→call_tool(name, args); -
Streamable:
streamablehttp_client(...)→ClientSession(read, write)→initialize()→call_tool(...);
-
-
一句话总结
“自动读取并注册远端 MCP 工具”*的本质是:**用和“调用远端工具”相同的会话**(
ClientSessionon SSE/Streamable),在initialize()之后调用*工具枚举 API(list_tools()),再把每个工具映射成你本地的ToolCreate并通过register_tool落库。注册后,使用时仍走你invoke_tool的 MCP 分支(你已实现),按gateway_id与传输类型动态连接远端call_tool(...)。
3. “把 A2A Agent 变成一个 Tool”——注册阶段
入口是 create_tool_from_a2a_agent(...)。它做了三件事:
-
先检查是否已经为该 Agent 生成过对应的 Tool(用
original_name=a2a_{agent.slug}唯一化)。若已存在就直接返回旧 Tool;否则继续创建。tool_service -
按 Agent 元数据拼出一份 ToolCreate:
-
integration_type="A2A"(关键标识,后续invoke_tool会据此走 A2A 分支) -
request_type="POST"(缺省通过 HTTP POST 调用 A2A Agent) -
url=agent.endpoint_url(调用目标) -
input_schema提供一个最小的{query: string}(方便 Admin/UI 直接试调) -
annotations 写入
a2a_agent_id、a2a_agent_type(这两个是后续定位 Agent 的关键“桥”) -
以及继承
auth_type/auth_value、tags等字段 最后把这份ToolCreate交给已有的register_tool(...)落库(统一处理冲突/可见性/事件通知/指标初始化)。tool_service
-
-
返回标准化的
ToolRead。tool_service
要点:A2A Tool 本质是 A2A Agent 的“本地代理条目”。它把 Agent 的 endpoint、鉴权、类型、ID 放进 Tool 的字段与 annotations 中,保证调用时能从 Tool 反查到 Agent。
2) “调用 A2A Tool 时怎么转给 A2A Agent”——执行阶段
在统一的 invoke_tool(...) 顶层路由里,先根据 integration_type 分派。当读到某工具是 A2A 且带有 a2a_agent_id 注解时,直接走 A2A 专用分支 _invoke_a2a_tool(...):tool_service
2.1 _invoke_a2a_tool(...) 做了什么?
-
从 Tool 的
annotations["a2a_agent_id"]反查 DbA2AAgent;若不存在/禁用则报错。tool_service -
调用私有方法
_call_a2a_agent(agent, arguments)发起真正的 HTTP 请求,并把结果转换为 MCP 的ToolResult(如果返回是 dict 且含response字段,就取response;否则字符串化)。tool_service
2.2 _call_a2a_agent(...) 的“协议适配”
-
输入规范化(兼容 Admin UI 的“扁平 query 字段”): 如果
arguments里有query(字符串),就自动包成 JSON-RPC 的消息结构:{"message":{"messageId":"admin-test-...","role":"user","parts":[{"type":"text","text":"..."}]}}同时默认
method="message/send";否则就回退为透传params。tool_service -
“请求格式”选择:
-
若
agent.agent_type in ["generic","jsonrpc"]或endpoint_url以/结尾 → 走 JSON-RPC 2.0:{"jsonrpc":"2.0","method":..., "params":..., "id":1}。 -
否则 → 走 自定义 A2A 格式:
{"interaction_type": "...","parameters": params,"protocol_version": agent.protocol_version}。
-
-
HTTP 调用与鉴权: 用
httpx.AsyncClient发POST到agent.endpoint_url,并按auth_type加Authorization: Bearer ...(支持api_key/bearer)。状态 200 则返回json(),否则抛错。tool_service
要点:A2A 分支把“本地 Tool 调用”翻译为“对 Agent endpoint 的 HTTP(或 JSON-RPC) 调用”,并且内置了一个输入整形器(把
{query: "..."}转为更丰富的 message 结构)以及两种协议打包逻辑(标准 JSON-RPC vs. 自定义 A2A)。
3) 调用路径总览(从入口到结果)
-
invoke_tool(...)找到目标 Tool,断言可达;若integration_type=="A2A"且 annotations 有a2a_agent_id→ 走_invoke_a2a_tool。tool_service -
_invoke_a2a_tool(...)用a2a_agent_id查到DbA2AAgent;然后调用_call_a2a_agent(...)。tool_service -
_call_a2a_agent(...)把 UI/客户端传来的arguments整形成目标协议(JSON-RPC 或自定义 A2A),加上鉴权头,httpx.post()发给agent.endpoint_url,收回响应。tool_service -
_invoke_a2a_tool(...)把响应包成ToolResult(content=[TextContent(...)] , is_error=...)返回;顶层invoke_tool(...)负责统一打点记录指标(成功/失败/时延)。
4) “A2A → Tool”的元数据桥接点(为什么能打通)
-
Tool.annotations:写入
a2a_agent_id、a2a_agent_type;调用时据此反查 Agent。tool_service -
Tool.url/request_type:保存 Agent 的 endpoint 与调用方法(POST),供 HTTP 客户端使用。tool_service
-
Tool.auth_type/auth_value:继承 Agent 的鉴权方式,调用时组装
Authorization。tool_service -
ToolCreate/register_tool(...):统一入库、冲突处理、事件通知。tool_service
5) 和 REST/MCP 两类工具的差异
-
REST 工具:用
ResilientHttpClient拼 URL 模板/Query/Header 白名单透传,直接打到外部 REST;响应可用 jq 过滤。tool_service -
MCP 工具:走
sse_client/streamablehttp_client + ClientSession.initialize(),session.call_tool(...);可带网关 OAuth、请求头透传。tool_service -
A2A 工具:不经 MCP 协议,而是直接 HTTP/JSON-RPC 到 Agent endpoint;有自带的输入整形(把
{query}转 message)和两种请求打包。tool_service
一句话总结
A2A2tools 的实现思路是:先把 A2A Agent“映射/注册”为一个本地 Tool(integration_type=A2A,annotations 记录 agent_id);当调用这个 Tool 时,按 annotations 反查 Agent → 把入参整形成 JSON-RPC 或自定义 A2A 格式 → 带鉴权用 HTTP POST 调 Agent endpoint → 把结果包回 MCP 的 ToolResult,并复用统一的指标/追踪体系。对应代码分别在 create_tool_from_a2a_agent(...)、invoke_tool(...)(A2A 分支)、_invoke_a2a_tool(...)、_call_a2a_agent(...)。
一句话总览
-
“组合成 MCP Server” = 定义一个 Server 实体 + 绑定多条 Tool。
-
ServerService负责:注册/更新 Server、验证并关联工具、提供按 Server 聚合后的工具清单与指标、以及事件通知。 -
MCP 协议面的
tools/list就是:读这个 Server 的工具关联,转换成工具描述返回;tools/call则:把调用路由给 ToolService.invoke_tool(这个在你的tool_service.py里),这里的ServerService主要提供“这个 Server 下应该有哪些工具可见”的编排数据。server_service
其中核心,就是

如果注册进来的是REST请求,就使用http客户端发起请求,然后将返回转化为MCP工具可以识别的返回
如果是MCP格式(即从远端的sse mcp服务传递过来的),则使用mcp客户端发起请求,
a2a走的是_invoke_a2a_tool接口,将输入转化为agent可以接受的格式
更多推荐



所有评论(0)