Reactor ‐ 在流上操作 - CCH0124/spring-sandbox GitHub Wiki

Mapping Data

執行轉換

@Test
    public void map_test() {
        List<Integer> eles = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> i * 10)
                .log()
                .subscribe(eles::add);

        Assertions.assertIterableEquals(eles, List.of(10, 20, 30, 40));
    }

onNext() 被呼叫時,map() 將會被應用。

Combining Two Streams

使用 zip() 函數,將另一個流與另一個流結合起來。

    @Test
    public void zip_test() {
        List<String> eles = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 10)
            .zipWith(Flux.range(0, Integer.MAX_VALUE), 
                (one, two) -> String.format("First Flux: %d, Second Flux: %d", 
                one, two))
            .subscribe(eles::add);

        Assertions.assertIterableEquals(eles, List.of("First Flux: 10, Second Flux: 0",
        "First Flux: 20, Second Flux: 1",
        "First Flux: 30, Second Flux: 2",
        "First Flux: 40, Second Flux: 3"));
    }

Hot Streams

對於 Reactor 來說,一個現實的用例可能是無限發生的事情。這些都可稱為熱流。

ConnectableFlux

下面是一個範例,如果將 while 條件改成 true,即可模擬大量資料進來。透過調用 publish() 得到 ConnectableFlux,接著調用 subscribe() 但不會立即開始發送資料,因此可添加多個訂閱。當調用 connect() 時,Flux 才會開始送資料。

    @Test
    public void infinite_stream() {
        ConnectableFlux<Object> publish = Flux.create(sink -> {
            int i = 100_000;
            while (i > 0) {
                sink.next(i--);
            }
        }).publish();

        publish.subscribe(System.out::println); 
        publish.subscribe(System.out::println);
        publish.subscribe(System.out::println);

        publish.connect();
    }

Concurrency

Scheduler 介面提供了一個異步程式碼的抽象,為此提供了許多實現,下面嘗試訂閱與 main 不同的執行續

@Test
    public void test() {
        List<Integer> eles = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
        .log()
        .map(x -> x * x)
        .subscribeOn(Schedulers.parallel())
        .subscribe(eles::add);
    }

parallel 調度器實現訂閱在不同的執行續上運行,可以透過以下日誌來證明這一點。第一行來自主線程,而 Flux 在另一個稱為 parallel-1 的線程中運行:

20:54:05.086 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:05.105 [parallel-1] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | request(unbounded)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(1)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(2)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(3)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | onNext(4)
20:54:05.107 [parallel-1] INFO  reactor.Flux.Array.1 - | onComplete()
⚠️ **GitHub.com Fallback** ⚠️