当前位置 博文首页 > 文章内容

    阳阳的博客:还好我接住了面试官对线程池的夺命连环问

    作者:shunshunshun18 栏目:未分类 时间:2021-11-26 13:21:57

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    1、为什么要使用线程池

    频繁地创建与销毁线程,会给系统带来额外的开销。倘若可以集中化管理与复用线程,将大大地提升系统的吞吐量。

    线程池基于一种“池化”思想,不仅可以提供复用线程的能力,也能提供约束线程并行执行的数量、定时或延时执行等高级功能。


    2、说说线程池的类图结构

    线程池相关的类图结构如下:

    •  Executor,顶层接口,内部就一个execute抽象方法,定义了线程池最根本的动作,即执行任务
    • ExecutorService,提供终止任务以及获取返回结果的submit抽象方法
    • AbstractExecutorService,提供执行任务的一个框架,具体的执行方法需要由子类实现
    • ScheduledExecutorService,提供定时或延时执行任务的抽象方法
    • ThreadPoolExecutor,最复杂的部分,同时维护任务与线程的关系
    • ScheduledThreadPoolExecutor,提供定时或延时执行任务的功能
    • ForkJoinPool,jdk7中出现的一种新的线程池,基于先拆再合的思想。java8中的parallelStream内部使用的线程池就是它。对ForkJoinPool不熟悉的同学,可以移步到我的另外一篇文章中谈谈并行流parallelStream
    • Executors,可以理解为线程池工具类或工厂类,用于生产不同类型的线程池

     3、说说线程池的核心参数

    这些核心参数位于ThreadPoolExecutor的构造方法中:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    • corePoolSize               核心线程数,或者说常驻线程数,线程池中最少线程数
    • maximumPoolSize      最大线程数
    • keepAliveTime             空闲线程的存活时间,线程池中当前线程数大于corePoolSize时,那些空闲时间达到keepAliveTime的空闲线程,它们将会被销毁掉
    • TimeUnit                       keepAliveTime的时间单位
    • workQueue                   任务队列,存放未被执行的任务
    • threadFactory               创建线程的工厂
    • handler                          拒绝策略,当前线程数≥最大线程数且任务队列满的时候,对后续任务的拒绝方式

    4、线程池的种类有哪些

    不同的线程池有不同的适用场景,本质上都是在Executors类中实例化一个ThreadPoolExecutor对象,只是传入的参数不一样罢了。

    线程池的种类有以下几种:

    newFixedThreadPool

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    创建一个固定大小的线程池,即核心线程数等于最大线程数,每个线程的存活时间和线程池的寿命一致,线程池满负荷运作时,多余的任务会加入到无界的阻塞队列中,newFixedThreadPool可以很好的控制线程的并发量

    newCachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    创建一个可以无限扩大的线程池,当任务来临时,有空闲线程就去执行,否则立即创建一个线程。当线程的空闲时间超过1分钟时,销毁该线程。适用于执行任务较少且需要快速执行的场景,即短期异步任务。

    newSingleThreadExecutor

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    创建一个大小为1的线程池,用于顺序执行任务

    newScheduledThreadPool

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }

    创建一个初始大小为corePoolSize的线程池,线程池的存活时间没有限制,newScheduledThreadPool中的schedule方法用于延时执行任务,scheduleAtFixedRate用于周期性地执行任务


    5、线程池执行任务的流程是怎么样的

    • 当线程池中线程数小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。

    • 当线程池中线程数达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。

    • 当workQueue已满,且maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务。

    • 当workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理。

    • 当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收这些线程。

    • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收

    使用更加直观的流程图来描述:

    注:此章节参考通俗易懂,各常用线程池执行的-流程图


    6、工作队列有哪些

    工作队列用来存储提交的任务,工作队列一般使用的都是阻塞队列。阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。

    阻塞队列一般由以下几种:

    LinkedBlockingQueue  

    由单链表实现的无界阻塞队列,遵循FIFO。注意这里的无界是因为其记录队列大小的数据类型是int,那么队列长度的最大值就是恐怖的Integer.MAX_VALUE,这个值已经很大了,因此可以将之称为无界队列。不过该队列也提供了有参构造函数,可以手动指定其队列大小,否则使用默认的int最大值。

    LinkedBlockingQueue只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说它是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

    当线程数目达到corePoolSize时,后续的任务会直接加入到LinkedBlockingQueue中,在不指定其队列大小的情况下,该队列永远也不会满,可能内存满了,队列都不会满,此时maximumPoolSize和拒绝策略将不会有任何意义

    ArrayBlockingQueue

    由数组实现的有界阻塞队列,同样遵循FIFO,必须制定队列大小。使用全局独占锁的方式,使得在同一时间只有一个线程能执行入队或出队操作,相比于LinkedBlockingQueue,ArrayBlockingQueue锁的力度很大。

    SynchronousQueue

    是一个没有容量的队列,当然也可以称为单元素队列。会将任务直接传递给消费者,添加任务时,必须等待前一个被添加的任务被消费掉,即take动作等待put动作,put动作等待take动作,put与take是循环往复的

    如果线程拒绝执行该队列中的任务,或者说没有线程来执行。那么旧任务无法被执行,新任务也无法被添加,线程池将陷入一种尴尬的境地。因此,该队列一般需要maximumPoolSize为Integer.MAX_VALUE,有一个任务到来,就立马新起一个线程执行,newCachedThreadPool就是使用的这种组合。

    关于这些阻塞队列的源码解析,可能需要另开篇幅。


    7、为什么阿里巴巴开发手册不建议使用Executors去创建线程池?

    手册中这样写道:

    【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

    回答这个问题,需要清楚不同类型线程池所用的工作队列以及最大线程数。

    newFixedThreadPool与newSingleThreadExecutor直接使用的LinkedBlockingQueue ,并且没有声明大小,因此是一种无界阻塞队列。当不停地往线程池中提交任务时,会在队列中堆积无数的任务,可能会造成OOM。

    newCachedThreadPool的最大线程数为Integer.MAX_VALUE,如果突然涌入大量的任务,将会瞬间创建大量的线程,也可能会造成OOM。


    8、说说线程工厂

    先看一下,ThreadPoolExecutor构造方法中默认使用的线程工厂

        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }

    defaultThreadFactory对于线程的命名方式为“pool-”+pool的自增序号+"-thread-"+线程的自增序号。

    默认线程工厂给线程的取名没有太多的意义,在实际开发中,我们一般会给线程取个比较有识别度的名称,方便出现问题时的排查。


    9、拒绝策略有哪些

    如果当工作队列已满,且线程数目达到maximumPoolSize后,依然有任务到来,那么此时线程池就会采取拒绝策略。

    ThreadPoolExecutor中提供了4种拒绝策略。

    AbortPolicy

         private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();   
    
         public static class AbortPolicy implements RejectedExecutionHandler {
     
                public AbortPolicy() { }
    
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
                }
        }

    这是线程池的默认拒绝策略,直接会丢弃任务并抛出RejectedExecutionException异常

    DiscardPolicy

        public static class DiscardPolicy implements RejectedExecutionHandler {
    
            public DiscardPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

    丢弃后续提交的任务,但不抛出异常。建议在一些无关紧要的场景中使用此拒绝策略,否则无法及时发现系统的异常状态。

    DiscardOldestPolicy

        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
            public DiscardOldestPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }

    从源码中可以看到,此拒绝策略会丢弃队列头部的任务,然后将后续提交的任务加入队列中

    CallerRunsPolicy

        public static class CallerRunsPolicy implements RejectedExecutionHandler {
    
            public CallerRunsPolicy() { }
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }

    由调用线程执行该任务,即提交任务的线程,一般是主线程。


    10、如何配置核心线程数

    CPU密集型任务

    CPU密集指的是需要进行大量的运算,例如排序,一般没有什么阻塞。

    尽量使用较小的线程池,大小一般为CPU核心数+1。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。

    IO密集型任务

    IO密集指的是需要进行大量的IO,例如文件上传与下载、网络请求等。阻塞十分严重,可以挂起被阻塞的线程,开启新的线程干别的事情。

    可以使用稍大的线程池,大小一般为CPU核心数*2。IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。

    当然,依据IO密集的程度,可以在两倍的基础上进行相应的扩大与缩小。

    以上只是一个初步的策略,或者说先定一个初始数值,接着需要进行压测,来调整最大线程数。

    压测的同时,可以监控线程池状态,并且动态改变线程池的参数。


    11、submit与execute有什么区别

    先说结论:

    (1)execute没有返回值,而submit可以返回Future,因此可以通过get得到异步执行的结果

    (2)execute方法会打印出异常,但无法捕获该异常;submit通过get方法可以捕获到异常,如果没有调用get方法,则获取不到异常,也不会打印异常

    关于第一点,是大家都知道的。

    下面,我们来实际测试一下第二点:

    线程1直接抛出空指针异常,使用execute方式执行;线程2直接抛出数组越界异常,使用submit方式执行,但没有使用get方法去获取执行结果

        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newFixedThreadPool(2);
    
            Thread t1 = new Thread(() -> {
                throw new NullPointerException();
            });
            pool.execute(t1);
    
            Thread t2 = new Thread(() -> {
                throw new ArrayIndexOutOfBoundsException();
            });
            pool.submit(t2);
    
            pool.shutdown();
        }

    程序运行完可以得到:

    可以发现,只打印出了空指针异常。

    改造一下代码,尝试捕获execute与future.get的异常

        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newFixedThreadPool(2);
    
            Thread t1 = new Thread(() -> {
                throw new NullPointerException();
            });
            try {
                pool.execute(t1);
            } catch (Exception e) {
                System.out.println("捕获到execute异常了");
            }
    
            Thread t2 = new Thread(() -> {
                throw new ArrayIndexOutOfBoundsException();
            });
            Future<?> result = pool.submit(t2);
            try {
                result.get();
            } catch (ExecutionException e) {
                System.out.println("捕获到submit异常了");
            }
    
            pool.shutdown();
        }

    运行结果:

     可以看到,submit提交时,可以捕获到future.get的异常,但还是捕获不到execute中的异常。

    原因在于,execute方式直接向上抛出,并在ThreadGroup.uncaughtException打印出来,之后停止向上抛出,因此不能被外界捕获

    而submit方法,一开始会将异常保存在outcome中,当调用future.get方法时,会将outcome中的异常再抛出来,从而被外界捕获

    另外,线程池中某个线程执行任务出现异常后,线程池会将此线程移除,并重新创建一个新的线程。


    12、怎么去回收核心线程

    线程池构造方法中的keepAliveTime参数,代表非核心线程的存活时间,线程池中当前线程数大于corePoolSize时,那些空闲时间达到keepAliveTime的空闲线程,它们将会被销毁掉,直到线程数等于corePoolSize。

    如果某些时候也想去销毁长时间空闲的核心线程,怎么去做呢?

    ThreadPoolExecutor中提供了allowCoreThreadTimeOut方法,将应用于非核心线程的保活策略也用于核心线程。

        public void allowCoreThreadTimeOut(boolean value) {
            if (value && keepAliveTime <= 0)
                throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
            if (value != allowCoreThreadTimeOut) {
                allowCoreThreadTimeOut = value;
                if (value)
                    interruptIdleWorkers();
            }
        }

    13、怎么进行线程池的预热

    首先需要知道的是,创建一个线程池后,如果没有任务进来的话,线程池是不会去创建线程的。

    如果一开始就有大量的任务涌进来,那么线程池将一直忙于创建核心线程,降低了任务执行的效率。

    那么,线程池存在一种预热机制吗?

    线程池提供了prestartCoreThread方法(仅事先启动一个核心线程)prestartAllCoreThreads(启动所有的核心线程)

        public boolean prestartCoreThread() {
            return workerCountOf(ctl.get()) < corePoolSize &&
                addWorker(null, true);
        }
    
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }

    这里插一句,prestartAllCoreThreads和allowCoreThreadTimeOut连用,不知道会起到什么意想不到的效果...


    14、怎么监控线程池与动态化线程池参数

    我们要创建什么线程池,其中用到的参数在一创建的时候就定死了。

    有时候,当队列积压较多的任务而这些任务又比较重要的时候,我们希望收到告警,并且自动增大核心线程数以增加处理速度。

    那么首先就需要监控线程池,需要获取到当前队列大小,活跃线程等信息。

    当然,线程池提供了一些方法,例如getQueue().size()可以获取积压在队列中的任务数,getActiveCount()获取活跃线程数等。

    我们可以写一个定时任务,去检查这些参数。如果队列一直积压过度的话,可以暂时增大核心线程数。

    怎么去增大线程数,难不成我先把之前的服务给停了,然后再重新启动,那队列中未执行与正在执行的任务怎么办呢?

    当然,线程池提供了动态修改参数的方法,例如使用setCorePoolSize来修改核心线程数,会覆盖掉之前的核心线程数。


    最后想说的

    以上的知识点,应付面试已经差不多了,但我更希望大家不要浮于表面,最好能深入到线程池源码中来。

    那么对线程池的源码分析,也被列入到今年的博客计划中了!

    cs