3. Reactor - giovany79/reactive GitHub Wiki
Los desarrolladores de Spring framework necesitaban un framework con un alto throughput de procesamiento de data con el fin de simplificar el desarrollo de big data pensando en procesamiento asíncrono y no bloqueante. Con esto iniciaron un nuevo proyecto llamado Project Reactor. En la versión 1.X incorporó las mejores practicas para procesamiento de mensajes como el patron Reactor y estilo de programación reactiva
Reactor es la implementación de la especificación Reactive Stream. Reactor es la librería por defecto de Spring Webflux. Proporciona dos implementaciones de la interfase publisher de reactive Stream: Mono y Flux. Actualmente está en la versión 3.0
Publica 0 o 1 elementos. Se usa normalmente cuando retorna un solo elemento o retorno void.
Un mono vacío es útil para emular retornos vacíos en programación tradicional. No retorna un valor pero emite una señal de finalizado para avisarte que el procesamiento ha finalizado
Mono puede ser muy útil para empaquetar operaciones asíncronas como HTTP request o DB queries.
Publica 0 o N elementos. Se usa normalmente cuando retorna una lista. Puede incluso producir cantidades infinitas de elementos. Intentar recolectar todos los elementos emitidos por un stream infinito puede ocasionar OutOfMemoryException.
Flux permite crear streams vacíos o con error
Flux<String> empty = Flux.empty();
Flux<String> never = Flux.never();
Mono<String> error = Mono.error(new RuntimeException("Unknown id"));
Mono y Flux pueden ser transformados uno en el otro. Ejemplo
Flux<T>.collectList() retorna Mono<List<T>>
Mono<T>.flux() retorna Flux<T>
Adicionalmente la librería es lo suficientemente inteligente para evitar cambios semánticos:
Mono -> Flux -> Mono
Mono.from(Flux.from(mono))
Esta sentencia retorna la instancia de mono original
Reactor ofrece muchas implementaciones del método subscribe. Este método es importante hasta que este método sea llamado.
Es la versión simple. Ignora todas las señales. No tiene parámetros y lanza el proceso de subscripción. Es útil para disparar procesamiento de stream
Toma la implementación de la interface consumer para hacer algo con cada valor recibido. No maneja las señales onComplete y onError.
Versión para el manejo de error. La señal onComplete es ignorada
Subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> error errorConsumer Runnable completeConsumer):
Adicional implementa una interfase runnable para correr código cuando el valor publicado es procesado exitosamente
Subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> error errorConsumer, Runnable completeConsumer):
Adicional permite hacer algo con el objeto subscripción.
Cuando trabajamos con secuencias reactivas es vital tener la habilidad de manipular y transformar estas secuencias. El proyecto reactor proporciona instrumentos (métodos y métodos constructores) para casi cualquier transformación reactiva requerida donde se pueden clasificar en las siguientes categorías:
- Transformar secuencias existentes
- Métodos para observar el procesamiento de las secuencias
- Dividir y juntar secuencias Flux
- Trabajar con tiempo
- Retornar data síncrona
https://projectreactor.io/docs/core/release/reference/index.html#which.create
Es la forma más natural de transformar cada elemento de la secuencia en otro elemento. Tanto Mono como Flux tienen el operador map. Este funciona de manera similar al operador map del Api Java Stream. Permite procesar los elementos uno por uno.
Flux se convienrte en un Flux Mono se convienrte en un Mono
Flux.range(1,10)
.map(value -> value * 100)
.log()
.subscribe();
El proyecto Reactor contiene todos los tipos de operadores para filtrar elementos
- Filter: Pasa los elementos que cumplen la condición
- ignoreElements: Retorna un Mono y filtra todos los elementos. La secuencia finaliza solo después que la original finaliza
- take(n): Toma solo los primeros n elementos. Los demas los ignora
- takeLast(n): Toma los últimos n elementos. Los demas los ignora
- takeUntil(Predicate): Toma los elementos hasta que la condición se cumpla
- elementAt(n): Permite tomar el n-esimo elemento de la secuencia
- Skip(Duration): Se salta elementos de la secuencia por duración
- take (Duration): Se salta elementos de la secuencia por duración
Flux.range(1,10)
.filter(value -> (value % 2)!=0)
.log()
.subscribe();
Recolecta todos los elementos en la lista y procesa el resultado de la colección como un stream Mono
- CollectList: Recolecta los elementos
- CollectSortedList: Recolecta y ordena los elementos
Flux.just(1, 6, 2, 8, 3, 1, 5, 1)
.collectSortedList(Comparator.reverseOrder())
.subscribe(System.out::println);
Mono y Flux también tiene funciones repeat() y repeat(times) para hacer loop sobre secuencias ingresadas
Adicionalmente provee la función defaultIfEmpty(T) para aprovisionar valores por defecto en caso que sea vacío
Por último tenemos la función distinc() la cual filtra los elementos repetidos. Elementos en el stream que no hayan aparecido antes
Flux.just(1, 6, 2, 8, 3, 1, 5, 1)
.distinct()
.subscribe(System.out::println);
Permite reducir los elementos de un stream a un valor
- Flux.count(): Cuenta todos los elementos de un stream
- Flux.all: Si todos los elementos cumplen con la propiedad requerida
- Flux.any(): Si al menos un elemento cumple con la propiedad requerida
- Flux.hasElements(): Si el stream tiene elementos
- Flux.reduce(T): Permite reducir los elementos de un stream con alguna lógica personalizada
Flux.range(1, 5)
.reduce(0, (acc, elem) -> acc + elem) .subscribe(result -> log.info("Result: {}", result));
- Flux.then(), Flux.thenMany(), Flux.thenEmpty(): Estos operadores ignoran los valores de los stream ingresados y generan sus propios valores
Flux.just(1, 2, 3)
.thenMany(Flux.just(4, 5))
.subscribe(e -> log.info("onNext: {}", e));
El proyecto reactor permite la combinación de muchos streams en streams salientes
- Concat(): Combina varios streams de manera secuencial
Flux.concat( Flux.range(1, 3),
Flux.range(4, 2),
Flux.range(6, 5)
).subscribe(e -> System.out.println("onNext: " + e));
- Merge: Combina varios streams ordenandolos en orden de llegada
Flux.concat( Flux.range(1, 3),
Flux.range(4, 2),
Flux.range(6, 5)
).subscribe(e -> System.out.println("onNext: " + e));
- Zip: Los streams se reciben de diferentes fuentes. Espera por todos los recursos para emitir un elemento y combinar todos los elementos en un valor de salida. El operador continúa haciendo esto hasta que cualquier fuente de recursos finalice
Flux.zip( Flux.just("A", "B", "C"),
Flux.range(1, 3)
).subscribe(e -> System.out.println("onNext: " + e));
El proyecto reactor soporta batch de elementos de streams Flux
- Buffering: Elementos dentro de contenedores como lista. El stream resultado es Flux<List>
Flux.range(1, 13)
.buffer(4)
.subscribe(e -> System.out.println("onNext: " + e));
- Windowing: Almacena elementos en un stream de streams como Flux<Flux>
Flux<Flux<Integer>> windowedFlux = Flux.range(101, 20)
.windowUntil(e -> (e % 5) == 0);
windowedFlux.subscribe(window -> window .collectList()
.subscribe(e -> System.out.println("window: " + e)));
- Grouping: Agrupa elementos por una llave dentro de un stream que tiene el tipo Flux<GroupedFlux<K,T>>. Cada nueva llave lanza una instancia GroupedFlux y todos los elementos con esa llave son llevados a esa instancia de la clase GroupedFlux
Buffering y Windowing puede pasar basado en lo siguiente:
- El número de elementos procesados. Ejemplo, cada 10 elementos
- Algún periodo de tiempo. Ejemplo, cada 5 segundos
- Basado en algún predicado que cumple una condición. Ejemplo, números divisibles por 5
- Basado en un evento que llega de otro Flux, el cual controla la ejecución
- Flatmap Flatmap consiste de dos operaciones map y flatten(es similar al operador merge). La parte del map transforma cada elemento de entrada en un stream reactivo(T -> Flux) y la parte de flatten hace merge de todas las secuencias reactivas generadas en una nueva secuencia. Flatmap es de tipo eager. No necesariamente preserva el orden original.
En este caso, por cada circulo generamos dos rombos. Se hace merge de las secuencias
El operador flatmap y sus variantes es muy importante en la programación funcional y la programación reactiva porque permite la implementación de flujos complejos en una sola línea.
-
ConcatMap Es una variación de flatmap donde espera que cada completitud interna antes de generar el próximo substream. Concat map preserva el orden de los elementos de entrada
-
FlapMapSecuencial Es una variación de flatmap. Preserva el orden de llegada
Para escenarios de un alto throughput en ocasiones toma sentido procesar solo una fracción de eventos para aplicar técnicas de muestreo. Reactor lo permite realizar con los operadores sample y sample timeout
Flux.range(1, 100)
.delayElements(Duration.ofMillis(1))
.sample(Duration.ofMillis(20))
.subscribe(e -> log.info("onNext: {}", e));
Produce la salida:
onNext: 13
onNext: 28
onNext: 43
onNext: 58
onNext: 73
onNext: 89
onNext: 100
Aun cuando se generan items secuenciales cada milisegundo, el subscribe recibe solo una fracción de eventos dentro del límite deseado.
La librería reactor proporciona un API para transformar una secuencia reactiva en estructuras bloqueantes. Aún cuando cualquier operación bloqueante debe ser omitida en una aplicación reactiva, algunas veces es requerida por la API de alto nivel. Estas son las opciones para bloquear el stream y producir un resultado síncrono:
- toIterable: Transforma un Flux en un Iterable bloqueante
- toStream: Transforma un Flux en un Stream bloqueante
- blockFirst: Bloquea el hilo actual hasta que la señal interna reciba el primer valor o se complete
- blockLast: Bloquea el hilo actual hasta que la señal interna reciba el último valor o se complete. En caso de error lanza la excepción en el hilo bloqueante
Algunas veces es necesario realizar una acción para cada elemento o una señal particular en el medio del procesamiento de un pipeline. Para cubrir esta necesidad, el proyecto Reactor ofrece los siguientes métodos:
- doOnNext(Consumer): Permite realizar acciones en cada elemento en un Flux o Mono
- doOnComplete(): Es invoncado en el evento OnComplete
- doOnError(Throwable): Es invocado en el evento OnError
- doOnSuscribe(Consumer ): Reacciona al ciclo de vida de los eventos de subscription
- doOnRequest(LongConsumer): Reacciona al ciclo de vida de los eventos de subscription
- doOnCancel(Runnable): Reacciona al ciclo de vida de los eventos de subscription
- doOnTerminate(Runnable): Es llamado cuando un stream es terminado sin importar que cause su terminación
- doOnEach(Consumer): Maneja todas las señales que representan el dominio Reactive Stream (OnError, OnSubscribe, OnNext, OnComplete)
Flux.just(1, 2, 3)
.concatWith(Flux.error(new RuntimeException("Conn error")))
.doOnEach(s -> log.info("signal: {}", s)) .subscribe();
La salida es:
signal: doOnEach_onNext(1)
signal: doOnEach_onNext(2)
signal: doOnEach_onNext(3)
signal: onError(java.lang.RuntimeException: Conn error)
Cuando diseñamos aplicaciones reactivas que se comunican con componentes externos(DB, servicios, colas, etc) necesitamos con todas las situaciones excepcionales. Por fortuna la señal OnError es parte integral de la especificación de Reactive Stream, asi una excepción siempre debería tener una forma de propagar al actor que manejará la excepción. Si el subscriber final no define un handler para la señal OnError, onError lanza una UnsupportedOperationException.
Adicionalmente, la semántica de los stream reactivos define que onError es una operación terminal, después de la cual la secuencia reactiva para su ejecución. En este punto podemos reaccionar diferente aplicando alguna de las siguientes estrategias:
- Definir un handler en la señal onError en el operador subscribe
- Catchear y remplazar el error con un valor estatico por defecto o un valor calculado desde la excepción aplicando el operador onErrorReturn
- Catchear la excepción y ejecutar un flujo alternativo aplicando el operador onErrorResume
- Catchear y transformar una excepción en otra excepción que mejor represente la situación aplicando el operador onErrorMap
- Podemos definir un workflow reactivo que en el evento de errores vuelve a lanzar la ejecución . El operador retry se subscribe al secuencia del fuente reactivo si este emite un error
- Adicionalmente un stream vacio no es lo que queremos siempre, por lo cual se puede retornar un valor por defecto si es vacío usando el operador defaultIfEmpty
- Otro operador importante es el de timeout permite limitar el tiempo de espera de la operación lanzando un TimeoutException la cual se puede manejar con alguna estrategia de manejo de errores vista anteriormente
- Libro Handson Reactive Programming in Spring 5. Oleh Dokuka and Igor Lozynski. Capitulo 4
- PluralSight - Curso Spring WebFlux Getting Started. Esteban Herrera
- https://projectreactor.io/docs/core/release/reference/index.html#getting
- https://projectreactor.io/docs/core/release/reference/index.html#which-operator
- https://rxmarbles.com/
- http://reactivex.io/