Spring AI:自定义线程池 - 通过 CompletableFuture 并发获取搜索结果页面内容
Spring AI:自定义线程池 - 通过 CompletableFuture 并发获取搜索结果页面内容
历史文章
Spring AI:对接DeepSeek实战
Spring AI:对接官方 DeepSeek-R1 模型 —— 实现推理效果
Spring AI:ChatClient实现对话效果
Spring AI:使用 Advisor 组件 - 打印请求大模型出入参日志
Spring AI:ChatMemory 实现聊天记忆功能
Spring AI:本地安装 Ollama 并运行 Qwen3 模型
Spring AI:提示词工程
Spring AI:提示词工程 - Prompt 角色分类(系统角色与用户角色)
Spring AI:基于 “助手角色” 消息实现聊天记忆功能
Spring AI:结构化输出 - 大模型响应内容
Spring AI:Docker 安装 Cassandra 5.x(限制内存占用)&& CQL
Spring AI:整合 Cassandra - 实现聊天消息持久化
Spring AI:多模态 AI 大模型
Spring AI:文生图:调用通义万相 AI 大模型
Spring AI:文生音频 - cosyvoice-V2
Spring AI:文生视频 - wanx2.1-i2v-plus
Spring AI:上手体验工具调用(Tool Calling)
Spring AI:整合 MCP Client - 调用高德地图 MCP 服务
Spring AI:搭建自定义 MCP Server:获取 QQ 信息
Spring AI:对接自定义 MCP Server
Spring AI:RAG 增强检索介绍
Spring AI:Docker 安装向量数据库 - Redis Stack
Spring AI:文档向量化存储与检索
Spring AI:提取 txt、Json、Markdown、Html、Pdf 文件数据,转换为 Document 文档
Spring AI:Apache Tika 读取 Word、PPT 文档
Spring AI:Docker 安装 SearXNG 搜索引擎
Spring AI:整合 OKHttp3:获取 SearXNG 搜索结果
本文中,我们尝试通过并发的方式,请求搜索页面 URL , 并获取到所有页面的内容。
自定义线程池
在 /config 包下,新建一个 ThreadPoolConfig 线程池配置类,分别定义两个线程池,代码如下:
@Configuration
public class ThreadPoolConfig {
/**
* HTTP 请求线程池(IO 密集型任务)
* @return
*/
@Bean("httpRequestExecutor")
public ThreadPoolTaskExecutor httpRequestExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50); // 核心线程数(保持常驻)
executor.setMaxPoolSize(200); // 最大线程数(突发流量时扩容)
executor.setQueueCapacity(1000); // 任务队列容量(缓冲突发请求)
executor.setKeepAliveSeconds(120); // 空闲线程存活时间(秒)
executor.setThreadNamePrefix("http-fetcher-"); // 线程名前缀(便于监控)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(由调用线程执行)
executor.initialize(); // 初始化线程池
return executor;
}
/**
* 结果处理线程池(CPU 密集型任务)
* @return
*/
@Bean("resultProcessingExecutor")
public ThreadPoolTaskExecutor resultProcessingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); // 核心线程数(等于CPU核心数)
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2); // 最大线程数(不超过CPU核心数2倍)
executor.setQueueCapacity(200); // 较小队列(避免任务堆积)
executor.setThreadNamePrefix("result-processor-"); // 线程名前缀(便于监控)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略(直接抛出异常)
executor.initialize(); // 初始化线程池
return executor;
}
}
解释一下这两个线程池各自的作用:
- httpRequestExecutor : 此线程池用于发送 Http 请求,任务场景属于 IO 密集型,因为 Http 请求涉及网络等待(延迟较高),线程大部分时间处于等待状态,所以设置了较多的线程数,能够提升并发处理性能;
- resultProcessingExecutor : 此线程池用于提取每个请求的结果,任务场景属于是 CPU 密集型,所以,线程数定义的较少,防止 CPU 调度时,上下文切换带来的性能损耗;
定义 service
前置工作完成后,在 /service 包下,新建 SearchResultContentFetcherService 接口,声明一个 batchFetch() 方法,用于批量获取搜索结果页面的内容:
public interface SearchResultContentFetcherService {
/**
* 并发批量获取搜索结果页面的内容
*
* @param searchResults
* @param timeout
* @param unit
* @return
*/
CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults,
long timeout,
TimeUnit unit);
}
接着,在 /impl 包下,创建上述接口的具体实现类 SearchResultContentFetcherServiceImpl,里面的逻辑暂时写个 TODO 标识,等会再来补充完整,代码如下:
@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {
/**
* 并发批量获取搜索结果页面的内容
*
* @param searchResults
* @param timeout 超时时间
* @param unit 单位
* @return
*/
@Override
public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
// TODO 待完善
return null;
}
}
同步获取页面 html
然后,编辑 SearchResultContentFetcherServiceImpl, 定义一个 syncFetchHtmlContent() 私有方法,以同步阻塞的方式获取指定 url 页面的 html 内容, 代码如下:
@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {
@Resource
private OkHttpClient okHttpClient;
@Override
public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
// TODO
return null;
}
/**
* 同步获取指定 URL 的 HTML 内容
* @param url
* @return
*/
private String syncFetchHtmlContent(String url) {
// 构建 HTTP GET 请求
Request request = new Request.Builder()
.url(url) // 设置要访问的目标 URL
.header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") // 设置浏览器标识,模拟真实浏览器访问
.header("Accept", "text/html") // 指定接受 HTML 格式的响应
.build();
try (Response response = okHttpClient.newCall(request).execute()) { // 执行请求并自动关闭响应资源
// 检查响应状态和内容
if (!response.isSuccessful() || response.body() == null) { // 响应失败或响应体为空
return ""; // 返回空字符串
}
// 读取响应体内容并返回
return response.body().string();
} catch (IOException e) { // 捕获网络IO异常
return ""; // 异常时返回空字符串
}
}
}
Future 异步处理
编写好同步获取页面内容的方法后,再定义一个 asynFetchContentForResult() 方法,通过 CompletableFuture 来做异步处理,同时指定超时时间,以及异常处理,若执行超时,比如超过 7s 还没拿到页面内容,或者请求发生了异常,则直接返回一个 “空字符串内容” 的结果,代码如下:
@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {
// 省略...
@Resource(name = "httpRequestExecutor")
private ThreadPoolTaskExecutor httpExecutor;
// 省略...
/**
* 异步获取单个 SearchResult 对象对应的页面内容
* @param result
* @param timeout 超时时间
* @param unit 时间单位
* @return
*/
private CompletableFuture<SearchResult> asynFetchContentForResult(
SearchResult result,
long timeout,
TimeUnit unit) {
// 异步线程处理
return CompletableFuture.supplyAsync(() -> {
// 获取 HTML 内容
String html = syncFetchHtmlContent(result.getUrl());
return SearchResult.builder()
.url(result.getUrl())
.score(result.getScore())
.content(html)
.build();
}, httpExecutor) // 使用专用的 httpExecutor 线程池
// 超时处理
.completeOnTimeout(createFallbackResult(result), timeout, unit)
// 异常处理
.exceptionally(e -> {
// 记录错误日志
log.error("## 获取页面内容异常, URL: {}", result.getUrl(), e);
return createFallbackResult(result);
});
}
/**
* 创建回退结果(请求失败时使用)
*/
private SearchResult createFallbackResult(SearchResult searchResult) {
return SearchResult.builder()
.url(searchResult.getUrl())
.score(searchResult.getScore())
.content("") // 空字符串表示获取页面内容失败
.build();
}
// 省略...
}
批量获取
上面的代码,还只是针对单个页面 url ,为了能够提高接口的响应速度,我们需要批量的、并发的对多个页面 url 做处理。补充 batchFetch() 方法中的逻辑,添加代码如下:
@Service
@Slf4j
public class SearchResultContentFetcherServiceImpl implements SearchResultContentFetcherService {
// 省略...
@Resource(name = "resultProcessingExecutor")
private ThreadPoolTaskExecutor processingExecutor;
/**
* 并发批量获取搜索结果页面的内容
*
* @param searchResults
* @param timeout 超时时间
* @param unit 单位
* @return
*/
@Override
public CompletableFuture<List<SearchResult>> batchFetch(List<SearchResult> searchResults, long timeout, TimeUnit unit) {
// 步骤1:为每个搜索结果创建独立的异步获取任务
List<CompletableFuture<SearchResult>> futures = searchResults.stream()
.map(result -> asynFetchContentForResult(result, timeout, unit))
.toList();
// 步骤2:合并所有独立任务为一个聚合任务
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 步骤3:当所有任务完成后收集结果
return allFutures.thenApplyAsync(v -> // 所有任务完成后触发
futures.stream() // 遍历所有已完成的任务
.map(CompletableFuture::join) // 提取每个任务的结果
.collect(Collectors.toList()), // 合并所有结果为一个集合,并返回
processingExecutor // 使用专用的 processingExecutor 线程池
);
}
// 省略...
}
解释一下上述代码的流程:
- 针对入参 searchResults 集合, 为每个搜索结果创建独立的异步获取任务;
- 将这一批独立任务,合并为一个聚合任务;
- 使用专用的 processingExecutor 线程池,当所有任务完成后,收集它们的结果并返回;
添加测试接口
完成上述步骤后,在 /controller 包下,新建一个 NetworkSearchController 控制器,声明一个 /network/test 测试接口,代码如下:
@RestController
@RequestMapping("/network")
public class NetworkSearchController {
@Resource
private SearXNGService searXNGService;
@Resource
private SearchResultContentFetcherService searchResultContentFetcherService;
/**
* 测试
* @param message
* @return
*/
@GetMapping(value = "/test")
public List<SearchResult> generateStream(@RequestParam(value = "message") String message) {
// 调用 SearXNG 获取搜索结果
List<SearchResult> searchResults = searXNGService.search(message);
// 并发请求,获取搜索结果页面的内容
CompletableFuture<List<SearchResult>> resultsFuture = searchResultContentFetcherService.batchFetch(searchResults, 7, TimeUnit.SECONDS);
List<SearchResult> searchResultList = resultsFuture.join();
// TODO 后续处理
return searchResultList;
}
}
测试
最后,我们来测试一下上述代码,看看功能是否正常。先编辑 application.yml, 将 searxng.count 值修改为 3,减少从搜索结果中提取的最大数量,目的是为了测试时,更方便的观察返参数量是否对应的上:
重启后端项目,在浏览器中请求 http://localhost:8090/network/test?message=小马教学
更多推荐
所有评论(0)