2. Reactive Stream - giovany79/reactive GitHub Wiki

Problemas con las Apis reactivas

La lista de librerías como RxJava, características del core de java como los Completable Stages y frameworks especificos como org.springframework.util.concurrent.ListenableFuture complejizaron el entorno de la programación reactiva en java. No habia una integración directa entre los Completable Stages de Java con los Listeneable Future de Spring. Adicionalmente muchas librerías y frameworks crearon sus propias interfaces para la comunicación asíncrona entre componentes. Esto implicaba que no había un único método que permitiera a los vendedores de librerías alinear sus APIs.

Adicionalmente las librerías fueron pensadas para que la data la arrastrara(push) de sus fuentes. Los modelos pull no eran eficientes en algunos casos

Otro problema común con las librerías reactivas es la velocidad en la producción y en el consumo donde los dos principales son:

  • Productor lento, consumidor rápido (No hay métricas para incrementar el throuput de manera automatica)
  • Productor rápido, consumidor lento (normalmente tratado con colas)

En conclusión, las librerías no tenían mecanismos para controlar el backpressure

La solución

A finales del 2013, un grupo de ingenieros de Lightbend, Netflix, Twiter y Pivotal se reunieron para resolver estos problemas y proporcionaron un estandar a la comunidad de JVM llamado Reactive Stream Specification. La idea en sí fue la estandarización de los patrones de programación reactiva. Esta iniciativa creció independiente de cualquier organización y está disponible como un jar separado en el paquete org.reactivestreams

Reactive Stream proporciona un modelo hibrido de push/pull lo cual permite el control apropiado de backpressure.

El objetivo de Reactive Stream es proporcionar un contrato para procesamiento asíncrono, no bloqueante y con capacidades de backpresure. Proporciona un conjunto de interfaces para estandarizar el comportamiento reactivo de las librerías.

No especifica como los datos son manipulados o transformados con operadores como map o filter

Interfaces principales de la especificación Reactive Stream

Contiene 4 interfases primarias:

Publisher

Publica elementos a cualquier número de subscriptores

package org.reactivestreams;
   public interface Publisher<T> {
       void subscribe(Subscriber<? super T> s);
}

Subscriber

Contiene los métodos para controlar el flujo de elementos

package org.reactivestreams;
   public interface Subscriber<T> {
       void onSubscribe(Subscription s); // Notifica si la subscripción es exitosa
       void onNext(T t);
       void onError(Throwable t);
       void onComplete();
}

Subscription

Proporciona lo fundamental para controlar el flujo de elementos

package org.reactivestreams;
   public interface Subscription {
       void request(long n);
       void cancel();
}

Processor

Es una combinación entre publisher y subscriber. Es diseñado para adicionar estados adicionales de procesamiento entre el publisher y el subscriber. Processor puede representar una transformación lógica. facilita la implementación de lógica de negocio entre el publisher y el subscreiber

Package org.reactivestreams;
   public interface Processor<T, R> extends Subscriber<T>,
                                            Publisher<R> {
}

Flujo

  1. El método subscribe() es llamado en el Publisher
Publisher <-----subscribe()----Subscriber
  1. Una subscripción es creada y el método onSubscribe() es ejecutado pasando el objeto subscription
Publisher ----onSubscribe(subscription)-->Subscriber
  1. Para empezar a recibir elementos, el subscribe debe ejecutar el método request indicando cuantos elementos pueden ser procesados. Si no se llama el método explícitamente, un número incontable de elementos es requerido.
Publisher        Subscriber
                      |
                    Request
                      |
                      V
                Subscription
  1. Solo después de esto el subscribe puede recibir via el método onNext()
Publisher ----onNext()--->Subscriber

             Subscription

Recibe elementos hasta que una de los siguientes eventos ocurran:

  • Se hayan enviado todos los elementos (onComplete) - cancela la subscripción
  • Requerir mas elementos (onNext)
  • Si hay error en algun punto del publisher(onError) - Cancela la subscripción
⚠️ **GitHub.com Fallback** ⚠️