Reactor ‐ 產生數據流 - CCH0124/spring-sandbox GitHub Wiki
- Flux
可以發出 0..n
元素的流
Flux<Integer> just = Flux.just(1, 2, 3, 4);
- Mono
可以發出 0..1
元素的流
Mono<Integer> just = Mono.just(1);
Flux
和 Mono
都是 Reactive Streams Publisher
介面的實現。
Publisher<String> just = Mono.just("Hello");
有了產生流,需要訂閱它才能發出元素。
List<Integer> eles = new ArrayList<>();
Flux.just(1,2,3,4).log().map(i -> {
LOGGER.info("{}:{}", i, Thread.currentThread());
return i * 2;
})
.subscribe(eles::add);
Assertions.assertIterableEquals(eles, List.of(2, 4, 6, 8));
20:40:16.460 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:40:16.464 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | onNext(1)
20:40:16.465 [main] DEBUG org.reactor.reactive.SubscribeTest - 1:Thread[#1,main,5,main]
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | onNext(2)
20:40:16.465 [main] DEBUG org.reactor.reactive.SubscribeTest - 2:Thread[#1,main,5,main]
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | onNext(3)
20:40:16.465 [main] DEBUG org.reactor.reactive.SubscribeTest - 3:Thread[#1,main,5,main]
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | onNext(4)
20:40:16.465 [main] DEBUG org.reactor.reactive.SubscribeTest - 4:Thread[#1,main,5,main]
20:40:16.465 [main] INFO reactor.Flux.Array.1 - | onComplete()
在訂閱之前,數據不會開始流動。上面是在一個主線程執行。
- onSubscribe() – 當訂閱流時,會調用此方法
- request(unbounded) - 當調用
subscribe
時,在後台正在創建一個Subscription
,該訂閱從流中請求元素。在這種情況下,它預設為unbounded
,表示它請求每個可用的元素。 - onNext() – 在每個元素上都調用此函數
- onComplete() – 在收到最後一個元素後調用。實際上還有一個
onError()
,如果有異常,它會被調用。
List<Integer> collected = Stream.of(1, 2, 3, 4)
.toList();
核心區別在於 Reactive 是推送模型(push model),而 Java 8 Streams 是拉取模型(Pull model)。在 Reactive 中,事件在訂閱者進入時被推送給訂閱者。
Stream 終端操作員就是一個終端,拉取所有資料並返回結果。使用 Reactive,可以從外部資源獲得無限的流,並臨時連接和刪除多個訂閱者。