基础知识部分

ThreadPoolExecutor类的构造方法

为了让读者更好的理解文中的示例,笔者在讲解拒绝策略之前,列出了Java线程池的基础知识,本部分可跳过。

代码一:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

代码一是ThreadPoolExecutor类的中参数最多的构造方法的方法签名。该方法有7个参数,它们分别是:

  1. int corePoolSize 核心线程数
  2. int maximumPoolSize 最大线程数
  3. long keepAliveTime 空闲线程等待工作的超时时间的数值
  4. TimeUnit unit 空闲线程等待工作的超时时间的单位
  5. BlockingQueue<Runnable> workQueue 阻塞队列
  6. ThreadFactory threadFactory 线程工厂
  7. RejectedExecutionHandler handler 拒绝策略
线程池处理任务的过程

代码二是ThreadPoolExecutor类中execute方法的源码。该方法是我们向线程池中添加任务时调用的方法。

当我们调用execute(Runnable command)方法向线程池中添加任务时,线程池将依次按照下面几种情况处理:

  1. 检查线程池中正在运行的线程数是否小于corePoolSize,如果小于,则来新建一个线程执行该任务。
  2. 若前一步中的条件不满足,即正在运行的线程数不小于corePoolSize,则尝试将该任务添加到workQueue中,当线程池中出现空闲的线程时,workQueue中的任务将会被取出并执行。但是,如果workQueue已经满了,将会出现添加不成功的情况,这时会执行下一步。
  3. 若在之前两步中,任务没有执行也没有被成功的放入workQueue中,线程池将会尝试创建一个新的线程来执行该任务。若创建失败(可能的原因有线程池中的线程数已经达到了指定的maximumPoolSize),则会使用指定的拒绝策略来处理。则会调用RejectedExecutionHandler中的rejectedExecution方法。

代码二:

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */ 
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

reject方法源码:

  /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

为了更直观的感受线程池处理任务的过程,我写了下面这个例子(代码三)。

在这个例子中,我创建了一个核心线程数为3,最大线程数为6,阻塞队列容量为3的线程池。然后向线程池中依次添加9个任务(任务1,任务2……任务9),执行每个任务都需要花三秒的时间,这样保证最后一个任务提交时,第一个任务仍没有执行完。

代码三:

public class ThreadPoolDemo4 {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
        try {
            // 先放入三个任务 这三个任务将直接在常驻线程中运行
            for (int i = 1; i <= 3; i++) {
                int id = i;
                threadPool.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 这三个任务将放入阻塞队列
            for (int i = 4; i <= 6; i++) {
                int id = i;
                threadPool.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            // 这三个线程将新创建线程来运行
            for (int i = 7; i <= 9; i++) {
                int id = i;
                threadPool.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            threadPool.shutdown();
        }
    }
}

程序输出如下:

pool-1-thread-1 Task 1 开始运行
pool-1-thread-2 Task 2 开始运行
pool-1-thread-3 Task 3 开始运行
pool-1-thread-4 Task 7 开始运行
pool-1-thread-5 Task 8 开始运行
pool-1-thread-6 Task 9 开始运行
pool-1-thread-1 Task 1 运行完毕*
pool-1-thread-1 Task 4 开始运行
pool-1-thread-2 Task 2 运行完毕*
pool-1-thread-2 Task 5 开始运行
pool-1-thread-3 Task 3 运行完毕*
pool-1-thread-3 Task 6 开始运行
pool-1-thread-4 Task 7 运行完毕*
pool-1-thread-5 Task 8 运行完毕*
pool-1-thread-6 Task 9 运行完毕*
pool-1-thread-1 Task 4 运行完毕*
pool-1-thread-2 Task 5 运行完毕*
pool-1-thread-3 Task 6 运行完毕*

通过程序输出可以看出:

  1. 当线程池中已有的线程数小于核心线程数时,线程池将新建线程来执行收到的任务(线程池新建了线程1、线程2、线程3来执行任务1、任务2、任务3)。

  2. 对于随后收到的任务4、任务5和任务6,由于线程池中正在运行的线程已经达到了核心线程数,所以这三个线程被放进了线程池的阻塞队列(容量为3)中。此时,阻塞队列也已经满了。

  3. 对于最后收到的任务7、任务8和任务9,线程池创建了新的线程4、线程5和线程6来运行。

  4. 随后,最先开始运行的任务1、任务2和任务3陆续运行完毕,阻塞队列中的任务就被执行了。

四种拒绝策略

当线程池中正在运行的线程已经达到了指定的最大线程数量maximumPoolSize且线程池的阻塞队列也已经满了时,向线程池提交任务将触发拒绝处理逻辑。而juc中提供了四种拒绝策略,它们分别是AbortPolicy,CallerRunsPolicy,DiscardOldestPolicyDiscardPolicy.

对于这四种拒绝策略,我将使用代码四中的代码模板来演示它们的运行效果。第四行代码用来指定所使用的拒绝策略。在这段程序中,我们先创建了一个核心线程数,最大线程数和阻塞队列大小均为3的线程池。然后依次向线程池中提交7个任务(任务1,任务2……任务7)。由于单个任务执行时间需要3秒,所以当任务7被提交时,之前的任务都没有执行完,线程池中运行的线程数已经达到了指定的最大线程数,且阻塞队列也已经满了,所以将执行拒绝策略中的拒绝处理方法。

代码四:

public class RejectedPolicyDemo {
    public static void main(String[] args) {
        // 指定所使用的拒绝策略
        RejectedExecutionHandler handler = new ThreadPoolExecutor.XXXPolicy();
        // 新建线程池 核心线程数3 最大线程数3 阻塞队列容量3
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 3, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), handler);
        // 依次向线程池中提交7个任务,触发拒绝处理逻辑
        try {
            for (int i = 1; i <= 7; i++) {
                int id = i;
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 开始执行: 任务" + id);
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " 完成: 任务" + id);
                });
                try {
                    TimeUnit.MILLISECONDS.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            threadPool.shutdown();
        }
    }
}
AbortPolicy

终止策略,这是ThreadPoolExecutor线程池默认的拒绝策略,程序将会抛出RejectedExecutionException异常。

AbortPolicy源代码:

  /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

代码四中的程序输出:

pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.pool.ThreadPoolDemo5$$Lambda$1/1587487668@7c3df479 rejected from java.util.concurrent.ThreadPoolExecutor@7106e68e[Running, pool size = 3, active threads = 3, queued tasks = 3, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.example.pool.ThreadPoolDemo5.main(ThreadPoolDemo5.java:19)
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
pool-1-thread-1 完成: 任务4
pool-1-thread-2 完成: 任务5
pool-1-thread-3 完成: 任务6
CallerRunsPolicy

调用者运行策略,线程池中没办法运行,那么就由提交任务的这个线程运行(哪儿来的回哪儿儿去~)。

CallerRunsPolicy源码:

  /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

代码四中的程序输出:

pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
main 开始执行: 任务7
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
main 完成: 任务7
DiscardOldestPolicy

丢弃最早未处理请求策略,丢弃最先进入阻塞队列的任务以腾出空间让新的任务入队列。

DiscardOldestPolicy源码:

 /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

代码四中程序输出:

pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务5
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务6
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务7
pool-1-thread-1 完成: 任务5
pool-1-thread-2 完成: 任务6
pool-1-thread-3 完成: 任务7
DiscardPolicy

丢弃策略,什么都不做,即丢弃新提交的任务。

DiscardPolicy源码:

 /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

代码四中程序输出:

pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
pool-1-thread-1 完成: 任务4
pool-1-thread-2 完成: 任务5
pool-1-thread-3 完成: 任务6
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐