多线程并发 (七) 线程池

多线程并发 (一) 了解 Java 虚拟机 - JVM 
多线程并发 (二) 了解 Thread
多线程并发 (三) 锁 synchronized、volatile 
多线程并发 (四) 了解原子类 AtomicXX 属性地址偏移量
多线程并发 (五) ReentrantLock 使用和源码 
多线程并发 (六) 了解死锁
多线程并发 (七) 线程池

很高兴坚持学到了多线程并发计划中的最后一个知识点线程池的使用和原理。其实对线程池不陌生,只是简单的会使用,对于具体的功能实现,一直还是没有去看,最近乘着多线程并发的学习,来把线程池给记录下来。

1.线程池引入、优点

如果在一个任务量非常多,但是任务又非常短小,如果我们在处理这种任务时,为每个任务都创建一个线程,这样就会创建并且销毁线程消耗资源、浪费时间。为了充分发挥线程的利用价值,所以在这种情况下线程池是最好的选择。

线程池的优点:

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2.线程池的使用

  ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(3, 4, 10, TimeUnit.SECONDS,
                        new LinkedTransferQueue<Runnable>());

        for (int i =0;i<20;i++){
            final int finalI = i;
            try {
                threadPoolExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(100* finalI);
                            Log.e("wxy","-----" + Thread.currentThread());
                        } catch (InterruptedException e) {

                        }
                    }
                });

            }catch (Exception e){
                Log.e("wxy","--e---");
            }

        }

通过使用的例子,可以看出要了解线程池主要有两个入口、1.构造方法、2.execute() 方法。下面会一一介绍。

3.线程池构造方法参数

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

各参数含义:

  1. corePoolSize:核心线程数量
  2. maximumPoolSize:最大线程数量
  3. keepAliveTime:当线程池中的线程数量超过corePoolSize的时,如果这时没有新的任务持行,等待keepAliveTime时间之后超过的线程就会销毁

  4. unit:keepAliveTime 时间的单位。有 秒 second 等

  5. workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列

  6. threadFactory:用来创建新线程,默认使用Executors.defaultThreadFactory() 来创建线程,线程具有相同的NORM_PRIORITY优先级并且是非守护线程

  7. handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    1.AbortPolicy:直接抛出异常,这是默认策略;
    2.CallerRunsPolicy:用调用者所在的线程来执行任务;
    3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4.DiscardPolicy:直接丢弃任务;

这些参数就代表线程池的持行策略、线程池大致的持行策略是:

  1. 有任务提交时先判断当前线程数量 tCountcorePoolSize  直接创建线程提交任务
  2. 如果tCount > corePoolSize 再判断workQueue 是否已经存满,没有存满把当前任务加入等待队列,注意队列有有界队列和无界队列,是情况而定,减少cup消耗。
  3. 如果workQueue 已满再如果 tCount <  maximumPoolSize 创建线程持行任务
  4. 如果 tCount > maximumPoolSize 根据 handler 拒绝策略决定

     过程: corePoolSize ---> workQueue ------>  maximumPoolSize -----> handler 

线程池的持行策略在execute()方法中完全体现, execute代码就是按照这个思路实现的。看execute方法。

4.execute()方法

public void execute(Runnable command) {
        // c 32位 记录线程池状态和线程数量
        int c = ctl.get();
        // 当前线程数量 和 核心数量大小比较
        if (workerCountOf(c) < corePoolSize) {
            // 小于,持行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 否则,提交到 workQueue 队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 队列满了,判断最大线程数量,超过参数拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

这个流程和三中说的一致,里面需要特殊关注的是  int c = ctl.get(); 这个值是 32位 记录线程池状态和线程数量

5. 线程池状态和线程数量

    // 原子类型   
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer.SIZE = 32 整型的位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    // 线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
   
    // 得到线程数,也就是后29位的数字。 直接跟CAPACITY做一个与操作即可,
      CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 
      与操作的话前面3位肯定为0,相当于直接取后29位的值
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 得到状态,CAPACITY的非操作得到的二进制位11100000000000000000000000000000,
       然后做在一个与操作,相当于直接取前3位的的值
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 或操作。相当于更新数量和状态两个操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }

整个线程池几乎全是被这个数控制的,所以要想完全了解线程池,首先要了解一上代码的含义。

  1. 首先,我们知道java中1个整型占4个字节,也就是32位,所以1个整型有32位。
    整型1用二进制表示就是:00000000000000000000000000000001
  2. 整型-1用二进制表示就是:11111111111111111111111111111111(这个是补码,不懂的同学可以看下原码,反码,补码的知识)
  3. 在ThreadPoolExecutor,整型中32位的前3位用来表示线程池状态,后3位表示线程池中有效的线程数。
     
      // 原子类型   
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // Integer.SIZE = 32 整型的位数
        private static final int COUNT_BITS = Integer.SIZE - 3;

    CONT_BITS 就是29

  4. private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    线程池容量大小为 1 << 29 - 1 = 00011111111111111111111111111111(二进制)

  5. private static final int RUNNING    = -1 << COUNT_BITS;

    RUNNING状态 -1 << 29 = 11111111111111111111111111111111 << 29 = 11100000000000000000000000000000(前3位为111)。表示-1的二进制 左移 29位,前3位为111表示线程池状态

  6. private static final int SHUTDOWN   =  0 << COUNT_BITS;

    SHUTDOWN状态 0 << 29 = 00000000000000000000000000000000 << 29 = 00000000000000000000000000000000(前3位为000)

  7. private static final int TERMINATED =  3 << COUNT_BITS;  

    TERMINATED状态 3 << 29 = 00000000000000000000000000000011 << 29 = 01100000000000000000000000000000(前3位为011)

       摘学于:https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/

了解了线程池的状态和数量之后,看下addWorker方法

6.addWorker方法

addWorker方法有两个参数,1.firstTask参数 用于指定新增的线程执行的第一个任务,2.core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取运行状态
        int rs = runStateOf(c);
        
        /*
         * 这个if判断
         * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
         * 接着判断以下3个条件,只要有1个不满足,则返回false:
         * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
         * 2. firsTask为空
         * 3. 阻塞队列不为空
         * 
         * 首先考虑rs == SHUTDOWN的情况
         * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
         * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
         * 因为队列中已经没有任务了,不需要再添加线程了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
            // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
            // 如果为false则根据maximumPoolSize来比较。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根据firstTask来创建Worker对象
        w = new Worker(firstTask);
        // 每一个Worker对象都会创建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING状态;
                // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

在这个方法中会启动线程,调用start方法,启动之后就调用Worker的run方法,所以先看下Worker类

7.Worker类

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread; 
        Runnable firstTask;      
        volatile long completedTasks;
        Worker(Runnable firstTask) {
            setState(-1); // 修改状态
            this.firstTask = firstTask;
            //传入自己所以线程start后会调用下面的 run 方法
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    ..............................................
    }

Worker类是一个继承AQS,实现Runable的类,里面封装了thread 和 runable。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。

8.runWorker方法

在addWorker中调用start之后就会调用Worker对象的 run方法,然后就调用了 runWorker 方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许中断
    w.unlock(); // allow interrupts
    // 是否因为异常退出循环
    boolean completedAbruptly = true;
    try {
        // 如果task为空,则通过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

9.getTask方法

private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        /*
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

10.tryTerminate方法

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        /*
         * 当前线程池的状态为以下几种情况时,直接返回:
         * 1. RUNNING,因为还在运行中,不能停止;
         * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了;
         * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task;
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果线程数量不为0,则中断一个空闲的工作线程,并返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // terminated方法默认什么都不做,留给子类实现
                    terminated();
                } finally {
                    // 设置状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

尝试终止线程池。

整个多线程并发涵盖的知识点有了一个总结,从系列一开始到七,大致讲述了整个并发的知识网。对并发到这里就先告一段落。

总的来说也是站在前人文章/书籍的基础上进行学习的,在知识点整理的过程中如有侵犯,请告诉我。因为总的来说是对其他开发者文章/书籍的整理学习。看了大量的文章/书籍,因为有的文章会不够完整,就把看到的学到的有关连的,一点一点的总结。

 

 摘学于:
https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/
https://www.jianshu.com/p/d2729853c4da

 

  • 2
    点赞
  • 4
    收藏
    觉得还不错? 一键收藏
  • 打赏
    打赏
  • 0
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

WangRain1

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值