【无标题】
handleFailure(hasFailed, failureException, new UdpNotifyAppException("任务被中断", e));log.warn("线程池未能在5秒内关闭,将强制中断正在执行的任务...");// compareAndSet:保证只有第一个失败的任务会设置异常(避免后续异常覆盖)* 处理任务失败:标记信号量+存储异常(确保只记录第一个失败的异常)
·
https://blog.csdn.net/qq_52592968/article/details/155103993?spm=1001.2014.3001.5501
基于我的上一篇文章 进行了部分改进
/**
* 首先 并行处理的数据必须保存在一个事务之内,会基于Spring进行管理
*/
@Transactional
public Long save(Request request, Long masterId) {
// 1. 保存/更新主表
Long id = tbDomainService.saveOrUpdateMain(mainEntity);
// 2. 批量保存所有明细表
batchSaveAllDetails(request,id);
}
public void batchSaveAllDetails(Request request,Long id) {
// 失败信号量:原子类保证线程安全,标记是否有任务失败
AtomicBoolean hasFailed = new AtomicBoolean(false);
// 存储异常信息:捕获第一个失败的异常(避免后续异常覆盖)
AtomicReference<Throwable> failureException = new AtomicReference<>();
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); // 捕获上下文
List<CompletableFuture<Void>> futures = new ArrayList<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 并行处理各个明细表
try {
List<Runnable> businessTasks = Arrays.asList(
() -> batchPlanDetails(request, id),
() -> batchDetails(request, id)
);
businessTasks.forEach(task ->
futures.add(wrapTask(executor, hasFailed, failureException, requestAttributes, task))
);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 检查是否有任务失败,有则抛出异常触发事务回滚
if (hasFailed.get()) {
Throwable cause = failureException.get();
if (cause instanceof UdpNotifyAppException) {
throw (UdpNotifyAppException) cause;
} else {
throw new UdpNotifyAppException("明细表处理失败", cause);
}
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("线程池未能在5秒内关闭,将强制中断正在执行的任务...");
executor.shutdownNow();
}
} catch (InterruptedException e) {
log.error("线程池关闭过程中被中断", e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* @param hasFailed 失败标记
* @param failureException 异常存储
* @param requestAttributes 主线程请求上下文
* @param businessTask 实际要执行的业务逻辑(Runnable 承载)
* @return 包装后的 CompletableFuture 任务
*/
private CompletableFuture<Void> wrapTask(ExecutorService executor,
AtomicBoolean hasFailed,
AtomicReference<Throwable> failureException,
RequestAttributes requestAttributes,
Runnable businessTask) {
return CompletableFuture.runAsync(() -> {
if (hasFailed.get()) {
log.debug("已有任务失败,当前任务跳过执行");
return;
}
RequestContextHolder.setRequestAttributes(requestAttributes);
try {
checkInterrupted();
businessTask.run();
checkInterrupted();
} catch (InterruptedException e) {
handleFailure(hasFailed, failureException, new UdpNotifyAppException("任务被中断", e));
Thread.currentThread().interrupt();
} catch (Throwable t) {
handleFailure(hasFailed, failureException, t);
} finally {
RequestContextHolder.resetRequestAttributes();
}
}, executor); // 使用传入的线程池
}
/** 检查线程是否被中断 */
private void checkInterrupted() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("任务被中断,停止执行");
}
}
/**
* 处理任务失败:标记信号量+存储异常(确保只记录第一个失败的异常)
*/
private void handleFailure(AtomicBoolean hasFailed, AtomicReference<Throwable> failureException, Throwable t) {
// compareAndSet:保证只有第一个失败的任务会设置异常(避免后续异常覆盖)
if (hasFailed.compareAndSet(false, true)) {
failureException.set(t);
}
// 抛出异常,让 CompletableFuture 标记为失败状态(不影响信号量逻辑)
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}
基于上一篇文字,添加了方法内线程池,这是要应用到生产环境的代码,所以没有创建公共线程。
以及优化了并行处理方案,我只写了两个,其实我实际应用到了10个明细并行处理
如果缺点,请指正
更多推荐

所有评论(0)