RxJS Schedulers - panchaow/blog GitHub Wiki

RxJS中有一个Scheduler的概念,定义如下:

An execution context and a data structure to order tasks and schedule their execution.

也就是说Scheduler的作用是对任务进行排序,并且安排他们的执行时间。其中,任务是用Action来描述的。创建Scheduler时需要传入一个schedulerActionCtor,它将被用来创建Action。Scheduler提供的最重要的接口就是schedule。通过调用这个方法,我们将需要执行的工作告诉Scheduler,Scheduler就会为此创建一个Action。至于这个Action将在什么时候被执行,其实是Action自己控制的(IoC):

class Scheduler {
  public schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay: number = 0, state?: T): Subscription {
    return new this.schedulerActionCtor<T>(this, work).schedule(state, delay);
  }
}

我们来看一个具体的Scheduler —— async scheduler。

async scheduler schedules tasks asynchronously, by putting them on the JavaScript event loop queue. It is best used to delay tasks in time or to schedule tasks repeating in intervals.

使用案例:

import { queueScheduler } from 'rxjs';
 
queueScheduler.schedule(function(state) {
  if (state !== 0) {
    console.log('before', state);
    this.schedule(state - 1); // `this` references currently executing Action,
                              // which we reschedule with new state
    console.log('after', state);
  }
}, 0, 3);

我们来分析以下上面的代码是如何执行的。在调用asyncScheduler.schedule之后,scheduler创建了一个async action,然后调用了该action的schedule方法。action的schedule的代码如下:

class {
  public schedule(state?: T, delay: number = 0): Subscription {
    if (this.closed) {
      return this;
    }

    // Always replace the current state with the new state.
    this.state = state;

    const id = this.id;
    const scheduler = this.scheduler;

    //
    // Important implementation note:
    //
    // Actions only execute once by default, unless rescheduled from within the
    // scheduled callback. This allows us to implement single and repeat
    // actions via the same code path, without adding API surface area, as well
    // as mimic traditional recursion but across asynchronous boundaries.
    //
    // However, JS runtimes and timers distinguish between intervals achieved by
    // serial `setTimeout` calls vs. a single `setInterval` call. An interval of
    // serial `setTimeout` calls can be individually delayed, which delays
    // scheduling the next `setTimeout`, and so on. `setInterval` attempts to
    // guarantee the interval callback will be invoked more precisely to the
    // interval period, regardless of load.
    //
    // Therefore, we use `setInterval` to schedule single and repeat actions.
    // If the action reschedules itself with the same delay, the interval is not
    // canceled. If the action doesn't reschedule, or reschedules with a
    // different delay, the interval will be canceled after scheduled callback
    // execution.
    //
    if (id != null) {
      this.id = this.recycleAsyncId(scheduler, id, delay);
    }

    // Set the pending flag indicating that this action has been scheduled, or
    // has recursively rescheduled itself.
    this.pending = true;

    this.delay = delay;
    // If this action has already an async Id, don't request a new one.
    this.id = this.id || this.requestAsyncId(scheduler, this.id, delay);

    return this;
  }
}

第一次执行的时候,action.pending被设置为true,表示这个action已经scheduled,处于等待被执行的状态。requestAsyncId,顾名思义,是用来申请async id。代码如下:

class {
  protected requestAsyncId(scheduler: AsyncScheduler, _id?: any, delay: number = 0): any {
    return intervalProvider.setInterval(scheduler.flush.bind(scheduler, this), delay);
  }
}

可以看到,这段代码做的就是设置delayms后执行scheduler.flushscheduler.flush的作用有两个,如果当前已经有任务在执行,则让action排队等待;否则,以FIFO的顺序执行所有的action。代码如下:

class {
  public actions: Array<AsyncAction<any>> = [];
  /**
   * A flag to indicate whether the Scheduler is currently executing a batch of
   * queued actions.
   * @type {boolean}
   * @internal
   */
  public _active: boolean = false;
  /**
   * An internal ID used to track the latest asynchronous task such as those
   * coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and
   * others.
   * @type {any}
   * @internal
   */
  public _scheduled: any = undefined;

  public flush(action: AsyncAction<any>): void {
    const { actions } = this;

    if (this._active) {
      actions.push(action);
      return;
    }

    let error: any;
    this._active = true;

    do {
      if ((error = action.execute(action.state, action.delay))) {
        break;
      }
    } while ((action = actions.shift()!)); // exhaust the scheduler queue

    this._active = false;

    if (error) {
      while ((action = actions.shift()!)) {
        action.unsubscribe();
      }
      throw error;
    }
  }
}

接下来看action.executeaction.execute的作用是执行创建action时传入的work。代码如下:

class {
  public execute(state: T, delay: number): any {
    if (this.closed) {
      return new Error('executing a cancelled action');
    }

    this.pending = false;
    const error = this._execute(state, delay); // could reschedule this action
    if (error) {
      return error;
    } else if (this.pending === false && this.id != null) {
      // Dequeue if the action didn't reschedule itself. Don't call
      // unsubscribe(), because the action could reschedule later.
      // For example:
      // ```
      // scheduler.schedule(function doWork(counter) {
      //   /* ... I'm a busy worker bee ... */
      //   var originalAction = this;
      //   /* wait 100ms before rescheduling the action */
      //   setTimeout(function () {
      //     originalAction.schedule(counter + 1);
      //   }, 100);
      // }, 1000);
      // ```
      this.id = this.recycleAsyncId(this.scheduler, this.id, null);
    }
  }
}

注意action.pending被设置为了false,但是在执行完_execute之后又去判断了pending是否为false。这是因为_execute在执行传入的work时,action.schedule可能会被再次调用,使action被reschedule,又进入了等待执行的状态。如果action没有被reschedule,则收回async Id(也就是clearInterval)。注意,在action被reschedule的时候,recycleAsyncId也被调用了。这也就体现recycleAsyncId的第二个作用,尽可能的复用之前的async Id。

⚠️ **GitHub.com Fallback** ⚠️