[图片] 写在前面的话 本次源码阅读的主要类 AbstractExecutorService ThreadPoolExecutor 需要提前了解的相关知识 线程池参数,参考我之前的文章关于线程池参数的理解 位运算 ThreadPoolExecutor使用位运算来做状态标志 Unsafe类的 API BlockQueue ..

线程池源码阅读

写在前面的话

以上的类都在java.util.concurrent包中

AbstractExecutorService

方法名称 功能
invokeAny 执行队列中的所有任务,
当某一个方法完成时则会立即返回,同时取消其余未完成的任务
invokeAll 执行所有任务,知道全部任务都完成时才返回

invokeAny -> doInvokeAny

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
		        // #1
                if (f == null) {
		            // #1.1	
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
		            // #1.2	
                    else if (active == 0)
                        break;
  		            // #1.3	
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
	                // #1.4
                    else
                        f = ecs.take();
                }
		        // #2
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
	        // #3
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

invokeAll

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // #1
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            // #2
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // #3
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

invokeAll 方法就简单些了,就是单纯的阻塞式的去获取每一个任务的结果然后返回。


ThreadPoolExecutor

主要方法

名称 作用
execute 提交任务
submit 提交任务,同时能够获取执行结果。
该方法在父类AbstractExecutorService
addWorker 启动一个新线程,并且执行任务。
当线程数量大于线程参数的coreSize时就不会再执行了

主要属性

变量名 作用 二进制码
ctl 该变量是一个原子类的 Integer,二进制码一共 32 位,
高 3 位用来标志线程池状态,剩下的 29 为用来记录线程数量
RUNNING 运行中的二进制标志 高 3 位为111
SHUTDOWN 优雅关闭的二进制标志 高 3 位为000
STOP 暴力关闭的二进制标志 高 3 位为001
TIDYING 即将完全关闭的二进制标志 高 3 位为010
TERMINATED 已完全关闭的二进制标志 高 3 位为011
上方表格中,后面的 5 个变量都是用来标志线程池的状态。他们是有顺序的,越往后说明线程池的活跃程度越低。
并且只有当值为负数时线程池才是运行中的状态

复习一下各个状态的含义:

辅助方法

名称 作用
runStateOf 取得 ctl 的高 3 位,即线程池的运行状态
workerCountOf 取得 ctl 的低 29 位,即运行的线程数量
ctlOf 将运行状态和运行线程数量存放在一个变量中

execute 方法解析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 1. 如果线程池中的运行线程数小于corePoolSize那么就启动
         * 一个新的线程去执行任务,如果成功那么就退出该方法,不
	 * 功的原因是:处于并发的环境下,在判断的时候线程数小于
	 * corePoolSize,但此时其他地方启动了一个新线程刚好导致
	 * 运行线程数达到corePoolSize,调用addWorker就会失败
	 *
         * 2. 如果任务能存放进队列,那么仍然需要再次检查线程池
         * 状态,因为可能在判断的时候线程池还是运行状态,但是
	 * 进入方法体之后线程池就被关闭了,所以需要再检查一下,
	 * 并且在必要时移除任务或者启动新的线程
         *
         * 3. 如果我们不能将任务放进队列中,说明队列已经满了,
	 * 我们就会尝试新启动一个线程(此时启动的线程就是由
	 * corePoolSize增加到maxPoolSize的过程)。如果启动失败
	 * 则拒绝该任务(启动失败的原因可能是线程数已经大于
	 * maxPoolSize)
         */
        int c = ctl.get();
	// #1
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
	// #2
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
	// #3
        else if (!addWorker(command, false))
            reject(command);
    }

简单来说execute方法主要判断是启动线程执行任务还是将任务放进待执行队列
任务执行的调用主要还是靠addWorker方法来完成

addWorker 方法解析

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // #1
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
	    // #2
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
	// #3
        try {
	    // #3.1
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // #3.2
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
		// #3.3
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

该方法是用来创建,运行,清理线程的。其两个参数的作用:


执行流程

  1. 首先执行#1
    • 检查一下线程池的状态,如果是 STOP,TIDYING,TERMINATED 状态的话,则直接返回 false 表明启动线程失败。
    • 如果现在状态是 SHUTDOWN,但是 firstTask 不为空或者 workQueue 为空的话,那么也直接返回 false。因为SHUTDOWN状态允许还在运行中的任务继续执行,但是若还想启动线程并且携带一个任务那就不允许了
  2. 接着执行#2
    • 检查线程数量是否太多了,如果过多则直接返回 false
    • 如果线程数量还允许继续增加,那么使用 CAS 添加线程数,添加成功则跳出大循环去执行#3
    • 添加失败了,那就在判断一下线程池的状态和之前是否相同,不同的话说明出现了一点点小问题,那么就从头再来,继续执行#1
  3. 兜兜转转一圈终于申请到了可以添加线程的权限,接下来执行#3,进行真正的创建线程
    • #3.1 创建一个 workerworker内部创建了线程
    • #3.2 检查线程池状态,只有当以下两种情况时才可能说明创建线程真正的成功
      • 情况 1:线程池是 RUNNING
      • 情况 2:线程池是 SHUTDOWN 但是没有携带任务
    • #3.3 上面的 2 个步骤都通过后接着启动线程!终于启动了

主流程大体就是这样,还需要去看一看 Worker 的实现才行

打赏 1 积分后可见
1 积分
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2468 引用 • 7877 回帖 • 864 关注
  • 代码
    236 引用 • 462 回帖 • 3 关注
1 操作
614756773 在 2019-12-04 15:07:15 更新了该帖
回帖
请输入回帖内容...