java快速接入mcp以及结合mysql动态管理
本文介绍了如何使用AI4J快速接入MCP服务。首先需引入AI4J的Spring Boot依赖,然后通过三种方式连接MCP服务:stdio方式适用于本地MCP服务,SSE方式用于连接高德等远程服务,HTTP方式用于标准接口。文章详细展示了构建传输层、创建MCP客户端并调用服务的代码示例,包括获取可用工具和执行特定功能的方法。此外还介绍了MCP网关的三种配置方式:通过JSON配置文件、动态添加服务器信
使用AI4J为应用快速接入MCP服务
本博文给大家介绍一下如何使用AI4J
快速接入MCP服务。
介绍
由于SpringAI需要使用JDK17和Spring Boot3,但是目前很多应用依旧使用的JDK8版本,所以使用可以支持JDK8的AI4J来接入OpenAI大模型。
AI4J是一款JavaSDK用于快速接入AI大模型应用,整合多平台大模型,如OpenAi、智谱Zhipu(ChatGLM)、深度求索DeepSeek、月之暗面Moonshot(Kimi)、腾讯混元Hunyuan、零一万物(01)等等,提供统一的输入输出(对齐OpenAi)消除差异化,优化函数调用(Tool Call),优化RAG调用、支持向量数据库(Pinecone),并且支持JDK1.8,为用户提供快速整合AI的能力。
支持MCP协议,支持STDIO,SSE,Streamable HTTP; 支持MCP Server与MCP Client; 支持MCP网关; 支持自定义MCP数据源; 支持MCP自动重连
引入Ai4j依赖
<!-- Spring应用 -->
<dependency>
<groupId>io.github.lnyo-cly</groupId>
<artifactId>ai4j-spring-boot-stater</artifactId>
<version>1.4.2</version>
</dependency>
利用MCP-Client接入MCP服务
使用MCP-Client接入stdio
@Test
public void test_mcp_client_stdio() throws Exception {
// 1. 构建传输层
McpTransport transport = McpTransportFactory.createTransport("stdio", TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));
// 2. 构建mcp client
McpClient mcpClient = new McpClient("12306-mcp","1.0.0", transport);
// 3. 开启连接
mcpClient.connect().join();
// 4. 获取可用工具
List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
System.out.println(tools);
}
上述代码,使用stdio方式接入MCP服务,使用前需要本地安装12306的mcp,运行结果如下:
使用MCP-Client接入SSE服务
@Test
public void test_mcp_client_sse() throws Exception {
// 1. 构建传输层
McpTransport transport = McpTransportFactory.createTransport("sse", TransportConfig.sse("https://mcp.amap.com/sse?key=您在高德官网上申请的key"));
// 2. 构建mcp client
McpClient mcpClient = new McpClient("高德MCP","1.0.0", transport);
// 3. 开启连接
mcpClient.connect().join();
// 4. 获取可用工具
List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
System.out.println(tools);
// 5. 创建参数对象
Map<String, Object> arguments = new HashMap<>();
arguments.put("city", "北京");
// 6. 调用函数
String result = mcpClient.callTool("maps_weather", arguments).join();
System.out.println(result);
}
上述代码,使用SSE方式接入高德的MCP服务,注意,高德MCP需要key,需要自行官网申请。
并进行调用mcp当中的服务,调用结果如下:
使用MCP-Client接入Streamable-HTTP
接入方法同理
@Test
public void test_mcp_client_http() throws Exception {
// 1. 构建传输层
McpTransport transport = McpTransportFactory.createTransport("http", TransportConfig.http("http://localhost:8080/mcp"));
// 2. 构建mcp client
McpClient mcpClient = new McpClient("Test-MCP","1.0.0", transport);
// 3. 开启连接
mcpClient.connect().join();
// 4. 获取可用工具
List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
System.out.println(tools);
}
MCP网关连接MCP服务
使用配置文件接入mcp网关
创建 mcp-servers-config.json
配置文件,配置mcp:
{
"mcpServers": {
"map-mcp": {
"type": "sse",
"url": "https://mcp.amap.com/sse?key=xxxxxxx"
},
"12306-mcp": {
"type": "stdio",
"command": "npx",
"args": ["-y", "12306-mcp"]
}
}
}
@Test
public void test_mcp_gateway() throws Exception {
// 1. 创建并初始化MCP网关
McpGateway gateway = new McpGateway();
gateway.initialize("mcp-servers-config.json").join();
// 2. 获取可用的工具
List<Tool.Function> tools = gateway.getAvailableTools().join();
System.out.println(tools);
// 3. 创建参数对象
Map<String, Object> arguments = new HashMap<>();
arguments.put("city", "北京");
// 4. 调用工具
String result = gateway.callTool("maps_weather", arguments).join();
System.out.println(result);
// 5. 关闭网关
gateway.shutdown().join();
}
使用serverinfo连接mcp网关
@Test
public void test_mcp_gateway_config() {
McpGateway gateway = new McpGateway();
// 不从配置文件初始化,手动添加服务器
gateway.initialize().join();
// 动态添加服务器
McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();
serverInfo.setName("12306-mcp");
serverInfo.setType("stdio");
serverInfo.setCommand("npx");
serverInfo.setArgs(Arrays.asList("-y", "12306-mcp"));
serverInfo.setEnabled(true);
gateway.addMcpServer("12306-mcp", serverInfo).join();
List<Tool.Function> tools = gateway.getAvailableTools().join();
System.out.println(tools);
gateway.shutdown().join();
}
使用client连接mcp网关
@Test
public void test_mcp_gateway_client() {
McpGateway gateway = new McpGateway();
gateway.initialize().join();
// 直接创建并添加MCP客户端
McpTransport transport = McpTransportFactory.createTransport("stdio",
TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));
McpClient mcpClient = new McpClient("12306", "1.0.0", transport);
gateway.addMcpClient("12306", mcpClient).join();
List<Tool.Function> tools = gateway.getAvailableTools().join();
System.out.println(tools);
gateway.shutdown().join();
}
OpenAi 使用 MCP服务
@Test
public void test_mcp_gateway_chat() throws Exception {
// 1. 创建并初始化MCP网关
McpGateway gateway = new McpGateway();
gateway.initialize("mcp-servers-config.json").join();
// 2. 检查网关状态
Map<String, Object> status = gateway.getGatewayStatus();
System.out.println("网关状态: " + status);
// 3. 获取Chat服务
IChatService chatService = aiService.getChatService(PlatformType.OPENAI);
// 4. 构建Chat请求
String content = "帮我查询今天北京到上海的高铁";
ChatCompletion chatCompletion = ChatCompletion.builder()
.model("gpt-4o-mini")
.mcpServices("12306-mcp", "map-mcp")
.message(ChatMessage.withUser(content))
.build();
// 5. 发送Chat请求
ChatCompletionResponse chatCompletionResponse = chatService.chatCompletion(chatCompletion);
System.out.println("问题:" + content);
System.out.println("回答:" + chatCompletionResponse.getChoices().get(0).getMessage().getContent());
System.out.println("花费tokens:" + chatCompletionResponse.getUsage().getTotalTokens());
// 6. 关闭网关
gateway.shutdown().join();
}
结合MySQL实现MCP动态管理(使用自定义数据源)
建立数据库
实体类、mapper、以及有关的service不再提供(比较占用篇幅),可以自行编写。
构建service实现自定义数据源
@Service
@Slf4j
public class McpGatewayService {
@Autowired
private McpServiceConfigMapper mcpServiceConfigMapper;
@Autowired
private UserMcpServiceMapper userMcpServiceMapper;
@Autowired
private UserMcpAuthMapper userMcpAuthMapper;
private McpGateway mcpGateway;
private DatabaseMcpConfigSource databaseConfigSource;
@PostConstruct
public void init() {
// 初始化MCP网关
mcpGateway = new McpGateway();
// 使用数据库配置源
databaseConfigSource = new DatabaseMcpConfigSource();
mcpGateway.setConfigSource(databaseConfigSource);
// 初始化网关
mcpGateway.initialize().join();
log.info("MCP网关初始化完成");
}
/**
* 为用户设置MCP服务
*/
public CompletableFuture<Void> setupUserMcpServices(Long userId, List<String> serviceIds) {
return CompletableFuture.runAsync(() -> {
try {
// 清除用户之前的MCP客户端
mcpGateway.clearUserMcpClients(String.valueOf(userId)).join();
for (String serviceId : serviceIds) {
setupSingleUserMcpService(userId, serviceId);
}
log.info("为用户 {} 设置了 {} 个MCP服务", userId, serviceIds.size());
} catch (Exception e) {
log.error("为用户 {} 设置MCP服务失败", userId, e);
throw new RuntimeException("设置MCP服务失败", e);
}
});
}
/**
* 设置单个用户MCP服务
*/
private void setupSingleUserMcpService(Long userId, String serviceId) {
try {
// 1. 获取服务配置
McpServiceConfig serviceConfig = mcpServiceConfigMapper.selectByServiceId(serviceId);
if (serviceConfig == null || serviceConfig.getStatus() != 1) {
log.warn("用户 {} 请求的服务 {} 不可用", userId, serviceId);
return;
}
// 2. 检查用户是否已添加该服务
UserMcpService userService = userMcpServiceMapper.selectByUserIdAndServiceId(userId, serviceId);
if (userService == null || userService.getEnabled() != 1) {
log.warn("用户 {} 未添加服务 {} 或已禁用", userId, serviceId);
return;
}
// 3. 构建MCP服务配置
McpServerConfig.McpServerInfo mcpConfig = buildMcpConfig(serviceConfig, userId);
// 4. 创建MCP客户端
String clientId = "user_" + userId + "_service_" + serviceId;
TransportConfig transportConfig = createTransportConfig(mcpConfig);
McpClient client = new McpClient(clientId, "1.0.0",
McpTransportFactory.createTransport(serviceConfig.getTransportType(), transportConfig));
// 5. 添加到网关(用户级别)
mcpGateway.addUserMcpClient(String.valueOf(userId), serviceId, client).join();
log.info("为用户 {} 成功设置MCP服务: {}", userId, serviceId);
} catch (Exception e) {
log.error("为用户 {} 设置MCP服务 {} 失败", userId, serviceId, e);
}
}
/**
* 构建MCP配置(包含用户认证信息)
*/
private McpServerConfig.McpServerInfo buildMcpConfig(McpServiceConfig serviceConfig, Long userId) {
McpServerConfig.McpServerInfo config = new McpServerConfig.McpServerInfo();
config.setName(serviceConfig.getServiceName());
config.setType(serviceConfig.getTransportType());
config.setCommand(serviceConfig.getCommand());
config.setUrl(serviceConfig.getUrl());
// 解析命令参数
if (serviceConfig.getArgs() != null) {
List<String> args = JSON.parseArray(serviceConfig.getArgs(), String.class);
config.setArgs(args);
}
// 构建环境变量
Map<String, String> env = new HashMap<>();
// 1. 添加默认环境变量
if (serviceConfig.getEnvVars() != null) {
Map<String, String> defaultEnv = JSON.parseObject(serviceConfig.getEnvVars(),
new TypeReference<Map<String, String>>() {});
env.putAll(defaultEnv);
}
// 2. 如果需要认证,添加用户认证信息
if (serviceConfig.getRequireAuth() == 1) {
UserMcpAuth userAuth = userMcpAuthMapper.selectByUserIdAndServiceId(userId, serviceConfig.getServiceId());
if (userAuth != null && userAuth.getStatus() == 1) {
Map<String, String> authData = JSON.parseObject(userAuth.getAuthData(),
new TypeReference<Map<String, String>>() {});
env.putAll(authData);
log.debug("为用户 {} 的服务 {} 添加了认证信息", userId, serviceConfig.getServiceId());
} else {
log.warn("用户 {} 的服务 {} 需要认证但未配置认证信息", userId, serviceConfig.getServiceId());
}
}
config.setEnv(env);
config.setEnabled(true);
return config;
}
/**
* 创建传输配置
*/
private TransportConfig createTransportConfig(McpServerConfig.McpServerInfo config) {
String transportType = config.getType();
switch (transportType.toLowerCase()) {
case "stdio":
return TransportConfig.stdio(config.getCommand(), config.getArgs(), config.getEnv());
case "sse":
return TransportConfig.sse(config.getUrl());
case "streamable_http":
case "http":
return TransportConfig.streamableHttp(config.getUrl());
default:
throw new IllegalArgumentException("不支持的传输类型: " + transportType);
}
}
/**
* 获取用户可用的MCP工具
*/
public CompletableFuture<List<Tool.Function>> getUserAvailableTools(Long userId) {
return mcpGateway.getUserAvailableTools(String.valueOf(userId));
}
/**
* 调用用户的MCP工具
*/
public CompletableFuture<String> callUserTool(Long userId, String toolName, Object arguments) {
return mcpGateway.callUserTool(String.valueOf(userId), toolName, arguments);
}
/**
* 获取网关状态
*/
public Map<String, Object> getGatewayStatus() {
return mcpGateway.getGatewayStatus();
}
/**
* 数据库配置源实现
*/
private class DatabaseMcpConfigSource implements io.github.lnyocly.ai4j.mcp.config.McpConfigSource {
private final List<ConfigChangeListener> listeners = new ArrayList<>();
private volatile Map<String, McpServerConfig.McpServerInfo> cachedConfigs = new HashMap<>();
public DatabaseMcpConfigSource() {
loadConfigsFromDatabase();
}
@Override
public Map<String, McpServerConfig.McpServerInfo> getAllConfigs() {
return new HashMap<>(cachedConfigs);
}
@Override
public McpServerConfig.McpServerInfo getConfig(String serverId) {
return cachedConfigs.get(serverId);
}
@Override
public void addConfigChangeListener(ConfigChangeListener listener) {
listeners.add(listener);
}
@Override
public void removeConfigChangeListener(ConfigChangeListener listener) {
listeners.remove(listener);
}
/**
* 从数据库加载全局配置(不需要认证的服务)
*/
private void loadConfigsFromDatabase() {
try {
// 只加载不需要认证的全局服务
List<McpServiceConfig> configs = mcpServiceConfigMapper.selectByCondition(1, null);
Map<String, McpServerConfig.McpServerInfo> newConfigs = new HashMap<>();
for (McpServiceConfig config : configs) {
// 只加载不需要认证的服务作为全局服务
if (config.getRequireAuth() == 0) {
McpServerConfig.McpServerInfo serverInfo = convertToServerInfo(config);
newConfigs.put(config.getServiceId(), serverInfo);
}
}
this.cachedConfigs = newConfigs;
log.info("从数据库加载了 {} 个全局MCP服务配置", newConfigs.size());
} catch (Exception e) {
log.error("从数据库加载MCP配置失败", e);
}
}
private McpServerConfig.McpServerInfo convertToServerInfo(McpServiceConfig config) {
McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();
serverInfo.setName(config.getServiceName());
serverInfo.setType(config.getTransportType());
serverInfo.setCommand(config.getCommand());
serverInfo.setUrl(config.getUrl());
if (config.getArgs() != null) {
List<String> args = JSON.parseArray(config.getArgs(), String.class);
serverInfo.setArgs(args);
}
if (config.getEnvVars() != null) {
Map<String, String> env = JSON.parseObject(config.getEnvVars(),
new TypeReference<Map<String, String>>() {});
serverInfo.setEnv(env);
}
serverInfo.setEnabled(true);
return serverInfo;
}
/**
* 刷新配置(管理员修改配置后调用)
*/
public void refreshConfigs() {
Map<String, McpServerConfig.McpServerInfo> oldConfigs = new HashMap<>(cachedConfigs);
loadConfigsFromDatabase();
// 检测变更并通知监听器
detectAndNotifyChanges(oldConfigs, cachedConfigs);
}
private void detectAndNotifyChanges(Map<String, McpServerConfig.McpServerInfo> oldConfigs,
Map<String, McpServerConfig.McpServerInfo> newConfigs) {
// 检测新增
newConfigs.forEach((serverId, config) -> {
if (!oldConfigs.containsKey(serverId)) {
notifyConfigAdded(serverId, config);
} else if (!configEquals(oldConfigs.get(serverId), config)) {
notifyConfigUpdated(serverId, config);
}
});
// 检测删除
oldConfigs.forEach((serverId, config) -> {
if (!newConfigs.containsKey(serverId)) {
notifyConfigRemoved(serverId);
}
});
}
private boolean configEquals(McpServerConfig.McpServerInfo config1, McpServerConfig.McpServerInfo config2) {
if (config1 == null && config2 == null) return true;
if (config1 == null || config2 == null) return false;
try {
String json1 = JSON.toJSONString(config1);
String json2 = JSON.toJSONString(config2);
return json1.equals(json2);
} catch (Exception e) {
log.warn("比较配置时发生错误", e);
return false;
}
}
private void notifyConfigAdded(String serverId, McpServerConfig.McpServerInfo config) {
listeners.forEach(listener -> {
try {
listener.onConfigAdded(serverId, config);
} catch (Exception e) {
log.error("通知配置添加失败: {}", serverId, e);
}
});
}
private void notifyConfigRemoved(String serverId) {
listeners.forEach(listener -> {
try {
listener.onConfigRemoved(serverId);
} catch (Exception e) {
log.error("通知配置删除失败: {}", serverId, e);
}
});
}
private void notifyConfigUpdated(String serverId, McpServerConfig.McpServerInfo config) {
listeners.forEach(listener -> {
try {
listener.onConfigUpdated(serverId, config);
} catch (Exception e) {
log.error("通知配置更新失败: {}", serverId, e);
}
});
}
}
/**
* 刷新全局配置
*/
public void refreshGlobalConfigs() {
if (databaseConfigSource != null) {
databaseConfigSource.refreshConfigs();
log.info("全局MCP配置已刷新");
}
}
}
构建controller,实现admin端在线增加mcp服务
@RestController
@RequestMapping("/api/admin/mcp/services")
public class AdminMcpController extends BaseController {
@Autowired
private McpServiceConfigService mcpServiceConfigService;
@Autowired
private McpGatewayService mcpGatewayService;
/**
* 获取所有MCP服务配置
*/
@GetMapping("/list")
public AjaxResult list(McpServiceConfig mcpServiceConfig) {
List<McpServiceConfig> services = mcpServiceConfigService.selectList(null);
return AjaxResult.success(services);
}
/**
* 分页获取MCP服务配置
*/
@GetMapping("/page")
public TableDataInfo page(McpServiceConfig mcpServiceConfig) {
startPage();
List<McpServiceConfig> services = mcpServiceConfigService.selectList(mcpServiceConfig);
return getDataTable(services);
}
/**
* 获取单个MCP服务配置
*/
@GetMapping("/{id}")
public AjaxResult getService(@PathVariable Long id) {
McpServiceConfig service = mcpServiceConfigService.getServiceById(id);
if (service == null) {
return AjaxResult.error("服务不存在");
}
return AjaxResult.success(service);
}
/**
* 添加MCP服务配置
*/
@PostMapping
public AjaxResult addService(@RequestBody McpServiceConfig config) {
// 检查serviceId是否已存在
if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {
return AjaxResult.error("服务ID已存在");
}
config.setCreatedBy(StpUtil.getLoginIdAsString());
config.setStatus(1); // 默认启用
boolean success = mcpServiceConfigService.addService(config);
if (success) {
// 自动刷新MCP配置
try {
mcpGatewayService.refreshGlobalConfigs();
} catch (Exception e) {
// 记录日志但不影响主流程
logger.error("刷新MCP配置失败", e);
}
return AjaxResult.success();
} else {
return AjaxResult.error("添加失败");
}
}
/**
* 更新MCP服务配置
*/
@PutMapping
public AjaxResult updateService(@RequestBody McpServiceConfig config) {
McpServiceConfig existingConfig = mcpServiceConfigService.getServiceById(config.getId());
// 如果修改了serviceId,检查新的serviceId是否已存在
if (!existingConfig.getServiceId().equals(config.getServiceId())) {
if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {
return AjaxResult.error("服务ID已存在");
}
}
boolean success = mcpServiceConfigService.updateService(config);
if (success) {
return AjaxResult.success();
} else {
return AjaxResult.error("更新失败");
}
}
/**
* 上架/下架MCP服务
*/
@PutMapping("/{id}/status")
public AjaxResult updateServiceStatus(@PathVariable Long id, @RequestParam Integer status) {
if (status != 0 && status != 1) {
return AjaxResult.error("状态值无效");
}
boolean success = mcpServiceConfigService.updateServiceStatus(id, status);
if (success) {
return AjaxResult.success();
} else {
return AjaxResult.error("更新状态失败");
}
}
/**
* 删除MCP服务配置
*/
@DeleteMapping("/{id}")
public AjaxResult deleteService(@PathVariable Long id) {
boolean success = mcpServiceConfigService.deleteService(id);
if (success) {
return AjaxResult.success();
} else {
return AjaxResult.error("删除失败");
}
}
/**
* 刷新MCP配置(管理员修改配置后调用)
*/
@PostMapping("/refresh")
public AjaxResult refreshMcpConfigs() {
try {
// 方式1: 通过ApplicationEventPublisher发布事件
// applicationEventPublisher.publishEvent(new McpConfigRefreshEvent());
// 方式2: 直接调用刷新方法(需要暴露接口)
mcpGatewayService.refreshGlobalConfigs();
return AjaxResult.success();
} catch (Exception e) {
return AjaxResult.error("刷新配置失败: " + e.getMessage());
}
}
}
更多推荐
所有评论(0)