https://blog.csdn.net/qq_52592968/article/details/155103993?spm=1001.2014.3001.5501

基于我的上一篇文章 进行了部分改进

/**
 * 首先 并行处理的数据必须保存在一个事务之内,会基于Spring进行管理
 */
@Transactional
public Long save(Request request, Long masterId) { 
        // 1. 保存/更新主表
        Long id = tbDomainService.saveOrUpdateMain(mainEntity);
        // 2. 批量保存所有明细表
        batchSaveAllDetails(request,id);
}

public void batchSaveAllDetails(Request request,Long id) {
    // 失败信号量:原子类保证线程安全,标记是否有任务失败
    AtomicBoolean hasFailed = new AtomicBoolean(false);
    // 存储异常信息:捕获第一个失败的异常(避免后续异常覆盖)
    AtomicReference<Throwable> failureException = new AtomicReference<>();
    RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); // 捕获上下文
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10, 20, 10, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );
    // 并行处理各个明细表
    try {
        List<Runnable> businessTasks = Arrays.asList(
                () -> batchPlanDetails(request, id),
                () -> batchDetails(request, id)
        );
        businessTasks.forEach(task ->
                futures.add(wrapTask(executor, hasFailed, failureException, requestAttributes, task))
        );
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        //  检查是否有任务失败,有则抛出异常触发事务回滚
        if (hasFailed.get()) {
            Throwable cause = failureException.get();
            if (cause instanceof UdpNotifyAppException) {
                throw (UdpNotifyAppException) cause;
            } else {
                throw new UdpNotifyAppException("明细表处理失败", cause);
            }
        }
    } finally {
        executor.shutdown();
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                log.warn("线程池未能在5秒内关闭,将强制中断正在执行的任务...");
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            log.error("线程池关闭过程中被中断", e);
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

/**
 * @param hasFailed 失败标记
 * @param failureException 异常存储
 * @param requestAttributes 主线程请求上下文
 * @param businessTask 实际要执行的业务逻辑(Runnable 承载)
 * @return 包装后的 CompletableFuture 任务
 */
private CompletableFuture<Void> wrapTask(ExecutorService executor,
                                         AtomicBoolean hasFailed,
                                         AtomicReference<Throwable> failureException,
                                         RequestAttributes requestAttributes,
                                         Runnable businessTask) {
    return CompletableFuture.runAsync(() -> {
        if (hasFailed.get()) {
            log.debug("已有任务失败,当前任务跳过执行");
            return;
        }
        RequestContextHolder.setRequestAttributes(requestAttributes);
        try {
            checkInterrupted();
            businessTask.run();
            checkInterrupted();
        } catch (InterruptedException e) {
            handleFailure(hasFailed, failureException, new UdpNotifyAppException("任务被中断", e));
            Thread.currentThread().interrupt();
        } catch (Throwable t) {
            handleFailure(hasFailed, failureException, t);
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }, executor); // 使用传入的线程池
}

/** 检查线程是否被中断 */
private void checkInterrupted() throws InterruptedException {
    if (Thread.currentThread().isInterrupted()) {
        throw new InterruptedException("任务被中断,停止执行");
    }
}

/**
 * 处理任务失败:标记信号量+存储异常(确保只记录第一个失败的异常)
 */
private void handleFailure(AtomicBoolean hasFailed, AtomicReference<Throwable> failureException, Throwable t) {
    // compareAndSet:保证只有第一个失败的任务会设置异常(避免后续异常覆盖)
    if (hasFailed.compareAndSet(false, true)) {
        failureException.set(t);
    }
    // 抛出异常,让 CompletableFuture 标记为失败状态(不影响信号量逻辑)
    if (t instanceof RuntimeException) {
        throw (RuntimeException) t;
    } else {
        throw new RuntimeException(t);
    }
}

基于上一篇文字,添加了方法内线程池,这是要应用到生产环境的代码,所以没有创建公共线程。

以及优化了并行处理方案,我只写了两个,其实我实际应用到了10个明细并行处理

如果缺点,请指正

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐