book notes java async programming - landon30/Bulls GitHub Wiki
-
异步编程概念与作用
- 在使用同步编程方式时,由于每个线程同时只能发起一个请求并同步等待返回,所以为了提高系统性能,此时我们就需要引入更多的线程来实现并行化处理。但是多线程下对共享资源进行访问时,不可避免会引入资源争用和并发问题
- 操作系统层面对线程的个数是有限制的,不可能通过无限增加线程数来提供系统性能
- 同步阻塞的编程方式还会浪费资源,比如发起网络IO请求时,调用线程就会处于同步阻塞等待响应结果的状态,而这时候调用线程明明可以去做其他事情,等网络IO响应结果返回后再对结果进行处理
- 可见通过增加单机系统线程个数的并行编程方式并不是“灵丹妙药”。通过编写异步、非阻塞的代码,则可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后再返回到当前线程继续处理,从而提高系统性能
-
使用异步
-
异步编程是可以让程序并行运行的一种手段,其可以让程序中的一个工作单元与主应用程序线程分开独立运行,并且在工作单元运行结束后,会通知主应用程序线程它的运行结果或者失败原因。使用异步编程可以提高应用程序的性能和响应能力等
landon-异步可以让程序并行
-
比如当调用线程使用异步方式发起网络IO请求后,调用线程就不会同步阻塞等待响应结果,而是在内存保存请求上下文后,马上返回去做其他事情,等网络IO响应结果返回后再使用IO线程通知业务线程响应结果已经返回,由业务线程对结果进行处理。可见,异步调用方式提高了线程的利用率,让系统有更多的线程资源来处理更多的请求
-
比如在移动应用程序中,在用户操作移动设备屏幕发起请求后,如果是同步等待后台服务器返回结果,则当后台服务操作非常耗时时,就会造成用户看到移动设备屏幕冻结(一直处于请求处理中),在结果返回前,用户不能操作移动设备的其他功能,这对用户体验非常不好。而使用异步编程时,当发起请求后,调用线程会马上返回,具体返回结果会通过UI线程异步进行渲染,且在这期间用户可以使用移动设备的其他功能
-
-
例
-
需要异步地处理一些事情,而不需要知道异步任务的结果。比如在调用线程里面异步打日志,为了不让日志打印阻塞调用线程,会把日志设置为异步方式
-
在Java中,每当我们需要执行异步任务时,可以直接开启一个线程来实现,也可以把异步任务封装为任务对象投递到线程池中来执行。在Spring框架中提供了@Async注解
-
有时候我们还需要在主线程等待异步任务的执行结果,这时候Future就派上用场了
- 可以在调用线程内开启一个异步运行单元来执行任务A,开启异步运行单元后调用线程会马上返回一个Future对象(futureB),然后调用线程本身来执行任务B,等任务B执行完毕后,调用线程可以调用futureB的get()方法获取任务A的执行结果,最后再拼接两者的结果
- 由于任务A和任务B是并行运行的,所以整个过程耗时为max(调用线程执行任务B的耗时,异步运行单元执行任务A的耗时)
- 整个过程耗时显著缩短,对于用户来说,页面响应时间缩短,用户体验会更好,其中异步单元的执行一般是由线程池中的线程执行
-
使用Future确实可以获取异步任务的执行结果,但是获取其结果还是会阻塞调用线程的,并没有实现完全异步化处理,所以在JDK8中提供了CompletableFuture来弥补其缺点。CompletableFuture类允许以非阻塞方式和基于通知的方式处理结果,其通过设置回调函数方式,让主线程彻底解放出来,实现了实际意义上的异步处理
- 使用CompletableFuture时,当异步单元返回futureB后,调用线程可以在其上调用whenComplete方法设置一个回调函数action,然后调用线程就会马上返回,等异步任务执行完毕后会使用异步线程来执行回调函数action,而无须调用线程干预
-
JDK8还引入了Stream,旨在有效地处理数据流(包括原始类型),其使用声明式编程让我们可以写出可读性、可维护性很强的代码,并且结合CompletableFuture完美地实现异步编程。但是它产生的流只能使用一次,并且缺少与时间相关的操作(例如RxJava中基于时间窗口的缓存元素),虽然可以执行并行计算,但无法指定要使用的线程池。同时,它也没有设计用于处理延迟的操作(例如RxJava中的defer操作),所以Reactor、RxJava等Reactive API就是为了解决这些问题而生的
- Reactor、RxJava等反应式API也提供Java 8 Stream的运算符,但它们更适用于流序列(不仅仅是集合),并允许定义一个转换操作的管道,该管道将应用于通过它的数据(这要归功于方便的流畅API和Lambda表达式的使用)。Reactive旨在处理同步或异步操作,并允许你对元素进行缓冲(buffer)、合并(merge)、连接(join)等各种转换
-
跨网络的交互
-
线程A同步获取服务B的结果后,再同步调用服务C获取结果,可见在同步调用情况下业务执行语义比较清晰,线程A顺序地对多个服务请求进行调用;但是同步调用意味着当前发起请求的调用线程在远端机器返回结果前必须阻塞等待,这明显很浪费资源
-
好的做法应该是在发起请求的调用线程发起请求后,注册一个回调函数,然后马上返回去执行其他操作,当远端把结果返回后再使用IO线程或框架线程池中的线程执行回调函数
- Netty的异步非阻塞能力与CompletableFuture结合则可以轻松地实现网络请求的异步调用
-
在异步调用情况下,当线程A调用服务B后,会马上返回一个异步的futureB对象,然后线程A可以在futureB上设置一个回调函数;接着线程A可以继续访问服务C,也会马上返回一个futureC对象,然后线程A可以在futureC上设置一个回调函数
-
由于服务B和服务C是并发运行,所以相比同步调用,线程A获取到服务B和服务C结果的时间会缩短很多(同步调用情况下的耗时为服务B和服务C返回结果耗时的和,异步调用情况下耗时为max(服务B耗时,服务C耗时))
-
可以借助CompletableFuture的能力等两次RPC调用都异步返回结果后再执行其他操作
-
用线程A首先发起服务B的远程调用,会马上返回一个futureB对象,然后发起服务C的远程调用,也会马上返回一个futureC对象,最后调用线程A使用代码futureB.thenCombine(futureC, action)等futureB和futureC结果可用时执行回调函数action
landon-futureB.thenCombine(futureC, action)
-
-
web请求
-
Servlet3.0规范前,Servlet容器对Servlet的处理都是每个请求对应一个线程这种1 : 1的模式进行处理的
landon-阻塞处理
-
Servlet 3.0规范中则提供了异步处理的能力,让Servlet容器中的线程可以及时释放,具体Servlet业务处理逻辑是在业务自己的线程池内来处理;虽然Servlet 3.0规范让Servlet的执行变为了异步,但是其IO还是阻塞式的。IO阻塞是说在Servlet处理请求时,从ServletInputStream中读取请求体时是阻塞的,而我们想要的是当数据就绪时直接通知我们去读取就可以了,因为这可以避免占用我们自己的线程来进行阻塞读取
landon-io是阻塞的
-
Servlet 3.1规范提供了非阻塞IO
-
Servlet技术栈的不断发展实现了异步处理与非阻塞IO,但是其异步是不彻底的,因为受制于Servlet规范本身,比如其规范是同步的(Filter, Servlet)或阻塞的(getParameter, getPart)
- 所以新的使用少量线程和较少的硬件资源来处理并发的非阻塞Web技术栈应运而生—WebFlux,其是与Servlet技术栈并行存在的一种新技术
-
-
-
-
在Java中实现异步编程最简单的方式是:每当有异步任务要执行时,使用Tread来创建一个线程来进行异步执行
- 当执行异步任务时,会直接创建一个Thread来执行异步任务,这在生产实践中是不建议使用的,因为线程创建与销毁是有开销的,并且没有限制线程的个数,如果使用不当可能会把系统线程用尽,从而造成错误。在生产环境中一般创建一个线程池,然后使用线程池中的线程来执行异步任务,线程池中的线程是可以被复用的,这可以大大减少线程创建与销毁开销;另外线程池可以有效限制创建的线程个数
- 上面使用Thread执行的异步任务并没有返回值,如果我们想异步执行一个任务,并且需要在任务执行完毕后获取任务执行结果,则上面这个方式是满足不了的,这时候就需要用到JDK中的Future了
- 另外,每当需要异步执行时,我们需要显式地创建线程并启动,这是典型的命令式编程方式,增加了编程者的心智负担。我们需要的是声明式的异步编程方式,即告诉程序我们要异步执行,但是具体怎么实现异步应该对我们透明
-
landon
- 源代码例子在main中调用Thread.currentThread.join会造成程序无法结束,主线程等待主线程执行完毕?
- CallerRunsPolicy,即当线程池任务饱和,执行拒绝策略时不会丢弃新的任务,而是会使用调用线程来执行
-
FutureTask
- FutureTask代表了一个可被取消的异步计算任务,该类实现了Future接口,比如提供了启动和取消任务、查询任务是否完成、获取计算结果的接口
- FutureTask中的任务可以是Callable,也可以是Runnable
- public interface RunnableFuture extends Runnable, Future
- public class FutureTask implements RunnableFuture
- 线程池#submit(Runnable task)
- newTaskFor返回封装的就是FutureTask
- new FutureTask(runnable, value)
- 线程池#submit(Callable task)
- newTaskFor#new FutureTask(callable)
- 可以自定义一个FutureTask,作为Thread的runnable,也可以直接到线程池execute
- 和直接submit到线程池一样
-
FutureTask的局限性
- FutureTask虽然提供了用来检查任务是否执行完成、等待任务执行结果、获取任务执行结果的方法,但是这些特色并不足以让我们写出简洁的并发代码,比如它并不能清楚地表达多个FutureTask之间的关系。另外,为了从Future获取结果,我们必须调用get()方法,而该方法还是会在任务执行完毕前阻塞调用线程
- 我们真正想要
- 可以将两个或者多个异步计算结合在一起变成一个,这包含两个或者多个异步计算是相互独立的情况,也包含第二个异步计算依赖第一个异步计算结果的情况
- 对反应式编程的支持,也就是当任务计算完成后能进行通知,并且可以以计算结果作为一个行为动作的参数进行下一步计算,而不是仅仅提供调用线程以阻塞的方式获取计算结果
- 可以通过编程的方式手动设置(代码的方式)Future的结果;FutureTask不能实现让用户通过函数来设置其计算结果,而是在其任务内部来进行设置
- 可以等多个Future对应的计算结果都出来后做一些事情
-
CompletableFuture
-
CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为
-
public class CompletableFuture implements Future, CompletionStage
- 当CompletableFuture任务完成后,同步使用任务执行线程来执行依赖任务结果的函数或者行为
- 所有异步的方法在没有显式指定Executor参数的情形下都是复用ForkJoinPool.commonPool()线程池来执行
- 所有CompletionStage方法的实现都是相互独立的,以便一个方法的行为不会因为重载了其他方法而受影响
-
一个CompletableFuture任务可能有一些依赖其计算结果的行为方法,这些行为方法被收集到一个无锁基于CAS操作来链接起来的链表组成的栈中;当Completable-Future的计算任务完成后,会自动弹出栈中的行为方法并执行。需要注意的是,由于是栈结构,在同一个CompletableFuture对象上行为注册的顺序与行为执行的顺序是相反的
-
默认情况下支撑CompletableFuture异步运行的是ForkJoinPool
-
ForkJoinPool本身也是一种ExecutorService,与其他ExecutorService(比如ThreadPoolExecutor)相比,不同点是它使用了工作窃取算法来提高性能,其内部每个工作线程都关联自己的内存队列,正常情况下每个线程从自己队列里面获取任务并执行,当本身队列没有任务时,当前线程会去其他线程关联的队列里面获取任务来执行。这在很多任务会产生子任务或者有很多小的任务被提交到线程池来执行的情况下非常高效
-
Landon 这里工作窃取算法,是不是可以应用到游戏中的线程模型,当线程负载不均匀的时候,可以窃取
-
-
CompletableFuture是一种可以通过编程显式设置结果的future
- 比如在外部声明一个CompletableFuture,然后主线程get等待结果
- 然后异步线程可以在任务执行完成调用future.complete,通知等待结果,此时主线程从get返回
-
CompletableFuture实现异步计算与结果转换
-
基于runAsync系列方法实现无返回值的异步计算
- 提供runAsync (Runnable runnable,Executor executor)方法允许我们使用自己制定的线程池来执行异步任务
-
基于supplyAsync系列方法实现有返回值的异步计算:当你想异步执行一个任务,并且需要任务的执行结果时可以使用该方法,比如异步对原始数据进行加工,并需要获取到被加工后的结果等
- supplyAsync(Suppliersupplier, Executor executor)
-
基于supplyAsync实现异步任务A,执行完毕后,thenRun激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是拿不到任务A的执行结果的
- 默认情况下oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenRunAsync(Runnable action, Executor executor)来指定设置的回调事件使用自定义线程池线程来执行
-
基于supplyAsync实现异步任务A,执行完毕后,thenAccept激活异步任务B执行,需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的
- oneFuture对应的异步任务和在oneFuture上添加的回调事件都是使用ForkJoinPool.commonPool()中的同一个线程来执行的,大家可以使用thenAcceptAsync(Consumer<? super T> action, Executor executor)来指定设置的回调事件使用自定义线程池线程来执行
-
基于supplyAsync实现异步任务A,执行完毕后,thenApply激活异步任务B执行。需要注意的是,这种方式激活的异步任务B是可以拿到任务A的执行结果的,并且可以获取到异步任务B的执行结果
-
基于whenComplete设置回调函数,当异步任务执行完毕后进行回调,不会阻塞调用线程
-
挂起了main函数所在线程,是因为具体执行异步任务的是ForkJoin的commonPool线程池,其中线程都是Deamon线程,所以,当唯一的用户线程main线程退出后整个JVM进程就退出了,会导致异步任务得不到执行
-
whenCompleteAsync指定回调线程池
-
landon 原程序是用在Thread.currentThread.join,挂起当前main线程,如果不指定超时则无法结束
-
-
-
多个CompletableFuture进行组合运算
- 基于thenCompose实现当一个CompletableFuture执行完毕后,执行另外一个CompletableFuture
- 基于thenCombine实现当两个并发运行的CompletableFuture任务都完成后,使用两者的结果作为参数再执行一个异步任务
- 基于allOf等待多个并发运行的CompletableFuture任务执行完毕
- 基于anyOf等多个并发运行的CompletableFuture任务中有一个执行完毕就返回
-
CompletableFuture异常处理
- CompletableFuture提供了completeExceptionally方法来处理异常情况
- future.exceptionally(t -> "exception-default").get() 可以在抛出异常后设置默认值
-
-
JDK8 Stream & CompletableFuture
-
使用Stream,声明示编程,可读性强
-
中间操作符(filter,map),终端操作符(collect)
-
Stream和CompletableFuture结合
- 如原来顺序同步调用rpc call
- 现在可以异步并发,最后阻塞获取每个异步任务执行完毕
-
landon join和get区别
- 后者抛出的是checked exception,需要try/catch,而前者是unchecked exception
-
-
使用CompletableFuture实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了编程者的负担
- spring异步执行
- @Async注解,并可以指定线程池
- 原理
- 对标注@Async注解的类做了代理
- 使用CompletableFuture.supplyAsync开启了一个异步任务,异步任务内具体调用了注解的方法
-
反应式编程
- Responsive、Resilient(回弹性)、Elastic、Message Driven
- 反应式编程是一种编程理念,作为反应式编程理念实施的第一步,Microsoft在.NET生态系统中创建了Reactive Extensions(Rx)库,然后RxJava在JVM上实现了Java版本的ReactiveExtensions(Rx)库;但是Rx.Net与RxJava的实现并没有遵守一样的规范。为了统一Java中反应式编程规范,后来通过Reactive Streams工作出现了Java的标准化,这一规范定义了在JVM上实现的反应库必须遵守的一组接口和交互规则,RxJava最新版也逐渐向该规范靠拢,在Java中该规范的实现有RxJava与Reactor库,由于RxJava和Reactor库遵循了同一个规范,所以可以很轻易地从一方切换到另一方
- Java8#stream
- 虽然可以执行并行计算(基于ForkJoinPool.commonPool()),但无法指定使用业务本身的线程池
- 两种方式来提高系统的能力
- 并行化:使用更多线程和更多的硬件资源
- 并行化方法不是灵丹妙药,获得硬件的全部功能才是必要的
- 在现有资源的使用方式上寻求更高的效率
- 寻求现有资源更高的使用率,可以解决资源浪费问题。通过编写异步、非阻塞代码,你就可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前线程继续进行处理
- 并行化:使用更多线程和更多的硬件资源
- 在JVM上编写异步代码?Java提供了如下两种异步编程模型
- CallBacks:异步方法没有返回值,但需要额外的回调函数(Lambda或匿名类),在结果可用时调用它们
- 首先多个Callback难以组合在一起,这将会很快导致代码难以阅读和维护(称为“Callback Hell”)
- Futures:异步方法立即返回Future,异步线程计算任务,并当结果计算出来后设置到Future
- 一起编排多个Future是可行的但不容易
- CallBacks:异步方法没有返回值,但需要额外的回调函数(Lambda或匿名类),在结果可用时调用它们
- 编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,Callback模型很简单,但其主要缺点之一是,对于复杂的处理,你需要从回调执行回调,本身嵌套在另一个回调中,依此类推,这个混乱被称为Callback Hell,这样的代码很难回归并推理业务执行逻辑
-
Reactive Streams规范
-
处理数据流—尤其是体积不可预测的“实时”数据,需要在异步系统中特别小心。最突出的问题是需要小心控制资源消费端,以便快速生产数据的数据源不会压倒流的消费端。为了让多个网络主机进行协作或单个计算机内的多个CPU在核心中并行使用计算资源,异步编程是非常必要的
-
Reactive Streams的主要目标是管理跨异步边界的流数据交换—考虑将元素从一个线程传递到另一个线程或线程池进行处理,同时确保接收方不会强制缓冲任意数量的数据。换句话说,回压是该模型的组成部分,以便允许在线程之间协调的队列有界
-
处理潜在无限数量的元素,并且按顺序进行处理
-
在组件之间异步传递元素
-
具有强制性的非阻塞回压
-
landon:
- 流水线方式 + reactor(事件驱动)
- 发布,订阅/观察者模式
- 之前是嵌套callback,现在改为流,因为大家都是数据流转(先异步回调拿到结果A,再根据A再次异步回调拿到结果)--- 即垂直的方式变为水平的方式
-
在RxJava中,操作运算符不能直接使用Threads或ExecutorServices进行异步处理,而需要使用Schedulers来抽象统一API背后的并发调度线程池。RxJava还可以让我们通过Schedulers.from(Executor)将现有的Executor(及其子类型,如ExecutorService)包装到Scheduler中
- 所以如果流发射元素时有耗时的计算或者阻塞IO,则可以通过使用subscribeOn操作来把阻塞的操作异步化(切换到其他线程来执行)。另外如果一旦数据就绪(数据发射出来),则可以通过使用observeOn来切换使用其他线程(比如前台或者GUI线程)来对数据进行处理
- 内置了io与computation类型的线程池来做同步转异步,但是其允许我们使用业务自定义的线程池来进行处理
- Single.defer(() -> Single.just(count.get())),使得Single.just(count. get())方法不会在编译时执行,而是等到原始流结束后才会执行 -defer
-
Reactor
-
Schedulers.elastic():线程池中的线程是可以复用的,按需创建与空闲回收,该调度器适用于I/O密集型任务
-
Schedulers.parallel():含有固定个数的线程池,该调度器适用于计算密集型任务
-
landon TODO 如果我本身的io本身就是异步的,如mongo的异步驱动
-
-
-
TODO 将callback hell的例子改用reactor实现
-
三方关于reactive
-
响应式编程
-
为什么说电子表格软件是“响应式典范”呢,因为“单价”和“数量”的任何变动,都会被引用(“监听”)它的单元格实时更新计算结果。变化传递
-
Cart购物者添加商品,调用Invoice计算价格 -> 调整为
- Invoice初始化监听Cart,Cart一旦有响应的事件,如添加商品,Invoce就会响应
-
一种生产者只负责生成并发出数据/事件,消费者来监听并负责定义如何处理数据/事件的变化传递方式
-
这些数据/事件在响应式编程里会以数据流的形式发出
- 每次往购物车里添加或移除一种商品,或调整商品的购买数量,这种事件都会像过电一样流过这由公式串起来的多米诺骨牌一次。这一次一次的操作事件连起来就是一串数据流(data stream),如果我们能够及时对数据流的每一个事件做出响应,会有效提高系统的响应水平。基于数据流
-
声明式
- 我们先声明好了对于数据流“将会”进行什么样的处理,当有数据流过来时,就会按照声明好的处理流程逐个进行处理
-
总结
-
我们设想这样一种场景,我们从底层数据库驱动,经过持久层、服务层、MVC层中的model,到用户的前端界面的元素,全部都采用声明式的编程范式,从而搭建一条能够传递变化的管道,这样我们只要更新一下数据库中的数据,用户的界面上就相应的发生变化,岂不美哉?尤其重要的是,一处发生变化,我们不需要各种命令式的调用来传递这种变化,而是由搭建好的“流水线”自动传递。
-
这种场景用在哪呢?比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。
-
landon,其实就是reactive,don't call us,we'll call you
-
-
-
响应式流
-
为啥不用Java Stream来进行数据流的操作
- Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API
- 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然不需要向前端传递每一次更新,这时候就需要一种流量控制能力,就像我们家里的水龙头,可以控制开关流速,而Java Stream不具备完善的对数据流的流量控制的能力
- 具备“异步非阻塞”特性和“流量控制”能力的数据流,我们称之为响应式流(Reactive Stream)
- Java 9版本,引入了java.util.concurrent.Flow,响应式规范
-
阻塞便是性能杀手之一
- i/o太慢
- CPU先生是典型的工作狂,任务多的时候,通宵达旦也毫无怨言,但是有什么事情让它等,那简直要他命了。恰恰一起共事的其他组(尤其是I/O组的磁盘和网卡)相对来说那效率是低的离谱啊
- 对于阻塞造成的性能损失,我们通常有两种思路来解决
- 并行化:使用更多的线程和硬件资源
- 异步化:基于现有的资源来提高执行效率
- 多线程并非银弹
- 上下文切换、内存占用、指令重排等多线程问题等
- 多线程仍然是目前主流的高并发方案
- 直到Node.js的出现,为Java Web开发带来了新的启示。Node.js只需要单线程(引擎内部多线程)就可以应对高并发的请求。Java可不可以这么搞呢?答案是可以!秘诀同Node.js一样——“异步非阻塞”
-
就像Node.js,使用“异步非阻塞”的代码可以在不改变执行线程的情况下切换要执行的任务
- 回调
- CompletableFuture
-
非阻塞回调
-
前端js单线程,很早就是非阻塞,对于需要较长时间才能返回结果的调用通常采用异步方式
-
callback hell
Landon-callback hell的例子来自于
https://projectreactor.io/docs/core/release/reference/index.html#_asynchronicity_to_the_rescue
- Reactor3有timeout操作符,即异步获取数据后如果超时可以执行其他逻辑
- 响应式流的编程方式,不仅有效减少了代码量,还大大提高了代码的可阅读性
-
-
相对于回调和Future来说,CompletableFuture的功能强大了不少
-
比如我们在咖啡店买咖啡,点餐之后我们首先会拿到一张小票,这个小票就是Future,代表你凭此票在咖啡做好之后就可以去拿了。但是Future.get()方法仍然是同步和阻塞的,意味着你拿着票可以去找朋友聊会天,但是并不知道自己的咖啡什么时候做好,可能去柜台拿的时候还是要等一会儿
-
提供CompletableFuture服务的咖啡厅,不仅有小票,还有一个号牌,我们点餐之后找个桌坐下就好,这个订单的咖啡一旦做好就会送到我们手中
-
异步回调,它提供了五十多种方法,其中以Async结尾的方法都可以异步的调用而不会导致阻塞
-
声明式,在CompletableFuture的方法中,多多少少可以看到类似上边Reactor代码的“声明式编程”的感觉
- completableFuture.thenApplyAsync(...).thenApplyAsync(...).thenAcceptAsync
-
landon
一个复杂的例子同样例子于calback hell所在的projectreactor的例子
reactor主要内置了许多组合操作,会让代码更简洁一些
所以说其实CompletableFuture是反应式编程的趋势,但功能不全或者未封装太好
-
-
回调或 CompletableFuture在处理复杂逻辑时会遇到的相似的窘境,反观Reactor3提供的API,却可以显著减少代码量,提高代码可阅读性,尤其是还可以提供一些不错的功能
-
流量控制-回压
- 在响应式流中,数据流的发出者叫做Publisher,监听者叫做Subscriber
- 假如发布者发出数据的速度和订阅者处理数据的速度不同的时候,怎么办呢?订阅者处理速度快的话,那还好说,但是如果处理速度跟不上数据发出的速度
- 如果没有流量控制,那么订阅者会被发布者快速产生的数据流淹没。就像在一个流水线上,如果某个工位处理比较慢,而上游下料比较快的话,这个工位的工人师傅就吃不消了,这个时候他需要一种途径来告诉上游下料慢一些。同样的,订阅者也需要有一种能够向上游反馈流量需求的机制
- 这种能够向上游反馈流量请求的机制就叫做回压(backpressure,也有翻译为“背压”的)
- 缓存的策略
- 订阅者处理完一个元素的时候通过
request(1)跟发布者再请求一个元素。由于发布者的数据不能很快被订阅者处理掉,那么发布者会将未处理的数据元素缓存起来 - 这种处理方式与消息队列有些相似之处,发布者需要维护一个队列用来缓存还没有被处理的元素。通常用于对数据准确性要求比较高的场景,比如发布者这儿是突然到来的数据高峰,都是要保存到数据库的,作为订阅者的数据持久层没有那么快的处理速度,那么发布者就需要将数据暂时缓存起来
- 订阅者处理完一个元素的时候通过
- 丢弃的策略
- 发布者不需要缓存来不及处理的数据,而是直接丢弃,当订阅者请求数据的时候,会拿到发布者那里最近的一个数据元素。比如我们在做一个监控系统,后台的监控数据以每秒10个的速度产生,而前端界面只需要每秒钟更新一下监控数据即可,那作为发布者的后台就不用缓存数据了,因为这种时效性强的场景,用不到的数据直接丢掉即可
-
总结
- 响应式流的两个核心特点:异步非阻塞,以及基于“回压”机制的流量控制
- (响应式流[异步非阻塞,背压机制]) + 变化传递 + 声明式范式
- 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展
- 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的
- 使用 iterator 是一种“命令式”(imperative)编程范式,因为什么时候获取下一个元素取决于开发者。在响应式流中,相对应的角色是“发布者 - 订阅者”(Publisher-Subscriber),当有新的值到来的时候,反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键
- 对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“处理流程”来定义对数据流的处理逻辑
-
-
Reactor 3快速上手
-
reactor-core 3.3.3.RELEASE
-
Reactor中的发布者(Publisher)由
Flux和Mono两个类定义-
一个Flux对象代表一个包含0..N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0..1)元素的结果
-
Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者
-
举个例子,一个HTTP请求产生一个响应,所以对其进行“count”操作是没有多大意义的。表示这样一个结果的话,应该用Mono,对于的操作通常只用于处理 0/1 个元素。它们从语义上就原生包含着元素个数的信息
-
count操作用于Flux,但是操作返回的结果是Mono
-
ReactiveRepository
-
Mono findById(long id)
-
Flux findAll()
-
landon 以前接口是返回List和Object,现在是返回Flux和Mono
-
-
-
subscribe方法中的lambda表达式作用在了每一个数据元素上- 只有
subscribe()方法调用的时候才会触发数据流 - 订阅前什么都不会发生
- 只有
-
从命令式和同步式编程切换到响应式和异步式编程有时候是令人生畏的。学习曲线中最陡峭的地方就是出错时如何分析和调试
- StepVerifier
-
Operator
-
通常情况下,我们需要对源发布者发出的原始数据流进行多个阶段的处理,并最终得到我们需要的数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐由原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于源发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个一个的操作符(Operator)
-
map
- 元素映射为新元素
-
map接受一个Function的函数式接口为参数,这个函数式的作用是定义转换操作的策略
-
flatMap
-
元素映射为流,然后将这些流合并为一个大的数据流
-
流的合并是异步的,先来先到,并非是严格按照原始序列的顺序
-
flatMap也是接收一个Function的函数式接口为参数,这个函数式的输入为一个T类型数据值,对于Flux来说输出可以是Flux和Mono,对于Mono来说输出只能是Mono-
doOnNext方法是“偷窥式”的方法,不会消费数据流 -
Landon doOnNext,每次发射一项数据就会调用它一次,一般用于在subscribe之前对数据的一些处理
-
-
flatMap通常用于每个元素又会引入数据流的情况 -
比如我们有一串url数据流,需要请求每个url并收集response数据
- Mono requestUrl(String url) {...}
- 而url数据流为一个FluxurlFlux
- 那么为了得到所有的HttpResponse,就需要用到flatMap
- urlFlux.flatMap(url -> requestUrl(url))
-
-
filter
-
filter操作可以对数据元素进行筛选 -
filter接受一个Predicate的函数式接口为参数,这个函数式的作用是进行判断并返回boolean
-
-
zip
- 一对一合并
- 看到
zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来 - 二合一:它对两个Flux/Mono流每次各取一个元素,合并为一个二元组(
Tuple2) -
Flux.interval声明一个每200ms发出一个元素的long数据流 - 在异步条件下,数据流的流速不同,使用zip能够一对一地将两个或多个数据流的元素对齐发出
-
提供了非常丰富的操作符
-
-
调度器与线程模型
-
Reactor让线程管理和任务调度更加“傻瓜”——调度器(Scheduler)帮助我们搞定这件事
-
Schedulers.immediate()
- 当前线程
-
Schedulers.single()
- 这个方法对所有调用者都提供同一个线程来使用
- 如果你想使用独占的线程
- Executors.newSingleThreadExecutor()
-
Schedulers.elastic()
- 它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用
- Executors.newCachedThreadPool()
-
Schedulers.parallel()
- 所创建线程池的大小与CPU个数等同
- 固定大小线程池
- Executors.newFixedThreadPool()
-
Schedulers.fromExecutorService(ExecutorService)
- 自定义线程池
-
举例
- 将同步的阻塞调用变为异步的
- Mono.fromCallable().subscribeOn(Schedulers.elastic())
-
切换调度器的操作符
- Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:
publishOn和subscribeOn -
publishOn会影响链中其后的操作符,比如第一个publishOn调整调度器为elastic,则其后的filter的处理操作是在弹性线程池中执行的 -
subscribeOn无论出现在什么位置,都只影响源头的执行环境。比如源头是range,则range方法是执行在subscribeOn指定的single单线程执行。直至被第一个publishOn切换调度器之前,所以range后的map操作(publicshOn之前)也在single单线程执行
- Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:
-
Landon 线程模型测试补充
TODO 如果已有一个callback耗时接口或者同步接口,如何封装异步的Flux或者Mono返回
- 整体线程模型分析没有问题,但是同步变为异步的时候
- Mono.fromCallable,callable为之前的同步任务,如返回Mono
- Flux.create(fluxSink方式,fluxSink的next产生元素并最后complete
- 然后再通过subscribeOn和publishOn指定线程执行异步任务
CompletableFuture复杂的例子中异步的包装,直接用subscribeOn指定了线程池
-
-
错误处理
- 在响应式流中,错误(error)是终止信号。当有错误发生时,它会导致流序列停止,并且错误信号会沿着操作链条向下传递,直至遇到subscribe中的错误处理方法
- 简单就是订阅一个System.err::println
- 捕获并返回一个静态的缺省值,onErrorReturn,出错就返回了,没有继续执行
- 捕获并执行一个异常处理方法或计算一个候补值来顶替,
onErrorResume方法能够在收到错误信号的时候提供一个新的数据流- Flux.just(endpoint1, endpoint2).flatMap(k -> callExternalService(k)) .onErrorResume(e -> getFromCache(k))
- 调用外部服务
- 如果外部服务异常,则从缓存中取值代替
- 捕获,并再包装为某一个业务相关的异常,然后再抛出业务异常
- onErrorMap
- onErrorMap(original -> new BusinessException("SLA exceeded", original))
- 一功能其实也可以用
onErrorResume实现- onErrorResume(original -> Flux.error(new BusinessException("SLA exceeded", original))
- 捕获,记录错误日志,然后继续抛出
- 如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用
doOnError方法 - 前面提到,形如
doOnXxx是只读的,对数据流不会造成影响
- 如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用
- 使用 finally 来清理资源,或使用 Java 7 引入的 "try-with-resource"
- Flux.using(resourceSupplier,sourceSupplier,resourceCleanup)
-
doFinally在序列终止(无论是 onComplete、onError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消),以便进行针对性的清理 -
take(1)能够在发出1个元素后取消流
- 还有一个用于错误处理的操作符你可能会用到,就是
retry,见文知意,用它可以对出现错误的序列进行重试.- 请注意:retry对于上游Flux是采取的重订阅(re-subscribing)的方式,因此重试之后实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的
-
retry不过是再一次从新订阅了原始的数据流,从1开始。第二次,由于异常再次出现,便将异常传递到下游了 - retry(2)表示重试2次
-
回压
- 当执行
.subscribe(System.out::println)这样的订阅的时候,直接发起了一个无限的请求(unbounded request),就是对于数据流中的元素无论快慢都“照单全收” - subscribe(Subscriber subscriber),这个是最终的方法,最终都是拼装为这个方法
- 我们可以通过自定义具有流量控制能力的Subscriber进行订阅
- doOnRequest,在每次request时执行
- 扩展BaseSubscriber
-
hookOnSubscribe定义在订阅的时候执行的操作 -
hookOnNext定义每次在收到一个元素的时候的操作
-
- 这6个元素是以每秒1个的速度被处理的。由此可见
range方法生成的Flux采用的是缓存的回压策略,能够缓存下游暂时来不及处理的元素
- 当执行
-
从命令式编程到响应式编程的切换并不是一件容易的事,需要一个适应的过程
- 相对于传统的基于回调和Future的异步开发方式,响应式编程更加具有可编排性和可读性,配合lambda表达式,代码更加简洁,处理逻辑的表达就像装配“流水线”,适用于对数据流的处理
- 在订阅(subscribe)时才触发数据流,这种数据流叫做“冷”数据流,就像插座插上电器才会有电流一样,还有一种数据流不管是否有订阅者订阅它都会一直发出数据,称之为“热”数据流,Reactor中几乎都是“冷”数据流
- 调度器对线程管理进行更高层次的抽象,使得我们可以非常容易地切换线程执行环境
- 灵活的错误处理机制有利于编写健壮的程序
- “回压”机制使得订阅者可以无限接受数据并让它的源头“满负荷”推送所有的数据,也可以通过使用request方法来告知源头它一次最多能够处理 n 个元素,从而将“推送”模式转换为“推送+拉取”混合的模式
-
-
webflux性能测试
-
异步非阻塞的优势体现在I/O操作方面,无论是文件I/O、网络I/O,还是数据库读写,都可能存在阻塞的情况
-
用mvc和flux分别测试
- 分别sleep和使用delayElement
-
mvc
- tomcat最大线程数200
- 用户量在逐渐增加时,线程数达到默认值200
- 线程数达到200后,因为排队,吞吐量放缓,延时开始上升
- 当扩大工作线程数后,可以一定程度下提高吞吐量,降低因阻塞造成的响应延时
- 但是内存成本和更多的线程上下文切换成本
-
flux
- 对于运行在异步IO的Netty之上的WebFlux应用来说,其工作线程数量始终维持在一个固定的数量上
- 异步非阻塞条件下,程序逻辑是由事件驱动的,并不需要多线程并发
- 吞吐量线性增多,并非出现延时
-
非阻塞的处理方式规避了线程排队等待的情况,从而可以用少量而固定的线程处理应对大量请求的处理
landon-传统的处理是1请求1线程,当线程池的线程用完后就开始排队,此时可能是有cpu资源的,但是却被阻塞到了等待io上等
但是异步的,则是不等待,这样能更大限度的利用的cpu资源
-
结论就是相对于Servlet多线程的处理方式来说,Spring WebFlux在应对高并发的请求时,借助于异步IO,能够以少量而稳定的线程处理更高吞吐量的请求,尤其是当请求处理过程如果因为业务复杂或IO阻塞等导致处理时长较长时,对比更加显著
-
此时,我们更加理解了Nodejs的骄傲,不过我们大Java语言也有了Vert.x和现在的Spring WebFlux
-
-
http客户端的性能测试
- WebClient同样能够以少量而固定的线程数处理高并发的Http请求
-
Netflix开发了Zuul 2,它基于Netty,以异步非阻塞的方式来处理请求,一个CPU核心专心处理一个线程,每一个请求的生命周期存在于Event Loop和Callback中
- 从资源成本的角度来说,由于不用为每一个请求开辟独立的线程,能够避免CPU线程切换、大量线程栈内存造成的资源浪费,基本只剩下文件描述符和回调Listener的成本,从而Http连接成本显著降低。此外,由于工作在一个线程上,CPU除了不用来回奔波于成百上千的线程,还能更好地利用一二级CPU缓存,从而进一步提高性能
- 不同于计算密集型的应用,WEB应用通常是高并发和I/O密集型的,尤其是在微服务架构的应用中,CPU执行时间相对于阻塞时间来说通常要短得多,越是如此,异步非阻塞越能发挥出显著的性能提升效果。从这个案例可以看到,以异步非阻塞的方式代替阻塞和多线程方式是提高性能的有效途径
-
异步mongo驱动性能测试
- 相对于同步驱动来说,异步驱动在性能方面略胜一筹
- 在应对大量客户端线程的情况下,异步驱动能够以少量而稳定的连接数应对
-
上边我们分别针对Http服务端、Http客户端以及数据库进行了同步和异步的测试对比,综上来看,基于异步非阻塞的响应式应用或驱动能够以少量且固定的线程应对高并发的请求或调用,对于存在阻塞的场景,能够比多线程的并发方案提供更高的性能。
- 响应式和非阻塞并不是总能让应用跑的更快,况且将代码构建为非阻塞的执行方式本身还会带来少量的成本。但是在类似于WEB应用这样的高并发、少计算且I/O密集的应用中,响应式和非阻塞往往能够发挥出价值。尤其是微服务应用中,网络I/O比较多的情况下,效果会更加惊人
-
响应式流特点
-
具有处理无限数量的元素的能力、按序处理、异步地传递元素、必须实现非阻塞的回压(backpressure)
-
当执行subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,通常订阅者会借助传入的Subscription向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流
-
订阅后的回调用表达式表示就是onSubscribe onNext (onError | onComplete),即以一个onSubscribe开始,中间有0个或多个onNext,最后有0个或1个onError或onComplete事件
-
Publisher和Subscriber融合了迭代器模式和观察者模式
- 我们经常用到的Iterable和Iterator就是迭代器模式的体现,可以满足上边第1和2个特点关于按需处理数据流的要求;而观察者模式基于事件的回调机制有助于满足第3个特点关于异步传递元素的要求
-
Subscription是Publisher和Subscriber的“中间人”
- 当发布者调用subscribe方法注册订阅者时,会通过订阅者的回调方法onSubscribe传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request方法向发布者“要”数据了。回压机制正是基于此来实现的,因此第4个特点也能够实现了
-
Processor集Publisher和Subscriber于一身
-
Landon,Processor的作用?
Processor既是一种特别的发布者(Publisher)又是一种订阅者(Subscriber)
-
-
向下:很自然地,数据和信号(onSubscribe、onNext、onError、onComplete)是通过每一个操作符向下传递的,传递的过程中进行相应的操作处理。
- 向上:有一个自下而上的“订阅链”,这个订阅链可以用来传递
request,因此回压(backpressure)可以实现从下游向上游的传递
- 向上:有一个自下而上的“订阅链”,这个订阅链可以用来传递
-
-
Reactor调度器与线程模型
- 自从版本 3.1.0,Reactor 引入了一个类似于 ThreadLocal 的高级功能:Context。它作用于一个 Flux 或一个 Mono 上,而不是应用于一个线程(Thread)。也就是其生命周期伴随整个数据流,而不是线程
- 对于一些能够在一个线程中顺序处理的任务,即使调度到ParallelScheduler上,通常也只由一个Worker来执行。有时候,我们确实需要一些任务能够“均匀”分布在不同的工作线程上执行,这时候就需要用到
ParallelFlux。为了配置ParallelFlux如何并行地执行每一个轨道,需要使用runOn(Scheduler),这里,Schedulers.parallel() 是比较推荐的专门用于并行处理的调度器
-
landon
- reactive思想不错,但是1是要切换到反应式编程思维上面去有一定的困难。2是reactor也是一个很大的库,也了解其内部原理以及调试等也比较困难。3是线程模型这块,如何解决线程安全的问题得再次确认一下。个人还是倾向于线程模型比较清晰,知道哪些业务运行在哪个线程下
- 或者可以简单理解是在java8 stream api上做了升级
- 业务这边主要是接口升级,返回Flux或者Mono
- 传统的callback和completable future在某些层面,如果嵌套层数不是特别多,且如果不是全异步的情况下,使用还好
- TODO 要结合游戏具体的业务去想,而非例子或者网上的偏web
- TODO 如果用reactive,还是要深入了解框架原理,httpserver部分是否可以直接考虑webflux
- reactive思想不错,但是1是要切换到反应式编程思维上面去有一定的困难。2是reactor也是一个很大的库,也了解其内部原理以及调试等也比较困难。3是线程模型这块,如何解决线程安全的问题得再次确认一下。个人还是倾向于线程模型比较清晰,知道哪些业务运行在哪个线程下
-
常见的比较经典的Servlet容器实现有Tomcat和Jetty
-
Servlet 3.0提供的异步处理能力
-
Web应用程序中提供异步处理最基本的动机是处理需要很长时间才能完成的请求。这些比较耗时的请求可能是一个缓慢的数据库查询,可能是对外部REST API的调用,也可能是其他一些耗时的I / O操作。这种耗时较长的请求可能会快速耗尽Servlet容器线程池中的线程并影响应用的可伸缩性
-
在Servlet3.0规范前,Servlet容器对Servlet都是以每个请求对应一个线程这种1 : 1的模式进行处理的
-
每当用户发起一个请求时,Tomcat容器就会分配一个线程来运行具体的Servlet。在这种模式下,当在Servlet内执行比较耗时的操作,比如访问了数据库、同步调用了远程rpc,或者进行了比较耗时的计算时,当前分配给Servlet执行任务的线程会一直被该Servlet持有,不能及时释放掉后供其他请求使用,而Tomcat内的容器线程池内线程是有限的,当线程池内线程用尽后就不能再对新来的请求进行及时处理了,所以这大大限制了服务器能提供的并发请求数量
-
在Servlet 3.0规范中引入了异步处理请求的能力,处理线程可以及时返回容器并执行其他任务
-
请求被Servlet容器接收,然后从Servlet容器(例如Tomcat)中获取一个线程来执行,请求被流转到Filter链进行处理,然后查找具体的Servlet进行处理
-
Servlet具体处理请求参数或者请求内容来决定请求的性质
-
Servlet内使用“req.startAsync(); ”开启异步处理,返回异步处理上下文Async-Context对象,然后开启异步线程(可以是Tomcat容器中的其他线程,也可以是业务自己创建的线程)对请求进行具体处理(这可能会发起一个远程rpc调用或者一个数据库请求);开启异步线程后,当前Servlet就返回了(分配给其执行的容器线程也就释放了),并且不对请求方产生响应结果
-
异步线程对请求处理完毕后,会通过持有的AsyncContext对象把结果写回请求方
-
具体处理请求响应的逻辑已经不再是Servlet调用线程来做了,Servlet内开启异步处理后会立刻释放Servlet容器线程,具体对请求进行处理与响应的是业务线程池中的线程
-
Landon 这种目前和游戏服务器类似,不过此时相当于bio,只不过io和逻辑分开了
-
-
-
Servlet 3.1提供的非阻塞io
-
虽然Servlet 3.0规范让Servlet的执行变为了异步,但是其IO还是阻塞式的。IO阻塞是说,在Servlet处理请求时,从ServletInputStream中读取请求体时是阻塞的。而我们想要的是,当数据就绪时通知我们去读取就可以了,因为这可以避免占用Servlet容器线程或者业务线程来进行阻塞读取。阻塞IO会消耗宝贵的线程
-
Servlet容器接收请求后会从容器线程池获取一个线程来执行具体Servlet的Service方法,由Service方法调用StartAsync把请求处理切换到业务线程池内的线程,如果业务线程内调用了ServletInputStream的read方法读取http的请求体内容,则业务线程会以阻塞方式读取IO数据(因为数据还没就绪)
-
在Servlet3.1规范中提供了非阻塞IO处理方式:Web容器中的非阻塞请求处理有助于增加Web容器可同时处理请求的连接数量。Servlet容器的非阻塞IO允许开发人员在数据可用时读取数据或在数据可写时写数据
-
基于内核的能力,Servlet3.1允许我们在ServletInputStream上通过函数setReadListener注册一个监听器,该监听器在发现内核有数据时才会进行回调处理函数
- onAllDataRead的时候切换到业务线程池
-
Servlet容器接收请求后会从容器线程池获取一个线程来执行具体Servlet的Service方法,Service方法内调用StartAsync开启异步处理,然后通过setReadListener注册一个ReadListener到ServletInputStream,最后释放容器线程
-
当内核发现TCP接收缓存有数据时,会回调注册的ReadListener的onData Available方法,这时使用的是容器线程,但是我们可以选择是否在onData Available方法内开启异步线程来对就绪数据进行读取,以便及时释放容器线程
-
当发现http的请求体内容已经被读取完毕后,会调用onAllDataRead方法,在这个方法内我们使用业务线程池对请求进行处理,并把结果写回请求方
-
无论是容器线程还是业务线程,都不会出现阻塞IO的情况。因为当线程被分配来进行处理时,当前数据已经是就绪的,可以马上进行读取,故不会造成线程的阻塞
-
Servlet3.1不仅增加了可以非阻塞读取请求体的ReadListener,还增加了可以避免阻塞写的WriteListener接口,在ServletOutputStream上可以通过set-WriteListener进行设置。
-
landon-TODO 这里为什么需要手动的去InputStream读取数据,容器不是应该已经帮我们做好了吗?感觉是完全实现了一套解析http的?
-
-
-
Spring web mvc的异步处理能力
-
基于DeferredResult的异步处理
-
基于Callable实现异步处理
-
Landon:简单来说,就是异步回调
-
-
spring webflux概述
- Spring框架中包含的原始Web框架Spring Web MVC是专为Servlet API和Servlet容器构建的。反应式栈的Web框架Spring WebFlux则是在Spring 5.0版中才添加的,它是完全无阻塞的
- Servlet API最初是为了通过Filter→Servlet链进行单次传递而构建的。Servlet 3.0规范中添加的异步请求处理允许应用程序及时退出Filter-Servlet链(及时释放容器线程),但保持响应打开以便异步线程进行后续处理
- Spring MVC的异步处理支持是围绕该机制构建的。当controller返回DeferredResult时,将退出Filter-Servlet链,并释放Servlet容器线程。稍后,当设置DeferredResult时,会对请求进行重新分派,使用DeferredResult值(就像controller返回它一样)以恢复处理
- Spring WebFlux既不是基于Servlet API构建的,也不需要额外的异步请求处理功能,因为它在设计上是异步的
- 从编程模型的角度来看,Spring MVC和Spring WebFlux都支持异步和反应式作为controller方法中的返回值。Spring MVC甚至支持流媒体,包括反应性回压功能,但是其对响应的写入仍然是阻塞的(并且在单独的线程上执行), Servlet 3.1确实为非阻塞IO提供了API,但是使用它会远离Servlet API的其余部分,比如其规范是同步的(Filter, Servlet)或阻塞的(getParameter,getPart)。WebFlux则不同,其依赖于非阻塞IO,并且每次写入都不需要额外的线程进行支持
- Spring Boot有一个WebFlux启动器(starter),可以自动启动。另外默认情况下,starter使用Netty作为服务器(基于reactor-netty支持),可以通过更改Maven或Gradle依赖项轻松切换到Tomcat、Jetty或Undertow服务器
-
webflux的并发模型
-
在Spring MVC(及一般的Servlet应用程序)中,假设应用程序可以阻塞当前线程(例如远程过程调用),则Servlet容器一般使用大型线程池来化解请求期间的潜在阻塞问题
-
在Spring WebFlux(以及一般的非阻塞服务器,例如Netty)中,假设应用程序不会阻塞,因此非阻塞服务器使用小的固定大小的线程池(事件循环IO工作线程)来处理请求
-
如果确实需要使用阻塞库,该怎么办? Reactor和RxJava分别提供了publishOn和observeOn运算符将流上的后续操作切换到其他的线程上进行处理。这意味着在阻塞API方案中,有一个简单的适配方案。但请记住,阻塞API不适合这种并发模型
-
在Reactor和RxJava中,可以使用操作符声明逻辑,并且在运行时形成一个反应流,其中数据在不同的阶段按顺序处理。这样做的一个主要好处是它可以使应用程序中的数据处于线程安全的状态,因为该反应流中的应用程序代码永远不会被并发调用
-
landon-是因为一个一个的按顺序发射吗?
-
-
反应式和非阻塞编程通常不会使应用程序运行得更快,虽然在某些情况下它们可以(例如使用WebClient并行执行远程调用)做到更快。相反以非阻塞的方式来执行,需要做更多的额外工作,并且可能会增加处理所需的时间。反应式和非阻塞的关键好处是能够使用少量固定数量的线程和更少的内存实现系统可伸缩性。这使得应用程序在负载下更具弹性,因为它们以更可预测的方式扩展。但是为了得到这些好处,需要付出一些代价(比如不可预测的网络IO
-
-
编程模型
- WebFlux与Spring MVC的不同之处在于,它返回的都是Reactor库中的反应式类型Mono或者Flux对象
- WebFlux默认运行在Netty服务器上
- controller的执行是使用Netty的IO线程进行执行的。如果controller的执行比较耗时,则会把IO线程耗尽,从而不能再处理其他请求
- 为了能够让IO线程及时得到释放,我们可以在反应式类型上施加publishOn运算,让controller逻辑的执行切换到其他线程,以便及时释放IO线程。另外,线程调度器Schedulers也提供了让我们制定自己的线程池来执行异步任务的功能
- 在WebFlux的函数式编程模型中,使用HandlerFunction处理HTTP请求,Handler Function是一个接收ServerRequest并返回延迟写入结果的(delayed)ServerResponse(即Mono)的函数。WebFlux服务器接收请求后,会将请求路由到带有RouterFunction的处理函数
-
原理
- Reactor Netty提供基于Netty框架的无阻塞和回压的TCP / HTTP / UDP客户端和服务器。在WebFlux中主要使用其创建的HTTP服务器,Reactor Netty提供易于使用且易于配置的HttpServer类。它隐藏了创建HTTP服务器所需的大部分Netty功能,并添加了Reactive Streams回压
- NettyReactiveWebServerFactory
- DispatcherHandler#handle 路由
-
关于是选择Spring MVC还是WebFlux
- 如果你的Spring MVC应用程序运行正常,则无须更改。命令式编程是编写、理解和调试代码的最简单方法
- 如果你已使用非阻塞Web栈,则可以考虑使用WebFlux。因为Spring WebFlux提供与此相同的执行模型优势,并且提供了可用的服务器选择,还提供了可选择的编程模型以及可选择的反应库
- 如果你对与Java 8 Lambdas或Kotlin一起使用的轻量级、功能性Web框架感兴趣,则可以使用Spring WebFlux函数式Web端点
- 在微服务架构中,你可以将应用程序与Spring MVC、Spring WebFlux控制器、SpringWebFlux函数式端点混合使用
- 评估应用程序的一种简单方法是检查其依赖性。如果你要使用阻塞持久性API(JPA,JDBC)或网络API,则Spring MVC至少是常见体系结构的最佳选择。从技术上讲,Reactor和RxJava都可以在单独的线程上执行阻塞调用,但是你无法充分利用非阻塞的Web技术栈
- 如果你有一个调用远程服务的Spring MVC应用程序,则可尝试使用反应式WebClient。你可以直接从Spring MVC控制器方法返回反应式类型(Reactor、RxJava或其他)。每次调用的延迟或调用之间的相互依赖性越大,其益处就越大。Spring MVC控制器也可以调用其他反应式组件
-
landon
- 从技术上讲,Reactor和RxJava都可以在单独的线程上执行阻塞调用,但是你无法充分利用非阻塞的Web技术栈
- 但请记住,阻塞API不适合这种并发模型
- 即如果已有的依赖api是阻塞的,则和本身的异步非阻塞理念是冲突的。因为前提就是假设应用程序不会阻塞,所以使用小的固定大小的线程池来处理请求
-
Netty
-
Netty框架将网络编程逻辑与业务逻辑处理分离开来,其内部会自动处理好网络与异步处理逻辑,让我们专心写自己的业务处理逻辑。同时,Netty的异步非阻塞能力与CompletableFuture结合可以让我们轻松实现网络请求的异步调用
-
Netty之所以能提供高性能网络通信,其中一个原因是它使用Reactor线程模型。在Netty中,每个EventLoopGroup本身都是一个线程池,其中包含了自定义个数的NioEventLoop,每个NioEventLoop是一个线程,并且每个NioEventLoop里面持有自己的NIO Selector选择器。在Netty中,客户端持有一个EventLoopGroup用来处理网络IO操作;在服务器端持有两个EventLoopGroup,其中boss组是专门用来接收客户端发来的TCP链接请求的,worker组是专门用来处理完成三次握手的链接套接字的网络IO请求的
-
在Netty中,NioEventLoop是EventLoop的一个实现,每个NioEventLoop中会管理自己的一个selector选择器和监控选择器就绪事件的线程;每个Channel在整个生命周期中固定关联到某一个NioEventLoop;但是,每个NioEventLoop中可以关联多个Channel
-
ChannelPipeline:Netty中的ChannelPipeline类似于Tomcat容器中的Filter链,属于设计模式中的责任链模式,其中链上的每个节点就是一个ChannelHandler。在Netty中,每个Channel有属于自己的ChannelPipeline,管线中的处理器会对从Channel中读取或者要写入Channel中的数据进行依次处理
-
每个NioEventLoopGroup里面包含了多个Nio EventLoop,每个NioEventLoop中包含了一个NIO Selector、一个队列、一个线程;其中线程用来做轮询注册到Selector上的Channel的读写事件和对投递到队列里面的事件进行处理
-
每个NioEventLoop中会管理好多客户端发来的连接,并通过循环轮询处理每个连接的读写事件
-
Netty之所以说是异步非阻塞网络框架,是因为通过NioSocketChannel的write系列方法向连接里面写入数据时是非阻塞的,是可以马上返回的(即使调用写入的线程是我们的业务线程)
- 如果调用线程是IO线程,则会在IO线程上执行写入
- 如果发现调用线程不是IO线程,则会把写入请求封装为WriteTask并投递到与其对应的NioEventLoop中的队列里面,然后等其对应的NioEventLoop中的线程轮询连接套接字的读写事件时捎带从队列里面取出来并执行
- 每个NioSocketChannel对应的读写事件都是在与其对应的NioEvent Loop管理的单线程内执行的,不存在并发,所以无须加锁处理
-
使用Netty框架进行网络通信时,当我们发起请求后请求会马上返回,而不会阻塞我们的业务调用线程;如果我们想要获取请求的响应结果,也不需要业务调用线程使用阻塞的方式来等待,而是当响应结果出来时使用IO线程异步通知业务,由此可知,在整个请求-响应过程中,业务线程不会由于阻塞等待而不能干其他事情
-
完成TCP三次握手的套接字应该注册到worker线程池中的哪一个NioEventLoop的Selector上
- 关于NioEventLoop的分配,采用轮询取模的方式来进行分配
-
如果NioEventLoop中的线程负责监听注册到Selector上的所有连接的读写事件和处理队列里面的消息,那么会不会导致由于处理队列里面任务耗时太长导致来不及处理连接的读写事件
-
Netty默认是采用时间均分策略来避免某一方处于饥饿状态
-
处理所有注册到当前NioEventLoop的Selector上的所有连接套接字的读写事件
-
统计其耗时,默认情况下ioRatio为50
-
使用相同的时间来运行队列里面的任务,也就是处理套接字读写事件与运行队列里面任务是使用时间片轮转方式轮询执行
-
landon TODO runAllTasks中如何保证运行队列任务的时间和io一样?任务执行的时间不固定啊
// 从队列里拿出一个任务 Runnable task = pollTask(); ... // 设定deadline final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0 ... // 无限循环 for (;;) { // 执行任务,任务计数 safeExecute(task); runTasks ++; // landon-这里代码是核心,注释写的也比较清楚,当任务计数到了64个的时候才执行一次检测,如果执行时间超时,则直接break,所以这里如果有某个任务执行时间特别长,则是很有可能超过deadline的 // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } // landon-这里就是如果没有到64个任务,队列就没有,直接break.所以从代码上看,如果有很多队列的任务都很耗时,但是又没有超过64个,则肯定会导致指定执行队列的时间过长的,但是从设计上看,应该任务都会很快执行的 task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } }
-
-
多个套接字注册到同一个NioEventLoop的Selector上,使用单线程轮询处理每个套接字上的事件,如果某一个套接字网络请求比较频繁,轮询线程是不是会一直处理该套接字的请求,而使其他套接字请求得不到及时处理
- 默认情况下maxMessagePerRead为16,所以对应NioEventLoop管理的每个NioSocketChannel中的数据,在一次事件循环内最多连续读取16次数据,并不会一直读取,这就有效避免了其他NioSocketChannel的请求事件得不到及时处理的情况
-
基于Netty与CompletableFuture实现RPC异步调用
-
thenCombine
-
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,
other并不会等待先前的CompletableFuture执行完毕后再执行 -
其实从功能上来讲,它们的功能更类似
thenAcceptBoth,只不过thenAcceptBoth是纯消费,它的函数参数没有返回值,而thenCombine的函数参数fn有返回值 -
landon,所以合并之后的任务执行默认线程应该是不确定的
-
-
@Sharable注解是让服务端所有接收的链接对应的channel复用同一个NettyServerHandler实例,这里可以使用@Sharable方式是因为NettyServer Handler内的处理是无状态的,不会存在线程安全问题
-
rpcSyncCall
- 创建一个CompletableFuture future
- 创建一个消息,有消息id,异步发送
- 保存future上下文,<id,future>
- future.get同步等待结果
- 在handler收到消息的时候,从上下文拿到future,调用future.complete
-
rpcAsyncCall
- 则不是调用future.get等待结果,而是直接返回
- 然后future.whenComplete指定回调
-
可以把异步调用改造为Reactive编程风格,只需要把返回的CompletableFuture转换为Flowable即可
-
使用defer,当订阅的时候才执行rpc操作
-
future.whenComplete时,发射结果,这里使用了ReplayProcessor创建含有一个元素的流
landon-这里用到了Processor
-
发起rpc调用后马上返回了一个Flowable流对象,但这时真正的rpc调用还没有发出去,等代码3订阅了流对象时才真正发起rpc调用
-
由于CompletableFuture是可以设置回调函数的,所以把其转换为Reactive风格编程很容易
-
-
-
-
Dubbo
-
概述
- Provider为服务提供者集群,服务提供者负责暴露提供的服务,并将服务注册到服务注册中心
- Consumer为服务消费者集群,服务消费者通过RPC远程调用服务提供者提供的服务
- Registry负责服务注册与发现
- Monitor为统计服务的调用次数和调用时间的监控中心
- 服务提供者在启动时会将自己提供的服务注册到服务注册中心
- 服务消费者在启动时会去服务注册中心订阅自己所需服务的地址列表,由服务注册中心向它异步返回其所需服务接口的提供者的地址列表,再由服务消费者根据路由规则和设置的负载均衡算法选择一个服务提供者IP进行调用
- 监控平台主要用来统计服务的调用次数和调用耗时,服务消费者和提供者,在内存中累计调用次数和调用耗时,并定时每分钟发送一次统计数据到监控中心,监控中心则使用数据绘制图表来显示。监控平台不是分布式系统必须有的,但是这些数据有助于系统运维和调优。服务提供者和消费者可以直接配置监控平台的地址,也可以通过服务注册中心来获取
-
dubbo的异步调用
- Dubbo框架中的异步调用是发生在服务消费端的,异步调用实现基于NIO的非阻塞能力实现并行调用,服务消费端不需要启动多线程即可完成并行调用多个远程服务,相比多线程其开销较小
- 当服务消费端发起RPC调用时使用的是用户线程(步骤1),请求会被转换为IO线程(步骤2),具体向远程服务提供方发起远程调用
- 步骤2的IO线程使用NIO发起远程调用,用户线程通过步骤3创建了一个Future对象,然后通过步骤4将其设置到RpcContext中
- 然后用户线程则可以在某个时间从RpcContext中获取设置的Future对象(步骤5),并且通过步骤6设置回调函数,这样用户线程就返回了
- 当服务提供方返回结果(步骤7)后,调用方线程模型中的线程池线程会把结果通过步骤8写入Future,然后就会回调注册的回调函数
- 调用线程异步调用发起后会马上返回一个Future,并在Future上设置一个回调函数,然后调用线程就可以忙自己的事情去了,不需要同步等待服务提供方返回结果。当服务提供方返回结果时,调用方的IO线程会把响应结果传递给Dubbo框架内部线程池中的线程,后者则会回调注册的回调函数,由此可见,在整个过程中,发起异步调用的用户线程是不会被阻塞的
- 首先考虑在一个线程(记为线程A)中通过RPC请求获取服务B和服务C的数据,然后基于两者的结果做一些事情。在同步RPC调用情况下,线程A在调用服务B后需要等待服务B返回结果,才可以对服务C发起调用,等服务C返回结果后才可以结合服务B和服务C的结果做一件事
- 线程A同步获取服务B的结果后,再同步调用服务C获取结果,可见在同步调用的情况下,线程A必须按顺序对多个服务请求进行调用,因而调用线程必须等待,这显然会浪费资源。在Dubbo中,使用异步调用可以避免这个问题。两次异步远程过程调用,并行的
-
dubbo的异步执行
-
Dubbo框架的异步执行是发生在服务提供端的,在Provider端非异步执行时,其对调用方发来的请求的处理是在Dubbo内部线程模型的线程池中的线程来执行的,在Dubbo中服务提供方提供的所有服务接口都使用这一个线程池来执行,所以当一个服务执行比较耗时时,可能会占用线程池中的很多线程,这可能就会影响到其他服务的处理
-
Provider端异步执行则将服务的处理逻辑从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池中线程被过度占用,有助于避免不同服务间的互相影响
-
Provider端异步执行对节省资源和提升RPC响应性能是没有效果的,这是因为如果服务处理比较耗时,虽然不是使用Dubbo框架的内部线程,但还是需要业务自己的线程来处理,另外副作用还有会新增一次线程上下文切换(从Dubbo内部线程池线程切换到业务线程)
-
Landon:这里指provider的线程模型,即从io线程到dubbo的内部线程再到业务线程
和游戏服务器模型基本一致
注意:这个切换到业务线程执行对节省资源和提升RPC响应性能是没有效果的,而客户端异步调用这边是有作用的,因为不会阻塞调用线程
-
Dubbo中提供了两种异步处理方法
- 使用AsyncContext实现异步执行
- 用RpcContext.startAsync()开启服务异步执行,然后返回一个asyncContext
- 把服务处理任务提交到业务线程池后sayHello方法就直接返回了null
- 同时也释放了Dubbo内部线程池中的线程
- 具体业务处理逻辑则在自定义业务线程池内执行,任务内首先执行代码2.2切换任务的上下文,这是因为RpcContext.getContext()是ThreadLocal变量,不能跨线程,这里切换上下文就是为了把保存的上下文内容设置到当前线程内
- 最后把任务执行结果写入异步上下文
- 基于CompletableFuture签名的接口实现异步执行
- 基于定义CompletableFuture签名的接口实现异步执行需要接口方法返回值为CompletableFuture
- 方法内部使用CompletableFuture.supplyAsync让本来应由Dubbo内部线程池中线程处理的服务,转为由业务自定义线程池中线程来处理,所以Dubbo内部线程池线程会得到及时释放
- 调用sayHello方法的线程是Dubbo线程模型线程池中的线程,而业务在bizThreadpool中的线程处理,所以代码2.1保存了RpcContext对象(ThreadLocal变量),以便在业务处理线程中使用
- 使用AsyncContext实现异步执行
-
-
Dubbo demo
-
mac安装zookeeper
% brew info zookeeper zookeeper: stable 3.5.7 (bottled), HEAD Centralized server for distributed coordination of services https://zookeeper.apache.org/ Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/zookeeper.rb ==> Dependencies Build: ant ✘, autoconf ✘, automake ✘, libtool ✘, pkg-config ✘ ==> Options --HEAD Install HEAD version ==> Caveats To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start // 注: 2020.03.26 看zk官网zk的版本是3.6.0 % brew install zookeeper /usr/local/Cellar/zookeeper/3.5.7 // 配置文件目录 /usr/local/etc/zookeeper // 启动zk % zkServer start /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Starting zookeeper ... STARTED clientPort=2181 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Error contacting service. It is probably not running. // provider启动连zk报错,Exception in thread "main" java.lang.IllegalStateException: zookeeper not connected // 查看日志 /usr/local/etc/zookeeper 有log4j.properties log4j.appender.zklog.File = /usr/local/var/log/zookeeper/zookeeper.log 2020-03-26 14:21:11 NIOServerCnxnFactory [ERROR] Thread Thread[main,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.clear()Ljava/nio/ByteBuffer; at org.apache.jute.BinaryOutputArchive.stringToByteBuffer(BinaryOutputArchive.java:77) 2020-03-26 14:17:27 NIOServerCnxnFactory [ERROR] Thread Thread[NIOWorkerThread-1,5,main] died java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; % zkCli /usr/bin/java Connecting to localhost:2181 Welcome to ZooKeeper! % brew upgrade zookeeper // 临时解决办法 1. 手动去下载zookeeper 3.5.7 2. 拷贝zookeeper-3.5.7.jar和zookeeper-jute-3.5.7.jar到/usr/local/Cellar/zookeeper/3.5.7/libexec 3. 删除原有zookeeper-3.5.6-SNAPSHOT.jar - 猜测是是通过brew下载的zookeeper-3.5.6-SNAPSHOT.jar // 成功 % zkServer status /usr/bin/java ZooKeeper JMX enabled by default Using config: /usr/local/etc/zookeeper/zoo.cfg Client port found: 2181. Client address: localhost. Mode: standalone // 顺便装上maven % brew info maven maven: stable 3.6.3 Java-based project management https://maven.apache.org/ Conflicts with: mvnvm (because also installs a 'mvn' executable) Not installed From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/maven.rb ==> Dependencies Required: openjdk ✘ % brew install maven // 配合文件 /usr/local/Cellar/maven/3.6.3_1/libexec/conf -
provider application两种方式
- startWithBootstrap和startWithExport,推荐前者
-
consumer application两种方式
- runWithBootstrap和runWithRefer,推荐前者
- ReferenceConfig对象内部封装了所有通讯细节,对象较重,请缓存复用
- ReferenceConfigCache.getCache().get(reference)
- 传统的dubbo服务面向接口编程,如果需要调用其他服务则需要引入该服务对应的接口
- 比如跨语言接口,所以dubbo支持api泛化调用,即invoke传入接口名字,method和parameter等即可
- ReferenceConfig
-
异步调用
-
最简单的就是服务接口返回CompletableFuture,接口实现则是使用CompletableFuture.supplyAsync,传入自定义线程池
- 第二种接口实现是使用Serverlet 3.0的异步接口
AsyncContext- 注意:Dubbo提供了一个类似Serverlet 3.0的异步接口
AsyncContext,在没有CompletableFuture签名接口的情况下,也可以实现Provider端的异步执行 - 这个是指没有CompletableFuture签名接口
- 注意:Dubbo提供了一个类似Serverlet 3.0的异步接口
- 这种是服务端异步执行。Provider端异步执行将阻塞的业务从Dubbo内部线程池切换到业务自定义线程,避免Dubbo线程池的过度占用,有助于避免不同服务间的互相影响。异步执行无益于节省资源或提升RPC响应性能,因为如果业务执行需要阻塞,则始终还是要有线程来负责执行。
- 第二种接口实现是使用Serverlet 3.0的异步接口
-
对于consumer
- 接口直接返回结果,实现也直接返回
- 但是consumer这边,reference.setAsync(true)
- 这样consumer这边调用接口就直接返回了
- 然后通过RpcContext.getContext().getCompletableFuture()获得future并设置complete回调
- 另外一种是不设置reference.setAsync(true)
- 而是直接context.asyncCall,传入一个callable执行同步方法,并返回CompletableFuture
- 这两种相当于接口的实现是同步的,但是在调用端执行了异步,调用即返回然后回调
-
总结
-
consumer可以异步执行,此时接口可以是同步的,可以直接返回,也可以用CompletableFuture.completedFuture包装
-
provider可以异步执行,接口必须要是CompletableFuture
-
两者解决的问题不同。如果provider端的服务比较耗时,建议切到自定义业务线程池。而consumer端则通常都是异步的,不影响调用端
- 即两端都可以异步,无论接口定义是什么样子的
-
如果你只有这样的同步服务定义,而又不喜欢RpcContext的异步使用方式。那还有一种方式,就是利用Java 8提供的default接口实现,重载一个带有带有CompletableFuture签名的方法
-
这个指的consumer端
-
https://github.com/apache/dubbo-async-processor#compiler-hacker-processer
-
在测试过程中无法找到AsyncSignal这个类,参数用了另外一个字段,测试倒是通过。所以猜测AsyncSignal只是示例,相当于一个标识符,‘specially designed to distinguish async method’
public interface GreetingsService { String sayHi(String name); // AsyncSignal is totally optional, you can use any parameter type as long as java allows your to do that. default CompletableFuture<String> sayHi(String name, AsyncSignal signal) { return CompletableFuture.completedFuture(sayHi(name)); } } -
另外一种实现是Compiler hacker processer,第一种是AsyncSignal
-
就是新写一个方法,重命名,如sayHiAsync,返回CompletableFuture
public interface GreetingsService { String sayHi(String name); // Any name is ok default CompletableFuture<String> sayHiAsync(String name) { return CompletableFuture.completedFuture(sayHi(name)); } } -
缺点是之前的方法就不再起作用
-
The essential part is to overwrite a new async method. Another way would be to generate a new method with a different name, for example,
sayHiAsync, then we can get rid ofAsyncSignal. But there's an obvious flaw of this approach, that is, all method level configurators and routers defined tosayHiwill not take effect anymore.
-
-
-
-
-
Disruptor
-
Disruptor是一个高性能的线程间消息传递库
-
要理解Disruptor是什么,最好的方法是将它与目前你已经很好地理解且与之非常相似的东西进行比较,例如与Java的BlockingQueue进行对比。与队列一样,Disruptor的目的也是在同一进程内的线程之间传递数据(例如消息或事件)
- Disruptor中的同一个消息会向所有消费者发送,即多播能力(Multicast Event)
- 为事件预先分配内存(Event Preallocation),避免运行时因频繁地进行垃圾回收与内存分配而增加开销
- 可选择无锁(Optionally Lock-free),使用两阶段协议,让多个线程可同时修改不同元素
- 缓存行填充,避免伪共享(prevent false sharing)
-
Disruptor使用Sequence作为识别特定组件所在位置的方法。每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence
-
Sequencer是Disruptor的真正核心。该接口的2个实现(单生产者和多生产者)实现了所有并发算法,用于在生产者和消费者之间快速、正确地传递数据
-
Wait Strategy:等待策略,确定消费者如何等待生产者将事件放入Disruptor
-
每个消费者持有自己的当前消费序号,由于是环形buffer,因而生产者写入事件时要看序号最小的消费者序号,以避免覆盖还没有被消费的事件
-
Disruptor具有多播能力(Multicast),这是Java中队列和Disruptor之间最大的行为差异。当有多个消费者在同一个Disruptor上监听事件时,所有事件都会发布给所有消费者,而Java队列中的每个事件只会发送给某一个消费者。Disruptor的行为旨在用于需要对同一数据进行独立的多个并行操作的情况
-
Disruptor的目标之一是在低延迟环境中使用。在低延迟系统中,必须减少或移除运行时内存分配;在基于Java的系统中,目的是减少由于垃圾收集导致的系统停顿。为了支持这一点,用户可以预先为Disruptor中的事件分配其所需的存储空间(也就是声明Ring Buffer的大小)。在构造Ring Buffer期间,EventFactory由用户提供,并将在Disruptor的Ring Buffer中每个事件元素创建时被调用。将新数据发布到Disruptor时,API将允许用户获取构造的对象,以便调用方法或更新该存储对象上的字段,Disruptor保证这些操作只要正确实现就是并发安全的
-
低延迟期望推动的另一个关键实现细节是使用无锁算法来实现Disruptor,所有内存可见性和正确性保证都是使用内存屏障(体现为volatile关键字)或CAS操作实现的。在Disruptor的实现中只有一种情况需要实际锁定—当使用BlockingWaitStrategy策略时,这仅仅是为了使用条件变量,以便在等待新事件到达时parked消费线程。许多低延迟系统将使用忙等待(busy-wait)来避免使用条件可能引起的抖动,但是大量在系统繁忙等待的操作可能导致性能显著下降,尤其是在CPU资源严重受限的情况下
-
在JDK的BlockingQueue中添加或取出元素时是需要加独占锁的,通过锁来保证多线程对底层共享的数据结构进行并发读写的线程安全性,使用锁会导致同时只有一个线程可以向队列添加或删除元素。Disruptor则使用两阶段协议,让多个线程可同时修改不同元素,需要注意,消费元素时只能读取到已经提交的元素。在Disruptor中某个线程要访问Ring Buffer中某个序列号下对应的元素时,要先通过CAS操作获取对应元素的所有权(第一阶段),然后通过序列号获取对应的元素对象并对其中的属性进行修改,最后再发布元素(第二阶段),只有发布后的元素才可以被消费者读取。当多个线程写入元素时,它们都会先执行CAS操作,获取到Ring buffer中的某一个元素的所有权,然后可以并发对自己的元素进行修改。注意,只有序列号小的元素发布后,后面的元素才可以发布。可知相比使用锁,使用CAS大大减少了开销,提高了并发度
-
其实在单生产者的情况下Disruptor甚至可以省去CAS的开销,因为在这种情况下,只有一个线程来请求修改Ring Buffer中的数据,生产者的序列号使用普通的long型变量即可。在创建Disruptor时是可以指定是单生产者还是多生产者的,如果你的业务就是单生产者模型,那么创建Disruptor时指定生产者模式为ProducerType.SINGLE效果会更好
-
Disruptor中的Ring Buffer底层是一个地址连续的数组,数组内相邻的元素很容易会被放入同一个Cache行里,从而导致伪共享的出现。Disruptor则通过缓存行填充,让数组中的每个元素独占一个缓存行从而解决了伪共享问题的出现。另外为了避免Ring Buffer中序列号(定位元素的游标)与其他元素共享缓存行,对其也进行了缓存行填充,以提高访问序列号时缓存的命中率
-
关键实现原理
landon
- 目前看线程,每1个consumer handler都是固定线程的,依次排队去消费产生的事件。
- 有多个消费者,每个消费者有自己的sequence。因为是环形buffer,当缓冲期满了,但还有没有消费的元素,那么此时生产者只能等。其实这个和阻塞队列满时是一样的。那么什么时候等?只要判断当前很多消费者的最小sequence,这里指在环里的位置,那么生产的当前在环里的位置一一定小于这个位置。这个通常指生产者速度较快,过了一圈后,消费者还没有消费完之前的元素
- 测试是buffer为4,依次放0,1,2,10,11,12
- 0号元素的消费者handler耗时,当生产11(index 0)时,则阻塞等待0号消耗完毕
- 关于ringbuffer
- 我们实现的ring buffer和大家常用的队列之间的区别是,我们不删除buffer中的数据,也就是说这些数据一直存放在buffer中,直到新的数据覆盖他们
- 为什么要和ArrayBlockingQueue对比呢?这是因为两个底层的数据结构类似,都是通过一个环形数组实现
-
https://colobu.com/2014/12/22/why-is-disruptor-faster-than-ArrayBlockingQueue/
- ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发
- 压入元素时:如果数组已满,则等待notFull,如果消费者取出了元素,则会调用
notFull.signal();: - 这时put方法会被唤醒
- 取出元素时:如果数组为空,则调用
notEmpty.await();等待, enqueue会调用notEmpty.signal();唤醒它: - 这种wait-notify(signal)也就是教科书上标准的处理队列的方式
- 压入元素时:如果数组已满,则等待notFull,如果消费者取出了元素,则会调用
- RingBuffer使用了padding方式来提供CPU cache的命中率
- 如果producer生产的快,追上消费者的时候
可以通过gatingSequences让生产者等待消费者消费。
这个时候是通过
LockSupport.parkNanos(1L);不停的循环,直到有消费者消费掉一个或者多个事件 - 如果消费者消费的快,追上生产者的时候
这个时候由于消费者将自己最后能处理的sequence写回到光标后
sequence.set(availableSequence);, 如果生产者还没有写入一个事件, 那么它就会调用waitStrategy.waitFor等待。 如果生产者publish一个事件,它会更改光标的值:cursor.set(sequence);,然后通知等待的消费者继续处理waitStrategy.signalAllWhenBlocking(); - 在使用BlockingWaitStrategy情况下,其实这和ArrayBlockingQueue类似,因为ArrayBlockingQueue也是通过Lock的方式等待。 性能测试结果显示Disruptor在这种策略下性能比ArrayBlockingQueue要略好一点,但是达不到10倍的显著提升,大概两倍左右。 这大概就是生产者使用不断的LockSupport.parkNanos方式带来的提升吧
- 但是如果换为YieldingWaitStrategy, CPU使用率差别不大,但是却带来了10倍的性能提升。 这是因为消费者不需sleep, 通过spin-yield方式降低延迟率,提高了吞吐率
- 多生产者时在请求下一个sequence时有竞争的情况,所以通过
cursor.compareAndSet(current, next)的spin来实现,直到成功的设置next才返回
- ArrayBlockingQueue通过ReentrantLock以及它的两个condition来控制并发
- 伪共享
- 数据X、Y、Z被加载到同一Cache Line中,线程A在Core1修改X,线程B在Core2上修改Y。根据MESI大法,假设是Core1是第一个发起操作的CPU核,Core1上的L1 Cache Line由S(共享)状态变成M(修改,脏数据)状态,然后告知其他的CPU核,图例则是Core2,引用同一地址的Cache Line已经无效了;当Core2发起写操作时,首先导致Core1将X写回主存,Cache Line状态由M变为I(无效),而后才是Core2从主存重新读取该地址内容,Cache Line状态由I变成E(独占),最后进行修改Y操作, Cache Line从E变成M。可见多个线程操作在同一Cache Line上的不同数据,相互竞争同一Cache Line,导致线程彼此牵制影响,变成了串行程序,降低了并发性。此时我们则需要将共享在多线程间的数据进行隔离,使他们不在同一个Cache Line上,从而提升多线程的性能
-
-
akka
-
Akka是一个工具包,用于在JVM上构建高并发、分布式、弹性、基于消息驱动的应用程序
- Akka基于Actor模型和Streams,让我们可以构建可伸缩的,并且可以高效使用服务器资源,使用多个服务器进行扩展的系统
- 在单台计算机上可以处理高达每秒5000万条消息。内存占用少;每GB堆可以创建约250万个actor(参与者)
-
锁
- 使用锁会严重影响并发度,使用锁在现在CPU架构中是一个比较昂贵的操作,因为当线程获取锁失败后会把线程从用户态切换到内核态把线程挂起,稍后唤醒后又需要从内核态切换到用户态继续运行
- 获取锁失败的调用线程会被阻塞挂起,因此它不能做任何有意义的事情。即使在桌面应用程序中这也是不可取的,我们想要的是即使后台有一个比较耗时的工作在运行,也要保证系统对用户的一部分请求有响应。在后端应用中,阻塞是完全浪费资源的。另外可能有人认为,虽然当前线程阻塞了,但是我们可以通过启动新线程来弥补这一点,需要注意,线程也是一种昂贵的资源,操作系统对线程个数是有限制的
- 锁的存在带来了新的威胁,即死锁问题
- 如果不使用足够多的锁,则不能保证多线程下对象中数据不受到破坏
- 如果在对象中每个数据访问时都加了锁,则会导致系统性能下降,并且很容易导致死锁
- 锁只能在单JVM内(本地锁)很好地工作。当涉及跨多台机协调时,只能使用分布式锁。但是分布式锁的效率比本地锁低几个数量级,这是因为分布式锁协议需要跨多台机在网络上进行多次往返通信,其造成的最大影响就是延迟
-
共享内存
- 在现在计算机硬件架构中,计算机系统中为了解决主内存与CPU运行速度的差距,在CPU与主内存之间添加了一级或多级高速缓冲存储器(Cache),每个Cache由多个Cache行组成,这些Cache一般是集成到CPU内部的,所以也叫CPU Cache。当我们写入变量时,实际是写入当前CPU的Cache中,而不是直接写入主内存中,并且当前CPU核对自己Cache写入的变量对其他CPU核是不可见的,这就是Java内存模型中共享变量的内存不可见问题
-
堆栈
- 主调用线程需要在异步任务执行完毕或者出异常时被通知,但是没有调用堆栈可以传递异常。异步任务执行失败的通知只能通过辅助方式完成,比如Future方式,将错误码写到主调用线程所在的地方。如果没有此通知,则主调用线程将永远不会收到失败通知,并且任务将丢失
- 当真的发生错误时,这种情况会变得更糟,当异步工作线程遇到错误时会导致最终陷入无法恢复的境地。异步线程当前正在执行的实际任务并没有存放起来。实际上,由于到达顶部的异常使所有调用栈退出,任务状态已经完全丢失了
- 为了在当前系统上实现任何有意义的并发性和提高性能,线程必须以高效的方式在彼此之间委派任务,而不会阻塞。使用这种类型的任务委派并发(甚至在网络/分布式计算中更是如此),基于调用堆栈的错误处理会导致崩溃。因此需要引入新的显式错误信令机制,让失败成为域模型的一部分
- 具有工作委派的并发系统需要处理服务故障,并需要具有从故障中恢复的原则与方法。此类服务的客户端需要注意,任务/消息可能会在重新启动期间丢失。即使没有发生损失,由于先前排队的任务(较长的队列)或者垃圾回收导致的延迟等,将会导致响应可能会被任意延迟。面对这些情况,并发系统应以超时的形式处理响应截止日期
-
Actor模型解决了传统编程模型的问题
- 在Actor模型中每个Actor都有自己的地址,Actor之间通过地址相互通过消息通信。Actor的目的是处理消息,这些消息是从其他Actor发送给当前Actor的。连接发送方和接收方Actor的是Actor的邮箱
- Akka中对失败的处理使用了“让它崩溃”的理念,这部分关键代码被监控者监控着(每个Actor实际就是一个监控者),监控者的唯一职责是知道失败后该干什么
- 另外Actor模型并不在意接收消息的是当前JVM内的Actor还是远端机器上的Actor,这允许我们基于许多计算机上构建系统
- 使用消息传递避免锁和阻塞
- Actor模型中组件之间的相互通信不再使用方法调用,而是通过发消息的方式进行通信,使用发消息的方式,不会导致发消息的调用线程的执行权转移到消息接收者。每个Actor可以连续发消息,由于是异步的,不会被阻塞。因此在同等时间内其可以完成更多工作
- 对于对象,当调用其方法返回时,它会释放调用其线程的控制权;Actor的行为与对象类似,当接收者Actor接收到消息后,会对消息进行反应,并在处理完消息后返回,所以Actor的执行符合我们认知中的执行逻辑
- 传递消息和调用方法之间的重要区别是,消息没有返回值。通过发送消息,Actor会将工作委托给另一个Actor。正如我们在调用堆栈误解中看到的那样,如果期望返回值,则发送方Actor调用线程将需要阻塞或调用线程会执行其他Actor的工作。相反,接收方会在回复消息中传递结果
- Actor对消息做出反应,就像对象对在其上的调用方法一样。区别在于,接收消息的Actor是独立于消息发送方Actor执行的,是一次接一个地响应传入的消息,而不是多个线程并发执行,因此不会破坏Actor内部状态和不变量。当每个Actor都按顺序处理发送给它的消息时,不同的Actor会并发工作,因此Actor系统可以同时处理硬件支持的尽可能多的消息
- 由于每个Actor同时最多只能处理一条消息,因而可以保持Actor的不变性,而无须使用锁等进行同步
- 当Actor收到消息时,会发生以下情况
- Actor将消息添加到队列的末尾
- 如果Actor没有被安排执行,则将其标记为准备执行
- Actor系统框架内的调度程序将接收该Actor并开始执行它
- Actor从队列的前面选择消息
- Actor修改内部状态,将消息发送给其他Actor
- Actor处于无调度、空闲状态
- 为了实现上述行为,Actor需要具有下面特性
- 一个邮箱(用于存放发送者发来的消息)
- 行为(Actor的状态、内部变量等)
- 消息(代表信号的数据片段,类似于方法调用及其参数)
- 执行环境(一种使具有消息的Actor响应并调用其消息处理代码的机制)
- 地址(每个Actor有自己的地址)
- 其中,Actor的行为描述了其如何响应消息(例如发送更多消息和/或更改状态)。执行环境则编排了一个线程池,以透明地驱动所有这些动作
- 通过将执行与信号分离(方法调用方式会转换任务的执行权,消息传递则不会)来保留封装性
- 不需要锁。只能通过消息修改Actor的内部状态,而消息是顺序处理的,以试图消除保持不变性时的竞争问题
- 任何地方都没有使用锁,发送者也不会被阻塞。可以在十几个线程上有效地调度数百万个Actor,从而充分发挥现代CPU的潜力
- Actor的状态是本地的而不是共享的,更改和数据通过消息进行传递,这与现代系统中内存的实际工作方式相对应
- 使用Actor优雅地处理错误
- 当目标Actor上运行被代理的任务发生错误时,比如任务内参数校验错误或者执行抛出了NPE异常等。在这种情况下,目标Actor封装的服务是完整的,只是任务执行本身发生了错误。目标Actor应该向消息发送方回复一条消息,提示错误情况
- 当服务本身遇到内部错误时,Akka强制将所有Actor组织成树状层次结构,即创建另一个Actor的Actor成为该新Actor的父节点。这与操作系统将进程组织到树结构中的方式非常相似。就像进程一样,当一个Actor失败时,它的父Actor会收到通知,并且可以对失败做出反应。同样,如果父Actor停止了,则其所有子Actor也将递归停止。这项服务被称为监督(supervisor),它是Akka的核心
-
https://developer.lightbend.com/guides/akka-quickstart-java/
-
demo
-
remote示例要引入akka-remote_2.13
-
https://doc.akka.io/docs/akka/current/remoting.html
- Classic Remoting (Deprecated)
- Artery Remoting instead
-
注意事项
-
默认的配置文件
- 这是以前的remote方式,而新版默认是用的Artery,默认端口是25520
- 即使按照下面的配置文件指定端口也不会,启动的时候看日志会看到是artery方式
- 需要关闭Artery:remote.artery.enabled = false
- 当关闭此artery后,启动会报错'Classic remoting is enabled but Netty is not on the classpath'
- 需要:Classic remoting depends on Netty. This needs to be explicitly added as a dependency so that users not using classic remoting do not have to have Netty on the classpath:
akka { actor { // 远端的actor provider = "akka.remote.RemoteActorRefProvider" } remote { // tcp传输 enabled-transports = ["akka.remote.netty.tcp"] // ip和监听端口 netty.tcp { hostname = "127.0.0.1" port = 2552 } } }
-
-
按照新的配置方式
- artery方式,则启动的actor顺利按照指定的2552端口监听
akka { actor { // 远端的actor provider = "akka.remote.RemoteActorRefProvider" allow-java-serialization = on } remote { artery { transport = tcp canonical.hostname = "127.0.0.1" canonical.port = 2552 } } } -
另外远程要传递消息,做序列化,所以要打开allow-java-serialization,否则序列化报错
-
另外ActorSelection指定远程的path的actor名字是远程通过actorOf创建的actor名字
-
-
Akka中每个Actor都有自己的地址,可以是本地的,也可以是远程的,对于远程的Actor,只需要将其地址配置好,就可以像本地Actor一样使用了
-
-
rocketmq
-
RocketMQ主要由4部分组成,分别为NameServer集群、Broker集群、Producer集群和Consumer集群。每部分都可以进行水平扩展,而不会出现单点问题
- NameServer集群:名称服务集群,提供轻量级的服务发现与路由服务,每个名称服务器记录了全部Broker的路由信息,并且提供相应的读写服务,支持快速存储扩展
- Broker集群:Broker集群,Broker通过提供轻量级的主题和队列机制来维护消息存储。它支持推和拉两种模型,包含容错机制(2个副本或3个副本),并提供了强大的平滑峰值,提供积累数以亿计的消息并保证其在原始时间顺序的被消费能力。此外,Broker也提供灾难恢复、丰富的度量统计和警报机制
- Producer集群:生产者集群,提供分布式部署,分布式的生产者发送消息到Broker集群,具体选择哪一个Broker机器是通过一定的负载均衡策略来决定的,发送消息中支持故障快速恢复,并且具有较低的延迟
- Consumer集群:消费者集群,消费者在推和拉模型中支持分布式部署。它还支持集群消费和消息广播。它提供实时消息订阅机制,可以满足大多数消费者的需求
-
Broker在启动时会去连接NameServer,然后将topic信息注册到NameServer, NameServer维护了所有topic的信息和对应的Broker路由信息。Broker与NameServer之间是有心跳检查的,NameServer发现Broker挂了后,会从注册信息里面删除该Broker,这类似Zookeeper实现的服务注册;Producer则需要配置NameServer的地址,然后定时从NameServer获取对应topic的路由信息(这个topic的消息应该路由到那个Broker)。同时Producer与NameServer、Proudcer与Broker有心跳检查;同样,Consumer需要配置NameServer的地址,然后定时从NameServer获取对应topic的路由信息(应该从那个Broker的消息队列获取消息)。同时Consumer与NameServer、Consumer与Broker有心跳检查。
-
demo
-
rocketmq-all-4.7.0 编译
% pwd /Users/landon30/2020/demo/rocketmq-logging 4.7.0 % mvn -Prelease-all -DskipTests clean install -U // 编译报错 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.5.1:compile (default-compile) on project rocketmq-logging: Compilation failure: Compilation failure: [ERROR] 不再支持源选项 6。请使用 7 或更高版本。 [ERROR] 不再支持目标选项 6。请使用 7 或更高版本。 修改 <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> 同理其他模块 % pwd /Users/landon30/2020/demo/rocketmq-rocketmq-all-4.7.0/distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 // 需要设置JAVA_HOME环境变量 ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! /Library/Java/JavaVirtualMachines/jdk1.8.0_241.jdk/Contents/Home vim ~/.bash_profile,编辑环境变量 // 启动nameserver % nohup sh bin/mqnamesrv & [1] 17557 appending output to nohup.out % tail -f ~/logs/rocketmqlogs/namesrv.log 2020-04-09 16:28:21 INFO main - tls.client.keyPassword = null 2020-04-09 16:28:21 INFO main - tls.client.certPath = null 2020-04-09 16:28:21 INFO main - tls.client.authServer = false 2020-04-09 16:28:21 INFO main - tls.client.trustCertPath = null 2020-04-09 16:28:21 INFO main - Using OpenSSL provider 2020-04-09 16:28:21 INFO main - SSLContext created for server 2020-04-09 16:28:22 INFO main - Try to start service thread:FileWatchService started:false lastThread:null 2020-04-09 16:28:22 INFO NettyEventExecutor - NettyEventExecutor service started 2020-04-09 16:28:22 INFO FileWatchService - FileWatchService service started 2020-04-09 16:28:22 INFO main - The Name Server boot success. serializeType=JSON 17589 NamesrvStartup // 启动broker 异常 // 为什么总报 java.lang.NoSuchMethodError // 应该是编译问题 比如编译是用jdk9+编译的,但是却运行在小于9的环境中 // 所以应该是netty编译环境的问题 > nohup sh bin/mqbroker -n localhost:9876 & 2020-04-09 18:02:42 WARN brokerOutApi_thread_1 - registerBroker Exception, localhost:9876 org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <ocalhost/127.0.0.1:9876> failed at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:440) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:373) ~[rocketmq-remoting-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.registerBroker(BrokerOuterAPI.java:194) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI.access$000(BrokerOuterAPI.java:61) ~[rocketmq-broker-4.7.0.jar:4.7.0] at org.apache.rocketmq.broker.out.BrokerOuterAPI$1.run(BrokerOuterAPI.java:150) ~[rocketmq-broker-4.7.0.jar:4.7.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_241] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_241] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241] Caused by: io.netty.handler.codec.EncoderException: java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer; at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:125) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] https://github.com/eclipse/jetty.project/issues/3244 参考 2020-04-09 18:16:06 INFO main - The broker[landon30deMacBook-Pro.local, 10.2.182.240:10911] boot success. serializeType=JSON and name server is localhost:9876 // 参考 https://github.com/netty/netty/issues?q=java.lang.NoSuchMethodError // 临时解决方案是将pom.xml中netty的版本从4.0.42切换到了4.0.43,解决 -
https://rocketmq.apache.org/docs/quick-start/
> unzip rocketmq-all-4.7.0-source-release.zip > cd rocketmq-all-4.7.0/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/rocketmq-4.7.0/rocketmq-4.7.0 Start Name Server > nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success... Start Broker > nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
-
-
consumer
- 同一个消费集群的每台机器中的实例名称要一样。然后设置了NameServer的地址为127.0.0.1:9876
- 从第一个消息的偏移量开始消费,指定订阅主题和主题下的tag
- 设置回调的消息处理
- 启动消费实例,连接NameServer获取Broker的地址,并与Broker进行连接
-
producer sync
- 同一个生产者集群实例中的实例名称要一致。然后设置了NameServer的地址为127.0.0.1:9876
- 启动生产者实例,然后实例就会去连接NameServer并获取Broker的地址,然后生产者实例就会与Broker建立连接
- 创建Message消息实体,其中第一个参数为主题名称,这里为TopicTest;第二个参数为Tag类型,这里为TagA;第三个参数为消息体内容,是个二进制数据。代码4.2调用生产者实例的send方法同步发送消息,需要注意,这里同步意味着当消息同步通过底层网络通信投递到TCP发送buffer后才会返回,整个过程中需要阻塞调用线程
- 调用线程调用RocketmqClient的send方法发送消息后,其内部会首先创建一个ResponseFuture对象,并切换到IO线程把请求发送到Broker,接着调用线程会调用ResponseFuture的wait方法阻塞调用线程,等IO线程把请求写入TCP发送Buffer后,IO线程会设置ResponseFuture对象说请求已经完成,然后调用线程就会从wait方法返回。需要注意的是,RocketMQ返回成功是指已经把请求发送到了其TCP发送Buffer中,这时候请求还没到Broker
-
producer.async
- 注意demo中,主线程不能结束,否则抛出异常RemotingConnectException,所以加了一个sleep
- 在发送消息的同时设置了一个CallBack,调用该方法后,该方法会马上返回,然后等真的把消息投递到Broker后,底层IO线程会回调设置的Callback来通知,消息已经发送成功或者消息发送失败的原因
- RocketMQ客户端内部把请求提交到线程池后就返回了。消息发送任务会被在线程池内异步执行,异步发送任务内首先会创建一个ResponseFuture对象,然后切换到IO线程来具体发送请求,等IO线程将请求发送到TCP发送Buffer后,IO线程会设置ResponseFuture对象的值,然后ResponseFuture中保存的CallBack的执行切换到线程池来执行。可知使用异步发送消息方式调用线程不会被阻塞
-
-
Go语言概述
- 传统的编程模型,比如经常使用Java、C++、Python编程时,多线程之间需要通过共享内存(比如在堆上创建的共享变量)来通信。这时为保证线程安全,多线程共享的数据结构需要使用锁来保护,多线程访问共享数据结构时需要竞争获取锁,只有获取到锁的线程才可以存取共享数据
- Go中不仅在语言层面提供了这种低级并发同步原语—锁,比如互斥锁、读写锁、条件变量等,而且Go的并发原语—goroutine和channel提供了一种优雅而独特的结构化开发并发软件的方式。Go鼓励使用channel在goroutine之间传递对共享数据的引用,而不是明确地使用锁来保护对共享数据的访问。这种方法确保在给定时间只有一个goroutine可以访问共享数据。这个理念被总结为:不要通过共享内存来通信,而要通过通信来共享内存
- Go中并发模型采用了通道,体现为CSP的一个变种。之所以选择CSP,一方面是因为Google的开发工程师对它的熟悉程度,另一方面因为CSP具有一种无须对其模型做任何深入的改变就能轻易添加到过程性编程模型中的特性
- 在其他语言,比如Java中线程模型的实现是一个操作系统内核线程对应着一个使用new Thread创建的线程,而由于操作系统线程个数是有限制的,所以限制了创建线程的个数。另外,当线程执行阻塞操作时,线程要从用户态切换到内核态执行,这个开销是比较大的;而在Go中线程模型则是一个操作系统线程对应多个goroutine,用户可以创建的goroutine个数只受内存大小限制,而且上下文切换发生在用户态,切换速度比较快,并且开销比较小,所以Go中一台机器可以创建百万个goroutine
- 在Java中编写并发程序时需要在操作系统线程层面进行考虑,但是在Go中,不需要考虑操作系统线程,而是需要站在goroutine和通道上进行思考,当然有时候也会在共享内存上进行思考
- 在Go中只需要在要异步执行的方法前面加上go关键字,就可以让方法与主goroutine并发运行。另外结合goroutine和channel,可以方便地实现异步非阻塞回压功能
-
Go语言的线程模型
-
一对一模型
- 这种线程模型下用户线程与内核线程是一一对应的,当从程序入口点(比如main函数)启动后,操作系统就创建了一个进程。这个main函数所在的线程就是主线程。在main函数内当我们使用高级语言创建一个用户线程的时候,其实对应创建了一个内核线程
- 这种线程模型的优点是,在多处理器上多个线程可以真正实现并行运行,并且当一个线程由于网络IO等原因被阻塞时,其他线程不受影响
- 缺点是由于一般操作系统会限制内核线程的个数,所以用户线程的个数会受到限制。另外由于用户线程与系统线程一一对应,当用户线程比如执行IO操作(执行系统调用)时,需要从用户态的用户程序执行切换到内核态执行内核操作,然后等执行完毕后又会从内核态切换到用户态执行用户程序,而这个切换操作开销是比较大的
- 另外这里提下,Java的线程模型就是使用的这种一对一的模型,所以Java中多线程对共享变量使用锁同步时会导致获取锁失败的线程进行上下文切换,而JUC包提供的无锁CAS操作则不会产生上下文切换
-
多对一模型
- 多对一模型是指多个用户线程对应一个内核线程,同时同一个用户线程只能对应一个内核线程,这时候对应同一个内核线程的多个用户线程的上下文切换是由用户态的运行时线程库来做的,而不是由操作系统调度系统来做的
- 这种模型的好处是由于上下文切换在用户态,因而切换速度很快,开销很小;另外,可创建的用户线程的数量可以很多,只受内存大小限制
- 这种模型由于多个用户线程对应一个内核线程,当该内核线程对应的一个用户线程被阻塞挂起时,该内核线程对应的其他用户线程也不能运行,因为这时候内核线程已经被阻塞挂起了。另外这种模型并不能很好地利用多核CPU进行并发运行
-
多对多模型
- 多对多模型则结合一对一和多对一模型的特点,让大量的用户线程对应少数几个内核线程
- 这时候每个内核线程对应多个用户线程,每个用户线程又可以对应多个内核线程,当一个用户线程阻塞后,其对应的当前内核线程会被阻塞,但是被阻塞的内核线程对应的其他用户线程可以切换到其他内核线程上继续运行,所以多对多模型是可以充分利用多核CPU提升运行效能的
- 另外多对多模型也对用户线程个数没有限制,理论上只要内存够用可以无限创建
-
Go线程模型属于多对多线程模型
- 每个内核线程对应多个用户线程,每个用户线程又可以对应多个内核线程,当一个用户线程阻塞后,其对应的当前内核线程会被阻塞,但是被阻塞的内核线程对应的其他用户线程可以切换到其他内核线程上继续运行,所以多对多模型是可以充分利用多核CPU提升运行效能的
- Go中使用Go语句创建的goroutine可以认为是轻量级的用户线程。Go线程模型包含3个概念:内核线程(M-Machine)、goroutine(G-Goroutine)和逻辑处理器(P-Processor)。在Go中每个逻辑处理器(P)会绑定到某一个内核线程上,每个逻辑处理器(P)内有一个本地队列,用来存放Go运行时分配的goroutine。在上面介绍的多对多线程模型中是操作系统调度线程在物理CPU上运行,在Go中则是Go的运行时调度goroutine在逻辑处理器(P)上运行
- 在Go中存在两级调度,一级是操作系统的调度系统,该调度系统调度逻辑处理器占用CPU时间片运行;一级是Go的运行时调度系统,该调度系统调度某个goroutine在逻辑处理上运行
- 使用Go语句创建一个goroutine后,创建的goroutine会被放入Go运行时调度器的全局运行队列中,然后Go运行时调度器会把全局队列中的goroutine分配给不同的逻辑处理器(P),分配的goroutine会被放到逻辑处理器(P)的本地队列中,当本地队列中某个goroutine就绪后,待分配到时间片后就可以在逻辑处理器上运行了
- 为了避免某些goroutine出现饥饿现象,被分配到某一个逻辑处理器(P)上的多个goroutine是分时在该逻辑处理器上运行的,而不是独占运行直到结束
- goroutine内部实现与在多个操作系统线程(OS线程)之间复用的协程(coroutine)一样。如果一个goroutine阻塞OS线程,例如等待输入,则该OS线程对应的逻辑处理器(P)中的其他goroutine将迁移到其他OS线程,以便它们继续运行
- 假设goroutine1在执行文件读取操作,则goroutine1会导致内核线程1阻塞,这时候Go运行时调度器会把goroutine1所在的逻辑处理器1迁移到其他内核线程上(这里是内核线程2上),这时候逻辑处理器1上的goroutine2和goroutine3就不会受goroutine1的影响了。等goroutine1文件读取操作完成后,goroutine1又会被Go运行时调度系统重新放入逻辑处理器1的本地队列
-
在Go中,使用go关键字跟上一个函数,就创建了一个goroutine,每个goroutine可以认为是一个轻量级的线程,其占用更少的堆栈空间
-
可以把通道理解为一个并发安全的队列,生产者goroutine可以向通道里放入元素,消费者goroutine可以从通道里获取元素。
- 从队列大小来看,通道可以分为有缓冲通道和无缓冲通道,无缓冲通道里最多有一个元素,有缓冲通道里面可以有很多元素
- 另外,通道还是有方向的
- 通道是可以关闭的
-
Go中以消息进行通信的方式允许程序员安全地协调多个并发任务,并且容易理解语义和控制流,这通常比其他语言(如Java)中的回调函数(callback)或共享内存方式更优雅简单
-
Go的并发原语使构建流式数据管道变得很容易,从而使IO操作和多核CPU更加有效
- 管道是由一系列节点组成,这些节点使用通道连接起来。其中每个节点是一组运行相同功能的goroutine,在每个阶段goroutine从上游通道获取元素,然后对该数据执行某些操作,然后把操作后的结果再写入下游的通道。除了第一个和最后一个节点,每个节点可以有任意多个输入和输出通道,第一个节点有时候被称为数据源或者生产者,最后一个节点被称为数据终点或者消费者
- 如果你对流式编程有经验的话,可能会发现管道和反应式库比如RxJava中的流式编程很相似
- 借助Go中的并发原语goroutine与通道,可以非常方便地构建异步非阻塞、具有回压功能的程序
-