使用AI4J为应用快速接入MCP服务

本博文给大家介绍一下如何使用AI4J快速接入MCP服务。

介绍

由于SpringAI需要使用JDK17和Spring Boot3,但是目前很多应用依旧使用的JDK8版本,所以使用可以支持JDK8的AI4J来接入OpenAI大模型。

AI4J是一款JavaSDK用于快速接入AI大模型应用,整合多平台大模型,如OpenAi、智谱Zhipu(ChatGLM)、深度求索DeepSeek、月之暗面Moonshot(Kimi)、腾讯混元Hunyuan、零一万物(01)等等,提供统一的输入输出(对齐OpenAi)消除差异化,优化函数调用(Tool Call),优化RAG调用、支持向量数据库(Pinecone),并且支持JDK1.8,为用户提供快速整合AI的能力。

支持MCP协议,支持STDIO,SSE,Streamable HTTP; 支持MCP Server与MCP Client; 支持MCP网关; 支持自定义MCP数据源; 支持MCP自动重连

AI4J-GitHub
AI4J-Gitee

引入Ai4j依赖

<!-- Spring应用 -->
<dependency>
    <groupId>io.github.lnyo-cly</groupId>
    <artifactId>ai4j-spring-boot-stater</artifactId>
    <version>1.4.2</version>
</dependency>

利用MCP-Client接入MCP服务

使用MCP-Client接入stdio

    @Test
    public void test_mcp_client_stdio() throws Exception {

        // 1. 构建传输层
        McpTransport transport = McpTransportFactory.createTransport("stdio", TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));

        // 2. 构建mcp client
        McpClient mcpClient = new McpClient("12306-mcp","1.0.0", transport);

        // 3. 开启连接
        mcpClient.connect().join();

        // 4. 获取可用工具
        List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
        System.out.println(tools);
    }

上述代码,使用stdio方式接入MCP服务,使用前需要本地安装12306的mcp,运行结果如下:
20250819143230

使用MCP-Client接入SSE服务

    @Test
    public void test_mcp_client_sse() throws Exception {

        // 1. 构建传输层
        McpTransport transport = McpTransportFactory.createTransport("sse", TransportConfig.sse("https://mcp.amap.com/sse?key=您在高德官网上申请的key"));

        // 2. 构建mcp client
        McpClient mcpClient = new McpClient("高德MCP","1.0.0", transport);

        // 3. 开启连接
        mcpClient.connect().join();

        // 4. 获取可用工具
        List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
        System.out.println(tools);

        // 5. 创建参数对象
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("city", "北京");

        // 6. 调用函数
        String result = mcpClient.callTool("maps_weather", arguments).join();
        System.out.println(result);
    }

上述代码,使用SSE方式接入高德的MCP服务,注意,高德MCP需要key,需要自行官网申请。

并进行调用mcp当中的服务,调用结果如下:

20250819153054

使用MCP-Client接入Streamable-HTTP

接入方法同理

    @Test
    public void test_mcp_client_http() throws Exception {

        // 1. 构建传输层
        McpTransport transport = McpTransportFactory.createTransport("http", TransportConfig.http("http://localhost:8080/mcp"));

        // 2. 构建mcp client
        McpClient mcpClient = new McpClient("Test-MCP","1.0.0", transport);

        // 3. 开启连接
        mcpClient.connect().join();

        // 4. 获取可用工具
        List<McpToolDefinition> tools = mcpClient.getAvailableTools().join();
        System.out.println(tools);
    }

MCP网关连接MCP服务

使用配置文件接入mcp网关

创建 mcp-servers-config.json配置文件,配置mcp:

{
  "mcpServers": {
    "map-mcp": {
      "type": "sse",
      "url": "https://mcp.amap.com/sse?key=xxxxxxx"
    },
    "12306-mcp": {
      "type": "stdio",
      "command": "npx",
      "args": ["-y", "12306-mcp"]
    }
  }
}

    @Test
    public void test_mcp_gateway() throws Exception {
        // 1. 创建并初始化MCP网关
        McpGateway gateway = new McpGateway();
        gateway.initialize("mcp-servers-config.json").join();

        // 2. 获取可用的工具
        List<Tool.Function> tools = gateway.getAvailableTools().join();
        System.out.println(tools);

        // 3. 创建参数对象
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("city", "北京");

        // 4. 调用工具
        String result = gateway.callTool("maps_weather", arguments).join();
        System.out.println(result);

        // 5. 关闭网关
        gateway.shutdown().join();
    }

使用serverinfo连接mcp网关

    @Test
    public void test_mcp_gateway_config() {
        McpGateway gateway = new McpGateway();

        // 不从配置文件初始化,手动添加服务器
        gateway.initialize().join();

        // 动态添加服务器
        McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();
        serverInfo.setName("12306-mcp");
        serverInfo.setType("stdio");
        serverInfo.setCommand("npx");
        serverInfo.setArgs(Arrays.asList("-y", "12306-mcp"));
        serverInfo.setEnabled(true);

        gateway.addMcpServer("12306-mcp", serverInfo).join();

        List<Tool.Function> tools = gateway.getAvailableTools().join();
        System.out.println(tools);

        gateway.shutdown().join();
    }

使用client连接mcp网关

    @Test
    public void test_mcp_gateway_client() {
        McpGateway gateway = new McpGateway();
        gateway.initialize().join();

        // 直接创建并添加MCP客户端
        McpTransport transport = McpTransportFactory.createTransport("stdio",
                TransportConfig.stdio("npx", Arrays.asList("-y", "12306-mcp")));
        McpClient mcpClient = new McpClient("12306", "1.0.0", transport);

        gateway.addMcpClient("12306", mcpClient).join();

        List<Tool.Function> tools = gateway.getAvailableTools().join();
        System.out.println(tools);

        gateway.shutdown().join();
    }

OpenAi 使用 MCP服务

    @Test
    public void test_mcp_gateway_chat() throws Exception {
        // 1. 创建并初始化MCP网关
        McpGateway gateway = new McpGateway();
        gateway.initialize("mcp-servers-config.json").join();

        // 2. 检查网关状态
        Map<String, Object> status = gateway.getGatewayStatus();
        System.out.println("网关状态: " + status);


        // 3. 获取Chat服务
        IChatService chatService = aiService.getChatService(PlatformType.OPENAI);

        // 4. 构建Chat请求
        String content = "帮我查询今天北京到上海的高铁";
        ChatCompletion chatCompletion = ChatCompletion.builder()
                .model("gpt-4o-mini")
                .mcpServices("12306-mcp", "map-mcp")
                .message(ChatMessage.withUser(content))
                .build();

        // 5. 发送Chat请求
        ChatCompletionResponse chatCompletionResponse = chatService.chatCompletion(chatCompletion);
        System.out.println("问题:" + content);
        System.out.println("回答:" + chatCompletionResponse.getChoices().get(0).getMessage().getContent());
        System.out.println("花费tokens:" + chatCompletionResponse.getUsage().getTotalTokens());

        // 6. 关闭网关
        gateway.shutdown().join();
    }

结合MySQL实现MCP动态管理(使用自定义数据源)

建立数据库

20250819181720

20250819181746

20250819182347

实体类、mapper、以及有关的service不再提供(比较占用篇幅),可以自行编写。

构建service实现自定义数据源

@Service
@Slf4j
public class McpGatewayService {

    @Autowired
    private McpServiceConfigMapper mcpServiceConfigMapper;

    @Autowired
    private UserMcpServiceMapper userMcpServiceMapper;

    @Autowired
    private UserMcpAuthMapper userMcpAuthMapper;

    private McpGateway mcpGateway;
    private DatabaseMcpConfigSource databaseConfigSource;
    @PostConstruct
    public void init() {
        // 初始化MCP网关
        mcpGateway = new McpGateway();

        // 使用数据库配置源
        databaseConfigSource = new DatabaseMcpConfigSource();
        mcpGateway.setConfigSource(databaseConfigSource);

        // 初始化网关
        mcpGateway.initialize().join();

        log.info("MCP网关初始化完成");
    }

    /**
     * 为用户设置MCP服务
     */
    public CompletableFuture<Void> setupUserMcpServices(Long userId, List<String> serviceIds) {
        return CompletableFuture.runAsync(() -> {
            try {
                // 清除用户之前的MCP客户端
                mcpGateway.clearUserMcpClients(String.valueOf(userId)).join();

                for (String serviceId : serviceIds) {
                    setupSingleUserMcpService(userId, serviceId);
                }

                log.info("为用户 {} 设置了 {} 个MCP服务", userId, serviceIds.size());
            } catch (Exception e) {
                log.error("为用户 {} 设置MCP服务失败", userId, e);
                throw new RuntimeException("设置MCP服务失败", e);
            }
        });
    }

    /**
     * 设置单个用户MCP服务
     */
    private void setupSingleUserMcpService(Long userId, String serviceId) {
        try {
            // 1. 获取服务配置
            McpServiceConfig serviceConfig = mcpServiceConfigMapper.selectByServiceId(serviceId);
            if (serviceConfig == null || serviceConfig.getStatus() != 1) {
                log.warn("用户 {} 请求的服务 {} 不可用", userId, serviceId);
                return;
            }

            // 2. 检查用户是否已添加该服务
            UserMcpService userService = userMcpServiceMapper.selectByUserIdAndServiceId(userId, serviceId);
            if (userService == null || userService.getEnabled() != 1) {
                log.warn("用户 {} 未添加服务 {} 或已禁用", userId, serviceId);
                return;
            }

            // 3. 构建MCP服务配置
            McpServerConfig.McpServerInfo mcpConfig = buildMcpConfig(serviceConfig, userId);

            // 4. 创建MCP客户端
            String clientId = "user_" + userId + "_service_" + serviceId;
            TransportConfig transportConfig = createTransportConfig(mcpConfig);
            McpClient client = new McpClient(clientId, "1.0.0",
                    McpTransportFactory.createTransport(serviceConfig.getTransportType(), transportConfig));

            // 5. 添加到网关(用户级别)
            mcpGateway.addUserMcpClient(String.valueOf(userId), serviceId, client).join();

            log.info("为用户 {} 成功设置MCP服务: {}", userId, serviceId);

        } catch (Exception e) {
            log.error("为用户 {} 设置MCP服务 {} 失败", userId, serviceId, e);
        }
    }

    /**
     * 构建MCP配置(包含用户认证信息)
     */
    private McpServerConfig.McpServerInfo buildMcpConfig(McpServiceConfig serviceConfig, Long userId) {
        McpServerConfig.McpServerInfo config = new McpServerConfig.McpServerInfo();
        config.setName(serviceConfig.getServiceName());
        config.setType(serviceConfig.getTransportType());
        config.setCommand(serviceConfig.getCommand());
        config.setUrl(serviceConfig.getUrl());

        // 解析命令参数
        if (serviceConfig.getArgs() != null) {
            List<String> args = JSON.parseArray(serviceConfig.getArgs(), String.class);
            config.setArgs(args);
        }

        // 构建环境变量
        Map<String, String> env = new HashMap<>();

        // 1. 添加默认环境变量
        if (serviceConfig.getEnvVars() != null) {
            Map<String, String> defaultEnv = JSON.parseObject(serviceConfig.getEnvVars(),
                    new TypeReference<Map<String, String>>() {});
            env.putAll(defaultEnv);
        }

        // 2. 如果需要认证,添加用户认证信息
        if (serviceConfig.getRequireAuth() == 1) {
            UserMcpAuth userAuth = userMcpAuthMapper.selectByUserIdAndServiceId(userId, serviceConfig.getServiceId());
            if (userAuth != null && userAuth.getStatus() == 1) {
                Map<String, String> authData = JSON.parseObject(userAuth.getAuthData(),
                        new TypeReference<Map<String, String>>() {});
                env.putAll(authData);
                log.debug("为用户 {} 的服务 {} 添加了认证信息", userId, serviceConfig.getServiceId());
            } else {
                log.warn("用户 {} 的服务 {} 需要认证但未配置认证信息", userId, serviceConfig.getServiceId());
            }
        }

        config.setEnv(env);
        config.setEnabled(true);

        return config;
    }

    /**
     * 创建传输配置
     */
    private TransportConfig createTransportConfig(McpServerConfig.McpServerInfo config) {
        String transportType = config.getType();

        switch (transportType.toLowerCase()) {
            case "stdio":
                return TransportConfig.stdio(config.getCommand(), config.getArgs(), config.getEnv());
            case "sse":
                return TransportConfig.sse(config.getUrl());
            case "streamable_http":
            case "http":
                return TransportConfig.streamableHttp(config.getUrl());
            default:
                throw new IllegalArgumentException("不支持的传输类型: " + transportType);
        }
    }

    /**
     * 获取用户可用的MCP工具
     */
    public CompletableFuture<List<Tool.Function>> getUserAvailableTools(Long userId) {
        return mcpGateway.getUserAvailableTools(String.valueOf(userId));
    }

    /**
     * 调用用户的MCP工具
     */
    public CompletableFuture<String> callUserTool(Long userId, String toolName, Object arguments) {
        return mcpGateway.callUserTool(String.valueOf(userId), toolName, arguments);
    }

    /**
     * 获取网关状态
     */
    public Map<String, Object> getGatewayStatus() {
        return mcpGateway.getGatewayStatus();
    }

    /**
     * 数据库配置源实现
     */
    private class DatabaseMcpConfigSource implements io.github.lnyocly.ai4j.mcp.config.McpConfigSource {

        private final List<ConfigChangeListener> listeners = new ArrayList<>();
        private volatile Map<String, McpServerConfig.McpServerInfo> cachedConfigs = new HashMap<>();

        public DatabaseMcpConfigSource() {
            loadConfigsFromDatabase();
        }

        @Override
        public Map<String, McpServerConfig.McpServerInfo> getAllConfigs() {
            return new HashMap<>(cachedConfigs);
        }

        @Override
        public McpServerConfig.McpServerInfo getConfig(String serverId) {
            return cachedConfigs.get(serverId);
        }

        @Override
        public void addConfigChangeListener(ConfigChangeListener listener) {
            listeners.add(listener);
        }

        @Override
        public void removeConfigChangeListener(ConfigChangeListener listener) {
            listeners.remove(listener);
        }

        /**
         * 从数据库加载全局配置(不需要认证的服务)
         */
        private void loadConfigsFromDatabase() {
            try {
                // 只加载不需要认证的全局服务
                List<McpServiceConfig> configs = mcpServiceConfigMapper.selectByCondition(1, null);
                Map<String, McpServerConfig.McpServerInfo> newConfigs = new HashMap<>();

                for (McpServiceConfig config : configs) {
                    // 只加载不需要认证的服务作为全局服务
                    if (config.getRequireAuth() == 0) {
                        McpServerConfig.McpServerInfo serverInfo = convertToServerInfo(config);
                        newConfigs.put(config.getServiceId(), serverInfo);
                    }
                }

                this.cachedConfigs = newConfigs;
                log.info("从数据库加载了 {} 个全局MCP服务配置", newConfigs.size());

            } catch (Exception e) {
                log.error("从数据库加载MCP配置失败", e);
            }
        }

        private McpServerConfig.McpServerInfo convertToServerInfo(McpServiceConfig config) {
            McpServerConfig.McpServerInfo serverInfo = new McpServerConfig.McpServerInfo();
            serverInfo.setName(config.getServiceName());
            serverInfo.setType(config.getTransportType());
            serverInfo.setCommand(config.getCommand());
            serverInfo.setUrl(config.getUrl());

            if (config.getArgs() != null) {
                List<String> args = JSON.parseArray(config.getArgs(), String.class);
                serverInfo.setArgs(args);
            }

            if (config.getEnvVars() != null) {
                Map<String, String> env = JSON.parseObject(config.getEnvVars(),
                        new TypeReference<Map<String, String>>() {});
                serverInfo.setEnv(env);
            }

            serverInfo.setEnabled(true);
            return serverInfo;
        }

        /**
         * 刷新配置(管理员修改配置后调用)
         */
        public void refreshConfigs() {
            Map<String, McpServerConfig.McpServerInfo> oldConfigs = new HashMap<>(cachedConfigs);
            loadConfigsFromDatabase();

            // 检测变更并通知监听器
            detectAndNotifyChanges(oldConfigs, cachedConfigs);
        }

        private void detectAndNotifyChanges(Map<String, McpServerConfig.McpServerInfo> oldConfigs,
                                            Map<String, McpServerConfig.McpServerInfo> newConfigs) {

            // 检测新增
            newConfigs.forEach((serverId, config) -> {
                if (!oldConfigs.containsKey(serverId)) {
                    notifyConfigAdded(serverId, config);
                } else if (!configEquals(oldConfigs.get(serverId), config)) {
                    notifyConfigUpdated(serverId, config);
                }
            });

            // 检测删除
            oldConfigs.forEach((serverId, config) -> {
                if (!newConfigs.containsKey(serverId)) {
                    notifyConfigRemoved(serverId);
                }
            });
        }

        private boolean configEquals(McpServerConfig.McpServerInfo config1, McpServerConfig.McpServerInfo config2) {
            if (config1 == null && config2 == null) return true;
            if (config1 == null || config2 == null) return false;

            try {
                String json1 = JSON.toJSONString(config1);
                String json2 = JSON.toJSONString(config2);
                return json1.equals(json2);
            } catch (Exception e) {
                log.warn("比较配置时发生错误", e);
                return false;
            }
        }

        private void notifyConfigAdded(String serverId, McpServerConfig.McpServerInfo config) {
            listeners.forEach(listener -> {
                try {
                    listener.onConfigAdded(serverId, config);
                } catch (Exception e) {
                    log.error("通知配置添加失败: {}", serverId, e);
                }
            });
        }

        private void notifyConfigRemoved(String serverId) {
            listeners.forEach(listener -> {
                try {
                    listener.onConfigRemoved(serverId);
                } catch (Exception e) {
                    log.error("通知配置删除失败: {}", serverId, e);
                }
            });
        }

        private void notifyConfigUpdated(String serverId, McpServerConfig.McpServerInfo config) {
            listeners.forEach(listener -> {
                try {
                    listener.onConfigUpdated(serverId, config);
                } catch (Exception e) {
                    log.error("通知配置更新失败: {}", serverId, e);
                }
            });
        }
    }


    /**
     * 刷新全局配置
     */
    public void refreshGlobalConfigs() {
        if (databaseConfigSource != null) {
            databaseConfigSource.refreshConfigs();
            log.info("全局MCP配置已刷新");
        }
    }
}

构建controller,实现admin端在线增加mcp服务

@RestController
@RequestMapping("/api/admin/mcp/services")
public class AdminMcpController extends BaseController {

    @Autowired
    private McpServiceConfigService mcpServiceConfigService;

    @Autowired
    private McpGatewayService mcpGatewayService;

    /**
     * 获取所有MCP服务配置
     */
    @GetMapping("/list")
    public AjaxResult list(McpServiceConfig mcpServiceConfig) {
        List<McpServiceConfig> services = mcpServiceConfigService.selectList(null);
        return AjaxResult.success(services);
    }


    /**
     * 分页获取MCP服务配置
     */
    @GetMapping("/page")
    public TableDataInfo page(McpServiceConfig mcpServiceConfig) {
        startPage();
        List<McpServiceConfig> services = mcpServiceConfigService.selectList(mcpServiceConfig);
        return getDataTable(services);
    }


    /**
     * 获取单个MCP服务配置
     */
    @GetMapping("/{id}")
    public AjaxResult getService(@PathVariable Long id) {
        McpServiceConfig service = mcpServiceConfigService.getServiceById(id);
        if (service == null) {
            return AjaxResult.error("服务不存在");
        }
        return AjaxResult.success(service);
    }

    /**
     * 添加MCP服务配置
     */
    @PostMapping
    public AjaxResult addService(@RequestBody McpServiceConfig config) {
        // 检查serviceId是否已存在
        if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {
            return AjaxResult.error("服务ID已存在");
        }

        config.setCreatedBy(StpUtil.getLoginIdAsString());
        config.setStatus(1); // 默认启用

        boolean success = mcpServiceConfigService.addService(config);
        if (success) {
            // 自动刷新MCP配置
            try {
                mcpGatewayService.refreshGlobalConfigs();
            } catch (Exception e) {
                // 记录日志但不影响主流程
                logger.error("刷新MCP配置失败", e);
            }
            return AjaxResult.success();
        } else {
            return AjaxResult.error("添加失败");
        }
    }

    /**
     * 更新MCP服务配置
     */
    @PutMapping
    public AjaxResult updateService(@RequestBody McpServiceConfig config) {
        McpServiceConfig existingConfig = mcpServiceConfigService.getServiceById(config.getId());
        // 如果修改了serviceId,检查新的serviceId是否已存在
        if (!existingConfig.getServiceId().equals(config.getServiceId())) {
            if (mcpServiceConfigService.getServiceByServiceId(config.getServiceId()) != null) {
                return AjaxResult.error("服务ID已存在");
            }
        }
        boolean success = mcpServiceConfigService.updateService(config);
        if (success) {
            return AjaxResult.success();
        } else {
            return AjaxResult.error("更新失败");
        }
    }

    /**
     * 上架/下架MCP服务
     */
    @PutMapping("/{id}/status")
    public AjaxResult updateServiceStatus(@PathVariable Long id, @RequestParam Integer status) {
        if (status != 0 && status != 1) {
            return AjaxResult.error("状态值无效");
        }

        boolean success = mcpServiceConfigService.updateServiceStatus(id, status);
        if (success) {
            return AjaxResult.success();
        } else {
            return AjaxResult.error("更新状态失败");
        }
    }

    /**
     * 删除MCP服务配置
     */
    @DeleteMapping("/{id}")
    public AjaxResult deleteService(@PathVariable Long id) {
        boolean success = mcpServiceConfigService.deleteService(id);
        if (success) {
            return AjaxResult.success();
        } else {
            return AjaxResult.error("删除失败");
        }
    }

    /**
     * 刷新MCP配置(管理员修改配置后调用)
     */
    @PostMapping("/refresh")
    public AjaxResult refreshMcpConfigs() {
        try {
            // 方式1: 通过ApplicationEventPublisher发布事件
            // applicationEventPublisher.publishEvent(new McpConfigRefreshEvent());

            // 方式2: 直接调用刷新方法(需要暴露接口)
            mcpGatewayService.refreshGlobalConfigs();

            return AjaxResult.success();
        } catch (Exception e) {
            return AjaxResult.error("刷新配置失败: " + e.getMessage());
        }
    }
}
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐