RxAndroid, RxJava 2 - Tuong-Nguyen/PreparationEduLog GitHub Wiki

Overview

ReactiveX extension for Java has changed a lot since version 2. This wiki will point out some notable differences and how to apply it in Android

Prerequisites:

Dependencies:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'

Java 8 Lambda expression:

To avoid verbose when using RxJava, we should use Java 8 Lambda expression.

  • IDE: Android Studio 3.0 Preview

  • Enable Java 8 in application level Gradle file:

compileOptions {
    sourceCompatibility JavaVersion.VERSION_1_8
    targetCompatibility JavaVersion.VERSION_1_8
}

For more information about Java 8 support in Android Studio, visit https://developer.android.com/studio/preview/features/java8-support.html

Change in RxJava 2:

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

Asynchronous I/O with RxJava:

Get list of contributors from a github repository

Retrofit with RxJava 2

Retrofit use RxJava 1 by default, add this line to support RxJava 2:

compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

And add RxJava2CallAdapter when create retrofit from builder:

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl(BASE_URL)
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
        .build();

Create observable

Observable can be acquired via Retrofit API:

GithubService githubService = retrofit.create(GithubService.class);
String owner = "ReactiveX";
String repo = "RxJava";
Observable<List<User>> observable = githubService.getContributors(owner, repo);

Register an observer to receive notification

mDisposable = observable
        .subscribeOn(Schedulers.io())   // Run I/O task (get list of contributors) on I/O thread
        .observeOn(AndroidSchedulers.mainThread())    // Run observer code on main thread
        .subscribe(
                contributors -> {
                    log(String.valueOf(contributors.size()));
                },   // onNext
                error -> {
                    log("onError: " + error.getMessage());
                },  // onError
                () -> {
                    log("onComplete");
                }    // onComplete
        );

Controlling subscription

  • Unsubscribe using Disposable:

Subscription has been renamed to Disposable since version 2

Disposable disposable = observable.subscribe(observer);
disposable.dispose();
  • Use Subscriber:

We can no longer register a Subscriber to Observable, instead of that, we must use Flowable - which is similar to Observable, but has a few differences:

- Observable is used for UI event, flow of items is short (no more than 1000 elements)
- Flowable is ideally for dealing with 10k+ of elements, handling I/O stuff related to database or networking

Visit https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#observable-and-flowable for more information

ResourceSubscriber is one kind of Subscriber which can unsubscribe (or dispose) itself

Flowable<List<User>> observable = githubService.getContributors(owner, repo);
observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new ResourceSubscriber<List<User>>() {
            @Override
            public void onNext(List<User> users) {
                dispose();
            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });

Handle UI event with RxJava

Search suggestion:

When user typing text in the search box, instead of display search suggestion immediately, we will wait for a specific delay interval and only showing the last one

RxTextView.textChangeEvents(inputSearchText)
    .debounce(400, TimeUnit.MILLISECONDS)
    .filter(changes -> isNotNullOrEmpty(changes.text().toString()))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(getSearchObserver());

RxTextView.textChangeEvents is a handy method provided by RxBinding library to creating observable for EditText onTextChanged event

debounce operator is used to schedule when to emitting message to observer

Form validation

Form validation is extremely easy with RxAndroid

  • First, we create observable for each input field (for example: username and password):
Observable<CharSequence> emailChangeObservable = RxTextView.textChanges(mEmailField).skip(1);
Observable<CharSequence> passwordChangeObservable = RxTextView.textChanges(mPasswordField).skip(1);

Similar to search suggestion example, we use RxTextView.textChanges to create observable that is mapped to EditText onTextChanged event

skip operator will skip the first item (because onTextChanged will be fired the first time when activity opened)

  • Next, combine two observable using Observable.combineLatest
Observable.combineLatest(emailChangeObservable, passwordChangeObservable, new BiFunction<CharSequence, CharSequence, Boolean>() {
    @Override
    public Boolean apply(@NonNull CharSequence email, @NonNull CharSequence password) throws Exception {
        boolean validEmail = Patterns.EMAIL_ADDRESS.matcher(email).matches();
        boolean validPassword = password.length() > 6;
        // Display error message here

        return validEmail && validPassword;
    }
})

combineLatest operator will combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

  • Finally, register an observer for this observable, get the validation result in onNext()

Some useful operators

  • buffer: gather items in bundles and emit it these bundles
  • map: transform items emitted by Observable by apply a function to that item
  • average: calculate average of all number emitted and emit that average
⚠️ **GitHub.com Fallback** ⚠️