Programowanie reaktywne - rlip/java GitHub Wiki

Spring Webflux

To reaktywny spring mvc. Używa Netty zamiast Tomcata. Kontroller zwraca Mono lub Flux, może też współpracować z typami RxJava: Observable, Single i Completable.

@GetMapping("/recent") 
public Flux<Taco> recentTacos() {
   return Flux.fromIterable(tacoRepo.findAll()).take(12);
}
-----------------------------------
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

RxJava

Observable<String> observable = Obaservable.just("Kowalski Jan");
observable.map(s -> s.toUpperCase())
          .subscribe(s -> log.info(s)

Reactor

Mono (od 0 do 1 elementów) i Flux to implementacje publishera strumienia reaktywnego. Można je tworzyć na postawie tablicy, listy, strumienia, czy wartości po przecinku. Dają większe możliwości od strumieni funkcyjnych:

  • filter, map, flatmap (przekształcanie asynchroniczne),
  • distinct, skip, take,
  • generowanie wartości(Flux.range, Flux.interval),
  • parowanie 2 strumieni(zip, mergeWith),
  • wybór pierwszego (first),
  • operowanie na czasie jak delay, delayElements(Duration.ofSeconds(1))
  • buforowanie, bądź zebranie wszystkiego do obiektu Flux zawierającego listę: fruitFlux.buffer(), fruitFlux.buffer(liczbaEl)
  • sprawdzenie, czy wszystkie, lub którykolwiek spełnia podane kryteria: animalFlux.all(), animalFlux.any()
Mono zawierający listę:
Mono<List<String>> fruitListMono = fruitFlux.collectList(); 

Mono zawierający mapę o kluczu wygenerowanym w lambdzie:
Mono<Map<Character, String>> animalMapMono = animalFlux.collectMap(a -> a.charAt(0));

Pozwala zgrupować listę na listy - po 3 elementy i przetwarzać je równolegle

Flux.just(
     "jabłko", "pomarańcza", "banan", "kiwi", "truskawka")
     .buffer(3)
     .flatMap(x -> 
          Flux.fromIterable(x)
         .map(y -> y.toUpperCase())
         .subscribeOn(Schedulers.parallel())
         .log()
     )
.subscribe();
Mono.just("Janek")
     .map(n -> n.toUpperCase())
     .map(cn -> "Witaj, " + cn + "!")
     .subscribe(System.out::println);
--------------------
<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
</dependency>

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-test</artifactId>
   <scope>test</scope>
</dependency>

<dependencyManagement>
     <dependencies>
         <dependency>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-bom</artifactId>
             <version>Bismuth-RELEASE</version>
             <type>pom</type>
             <scope>import</scope>
         </dependency>
     </dependencies>
</dependencyManagement>

flux

  1. Creating a Flux:
import reactor.core.publisher.Flux;

public class FluxExample {
    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.just("Hello", "World", "Reactor");
        stringFlux.subscribe(System.out::println);
    }
}
  1. Transforming elements in a Flux:
import reactor.core.publisher.Flux;

public class FluxExample {
    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.just("Hello", "World", "Reactor");
        Flux<String> transformedFlux = stringFlux.map(String::toUpperCase);
        transformedFlux.subscribe(System.out::println);
    }
}
  1. Filtering elements in a Flux:
import reactor.core.publisher.Flux;

public class FluxExample {
    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.just("Hello", "World", "Reactor");
        Flux<String> filteredFlux = stringFlux.filter(s -> s.length() > 5);
        filteredFlux.subscribe(System.out::println);
    }
}
  1. Combining multiple Flux:
import reactor.core.publisher.Flux;

public class FluxExample {
    public static void main(String[] args) {
        Flux<String> flux1 = Flux.just("Hello", "World");
        Flux<String> flux2 = Flux.just("Reactor", "Rocks");
        Flux<String> combinedFlux = Flux.concat(flux1, flux2);
        combinedFlux.subscribe(System.out::println);
    }
}
  1. Handling errors in a Flux:
import reactor.core.publisher.Flux;

public class FluxExample {
    public static void main(String[] args) {
        Flux<String> stringFlux = Flux.just("Hello", "World", "Reactor")
                                       .concatWith(Flux.error(new RuntimeException("Error occurred")));
        stringFlux.onErrorResume(e -> {
            System.err.println("Error: " + e.getMessage());
            return Flux.just("Fallback");
        }).subscribe(System.out::println);
    }
}
⚠️ **GitHub.com Fallback** ⚠️