白话RxJS - rxjs-space/notes GitHub Wiki

变量命名Style

(个人约定,非任何best practice)

  • Observable变量以$结尾,如state$
  • Subject变量以$$结尾, 如state$$
  • Subscription变量以_结尾,如ultimate_

写作目的

RxJS包含许多概念,RxJS Manual在介绍RxJS的时候,引入了更多概念,对于初学者而言,不够直白。
比如:

ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

再如:

Observable: represents the idea of an invokable collection of future values or events.

本文将尝试用白话来解读RxJS。
比如:

Observable:好似一个Function,有很多return,定义了在哪个时点return什么数据。

部分章节使用了伪代码,用来说明某个类的特性;其余部分,受限于水平,实在写不出伪代码。 难免错漏,敬请指正。

Observer:一个Object,有3种callback

Observer最平易近人,就是一个Object,里面有3种callback,形如:

const observer = {
  next: (value) => console.log(value),
  error: (error) => console.log(error),
  complete: () => console.log('completed')
}

Observable:好似一个Function,有很多return,定义了在哪个时点return什么数据

可以对照Function来看Observable
Function是这样:

const funcX = () => {
  在处理处理处理以后:
    return 'gogogo, but only once';
}

Observable就是这样:

const x$ = () => {
  (无条件的): 
    return 'I\'m the 1st go';
  在'500 ms'以后: 
    return 'I\'m the 2nd go';
  ...
  在http请求出错时:
    return 'something went wrong';
  // 然后就没有然后了
}

Observable可以return很多次,任意多次,每次return可能需要满足一定条件,比如“鼠标点击的时候”。
可是,怎么可能return多于1次呢?
这是伪代码,其实Observable应该是这样:

const x$ = (observer) => {
  (无条件的): 
    observer.next('I\'m the 1st go');
  在'500 ms'以后: 
    observer.next('I\'m the 2nd go');
  ...
  在http请求出错时:
    observer.error('something went wrong');
  // 然后就没有然后了
}

我们需要一个Observer,并用observer.callback来代替return

启动Observable:用Observable.subscribe(observer)

Function在定义好以后,不会自动运行,Observable也是一样。
我们可以通过funX()(即在函数名后加上括号)来调用funX
怎样调用Observable呢?看这里:

const FakeObservableClass = function() {
  this.execution = (observer) => {
      observer.next('I\'m the 1st go');
      observer.next('I\'m the 2nd go');
  }
}

FakeObservableClass.prototype.subscribe = function(observer) {
  const that = this;
  that.execution(observer);
}

const x$ = new FakeObservableClass();
const observer = {next: (value) => console.log(value)};
x$.subscribe(observer); // 启动x$的运行

我们是通过subscribe方法,即x$.subscribe(observer)来启动x$的运行的。

Subscription:一个Object,有一个方法unsubscribe,可以停掉运行中的Observable

Function在启动以后是停不下来的,直到return
Observablesubscribe以后,如果execution的内容是async的(比如setIntervaldom eventshttp response),它是可以停下来的。
IntervalObservable为例:

const FakeIntervalObservableClass = function(interval) {
  this.intervalId = null;

  this.execution = (observer) => {
    let state = 0;
    let execute = (state) => {
      this.intervalId = setInterval(() => {
        observer.next(++state);
      }, interval);
    }
    execute(state);
  }
}

FakeIntervalObservableClass.prototype.subscribe = function(observer) {
  const that = this;
  that.execution(observer); // subscribe的时候会执行setInterval
  // 接下来return一个Obeject,这个Object就是Subscription
  return {
    unsubscribe: () => {
      clearInterval(that.intervalId); // unsubscribe时执行clearInterval
    }
  }
}

const x$ = new FakeIntervalObservableClass(500);
const observer = {next: (value) => console.log(value)};
const x_ = x$.subscribe(observer); 
x_.unsubscribe();

subscribe方法返回一个Object,这个Object就是Subscription(即x_),它唯一的用途就是unsubsribe
unsubscribe会停下Observable的运行,并释放其占用的资源。
上面的例子中,当我们调用x_.unsubscribe()的时候,触发了clearInterval,后面的execution就不再进行下去了。
如果是dom事件触发的Observablesubscribe时会addEventListenerunsubscribe时会removeEventListener
如果是XMLHttpRequest触发的Observable,就是sendabort,等等。

Subject:有nextsubscribe方法,有一个observers列表

Subject是一个Observable,因为它有subscribe方法;Subject又是一个Observer,因为它有next方法。
它维护一个observers列表,当运行subject.subscribe(observerX)的时候,这个observerX就被加到列表里,unsubscribe时从列表中删掉。
Subject像是一个proxy,外部可以调用subject.next(value)时,这个valueforEachSubjectobservers

const FakeSubjectClass = function() {
  this.observers = [];
}

FakeSubjectClass.prototype.subscribe = function(observer) {
  let that = this;
  // 添加新observer,获取其所在的index;添加之前会检查是否observers已经有了这个observer,这里偷懒了没写
  let indexA = (that.observers.push(observer) - 1); 
  return {
    unsubscribe: () => {
      // 从observers列表里将这个位置在indexA的observer删除
      that.observers = that.observers.filter((observer, index) => index !== indexA) 
    }
  }
}

FakeSubjectClass.prototype.next = function(value) {
  let that = this;
  that.observers.forEach(observer => observer.next(value)); // 外部调用subject.next,subject.next马上forEach转出去;这个就是multicast
}

let x$$ = new FakeSubjectClass(); // Subject以$$结尾,Observable以$结尾

let observerB = {next: (value) => {console.log(`另一个logger说:${value}`)}}
x$$.subscribe(observerB); // 这个observerB被加到了x$$的observers列表里。

x$.subscribe(x$$); // 当x$向外推送时,调用的是x$$.next;x$$.next转身马上就forEach转给它的observers。

顺带提一下unicastmulticast的资源消耗。
Observableunicast,而Subjectmulticast,就是因为这个observers列表 -- 在Observable里是没有的这个列表的。
每次运行Observable.subscribe()都相当于一个Function.call(),是一个独立的运行,需要单独消耗资源。
Subject.subscribe()消耗资源很少。
比如:

let y$ = Observable.whatever() // 创建新的Observable
let y1_ = y$.subscribe(observerA); // 运行一次y$里的execution,消耗资源
let y2_ = y$.subscribe(observerB); // 又运行一次...
let y3_ ...                        // 又...
// ===========

let z$ = Observable.whatever();
let z$$ = new Subject();
let z1_ = z$$.subscribe(observerX) // observers列表里加一项,基本不消耗什么资源
let z2_ = z$$.subscribe(observerY) // observers列表里加一项
let z0_ = z$.subscribe(z$$) // 运行一次z$里的execution,仅此一次,然后z$推给z$$,z$$用forEach推给后面

Operator:一半AllSpark(创建Observable),一半滤镜(变更推送时点和推送内容)

Observable类上调用的OperatorStatic Operator, 比如:

Observable.interval(500);  // 每隔500ms,推送一个递增整数,从0开始
Observable.from([1, 2]);   // 连续推送1, 2
Observable.fromEvent(document, 'click');
Observable.fromPromise(fetch('/users'));
Observable.merge(x$, y$);  // 将x$与y$的推送混合在一起
Observable.concat(x$, y$); // 先运行x$,等x$推送observer.complete(),再运行y$
...

Static Operator可以从无到有创建一个Observable(像是变形金刚里的AllSpark),也可以把互不干预的Observable组合起来。
前面的伪代码中用到const x$ = new FakeObservableClass();
实际生活中new Observable()都是由Static Operator来处理的,所以在代码中不会看到new Observable()

===== 分割线 =====

Observable的实例上调用的OperatorInstance Operator, 比如:

const x$ = Observable.interval(500); // 创建一个Observable实例
const y$ = x$
  .map(v => v*3)                     // 每个推送的数值乘以3
  .filter(w => w%5 === 1)            // 只推送除以5余数为1的数值
  .delay(1000)                       // 等待1秒再推送
  .take(10)                          // 推送10个数值以后,调用observer.complete()结束
x$ !== y$                            // x$还是那个x$
...

如果Observable是一幅画,Instance Operator就是滤镜。
经过滤镜处理,我们拿到的一幅新的画,原来的画还在。

Scheduler:控制并发事件

Scheduler的职能是控制并发事件。本人开发经验接近0,实在想不出实际生活中何时会用到Scheduler,也确实在实践中没用过。
如果要观察每种Scheduler对数据推送的影响,可以打开RxJS Manual,开启console,贴入下面的代码,回车。

const x$ = Rx.Observable.create((observer) => {
  observer.next(0);
  observer.next(1);
}) // 这个用的是null Scheduler
const xOnQueue$ = x$.map(v=>'onQueue'+v).observeOn(Rx.Scheduler.queue);
const xOnAsync$ = x$.map(v=>'onAsync'+v).observeOn(Rx.Scheduler.async);
const xOnAsap$ = x$.map(v=>'onAsap'+v).observeOn(Rx.Scheduler.asap);
const xOnAnimationFrame$ = x$.map(v=>'onAnimationFrame'+v).observeOn(Rx.Scheduler.animationFrame);
const merged$ = Rx.Observable.merge(xOnAnimationFrame$, xOnAsync$, xOnAsap$, xOnQueue$, x$);
// 注意Observable.merge时的顺序,再对照console.log中的输出顺序
const merged_ = merged$.subscribe(console.log);

TestScheduler:虚拟时间机器

TestScheduler是测试RxJS代码时用到的一个虚拟时间机器,Observable可以挂靠其上。
我们定义的Observable在真实环境下可能要跑上一段时间才结束。而在TestScheduler里,就是一个同步的执行。
比如Observable.interval(20).take(20),这个observable每隔20 ms推送一个递增数字,一共推20个,需要用时400 ms。
通过Observable.interval(20, testScheduler).take(20),来设定Observable运行在虚拟时间机器上,在测试环境下,瞬间结束。
其中的testSchedulerTestScheduler的一个实例。
关于如何使用TestScheduler,后续文章中会提到。

总结

提到Observable的时候,就想想Function,一个有随意数量returnFunction

Happy coding!

参考

RxJS Source Code RxJS Manual