RxJS中的Subject和redux - panchaow/blog GitHub Wiki

RxJS中的Subject是一种特殊类型的Observable。但是相比起Observable,Subject却更加平易近人。因为Subject工作起来就和普通的Pub-Sub模式一样,失去了Observable独有的神秘感。我又想到redux不也就是Pub-Sub模式吗,唯独增加了Action、Reducer等概念。那Subject + Action + Reducer,会是什么样呢?

在继续之前,我们先来更加深入的了解以下reducer这个概念。其实,这个概念可以说是从函数式编程借用来的,甚至这个名字也是来自于函数式编程的reduce。因此将这个概念加入Subject可以说是非常自然而然的,甚至RxJS本身就提供了reduce这个operator。于是,很自然地,我们可以写出:

const createStore = <S, A extends Action>(
  reducer: Reducer<S, A>,
  preloadState?: S
) => {
  const action$ = new Subject<A>();

  const state$ = action$.pipe(scan(reducer, preloadState)) as Observable<S>; // 我们用的scan而不是reduce,他们虽然功能类似,但是reduce只会产生一个最终的值

  const dispatch = (action: A) => {
    action$.next(action)
  }

  dispatch({
    type: ActionTypes.INIT
  } as A)

  return {
    subscribe: state$.subscribe.bind(state$),
    dispatch,
  };
};

因为redux禁止在reducer中调用subscribeunsubscribe(理论上,reducer应该是纯函数),我们对代码进行一些调整:

const createStore = <S, A extends Action>(
  reducer: Reducer<S, A>,
  preloadState?: S
) => {
  let isDispatching: boolean = false;

  const action$ = new Subject<A>();

  const state$ = action$.pipe(scan(reducer, preloadState)) as Observable<S>;

  const subscribe = (next: (state: S) => void): (() => void) => {
    if (isDispatching) {
      throw new Error(
        "You may not call store.subscribe() while the reducer is executing. "
      );
    }

    const subscription = state$.subscribe(next);

    return function unsubscribe() {
      if (subscription.closed) {
        return;
      }
      if (isDispatching) {
        throw new Error(
          "You may not unsubscribe from a store listener while the reducer is executing. "
        );
      }
      subscription.unsubscribe();
    };
  };

  const dispatch = (action: A) => {
    try {
      isDispatching = true;
      action$.next(action);
    } finally {
      isDispatching = false;
    }
  };

  dispatch({
    type: ActionTypes.INIT,
  } as A);

  return {
    subscribe,
    dispatch,
  };
};

另外,在next函数中如果调用了subscribe或者unsubscribe,哪些subscribed next函数会被调用也是一个需要关注的地方。redux规定只有dispatch调用之前,已经subscribed的next函数会被调用,也就是说next函数中的subscribe/unsubscribe调用不会影响当前正在处理的action。不过幸好,RxJS的Subject也做了同样的处理,因此关于这个我们不需要做任何处理。

可以发现,这里没有实现getState方法。因为我们修改了subscribe,它接收的参数不再是一个listener,而是observer —— 相比listener,observer多接收了一个state。当然我们可以选择提供和redux提供完全一致的API,这样做还有一个好处 —— 可以兼容redux的所有middlewares和store enhancers。

redux提供两种扩展能力——Store Enhancers和middlewares。Store Enhancers可以增强createStore,让创建出来的store拥有更加强大的能力。 而applyMiddlewares其实就是创建了一个将传入的middlewares应用到dispatch方法的store enhancer,也就是说middleware是用来增强dispatch的。 比如redux-thunk就是一个给dispatch赋予接收函数作为参数的能力的middleware。

相关代码如下:

let currentState: S;

state$.subscribe(state => {
  currentState = state;
})

function getState(): S {
  if (isDispatching) {
    throw new Error(
      'You may not call store.getState() while the reducer is executing. ' +
      'The reducer has already received the state as an argument. ' +
      'Pass it down from the top reducer instead of reading it from the store.'
    )
  }

  return currentState as S;
};

到这里,redux最主要的功能其实已经实现了。

⚠️ **GitHub.com Fallback** ⚠️