Executors - JiyangM/spring GitHub Wiki

Executors 是一个工厂,里面定义了很多的静态方法。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

Executors 创建具体线程的实例:

  • 1.newFixedThreadPool 保持固定的活跃线程的数量。
public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        //线程池的饱和策略
        System.out.println(service.getRejectedExecutionHandler());

        for (int i = 0; i < 10; i++) {
            service.execute(new Task(i));
            System.out.println("ActiveCount: " + service.getActiveCount());
            System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
            System.out.println("QueueSize: " + service.getQueue().size());
            System.out.println("*****************************************");
        }
    }

    static class Task implements Runnable {

        int id;

        public Task(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(50L);
                System.out.println("完成:" + id);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

结果:

java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 2
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 3
*****************************************
完成:1
完成:0
完成:3
完成:2
完成:4

结论:

  • 它是一个固定大小的线程池,线程数量始终维持在nThread。
  • 阻塞队列采用了LinkedBlockingQueue,它是一个无界队列。
  • 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务。
  • 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
  • 活跃线程的数量不超过初始值。
  • 当活跃线程达到初始值时,新的任务会放到LinkedBlockingDeque中,LinkedBlockingDeque并未初始化大小,所以是无界的队列。
  • 核心线程数等于最大线程数。
  • newFixedThreadPool 适用于线程数量基本稳定的情况,当线程数量不稳定时,比如大量的线程会不断地进入队列中,导致内存空间被占用。(此处引用一次血案

设想当我们固定LinkedBlockingDeque的长度时,会发生什么变化

LinkedBlockingQueue

LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,意味着他们之间不存在互斥关系,在多CPU情况下,他们能正在在同一时刻既消费,又生产,真正做到并行。

现在回过头来思考FixedThreadPool,前面说过他的特点在于这种线程池的线程个数是固定的,而且阻塞队列是可以存储任务的,因此这种线程池不会拒绝任务,而且不会开辟新的线程,也不会因为线程的长时间不使用而销毁线程。这是典型的生产者----消费者问题,这种线程池适合用在稳定且固定的并发场景,

   //创建一个固定线程的线程池
        ThreadPoolExecutor service = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(1));
        //饱和策略
        System.out.println(service.getRejectedExecutionHandler());

        for (int i = 0; i < 5; i++) {
            service.execute(new Task(i));
            System.out.println("ActiveCount: " + service.getActiveCount());
            System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
            System.out.println("QueueSize: " + service.getQueue().size());
            System.out.println("*****************************************");
        }
    }

结果:

java.util.concurrent.ThreadPoolExecutor$AbortPolicy@568db2f2
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 1
*****************************************
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.boot.example.guava.OptionT$Task@2d98a335 rejected from java.util.concurrent.ThreadPoolExecutor@16b98e56[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.boot.example.guava.OptionT.main(OptionT.java:31)
完成:0
完成:1
完成:2

通过上面的输出结果,我们发现当队列已经满时采用AbortPolicy这种饱和策略会抛出异常信息!

  • 2.CachedThreadPool
public static void main(String[] args) {
        //创建一个固定线程的线程池
        ThreadPoolExecutor service =  new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30L, TimeUnit.SECONDS,
                new SynchronousQueue<>());

        //线程池的饱和策略
        System.out.println(service.getRejectedExecutionHandler());


        for (int i = 0; i < 10; i++) {
            service.execute(new Task(i));
            System.out.println("ActiveCount: " + service.getActiveCount());
            System.out.println("CompletedTaskCount: " + service.getCompletedTaskCount());
            System.out.println("QueueSize: " + service.getQueue().size());
            System.out.println("*****************************************");
        }
    }
java.util.concurrent.ThreadPoolExecutor$AbortPolicy@378bf509
ActiveCount: 1
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 2
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 3
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 4
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 5
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 6
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 7
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 8
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 9
CompletedTaskCount: 0
QueueSize: 0
*****************************************
ActiveCount: 10
CompletedTaskCount: 0
QueueSize: 0
*****************************************
完成:2
完成:1
完成:0
完成:9
完成:8
完成:7
完成:6
完成:5
完成:4
完成:3

结论:

  • 它是一个可以无限扩大的线程池,不断的创建的新的线程;
  • 使用SynchronousQueue,它是一个没有容量的队列,入队操作必须要等到另一个线程移出。所以,如果主线程提交任务的速度高于线程池中处理任务的速度时,会创建大量的新的线程。

  • 适合耗时较小的任务。

  • corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;

对比CachedThreadPool、FixedThreadPool

  • FixedThreadPool设置了核心线程数和最大线程数并且使用LinkedBlockingDeque(无界的队列),它的特点是:活跃的线程数量不超过设置值,新的任务不断的加入队列,可能会导致LinkedBlockingDeque的容量不断增大。corePoolSize和maximumPoolSize的大小是一样的。

  • CachedThreadPool 可以无限的创建新的线程,使用SynchronousQueue没有容量的阻塞队列,新的任务提交时,需要等到SynchronousQueue中的线程移出,当没有可用线程时,则会创建新的线程。当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big


为什么要用线程池:

1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。

线程池主要参数:

  • corePoolSize - 池中所保存的线程数,包括空闲线程。

  • maximumPoolSize-池中允许的最大线程数。

  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

  • unit - keepAliveTime 参数的时间单位。

  • workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。

如果运行的线程少于 corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)

如果运行的线程等于或多于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。

如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。


线程池参数的问题

https://blog.csdn.net/daiqinge/article/details/51179445

任务拒绝策略

1、直接丢弃(DiscardPolicy)

2、丢弃队列中最老的任务(DiscardOldestPolicy)。

3、抛异常(AbortPolicy)

4、将任务分给调用线程来执行(CallerRunsPolicy)。

⚠️ **GitHub.com Fallback** ⚠️