【语言学习】java并发的一些实践 - hippowc/hippowc.github.io GitHub Wiki

Executor框架

Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,即一个实现了Runnable接口的类。

ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程

ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。常用的线程池:

  • newCachedThreadPool():缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中;缓存型池子通常用于执行一些生存期很短的异步型任务;因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。

  • newFixedThreadPool(int):newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程;任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子;FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器;

  • newScheduledThreadPool(int):这个池子里的线程可以按schedule依次delay执行,或周期执行

  • SingleThreadExecutor():单例线程,任意时间池中只能有一个线程

CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。

Callable与Runnable

Runnable是一个接口,在它里面只声明了一个run()方法,由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,也是一个接口,在它里面声明了一个方法叫做call(),call()函数返回的类型就是传递进来的V类型。Callable一般情况下配合ExecutorService来使用

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

FutureTask

FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

两个例子:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

框架使用的最佳实践

创建ThreadFactory

使用谷歌提供的工具类guava

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("batch-retrieve-data-%d").build();

创建ExecutorService

通过Executors的工厂方式创建:

Executors.newCachedThreadPool();

但是某大厂不建议这么使用,建议直接创建ExecutorsService的实现类:

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

主要是为了让使用线程池的同学知道自己使用什么样的线程池参数,防止oom内存溢出。

ThreadPoolExecutor的创建与参数设置

最全面的构造参数如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize :核心池的大小,如果调用了prestartAllCoreThreads()或者prestartCoreThread()方法,会直接预先创建corePoolSize的线程,否则当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;这样做的好处是,如果任务量很小,那么甚至就不需要缓存任务,corePoolSize的线程就可以应对;
  • maximumPoolSize:线程池最大线程数,表示在线程池中最多能创建多少个线程,如果运行中的线程超过了这个数字,那么相当于线程池已满,新来的任务会使用RejectedExecutionHandler 进行处理;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止,然后线程池的数目维持在corePoolSize 大小;
  • unit:参数keepAliveTime的时间单位;
  • workQueue:一个阻塞队列,用来存储等待执行的任务,如果当前对线程的需求超过了corePoolSize大小,才会放在这里;
  • threadFactory:线程工厂,主要用来创建线程,比如可以指定线程的名字;
  • handler:如果线程池已满,新的任务的处理方式

线程池的阻塞队列

如果线程数超过了corePoolSize,则开始把线程先放到阻塞队列里,相当于生产者消费者的一个数据通道,有以下一些阻塞队列可供选择:

  • ArrayBlockingQueue是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。
  • DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现 java.util.concurrent.Delayed接口,该接口只有一个方法就是long getDelay(TimeUnit unit),返回值就是队列元素被释放前的保持时间,如果返回0或者一个负值,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,DelayQueue可应用于定时关闭连接、缓存对象,超时处理等各种场景;
  • LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
  • PriorityBlockingQueue是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。
  • SynchronousQueue队列内部仅允许容纳一个元素。当一个线程插入一个元素后会被阻塞,除非这个元素被另一个线程消费。

使用的最多的应该是LinkedBlockingQueue,注意一般情况下要配置一下队列大小,设置成有界队列,否则JVM内存会被撑爆

线程池的饱和策略

线程池已满的定义,是指运行线程数==maximumPoolSize,并且workQueue是有界队列并且已满(如果是无界队列当然永远不会满);这时候再提交任务怎么办呢?线程池会将任务传递给最后一个参数RejectedExecutionHandler来处理,比如打印报错日志、抛出异常、存储到Mysql/redis用于后续处理等等

饱和策略指的就是线程池已满情况下任务的处理策略,默认有以下几种:

  • 默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时RejectedExecutionException
  • ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

ThreadPoolExecutor的参数设置

一般需要根据任务的类型来配置线程池大小:

  • 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
  • 如果是IO密集型任务,参考值可以设置为2*NCPU

其中NCPU的指的是CPU的核心数,可以使用Runtime.getRuntime().availableProcessors()来获取;

多线程异常处理

java线程池处理异常的方式

java中的线程池用的是ThreadPoolExecutor,真正执行代码的部分是runWorker方法:

try {
    beforeExecute(wt, task);  
    Throwable thrown = null;
    try {
        task.run();  //执行程序逻辑
    } catch (RuntimeException x) { //捕获RuntimeException
        thrown = x; throw x;
    } catch (Error x) { //捕获Error
        thrown = x; throw x;
    } catch (Throwable x) {   //捕获Throwable
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);  //运行完成,进行后续处理
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

默认情况下,线程池会捕获任务抛出的所有异常,但是不做任何处理。这样做能够保证我们提交的任务抛出了异常不会影响其他任务的执行,同时也不会对用来执行该任务的线程产生任何影响。 afterExecute方法没有做任何处理,所以如果我们的任务抛出了异常,我们也无法立刻感知到。即使感知到了,也无法查看异常信息。

如何处理

1 提交任务的时候,将所有可能的异常都Catch住,并且自己处理

 @Override
    public void run() {
        try {
            //处理所有的业务逻辑
        } catch (Throwable e) {
            //打印日志等
        } finally {
            //其他处理
        }
    }

2 自定义线程池,继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法。

3 提交任务的时候使用的方法是submit,那么该方法将返回一个Future对象,所有的异常以及处理结果都可以通过future对象获取。 采用Future模式,将返回结果以及异常放到Future中,在Future中处理

final class PoolService {
  private final ExecutorService pool = Executors.newFixedThreadPool(10);
  public void doSomething() {
    Future<?> future = pool.submit(new Task());
    // ...
    try {
      future.get();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt(); // Reset interrupted status
    } catch (ExecutionException e) {
      Throwable exception = e.getCause();
      // Forward to exception reporter
    }
  }
}

总结

  1. java线程池会捕获任务抛出的异常和错误,但不做任何处理 好的程序设计应该考虑到对于类异常的处理
  2. 处理线程池中的异常有两种思路: 1)提交到线程池中的任务自己捕获异常并处理,不抛给线程池 2)由线程池统一处理
  3. 对于execute方法提交的线程,有两种处理方式 (ExecutorService可以submit,也可以使用父类Executor的execute方法,无返回值) 1)自定义线程池并实现afterExecute方法 2)给线程池中的每个线程指定一个UncaughtExceptionHandler,由handler来统一处理异常。
  4. 对于submit方法提交的任务,异常处理是通过返回的Future对象进行的。

一些问题的思考

基于BIO实现的server端,当建立了100个链接时,会有多少个线程?如果基于NIO,又会是多少个线程,为什么?

通常基于NIO的server端,会用多少个线程去处理IO事件,为什么?

一个典型的 客户端集群 -》LB -》 服务端集群的架构中,如果客户端采用连接池,长连接的方式,这种设计可能会出现什么问题?如果客户端采用单个长连接的方式呢?有什么问题?该如何解决

cglib和java动态代理相比,具体又什么不同

用Executors.newCachedThreadPool()创建的线程池,在运行过程中可能的风险是?

new ThreadPoolExecutor(10,100,10,TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10));一个这样创建的线程池,当已经有10个任务在运行时,第11个任务提交到此线程池执行的时候会发生什么,为什么?

实现一个自定义的ThreadFactory的作用通常是?

除了用Object.wait和Object.notifyAll来实现线程间的交互外,你还会常用哪些来实现?

为什么ConcurrentHashMap可以在高并发的情况下比HashMap更为高效?

AtomicInteger、AtomicBoolean这些类之所以在高并发时高效,共同的原因是?

请合理的使用Queue来实现一个高并发的生产/消费的场景,给些核心的代码片段。

请实现让10个任务同时并发启动,给些代码片段。

在Java程序运行阶段,可以用什么命令行工具来查看当前Java程序的一些启动参数值,例如Heap Size等。

用什么命令行工具可以查看运行的Java程序的GC状况,请具体写出命令行格式。

用什么工具,可以在Java程序运行的情况下跟踪某个方法的执行时间,请求参数信息等,并请解释下工具实现的原理。

当一个Java程序接收请求,很长时间都没响应的话,通常你会怎么去排查这种问题?

Java进程突然消失了,你会怎么去排查这种问题?

Java程序为什么通常在刚启动的时候会执行的比较慢,而处理了一些请求后会变快,AOT能带来什么帮助?

Parallel GC、CMS GC、ZGC、Azul Pauseless GC最主要的不同是?背后的原理也请简单描述下?

请写一段程序,让其运行时的表现为触发5次ygc,然后3次fgc,然后3次ygc,然后1次fgc,请给出代码以及启动参数。

Go的Coroutine和Java的线程机制最主要的不同是?如果Java语言要透明的实现Coroutine,你觉得主要的难点是?

⚠️ **GitHub.com Fallback** ⚠️