threadpoolexecutor 的基础构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数作用注释:
corePoolSize核心线程池大小
maximumPoolSize最大线程池大小
keepAliveTime线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnitkeepAliveTime时间单位
workQueue阻塞任务队列
threadFactory新建线程工厂
RejectedExecutionHandler当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
大致的流程如下:
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
jdk提供的RejectedExecutionHandler主要包含以下几个策略:
1 abort 抛出的RejectedExecutionException
2 discardOldest 不能执行的任务会删除任务dui'l队列中最先添加进来的任务
3 discard 直接丢弃
4 callrunsPolicy 使用者运行,既不会丢弃哪个任务,也不会抛出任何异常,把一些任务推回到调用者那里,以此减缓新任务流。它不会在池线程中执行最新提交的任务,但它会在一个调用了execute的线程中执行
接下来看threadpoolexecutor的具体实现:
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小于corepoolSize
if (addWorker(command, true)) //新生成的线程添加到corePool
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //添加成功进入队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //检查是否还存活
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); //不再添加worker
}
else if (!addWorker(command, false)) //往maxpool中添加线程
reject(command); //添加失败执行拒绝策略
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); //检查线程权限
advanceRunState(SHUTDOWN);//自旋锁设置线程的状态
interruptIdleWorkers();//中断所有正在的执行的线程
onShutdown(); // 监听的shutdown的线程
} finally {
mainLock.unlock();
}
tryTerminate();//尝试停止executor
}
shutdown 不再接受新的任务
shutdownnow()
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); //设置为stop状态
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
主要所做的事情就是切换ThreadPoolExecutor到STOP状态,中断所有worker,并将任务队列中的任务取出来,不再执行。最后尝试修改状态到TERMINATED。
shutdown()和shutdownNow()的区别:
shutdown()新的任务不会再被提交到线程池,但之前的都会依旧执行,通过中断方式停止空闲的(根据没有获取锁来确定)线程。
shutdownNow()则向所有正在执行的线程发出中断信号以尝试终止线程,并将工作队列中的任务以列表方式的结果返回。
一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态
并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程
还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行
相同的是都在最后尝试将线程池推到TERMINATED状态。
创建线程池的方法 与对比
BlockingQueue<Runnable> queue=new ArrayBlockingQueue<>(10);
ThreadPoolExecutor executor=new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,queue);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
cachePool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
1 这种线程池内部没有核心线程,线程的数量是有没限制的。
2 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
3 没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。
如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。
SingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
有且仅有一个工作线程执行任务
所有任务按照指定顺序执行,即遵循队列的入队出队规则
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
根据源码可以看出:
DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。
不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。
这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。
ThreadPool 关闭的标准代码:
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted 关闭的task的提交
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { //等待的60秒运行结束
pool.shutdownNow(); // Cancel currently executing tasks //关闭任务
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) //是否terminal
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Feb 26, 2021 at 11:31 am