RxJS Operators - panchaow/blog GitHub Wiki
Operators 是 RxJS 最重要最关键的概念。observalbe.pipe
接收若干个OperatorFunction
,创建一个新的 observable。pipe
就类似compose
,只不过compose
最后得到的是一个函数,而pipe
最后调用了得到的函数,并将自身作为参数传入,也就得到了一个新的 observable。
operators 可以通过两种方式创建:1.自己手动创建的类型为Observable a -> Observable b
的函数;2.通过内置函数,比如map
创建。在处理observable.subscribe
的时候,对于new Observable
和pipe
通过内置函数创建的 operator 所得到的 observable 的逻辑会有所不同。关键代码:
class {
subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
const { operator, source } = this; // this 是observable
subscriber.add(
// subscriber既是Observer又是Subscription
operator
? // We're dealing with a subscription in the
// operator chain to one of our lifted operators.
operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
// had intimate knowledge of our API, like our `Subject`, must have
// set it. We're going to just call `_subscribe` directly.
this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
// function, so we need to catch errors and handle them appropriately.
this._trySubscribe(subscriber)
)
return subscriber;
)}
}
通过new Observable
创建的 observable 是没有 opeartor 和 source 的,处理起来也相对比较简单,我们重点来看通过pipe
使用内置函数创建的 operator 创建的 observable 被 subscribe 时的处理。我们首先以内置函数map
为例,来看 operator 是如何创建 observable 的。map
的代码如下:
export function map<T, R>(
project: (value: T, index: number) => R,
thisArg?: any
): OperatorFunction<T, R> {
return operate((source, subscriber) => {
// The index of the value from the source. Used with projection.
let index = 0;
// Subscribe to the source, all errors and completions are sent along
// to the consumer.
source.subscribe(
new OperatorSubscriber(subscriber, (value: T) => {
// Call the projection function with the appropriate this context,
// and send the resulting value to the consumer.
subscriber.next(project.call(thisArg, value, index++));
})
);
});
}
关键在于这个operate
函数,它的作用是创建 operater。我们来看operate
的代码:
export function operate<T, R>(
init: (
liftedSource: Observable<T>,
subscriber: Subscriber<R>
) => (() => void) | void
): OperatorFunction<T, R> {
return (source: Observable<T>) => {
if (hasLift(source)) {
return source.lift(function (
this: Subscriber<R>,
liftedSource: Observable<T>
) {
try {
return init(liftedSource, this);
} catch (err) {
this.error(err);
}
});
}
throw new TypeError("Unable to lift unknown Observable type");
};
}
我们可以看到,创建新Observable
的作用被委托给了source.lift
完成。假设这里的source
是Observable
,我们来看observable.lift
的代码:
class {
lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
}
原来observable.lift
做的仅仅是创建一个新的 Observable,并且将其source
设置为调用的 observable,将其operator
设为传入的参数。这里传入的参数,并不是我们通过operate
创建的新 operator。这个名字很容易造成误解。从subscribe
中operator.call(subscriber, source)
可以看到这里设置的source
将被传入设置的operator
。这个observable.operator
又调用了传入operate
的init
函数,并且将subscriber
传入。最后终于,执行到了map
相关代码:
// The index of the value from the source. Used with projection.
let index = 0;
// Subscribe to the source, all errors and completions are sent along
// to the consumer.
source.subscribe(
new OperatorSubscriber(subscriber, (value: T) => {
// Call the projection function with the appropriate this context,
// and send the resulting value to the consumer.
subscriber.next(project.call(thisArg, value, index++));
})
);
代码也比较简单,只是将source
产生的值通过subscriber.next
输出。注意,这里的 subscriber 是由于 subscribe 通过 pipe 得到的 observable 时创建的 subscriber。