RxJS Overview - Tuong-Nguyen/Angular-D3-Cometd GitHub Wiki
An object or a function that emits sequences of data over time.
- Hot observables begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. (ex: watch livestream)
-
Cold observables waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning. (ex: watch video)
- Cold observables usually refer to arrays or single values that have been converted to be used within RxJS.
- HTTP Requests.
- UI Events.
Observable<String> myObservable = Observable.from(Arrays.asList("Hello from RxJava",
"Welcome...",
"Goodbye"));
- An object or a function that knows how to process the sequences of data
- Observer must implement three methods:
-
next()
- here's a new value from the stream -
error()
- here's an error happened in the stream -
complete()
- the stream's over
-
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i(TAG, "Rx Java events completed");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Error found processing stream", e);
}
@Override
public void onNext(String s) {
Log.i(TAG, "New event -" + s);
}
};
A connection between observer
and observable
.
It is used to unsubscribe an observer from an observable, ie: the observer no longer has item from observable.
-
Operators
are functions that can transform the stream data between the moments when theObservable
sent them and the functionsubscribe()
received them. - Each operator is a function that takes an
Observable
as an argument, transforms it and returns anotherObservable
-
Create an Observable: Interval
-
Transform items of an Observable
filter()
-
Multiple Observables: Merge Operator
-
Observable -> Observables - Items: flatMap
-
Error Handling Operator: Catch
Allowing us introduce multithreading into our cascade of Observable
operators, so we can instruct the operators(or particular Observables) to operate on particular Schedulers
- The
ObserveOn
operator specifies a differentScheduler
that aObservable
will use to send notifications to its observers. - Affects the thread that the Observable will use below where that operator appears.
- Can be called multiple times at various points during the chain of Observable operators in order to changes on which thread certain of those operators operate.
- By default,
Observable
and the chain of operators will do its work and notify its observer on the same thread whichSubscribe
method is called. TheSubscribeOn
operator changes this behavior by specifying a differentScheduler
on which theObservable
should operate. - The
SubscribeOn
operator designates which thread theObservable
will begin operating on, no matter at what point in the chain of operators that operator is called.
Ref: http://reactivex.io/documentation/scheduler.html
###Scheduler Types:
- Schedulers.immediate(): Default Scheduler that returns a Scheduler that executes the work immediately in the current thread.
- Schedulers.trampoline(): Returns a Scheduler that queues work in the current thread to be executed after the current work completes.
- Schedulers.newThread(): Returns a Scheduler, spawns a new thread, and executes the work on the new Thread.
- Schedulers.computation(): Returns a Scheduler intended for computational intensive work. This can be used for event loops, processing callbacks, and other computational work. Do not perform blocking IO work on this Scheduler. This Scheduler uses a fixed thread pool size where the size is dependent on the CPUs to optimize CPU usage and minimize CPU switching.
- Schedulers.io(): Creates and returns a Scheduler that executes the work of a cached pool of threads that grows and shrinks as needed, reusing already created threads that are idle to execute the require work. This Scheduler is intended for asynchronously performing blocking IO tasks, such as network or file system read and write.
- Scheduler.from(Executor): Creates a Scheduler that will execute the unit of work on the java.util.concurrent.Executor passed as argument.
- AndroidSchedulers.mainThread(): Creates a Scheduler that executes the required work on the Android application main thread. This Android Scheduler, provided by the RxAndroid library, is based on the HandlerThread that runs the unit of work serially.
- HandlerScheduler.from(Handler): Creates a Scheduler that executes work on a specified Handler. The AndroidSchedulers.mainThread() is of specialization of this Scheduler that runs on a Handler attached to the Android UI thread.
Example: Execute http request on IO threads and execute result on Main Thread.
getTextFromNetwork("http://demo1472539.mockable.io/mytext")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber())
);
- AsyncSubject: Subjects that will only emit the last item emitted by the source Observable when the source Observer completes the stream by calling onComplete()
- PublishSubject: The Subject only delivers to the Observers the events emitted after their subscription
- ReplaySubject: Emits all the events emitted by the source Observable, even those that were emitted before the subscription is made
- BehaviorSubject: Emits the last emitted item by the source Observable when the subscription is done, then continues to any other items emitted by the source observable
PublishSubject<Integer> pubSubject = PublishSubject.create();
pubSubject.onNext(1);
pubSubject.onNext(2);
Subscription subscription = pubSubject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "New Event received from PublishSubject: " + integer);
}
});
pubSubject.onNext(3);
pubSubject.onNext(4);
subscription.unsubscribe();
pubSubject.onNext(5);
pubSubject.onCompleted();
All RxJS documentation accompanies its explanations with marble diagram
, so we need to know it's legends