2022-05-14 11:59

Java线程池详解

wanmatea

JavaEE

(1166)

(0)

收藏

1.线程池简介

在前面的教程中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:

在一个应用程序中,我们需要多次使用线程,也就意味着,我们需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,我们就提出了线程池的概念。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

在Java中可以通过线程池来达到这样的效果。

2.线程池好处

1、降低资源的消耗。线程本身是一种资源,创建和销毁线程会有CPU开销;创建的线程也会占用一定的内存。

2、提高任务执行的响应速度。任务执行时,可以不必等到线程创建完之后再执行。

3、提高线程的可管理性。线程不能无限制地创建,需要进行统一的分配、调优和监控。

3.线程池的工作原理

image.png

接收任务,放入线程池的任务仓库

工作线程从线程池的任务仓库取,执行

没有任务时,线程阻塞,有任务时唤醒线程

4.实现线程池的方法

4.1.ThreadPoolExecutor类创建线程池 

线程池实现类ThreadPoolExecutor 是 Executor 框架最核心的类。

ThreadPoolExecutor类有四个构造函数:

image.png

我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

public ThreadPoolExecutor(int corePoolSize, //核心线程数量
     int maximumPoolSize, //最大线程数
     long keepAliveTime, //当线程数大于核心线程数时,多余的空闲线程存活的最大时间
     TimeUnit unit, //keepAliveTime参数的时间单位 可选为:NANOSECONDS,MICROSECONDS,MILLISECONDS,SECONDS,MINUTES,HOURS,DAYS等
     BlockingQueue<Runnable> workQueue, //任务队列,用来储存等待执行任务的队列
     ThreadFactory threadFactory, //线程工厂,用来创建线程,一般默认就可以了
     RejectedExecutionHandler handler //拒绝策略 当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
)


4.2.通过Executors工具类创建线程池

Java通过Executors提供四种线程池,分别为: 

1、newSingleThreadExecutor 

ThreadPoolExecutor 参数只有一个核心线程数和一个最大线程数,这个很少用到。它保证了所有线程的执行顺序都是按照提交到线程池的顺序执行。

源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

示例:

public class Test{
public static void main(String[] args)
{
   ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  for(int i=0;i<10;i++)
  {
           MyRunnable runnable=new MyRunnable();
       singleThreadExecutor.execute(runnable);
  }
}
}
class MyRunnable implements Runnable {
   @Override
   public void run() {
           System.out.println("线程:"+Thread.currentThread().getName()+"执行");
   }
}

运行结果:

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行


2、newFixedThreadPool 

这个线程池的特点就是线程的数量是固定的,超过这个数量的任务就得在 LinkedBlockingQueue 中排队等候。

源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

示例:

public class Test{
public static void main(String[] args)
{
ExecutorService fixedThreadPool= Executors.newFixedThreadPool(3);
            for(int i=0;i<5;i++)
   {
MyRunnable runnable=new MyRunnable();
fixedThreadPool.execute(runnable);
}
}
}
class MyRunnable implements Runnable {
          @Override
          public void run() {
System.out.println("线程:"+Thread.currentThread().getName()+执行");
          }
}


运行结果:

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行

线程:pool-1-thread-1执行

线程:pool-1-thread-3执行

线程:pool-1-thread-2执行


3、newScheduledThreadPool 

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。

ScheduledThreadPool可以创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。 

源码:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

示例:

public class Test{
public static void main(String[] args)
{
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
Date date=new Date();
            SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("时间:"+sf.format(date)+"开始执行:");
    for(int i=0;i<5;i++)
{
MyRunnable runnable=new MyRunnable();
//延时3秒执行
scheduledThreadPool.schedule(runnable,3, TimeUnit.SECONDS);
}
      }
}
class MyRunnable implements Runnable {
         @Override
         public void run() {
            Date date=new Date();
            SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
System.out.println("时间:"+sf.format(date)+" 线程"+Thread.currentThread().getName()+"执行");
         }
}

运行结果:

时间:2020-06-02 11:07:52开始执行:

时间:2020-06-02 11:07:55 线程:pool-1-thread-1执行

时间:2020-06-02 11:07:55 线程:pool-1-thread-1执行

时间:2020-06-02 11:07:55 线程:pool-1-thread-1执行

时间:2020-06-02 11:07:55 线程:pool-1-thread-3执行

时间:2020-06-02 11:07:55 线程:pool-1-thread-2执行


定期执行示例代码如下:

//延时3秒后每3秒执行一次

scheduledThreadPool.scheduleAtFixedRate(runnable,3, 3,TimeUnit.SECONDS);

时间:2020-06-02 11:11:32开始执行:

时间:2020-06-02 11:11:35 线程:pool-1-thread-1执行

时间:2020-06-02 11:11:35 线程:pool-1-thread-3执行

时间:2020-06-02 11:11:35 线程:pool-1-thread-2执行

时间:2020-06-02 11:11:35 线程:pool-1-thread-1执行

时间:2020-06-02 11:11:35 线程:pool-1-thread-3执行

时间:2020-06-02 11:11:38 线程:pool-1-thread-2执行

时间:2020-06-02 11:11:38 线程:pool-1-thread-3执行

时间:2020-06-02 11:11:38 线程:pool-1-thread-1执行

时间:2020-06-02 11:11:38 线程:pool-1-thread-3执行

时间:2020-06-02 11:11:38 线程:pool-1-thread-2执行


4、newCachedThreadPool 

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

可以看到核心线程数量为 0, 表示不会永久保留任何的线程,最大线程的数量是 Integer.MAX_VALUE,可以无限制的创建线程,但是当有大量线程处于空闲状态的时候,超过 60s 就会被销毁。

示例:

public class Test{
public static void main(String[] args)
{
ExecutorService singleThreadExecutor = Executors.newCachedThreadPool();
   for(int i=0;i<5;i++)
{
MyRunnable runnable=new MyRunnable();
singleThreadExecutor.execute(runnable);
}
     }
}
class MyRunnable implements Runnable {
         @Override
         public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"执行");
         }
}

运行结果:

线程:pool-1-thread-2执行

线程:pool-1-thread-3执行

线程:pool-1-thread-4执行

线程:pool-1-thread-1执行

线程:pool-1-thread-5执行


Executors返回线程池对象的弊端如下:

newSingleThreadExecutor方法和newFixedThreadPool方法:

允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,导致OOM(Out Of Memory)。

newScheduledThreadPool方法和newCachedThreadPool方法:

允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,导致OOM。

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

线程池有两种执行任务的方式,一种是execute方法,一种是submit方法。区别是什么呢?

1、可以接受的任务类型不同

(1)submit

image.png

(2)execute

image.png

可以看出:

execute只能接受Runnable类型的任务

submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null

2、返回值

  由Callable和Runnable的区别可知:

execute没有返回值

submit有返回值,所以需要返回值的时候必须使用submit

3、异常

(1)execute中抛出异常

execute中的是Runnable接口的实现,所以只能使用try、catch来捕获CheckedException,通过实现UncaughtExceptionHande接口处理UncheckedException

即和普通线程的处理方式完全一致

(2)submit中抛出异常

不管提交的是Runnable还是Callable类型的任务,如果不对返回值Future调用get()方法,都会吃掉异常

4.3.线程池的基本结构

下面这张图可以加深对对线程池中各个参数的相互关系的理解:

image.png

从图中,我们可以看出,线程池中的corePoolSize就是线程池中的核心线程数量,这几个核心线程,只是在没有用的时候,也不会被回收,maximumPoolSize就是线程池中可以容纳的最大线程的数量,而keepAliveTime,就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,而unit,就是计算这个时间的一个单位,workQueue,就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFO原则(先进先出)。threadFactory,就是创建线程的线程工厂,最后一个handler,是一种拒绝策略,我们可以在任务满了以后,拒绝执行某些任务。

有四种拒绝策略,分别是: 

1、AbortPolicy (默认):当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 [əˈbɔːt]  [ˈpɒləsi]  [rɪˈdʒektɪd]

2、CallerRunsPolicy:当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。 [ˈkɔːlə(r)]

DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中

DiscardPolicy:当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

4.4.线程池的处理流程

1、先判断线程池中的核心线程们是否空闲,如果空闲,就把这个新的任务指派给某一个空闲线程去执行。如果没有空闲,并且当前线程池中的核心线程数还小于 corePoolSize,那就再创建一个核心线程。

2、如果线程池的线程数已经达到核心线程数,并且这些线程都繁忙,就把这个新来的任务放到等待队列中去。如果等待队列又满了,那么

3、查看一下当前线程数是否到达maximumPoolSize,如果还未到达,就继续创建线程。如果已经到达了,就交给RejectedExecutionHandler来决定怎么处理这个任务。

我们通过代码来验证一下上面讲的内容:

public void execute(Runnable command) {
    //如果任务为null,抛出异常
    if (command == null)
        throw new NullPointerException();
    //ctl中保存线程池当前的一些状态信息
    int c = ctl.get();
    //下面会涉及到3步操作
    //首先判断当前线程池中的任务数量是否小于核心线程数
    //如果小于的话,通过addWorker新建一个核心(true)线程,并将任务(command)添加到该线程中
    //然后,启动该线程,从而执行任务
    if (workerCountOf(c) < corePoolSize) {
        //addWorker这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回的就是false。
if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //如果当前线程池中的任务数量是否大于等于核心线程数
    //通过isRunning方法判断当前线程池状态,线程池处于运行状态,并且队列可以加入任务,就把任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //再次获取线程池状态,如果线程池不是运行状态,就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕,
        //同时执行相应的拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果当前线程池为空,就新创建一个线程并运行
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //如果队列中无法添加任务(等待队列已满),通过addWorker新建一个非核心(false)线程,并将任务(command)添加到该线程中
    //然后,启动该线程,从而执行任务
    //如果无法创建非核心线程(已经达到最大线程),就执行相应的拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}


综上所述:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler 处理被拒绝的任务。

示例代码:

public class Test {
    public static void main(String[] args) {
        //核心线程数
        int corePoolSize = 5;
        //最大线程数
        int maximumPoolSize = 10;
        ////当线程数大于核心线程数时,多余的空闲线程存活的最大时间
        long keepAliveTime = 10;
        //keepAliveTime的时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //任务队列,用来储存等待执行任务的队列
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(5);
        ThreadPoolExecutor executor=new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue
        );
        for(int i=0;i<10;i++){
            //创建一个线程对象
            MyRunnable worker=new MyRunnable();
            //执行这个线程
            executor.execute(worker);
//            System.out.println("核心线程数:"+executor.getCorePoolSize());
//            System.out.println("最大线程数:"+executor.getMaximumPoolSize());
//            System.out.println("线程池数:"+executor.getPoolSize());
//            System.out.println("队列任务数:"+executor.getQueue().size());
//            System.out.println("活跃线程数:"+executor.getActiveCount());
//            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
        }
        //终止线程池
        executor.shutdown();
    }
}

通过下图可以更好地对上面这 3 步做一个展示:

image.png

4.5.BlockingQueue(阻塞队列)简介

BlockingQueue是一个阻塞队列,提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

image.png

1. ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

2.LinkedBlockingQueue

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成)。需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

3. DelayQueue

DelDelayQueue是一个支持延时获取元素的无界阻塞队列。用于放置实现了Delayed接口的对象,只有在延迟期满时才能从中提取元素。该队列时有序的,即队列的头部是延迟期满后保存时间最长的Delayed 元素。注意:不能将null元素放置到这种队列中。

4. PriorityBlockingQueue

priorityBlockingQueu是一个优先级的阻塞队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。

priorityBlockingQueu是一个无界队列,它没有限制,在内存允许的情况下可以无限添加元素;

5. SynchronousQueue

SynchronousQueue是无界的,是一种无缓冲的等待队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

0条评论

点击登录参与评论