历史文章

Spring AI:对接DeepSeek实战
Spring AI:对接官方 DeepSeek-R1 模型 —— 实现推理效果
Spring AI:ChatClient实现对话效果
Spring AI:使用 Advisor 组件 - 打印请求大模型出入参日志
Spring AI:ChatMemory 实现聊天记忆功能
Spring AI:本地安装 Ollama 并运行 Qwen3 模型
Spring AI:提示词工程
Spring AI:提示词工程 - Prompt 角色分类(系统角色与用户角色)
Spring AI:基于 “助手角色” 消息实现聊天记忆功能
Spring AI:结构化输出 - 大模型响应内容
Spring AI:Docker 安装 Cassandra 5.x(限制内存占用)&& CQL
Spring AI:整合 Cassandra - 实现聊天消息持久化
Spring AI:多模态 AI 大模型
Spring AI:文生图:调用通义万相 AI 大模型
Spring AI:文生音频 - cosyvoice-V2
Spring AI:文生视频 - wanx2.1-i2v-plus
Spring AI:上手体验工具调用(Tool Calling)
Spring AI:整合 MCP Client - 调用高德地图 MCP 服务
Spring AI:搭建自定义 MCP Server:获取 QQ 信息
Spring AI:对接自定义 MCP Server
Spring AI:RAG 增强检索介绍
Spring AI:Docker 安装向量数据库 - Redis Stack
Spring AI:文档向量化存储与检索
Spring AI:提取 txt、Json、Markdown、Html、Pdf 文件数据,转换为 Document 文档
Spring AI:Apache Tika 读取 Word、PPT 文档
Spring AI:Docker 安装 SearXNG 搜索引擎
Spring AI:整合 OKHttp3:获取 SearXNG 搜索结果

本文中,我们尝试通过并发的方式,请求搜索页面 URL , 并获取到所有页面的内容。

自定义线程池

在 /config 包下,新建一个 ThreadPoolConfig 线程池配置类,分别定义两个线程池,代码如下:

@Configuration
public class ThreadPoolConfig {

    /**
     * HTTP 请求线程池(IO 密集型任务)
     * @return
     */
    @Bean("httpRequestExecutor")
    public ThreadPoolTaskExecutor httpRequestExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50); // 核心线程数(保持常驻)
        executor.setMaxPoolSize(200); // 最大线程数(突发流量时扩容)
        executor.setQueueCapacity(1000); // 任务队列容量(缓冲突发请求)
        executor.setKeepAliveSeconds(120); // 空闲线程存活时间(秒)
        executor.setThreadNamePrefix("http-fetcher-"); // 线程名前缀(便于监控)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(由调用线程执行)
        executor.initialize(); // 初始化线程池
        return executor;
    }

    /**
     * 结果处理线程池(CPU 密集型任务)
     * @return
     */
    @Bean("resultProcessingExecutor")
    public ThreadPoolTaskExecutor resultProcessingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); // 核心线程数(等于CPU核心数)
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); // 最大线程数(不超过CPU核心数2倍)
        executor.setQueueCapacity(200); // 较小队列(避免任务堆积)
        executor.setThreadNamePrefix("result-processor-"); // 线程名前缀(便于监控)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略(直接抛出异常)
        executor.initialize(); // 初始化线程池
        return executor;
    }
}

解释一下这两个线程池各自的作用:

  • httpRequestExecutor : 此线程池用于发送 Http 请求,任务场景属于 IO 密集型,因为 Http 请求涉及网络等待(延迟较高),线程大部分时间处于等待状态,所以设置了较多的线程数,能够提升并发处理性能;
  • resultProcessingExecutor : 此线程池用于提取每个请求的结果,任务场景属于是 CPU 密集型,所以,线程数定义的较少,防止 CPU 调度时,上下文切换带来的性能损耗;

定义 service

前置工作完成后,在 /service 包下,新建 SearchResultContentFetcherService 接口,声明一个 batchFetch() 方法,用于批量获取搜索结果页面的内容:

public interface SearchResultContentFetcherService {


    /**
     * 并发批量获取搜索结果页面的内容
     *
     * @param searchResults
     * @param timeout
     * @param unit
     * @return
     */
    CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults,
                                                long timeout,
                                                TimeUnit unit);
}

接着,在 /impl 包下,创建上述接口的具体实现类 SearchResultContentFetcherServiceImpl,里面的逻辑暂时写个 TODO 标识,等会再来补充完整,代码如下:

@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {


    /**
     * 并发批量获取搜索结果页面的内容
     *
     * @param searchResults
     * @param timeout 超时时间
     * @param unit 单位
     * @return
     */
    @Override
    public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
        // TODO 待完善
        return null;
    }
}

同步获取页面 html

然后,编辑 SearchResultContentFetcherServiceImpl, 定义一个 syncFetchHtmlContent() 私有方法,以同步阻塞的方式获取指定 url 页面的 html 内容, 代码如下:

@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {

    @Resource
    private OkHttpClient okHttpClient;

    @Override
    public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
        // TODO
        return null;
    }

    /**
     * 同步获取指定 URL 的 HTML 内容
     * @param url
     * @return
     */
    private String syncFetchHtmlContent(String url) {
        // 构建 HTTP GET 请求
        Request request = new Request.Builder()
                .url(url)  // 设置要访问的目标 URL
                .header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")  // 设置浏览器标识,模拟真实浏览器访问
                .header("Accept", "text/html")  // 指定接受 HTML 格式的响应
                .build();

        try (Response response = okHttpClient.newCall(request).execute()) {  // 执行请求并自动关闭响应资源
            // 检查响应状态和内容
            if (!response.isSuccessful() || response.body() == null) {  // 响应失败或响应体为空
                return "";  // 返回空字符串
            }

            // 读取响应体内容并返回
            return response.body().string();
        } catch (IOException e) {  // 捕获网络IO异常
            return "";  // 异常时返回空字符串
        }
    }
}

Future 异步处理

编写好同步获取页面内容的方法后,再定义一个 asynFetchContentForResult() 方法,通过 CompletableFuture 来做异步处理,同时指定超时时间,以及异常处理,若执行超时,比如超过 7s 还没拿到页面内容,或者请求发生了异常,则直接返回一个 “空字符串内容” 的结果,代码如下:

@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {

	// 省略...
	
    @Resource(name = "httpRequestExecutor")
    private ThreadPoolTaskExecutor httpExecutor;

	// 省略...

    /**
     * 异步获取单个 SearchResult 对象对应的页面内容
     * @param result
     * @param timeout 超时时间
     * @param unit 时间单位
     * @return
     */
    private CompletableFuture<SearchResult> asynFetchContentForResult(
            SearchResult result,
            long timeout,
            TimeUnit unit) {

        // 异步线程处理
        return CompletableFuture.supplyAsync(() -> {
                    // 获取 HTML 内容
                    String html = syncFetchHtmlContent(result.getUrl());

                    return SearchResult.builder()
                            .url(result.getUrl())
                            .score(result.getScore())
                            .content(html)
                            .build();

                }, httpExecutor) // 使用专用的 httpExecutor 线程池
                // 超时处理
                .completeOnTimeout(createFallbackResult(result), timeout, unit)
                // 异常处理
                .exceptionally(e -> {
                    // 记录错误日志
                    log.error("## 获取页面内容异常, URL: {}", result.getUrl(), e);
                    return createFallbackResult(result);
                });
    }

    /**
     * 创建回退结果(请求失败时使用)
     */
    private SearchResult createFallbackResult(SearchResult searchResult) {
        return SearchResult.builder()
                .url(searchResult.getUrl())
                .score(searchResult.getScore())
                .content("") // 空字符串表示获取页面内容失败
                .build();
    }

    // 省略...
}

批量获取

上面的代码,还只是针对单个页面 url ,为了能够提高接口的响应速度,我们需要批量的、并发的对多个页面 url 做处理。补充 batchFetch() 方法中的逻辑,添加代码如下:

@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {

    // 省略...
    
    @Resource(name = "resultProcessingExecutor")
    private ThreadPoolTaskExecutor processingExecutor;

    /**
     * 并发批量获取搜索结果页面的内容
     *
     * @param searchResults
     * @param timeout 超时时间
     * @param unit 单位
     * @return
     */
    @Override
    public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
        // 步骤1:为每个搜索结果创建独立的异步获取任务
        List<CompletableFuture<SearchResult>> futures = searchResults.stream()
                .map(result -> asynFetchContentForResult(result, timeout, unit))
                .toList();

        // 步骤2:合并所有独立任务为一个聚合任务
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
        );

        // 步骤3:当所有任务完成后收集结果
        return allFutures.thenApplyAsync(v -> // 所有任务完成后触发
                        futures.stream() // 遍历所有已完成的任务
                                .map(CompletableFuture::join) // 提取每个任务的结果
                                .collect(Collectors.toList()), // 合并所有结果为一个集合,并返回
                processingExecutor // 使用专用的 processingExecutor 线程池
        );
    }

    // 省略...
}

解释一下上述代码的流程:

  • 针对入参 searchResults 集合, 为每个搜索结果创建独立的异步获取任务;
  • 将这一批独立任务,合并为一个聚合任务;
  • 使用专用的 processingExecutor 线程池,当所有任务完成后,收集它们的结果并返回;

添加测试接口

完成上述步骤后,在 /controller 包下,新建一个 NetworkSearchController 控制器,声明一个 /network/test 测试接口,代码如下:

@RestController
@RequestMapping("/network")
public class NetworkSearchController {

    @Resource
    private SearXNGService searXNGService;
    @Resource
    private SearchResultContentFetcherService searchResultContentFetcherService;

    /**
     * 测试
     * @param message
     * @return
     */
    @GetMapping(value = "/test")
    public List<SearchResult> generateStream(@RequestParam(value = "message") String message) {
        // 调用 SearXNG 获取搜索结果
        List<SearchResult> searchResults = searXNGService.search(message);

        // 并发请求,获取搜索结果页面的内容
        CompletableFuture<List<SearchResult>> resultsFuture = searchResultContentFetcherService.batchFetch(searchResults, 7, TimeUnit.SECONDS);

        List<SearchResult> searchResultList = resultsFuture.join();

        // TODO 后续处理

        return searchResultList;
    }

}

测试

最后,我们来测试一下上述代码,看看功能是否正常。先编辑 application.yml, 将 searxng.count 值修改为 3,减少从搜索结果中提取的最大数量,目的是为了测试时,更方便的观察返参数量是否对应的上:

重启后端项目,在浏览器中请求 http://localhost:8090/network/test?message=小马教学

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐