java 多线程之threadpoolexecutor
in java with 0 comment

java 多线程之threadpoolexecutor

in java with 0 comment

threadpoolexecutor 的基础构造方法如下:

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数作用注释:
corePoolSize核心线程池大小
maximumPoolSize最大线程池大小
keepAliveTime线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnitkeepAliveTime时间单位
workQueue阻塞任务队列
threadFactory新建线程工厂
RejectedExecutionHandler当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
大致的流程如下:
92ad4409-2ab4-388b-9fb1-9fc4e0d832cd.jpg

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。

2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行

3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务

4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理

5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程

6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

jdk提供的RejectedExecutionHandler主要包含以下几个策略:

1 abort 抛出的RejectedExecutionException

2 discardOldest 不能执行的任务会删除任务dui'l队列中最先添加进来的任务

3 discard 直接丢弃

4 callrunsPolicy 使用者运行,既不会丢弃哪个任务,也不会抛出任何异常,把一些任务推回到调用者那里,以此减缓新任务流。它不会在池线程中执行最新提交的任务,但它会在一个调用了execute的线程中执行

接下来看threadpoolexecutor的具体实现:

execute

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
      
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //小于corepoolSize
            if (addWorker(command, true)) //新生成的线程添加到corePool
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //添加成功进入队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) //检查是否还存活
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false); //不再添加worker
        }
        else if (!addWorker(command, false)) //往maxpool中添加线程
            reject(command); //添加失败执行拒绝策略
    }


public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess(); //检查线程权限
        advanceRunState(SHUTDOWN);//自旋锁设置线程的状态
        interruptIdleWorkers();//中断所有正在的执行的线程
        onShutdown(); // 监听的shutdown的线程
    } finally {
        mainLock.unlock();
    }
    tryTerminate();//尝试停止executor
}

shutdown 不再接受新的任务

shutdownnow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); //设置为stop状态
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

主要所做的事情就是切换ThreadPoolExecutor到STOP状态,中断所有worker,并将任务队列中的任务取出来,不再执行。最后尝试修改状态到TERMINATED。

shutdown()和shutdownNow()的区别:

shutdown()新的任务不会再被提交到线程池,但之前的都会依旧执行,通过中断方式停止空闲的(根据没有获取锁来确定)线程。

shutdownNow()则向所有正在执行的线程发出中断信号以尝试终止线程,并将工作队列中的任务以列表方式的结果返回。

一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态

并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程

还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行

相同的是都在最后尝试将线程池推到TERMINATED状态。

创建线程池的方法 与对比

 BlockingQueue<Runnable> queue=new ArrayBlockingQueue<>(10);
        ThreadPoolExecutor executor=new ThreadPoolExecutor(5,10,10, TimeUnit.SECONDS,queue);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

cachePool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

1 这种线程池内部没有核心线程,线程的数量是有没限制的。

2 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。

3 没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

SingleThreadPool

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个工作线程执行任务

所有任务按照指定顺序执行,即遵循队列的入队出队规则

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

根据源码可以看出:

DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是吧CachedThreadPool和FixedThreadPool 结合了一下。

不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE

这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。

ThreadPool 关闭的标准代码:

public static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted 关闭的task的提交
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { //等待的60秒运行结束
                pool.shutdownNow(); // Cancel currently executing tasks //关闭任务
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) //是否terminal
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }

Responses