Observable - ShenYj/ShenYj.github.io GitHub Wiki

Observable

在 RxSwift 中 Observable 代表可观测序列

  • Observable 的创建 和 订阅

    // 1. 创建可观测序列
    let ob = Observable<Any>.create { (observer) -> Disposable in
        // 3. 发送信号
        observer.onNext("a message")
        // 3.1 发送完一条消息后就不需要它了,所以调用了 `onCompleted`,在此之后这个可观测序列将会被销毁
        observer.onCompleted()
        return Disposables.create()
    }
    
    // 2.订阅这个可观测序列
    let _ = ob.subscribe(onNext: { (text) in
        print("订阅到: \(text)")
    }, onError: { (error) in
        print("error: \(error)")
    }, onCompleted: {
        print("完成")
    }) {
        print("销毁")
    }
  • 序列的创建源码探索

  • 序列的订阅源码探索

序列的创建源码探索

  • create 方法源码

    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) 
    -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
    • 这里引申出 AnonymousObservable 私有的一个内部类,继承自 Producer

      final private class AnonymousObservable<Element>: Producer<Element>
    • Producer, 继承自 Observable

      class Producer<Element> : Observable<Element>
    • Observable 基类,遵循了 ObservableType 协议

      public class Observable<Element> : ObservableType
    • ObservableType 协议

      • 声明了一个 subscribe 函数
      • 和继承自 ObservableConvertibleType 协议而来的 asObservable() 函数的默认实现
      public protocol ObservableType : ObservableConvertibleType {
          func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
      }
      extension ObservableType {
          /// Default implementation of converting `ObservableType` to `Observable`.
          public func asObservable() -> Observable<E> {
              // temporary workaround
              //return Observable.create(subscribe: self.subscribe)
              return Observable.create { o in
                  return self.subscribe(o)
              }
          }
      }
    • ObservableConvertibleType 协议

      /// Type that can be converted to observable sequence (`Observable<E>`).
      public protocol ObservableConvertibleType {
          /// Type of elements in sequence.
          associatedtype E
      
          /// Converts `self` to `Observable` sequence.
          ///
          /// - returns: Observable sequence that represents `self`.
          func asObservable() -> Observable<E>
      }

    目前为止顺着继承链和协议的遵循关系引申出来的 classprotocol 都扒出来了

    根据当前已知信息,当我们创建一个可观测序列时

    • 内部创建了一个私有的匿名可观测序列 AnonymousObservable 实例,它的继承关系是:

      • AnonymousObservable -> Producer -> Observable
    • Observable

      Observable 完整源码定义
      public class Observable<Element> : ObservableType {
          /// Type of elements in sequence.
          public typealias E = Element
          
          init() {
      #if TRACE_RESOURCES
              _ = Resources.incrementTotal()
      #endif
          }
          
          public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
              rxAbstractMethod()
          }
          
          public func asObservable() -> Observable<E> {
              return self
          }
          
          deinit {
      #if TRACE_RESOURCES
              _ = Resources.decrementTotal()
      #endif
          }
      
          // this is kind of ugly I know :(
          // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
      
          /// Optimizations for map operator
          internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
              return _map(source: self, transform: transform)
          }
      }

      Observable 作为基类遵循了 ObservableType 协议,从而拥有了 subscribe 函数

      • 但是这个函数的具体实现应当交由子类处理,所以给了个默认实现,如果你子类没自己实现,就让你崩溃

        /// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
        func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
            rxFatalError("Abstract method", file: file, line: line)
        }
        func rxFatalError(_ lastMessage: @autoclosure () -> String, file: StaticString = #file, line: UInt = #line) -> Swift.Never  {
            fatalError(lastMessage(), file: file, line: line)
        }
    • Producer
      前面提到 Observable 作为基类遵循了 ObservableType 协议,拥有了 subscribe 函数,而该函数的真正实现由 Producer 这个子类来实现,所以 Producer 有两个作用

      1. 具体实现了 subscribe 函数
      2. 提供了 run 虚方法
      Producer 源码
      class Producer<Element> : Observable<Element> {
          override init() {
              super.init()
          }
      
          override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
              if !CurrentThreadScheduler.isScheduleRequired {
                  // The returned disposable needs to release all references once it was disposed.
                  let disposer = SinkDisposer()
                  let sinkAndSubscription = self.run(observer, cancel: disposer)
                  disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
      
                  return disposer
              }
              else {
                  return CurrentThreadScheduler.instance.schedule(()) { _ in
                      let disposer = SinkDisposer()
                      let sinkAndSubscription = self.run(observer, cancel: disposer)
                      disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
      
                      return disposer
                  }
              }
          }
      
          func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
              rxAbstractMethod()
          }
      }

      上学的时候学 C#,在 iOS 中不管是 OC 还是 Swift,感觉都有点轻概念的意思,在 C# 中有抽象方法和虚方法的概念
      可以简单的了解一下区别: 虚方法有方法体,抽象方法没有方法体
      RxSwift 的很多命名十分接近,RxSwift 5 之前版本中的一些简写对新人更加不友善,如果 Swift 能像 C# 那样有更多的概念,有专门的关键字比如 abstractvirtual 等,在接口设计时会起到醒目的作用,提高阅读效率,起码看到关键字就能醒目的注意到

      • 这里 RxSwift 在设计接口时为其命名为 rxAbstractMethod(), 但在我理解,虚方法来称呼它更贴切
    • AnonymousObservable

      • 源码

        final private class AnonymousObservable<Element>: Producer<Element> {
            typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
        
            let _subscribeHandler: SubscribeHandler
        
            init(_ subscribeHandler: @escaping SubscribeHandler) {
                self._subscribeHandler = subscribeHandler
            }
        
            override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
                let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
                let subscription = sink.run(self)
                return (sink: sink, subscription: subscription)
            }
        }
      • 回到可观测序列的创建,在我们创建一个可观测序列时,定义了一个闭包

        Observable<Any>.create { (observer) -> Disposable in
            // 3. 发送信号
            observer.onNext("a message")
            // 3.1 发送完一条消息后就不需要它了,所以调用了 `onCompleted`,再次之后这个可观测序列将会被销毁
            observer.onCompleted()
            return Disposables.create()
        }
      • 通过 create 函数源码得知,我们 create 的实际上是 AnonymousObservable

        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
      • 由此得知 AnonymousObservable 有两个作用

        • create 函数中定义的闭包,在实例化 AnonymousObservable 的时候,被 _subscribeHandler 保存了起来
        • 具体实现了 run 函数 (从父类 Producer 继承而来)

到底为止,可观测序列从 create 函数开始,引申出的继承链、关键职责基本了解了

序列的订阅源码探索

可观测序列的订阅

  • 点击 subscribe 这个订阅函数, 会跳转到 ObservableType+Extensions.swift 文件
    在探索系列创建时曾阅读过 ObservableType 协议的源码,里面声明了一个 subscribe 函数,但在最终订阅的时候,实际执行的是 ObservableType 的另一个 extension 函数, 这里支持四个阶段的回调闭包,分别是 onNextonErroronCompletedonDisposed

    ObservableType+Extensions.swift 中 subscribe 函数具体实现源码
    extension ObservableType {
        public func subscribe(onNext: ((E) -> Void)? = nil, 
                            onError: ((Swift.Error) -> Void)? = nil, 
                            onCompleted: (() -> Void)? = nil, 
                            onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)
                }
                else {
                    disposable = Disposables.create()
                }
                
                #if DEBUG
                    let synchronizationTracker = SynchronizationTracker()
                #endif
                
                let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
                
                let observer = AnonymousObserver<E> { event in
                    
                    #if DEBUG
                        synchronizationTracker.register(synchronizationErrorMessage: .default)
                        defer { synchronizationTracker.unregister() }
                    #endif
                    
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    }
    • 其中关键的部分 AnonymousObserver, 一个匿名观察者,继承自 ObserverBase

      final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
          typealias Element = ElementType
          
          typealias EventHandler = (Event<Element>) -> Void
          
          private let _eventHandler : EventHandler
          
          init(_ eventHandler: @escaping EventHandler) {
      #if TRACE_RESOURCES
              _ = Resources.incrementTotal()
      #endif
              self._eventHandler = eventHandler
          }
      
          override func onCore(_ event: Event<Element>) {
              return self._eventHandler(event)
          }
          
      #if TRACE_RESOURCES
          deinit {
              _ = Resources.decrementTotal()
          }
      #endif
      }
    • ObserverBase 基类,遵循 DisposableObserverType 这两个协议

      class ObserverBase<ElementType> : Disposable, ObserverType {
          typealias E = ElementType
      
          private let _isStopped = AtomicInt(0)
      
          func on(_ event: Event<E>) {
              switch event {
              case .next:
                  if load(self._isStopped) == 0 {
                      self.onCore(event)
                  }
              case .error, .completed:
                  if fetchOr(self._isStopped, 1) == 0 {
                      self.onCore(event)
                  }
              }
          }
      
          func onCore(_ event: Event<E>) {
              rxAbstractMethod()
          }
      
          func dispose() {
              fetchOr(self._isStopped, 1)
          }
      }
      • Disposable

        /// Represents a disposable resource.
        public protocol Disposable {
            /// Dispose resource.
            func dispose()
        }
      • ObserverType

        ObserverType 源码
        /// Supports push-style iteration over an observable sequence.
        public protocol ObserverType {
            /// The type of elements in sequence that observer can observe.
            associatedtype E
        
            /// Notify observer about sequence event.
            ///
            /// - parameter event: Event that occurred.
            func on(_ event: Event<E>)
        }
        
        /// Convenience API extensions to provide alternate next, error, completed events
        extension ObserverType {
            
            /// Convenience method equivalent to `on(.next(element: E))`
            ///
            /// - parameter element: Next element to send to observer(s)
            public func onNext(_ element: E) {
                self.on(.next(element))
            }
            
            /// Convenience method equivalent to `on(.completed)`
            public func onCompleted() {
                self.on(.completed)
            }
            
            /// Convenience method equivalent to `on(.error(Swift.Error))`
            /// - parameter error: Swift.Error to send to observer(s)
            public func onError(_ error: Swift.Error) {
                self.on(.error(error))
            }
        }

      通过源码得知,ObserverBase 基本上就一个 onCore 虚方法和一个 _isStopped 私有属性,其余是遵循协议而来

      对比序列的创建,思路一致,继承链还少了一层,在 AnonymousObserver 创建的时候还是定义了一个大的闭包 (Event<Element>) -> Void,被 _eventHandler 这个属性保存了起来

    • 再次回到 subscribe 函数, AnonymousObserver 创建完后最后到了返回值部分

      return Disposables.create(
                      self.asObservable().subscribe(observer),
                      disposable
                  )

      关键在于 self.asObservable().subscribe(observer) 这一行代码

      这个 subscribe 函数调用方正是前面创建的可观测序列,也就是 ob 调用了 subscribe 函数,并把刚刚创建的匿名观察者传递到 subscribe 函数内,前面分析过创建序列过程中继承链各自职责, subscribe 函数正是由 Producer 来负责实现的,所以在订阅函数返回前,流程执行到了 Producersubscribe 实现部分

      Producder 最终会调用 run 函数

      self.run(observer, cancel: disposer)

      run 由其子类负责具体实现, 当前环境下子类就是 AnonymousObservable

      RxSwift 之所有那么多的序列类型,正是结合不同场景下的需求,通过这种继承关系重写父类方法,产生不同效果来实现的

      override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
          let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
          let subscription = sink.run(self)
          return (sink: sink, subscription: subscription)
      }
      • 这里引申出 AnonymousObservableSink,继承自 Sink

        AnonymousObservableSink 源码
        final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
            typealias E = O.E
            typealias Parent = AnonymousObservable<E>
        
            // state
            private let _isStopped = AtomicInt(0)
        
            #if DEBUG
                fileprivate let _synchronizationTracker = SynchronizationTracker()
            #endif
        
            override init(observer: O, cancel: Cancelable) {
                super.init(observer: observer, cancel: cancel)
            }
        
            func on(_ event: Event<E>) {
                #if DEBUG
                    self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { self._synchronizationTracker.unregister() }
                #endif
                switch event {
                case .next:
                    if load(self._isStopped) == 1 {
                        return
                    }
                    self.forwardOn(event)
                case .error, .completed:
                    if fetchOr(self._isStopped, 1) == 0 {
                        self.forwardOn(event)
                        self.dispose()
                    }
                }
            }
        
            func run(_ parent: Parent) -> Disposable {
                return parent._subscribeHandler(AnyObserver(self))
            }
        }
      • 实例化 AnonymousObservableSink 的同时,内部保存了 subscribe 函数中创建的 AnonymousObserver 对象

      • 紧接着会调用 AnonymousObservableSinkrun 函数,并将自己 AnonymousObservable 传递过去

        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
        • parent -> AnonymousObservable

        • _subscribeHandler -> 就是创建可观测序列时定义的闭包

        • self -> AnonymousObservableSink

        • AnyObserver(self) 这里还引申出一个 AnyObserver

          public struct AnyObserver<Element> : ObserverType
          • AnyObserver 有两个构造函数,结合当前环境,这里使用的是第二个构造函数, 把 AnonymousObservableSinkon 函数保存了起来, 即 AnyObserverobserver属性 == AnonymousObservableSinkon 函数

            /// Construct an instance whose `on(event)` calls `eventHandler(event)`
            ///
            /// - parameter eventHandler: Event handler that observes sequences events.
            public init(eventHandler: @escaping EventHandler) {
                self.observer = eventHandler
            }
            
            /// Construct an instance whose `on(event)` calls `observer.on(event)`
            ///
            /// - parameter observer: Observer that receives sequence events.
            public init<O : ObserverType>(_ observer: O) where O.E == Element {
                self.observer = observer.on
            }

            AnonymousObservableSink 是一个遵循 ObserverType 协议的类,而第一个构造函数的参数是一个逃逸闭包

          AnonymousObservableSink 设计上是一个 final private class,并不打算对外开放,对其进行了一层包装

      • 执行 parent._subscribeHandler(AnyObserver(self)) 这行代码等同于调用了 创建可观测序列时定义的闭包
        就回到了可观测序列的创建的闭包中, AnyObserver(self) 对应的就是闭包中的 observer

        Observable<Any>.create { (observer) -> Disposable in
            // 3. 发送信号
            observer.onNext("a message")
            // 3.1 发送完一条消息后就不需要它了,所以调用了 `onCompleted`,再次之后这个可观测序列将会被销毁
            observer.onCompleted()
            return Disposables.create()
        }
      • 闭包内,我们调用了 onNextonCompleted 函数
        在当前环境中 observer 代表着 AnyObserver(self: AnonymousObservableSink)AnyObserver 也遵循 ObserverType
        同样拥有 onNextonCompletedonError 函数, ObserverType 这三个扩展方法核心是调用 on 函数

        • e.g. onNext

          public func onNext(_ element: E) {
              self.on(.next(element))
          }
      • AnyObserver 源码

        public struct AnyObserver<Element> : ObserverType {
            /// The type of elements in sequence that observer can observe.
            public typealias E = Element
            
            /// Anonymous event handler type.
            public typealias EventHandler = (Event<Element>) -> Void
        
            private let observer: EventHandler
        
            /// Construct an instance whose `on(event)` calls `eventHandler(event)`
            ///
            /// - parameter eventHandler: Event handler that observes sequences events.
            public init(eventHandler: @escaping EventHandler) {
                self.observer = eventHandler
            }
            
            /// Construct an instance whose `on(event)` calls `observer.on(event)`
            ///
            /// - parameter observer: Observer that receives sequence events.
            public init<O : ObserverType>(_ observer: O) where O.E == Element {
                self.observer = observer.on
            }
            
            /// Send `event` to this observer.
            ///
            /// - parameter event: Event instance.
            public func on(_ event: Event<Element>) {
                return self.observer(event)
            }
        
            /// Erases type of observer and returns canonical observer.
            ///
            /// - returns: type erased observer.
            public func asObserver() -> AnyObserver<E> {
                return self
            }
        }
        • 重点 on 函数

          /// Send `event` to this observer.
          ///
          /// - parameter event: Event instance.
          public func on(_ event: Event<Element>) {
              return self.observer(event)
          }
          • self -> AnyObserver

          • observer -> AnonymousObservableSink.on

            AnyObserverobserver属性 == AnonymousObservableSinkon 函数

          • 这样流程移交给 AnonymousObservableSinkon 函数

            func on(_ event: Event<E>) {
                #if DEBUG
                    self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { self._synchronizationTracker.unregister() }
                #endif
                switch event {
                case .next:
                    if load(self._isStopped) == 1 {
                        return
                    }
                    self.forwardOn(event)
                case .error, .completed:
                    if fetchOr(self._isStopped, 1) == 0 {
                        self.forwardOn(event)
                        self.dispose()
                    }
                }
            }
          • 现在以 onNext 为例,会进入 .next 分支,并最终调用 self.forwardOn(event)

          • forwardOn 函数由基类 Sink 实现,并且不允许子类继承重写

            final func forwardOn(_ event: Event<O.E>) {
                #if DEBUG
                    self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { self._synchronizationTracker.unregister() }
                #endif
                if isFlagSet(self._disposed, 1) {
                    return
                }
                self._observer.on(event)
            }
          • 最终调用 self._observer.on(event)

            • self -> AnonymousObservableSink
            • _observer -> 订阅可观测序列时内部创建的匿名可观察者 AnonymousObserver
              • AnonymousObserver 从创建 -> Producersubscribe函数 -> AnonymousObservablerun 函数 -> AnonymousObservableSink 层层传递最终保存起来
            • 最终在 Sink 中的forwardOn 函数内被使用
          • AnonymousObserveron 函数实现继承自 ObserverBase 父类,自己并没有专门的实现

            func on(_ event: Event<E>) {
                switch event {
                case .next:
                    if load(self._isStopped) == 0 {
                        self.onCore(event)
                    }
                case .error, .completed:
                    if fetchOr(self._isStopped, 1) == 0 {
                        self.onCore(event)
                    }
                }
            }
          • 紧接着会执行到 onCore 函数,由 AnonymousObserver 自己实现处理

            override func onCore(_ event: Event<Element>) {
                return self._eventHandler(event)
            }

            _eventHandler 正是创建这个匿名观察者时定义的闭包函数

          • 由此回到了订阅方法内,创建匿名观察者 AnonymousObserver 的地方

            let observer = AnonymousObserver<E> { event in
                            
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
          • 而比包内调用的 onNextonErroronCompleted 对应的是最开始我们订阅可观测序列时闭包的各个实现

            let _ = ob.subscribe(onNext: { (text) in
                print("订阅到:\(text)")
            }, onError: { (error) in
                print("error: \(error)")
            }, onCompleted: {
                print("完成")
            }) {
                print("销毁")
            }

文中摘取的还是 Rx5 的代码,在 f92f8b7 这次提交中将很多下划线的前缀去掉了,所以如果对照当下的 Rx 6 的代码又有少许变化,但大体流程不受影响

⚠️ **GitHub.com Fallback** ⚠️