Reactive Programming ‐ Reactive Streams - thought-corner/Backend-PlayGround GitHub Wiki
리액티브 스트림즈(Reactive Streams)
데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하기 위한 리액티브 라이브러리의 표준 사양
리액티브 스트림즈의 구성요소
Publisher : 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
Subscriber : 구독한 Publisher부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
Subscription : Publisher에 요청한 데이터 개수를 지정하고 데이터의 구독을 취소하는 역할을 한다.
Processor : Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고 Publisher로서 다른 Subscriber가 구독할 수 있다.
Publisher
/** * A {@link Publisher} is a provider of a potentially unbounded number of sequenced elements, publishing them according to * the demand received from its {@link Subscriber}(s). * <p> * A {@link Publisher} can serve multiple {@link Subscriber}s subscribed {@link Publisher#subscribe(Subscriber)} dynamically * at various points in time. * * @param <T> the type of element signaled*/publicinterfacePublisher<T> {
/** * Request {@link Publisher} to start streaming data. * <p> * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. * <p> * Each {@link Subscription} will work for only a single {@link Subscriber}. * <p> * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. * <p> * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will * signal the error via {@link Subscriber#onError(Throwable)}. * * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}*/public void subscribe(Subscriber<?superT> s);
}
subscribe()를 호출한다고 해서 데이터가 그 자리에서 바로 처리되는 것이 아니라 이제부터 데이터를 받을 준비가 되었으니 준비되면 onNext()로 보내라고 구독자를 등록하는 과정이다.
Subscriber
/** * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. * <p> * No further notifications will be received until {@link Subscription#request(long)} is called. * <p> * After signaling demand: * <ul> * <li>One or more invocations of {@link #onNext(Object)} up to the maximum number defined by {@link Subscription#request(long)}</li> * <li>Single invocation of {@link #onError(Throwable)} or {@link Subscriber#onComplete()} which signals a terminal state after which no further events will be sent. * </ul> * <p> * Demand can be signaled via {@link Subscription#request(long)} whenever the {@link Subscriber} instance is capable of handling more. * * @param <T> the type of element signaled*/publicinterfaceSubscriber<T> {
/** * Invoked after calling {@link Publisher#subscribe(Subscriber)}. * <p> * No data will start flowing until {@link Subscription#request(long)} is invoked. * <p> * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted. * <p> * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}. * * @param s the {@link Subscription} that allows requesting data via {@link Subscription#request(long)}*/// Publisher에게 요청할 데이터의 개수를 지정하거나 구독을 해지하는 것을 의미한다.public void onSubscribe(Subscription s);
/** * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}. * * @param t the element signaled*/// Publisher가 통지한 데이터를 처리한다.public void onNext(T t);
/** * Failed terminal state. * <p> * No further events will be sent even if {@link Subscription#request(long)} is invoked again. * * @param t the throwable signaled*/// Publisher가 통지한 데이터를 처리하는 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할을 한다.public void onError(Throwable t);
/** * Successful terminal state. * <p> * No further events will be sent even if {@link Subscription#request(long)} is invoked again.*/// Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메서드로 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야 한다면 해당 메서드에서 처리 코드를 작성한다.public void onComplete();
}
Subscription
/** * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. * <p> * It can only be used once by a single {@link Subscriber}. * <p> * It is used to both signal desire for data and cancel demand (and allow resource cleanup).*/publicinterfaceSubscription {
/** * No events will be sent by a {@link Publisher} until demand is signaled via this method. * <p> * It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more, * it may be treated by the {@link Publisher} as "effectively unbounded". * <p> * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. * <p> * A {@link Publisher} can send less than is requested if the stream ends but * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. * * @param n the strictly positive number of elements to requests to the upstream {@link Publisher}*/// Publisher에게 데이터의 개수를 요청한다.public void request(long n);
/** * Request the {@link Publisher} to stop sending data and clean up resources. * <p> * Data may still be sent to meet previously signalled demand after calling cancel.*/// 구독을 해지한다.public void cancel();
}
Processor
/** * A {@link Processor} represents a processing stage—which is both a {@link Subscriber} * and a {@link Publisher} and obeys the contracts of both. * * @param <T> the type of element signaled to the {@link Subscriber} * @param <R> the type of element signaled by the {@link Publisher}*/publicinterfaceProcessor<T, R> extends Subscriber<T>, Publisher<R> {
}