java线程池的四种拒绝策略
本文主要介绍了java juc包中提供的四种拒绝策略。先是接受了部分java线程池ThreadPoolExecutor的构造方法,然后以源码和程序示例的方式介绍了四种拒绝策AbortPolicy,CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy的不同。
基础知识部分
ThreadPoolExecutor类的构造方法
为了让读者更好的理解文中的示例,笔者在讲解拒绝策略之前,列出了Java线程池的基础知识,本部分可跳过。
代码一:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
代码一是ThreadPoolExecutor
类的中参数最多的构造方法的方法签名。该方法有7个参数,它们分别是:
int corePoolSize
核心线程数int maximumPoolSize
最大线程数long keepAliveTime
空闲线程等待工作的超时时间的数值TimeUnit unit
空闲线程等待工作的超时时间的单位BlockingQueue<Runnable> workQueue
阻塞队列ThreadFactory threadFactory
线程工厂RejectedExecutionHandler handler
拒绝策略
线程池处理任务的过程
代码二是ThreadPoolExecutor类中execute方法的源码。该方法是我们向线程池中添加任务时调用的方法。
当我们调用execute(Runnable command)
方法向线程池中添加任务时,线程池将依次按照下面几种情况处理:
- 检查线程池中正在运行的线程数是否小于
corePoolSize
,如果小于,则来新建一个线程执行该任务。 - 若前一步中的条件不满足,即正在运行的线程数不小于
corePoolSize
,则尝试将该任务添加到workQueue
中,当线程池中出现空闲的线程时,workQueue
中的任务将会被取出并执行。但是,如果workQueue
已经满了,将会出现添加不成功的情况,这时会执行下一步。 - 若在之前两步中,任务没有执行也没有被成功的放入
workQueue
中,线程池将会尝试创建一个新的线程来执行该任务。若创建失败(可能的原因有线程池中的线程数已经达到了指定的maximumPoolSize
),则会使用指定的拒绝策略来处理。则会调用RejectedExecutionHandler
中的rejectedExecution
方法。
代码二:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
reject方法源码:
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
为了更直观的感受线程池处理任务的过程,我写了下面这个例子(代码三)。
在这个例子中,我创建了一个核心线程数为3,最大线程数为6,阻塞队列容量为3的线程池。然后向线程池中依次添加9个任务(任务1,任务2……任务9),执行每个任务都需要花三秒的时间,这样保证最后一个任务提交时,第一个任务仍没有执行完。
代码三:
public class ThreadPoolDemo4 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
try {
// 先放入三个任务 这三个任务将直接在常驻线程中运行
for (int i = 1; i <= 3; i++) {
int id = i;
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 这三个任务将放入阻塞队列
for (int i = 4; i <= 6; i++) {
int id = i;
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 这三个线程将新创建线程来运行
for (int i = 7; i <= 9; i++) {
int id = i;
threadPool.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " Task " + id + " 开始运行");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " Task " + id + " 运行完毕*");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
threadPool.shutdown();
}
}
}
程序输出如下:
pool-1-thread-1 Task 1 开始运行
pool-1-thread-2 Task 2 开始运行
pool-1-thread-3 Task 3 开始运行
pool-1-thread-4 Task 7 开始运行
pool-1-thread-5 Task 8 开始运行
pool-1-thread-6 Task 9 开始运行
pool-1-thread-1 Task 1 运行完毕*
pool-1-thread-1 Task 4 开始运行
pool-1-thread-2 Task 2 运行完毕*
pool-1-thread-2 Task 5 开始运行
pool-1-thread-3 Task 3 运行完毕*
pool-1-thread-3 Task 6 开始运行
pool-1-thread-4 Task 7 运行完毕*
pool-1-thread-5 Task 8 运行完毕*
pool-1-thread-6 Task 9 运行完毕*
pool-1-thread-1 Task 4 运行完毕*
pool-1-thread-2 Task 5 运行完毕*
pool-1-thread-3 Task 6 运行完毕*
通过程序输出可以看出:
-
当线程池中已有的线程数小于核心线程数时,线程池将新建线程来执行收到的任务(线程池新建了线程1、线程2、线程3来执行任务1、任务2、任务3)。
-
对于随后收到的任务4、任务5和任务6,由于线程池中正在运行的线程已经达到了核心线程数,所以这三个线程被放进了线程池的阻塞队列(容量为3)中。此时,阻塞队列也已经满了。
-
对于最后收到的任务7、任务8和任务9,线程池创建了新的线程4、线程5和线程6来运行。
-
随后,最先开始运行的任务1、任务2和任务3陆续运行完毕,阻塞队列中的任务就被执行了。
四种拒绝策略
当线程池中正在运行的线程已经达到了指定的最大线程数量maximumPoolSize
且线程池的阻塞队列也已经满了时,向线程池提交任务将触发拒绝处理逻辑。而juc中提供了四种拒绝策略,它们分别是AbortPolicy
,CallerRunsPolicy
,DiscardOldestPolicy
和DiscardPolicy
.
对于这四种拒绝策略,我将使用代码四中的代码模板来演示它们的运行效果。第四行代码用来指定所使用的拒绝策略。在这段程序中,我们先创建了一个核心线程数,最大线程数和阻塞队列大小均为3的线程池。然后依次向线程池中提交7个任务(任务1,任务2……任务7)。由于单个任务执行时间需要3秒,所以当任务7被提交时,之前的任务都没有执行完,线程池中运行的线程数已经达到了指定的最大线程数,且阻塞队列也已经满了,所以将执行拒绝策略中的拒绝处理方法。
代码四:
public class RejectedPolicyDemo {
public static void main(String[] args) {
// 指定所使用的拒绝策略
RejectedExecutionHandler handler = new ThreadPoolExecutor.XXXPolicy();
// 新建线程池 核心线程数3 最大线程数3 阻塞队列容量3
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 3, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), handler);
// 依次向线程池中提交7个任务,触发拒绝处理逻辑
try {
for (int i = 1; i <= 7; i++) {
int id = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 开始执行: 任务" + id);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 完成: 任务" + id);
});
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
threadPool.shutdown();
}
}
}
AbortPolicy
终止策略,这是ThreadPoolExecutor
线程池默认的拒绝策略,程序将会抛出RejectedExecutionException
异常。
AbortPolicy
源代码:
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
代码四中的程序输出:
pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.pool.ThreadPoolDemo5$$Lambda$1/1587487668@7c3df479 rejected from java.util.concurrent.ThreadPoolExecutor@7106e68e[Running, pool size = 3, active threads = 3, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.example.pool.ThreadPoolDemo5.main(ThreadPoolDemo5.java:19)
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
pool-1-thread-1 完成: 任务4
pool-1-thread-2 完成: 任务5
pool-1-thread-3 完成: 任务6
CallerRunsPolicy
调用者运行策略,线程池中没办法运行,那么就由提交任务的这个线程运行(哪儿来的回哪儿儿去~)。
CallerRunsPolicy
源码:
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
代码四中的程序输出:
pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
main 开始执行: 任务7
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
main 完成: 任务7
DiscardOldestPolicy
丢弃最早未处理请求策略,丢弃最先进入阻塞队列的任务以腾出空间让新的任务入队列。
DiscardOldestPolicy
源码:
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
代码四中程序输出:
pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务5
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务6
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务7
pool-1-thread-1 完成: 任务5
pool-1-thread-2 完成: 任务6
pool-1-thread-3 完成: 任务7
DiscardPolicy
丢弃策略,什么都不做,即丢弃新提交的任务。
DiscardPolicy
源码:
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
代码四中程序输出:
pool-1-thread-1 开始执行: 任务1
pool-1-thread-2 开始执行: 任务2
pool-1-thread-3 开始执行: 任务3
pool-1-thread-1 完成: 任务1
pool-1-thread-1 开始执行: 任务4
pool-1-thread-2 完成: 任务2
pool-1-thread-2 开始执行: 任务5
pool-1-thread-3 完成: 任务3
pool-1-thread-3 开始执行: 任务6
pool-1-thread-1 完成: 任务4
pool-1-thread-2 完成: 任务5
pool-1-thread-3 完成: 任务6
更多推荐
所有评论(0)