400、RXSwift - BMWB/Fastlane_Swift GitHub Wiki

概念

ReactiveX(简写: Rx) 是一个可以帮助我们简化异步编程的框架。

RxSwift 是 Rx 的 Swift 版本。

它尝试将原有的一些概念移植到 iOS/macOS 平台。

你可以在这里找到跨平台文档 ReactiveX.io。

KVO,异步操作 和 流 全部被统一成抽象序列。这就是为什么 Rx 会如此简单,优雅和强大。

CocoaPods

# Podfile
use_frameworks!

target 'YOUR_TARGET_NAME' do
    pod 'RxSwift', '6.5.0'
    pod 'RxCocoa', '6.5.0'
end

# RxTests 和 RxBlocking 将在单元/集成测试中起到重要作用
target 'YOUR_TESTING_TARGET' do
    pod 'RxBlocking', '6.5.0'
    pod 'RxTest', '6.5.0'
end

为什么要使用 RxSwift ?

我们先看一下 RxSwift 能够帮助我们做些什么:

Target Action

传统实现方法:

button.addTarget(self, action: #selector(buttonTapped), for: .touchUpInside)

通过 Rx 来实现:

button.rx.tap
    .subscribe(onNext: {
        print("button Tapped")
    })
    .disposed(by: disposeBag)

你不需要使用 Target Action,这样使得代码逻辑清晰可见。

代理

传统实现方法:

class ViewController: UIViewController {
    ...
    override func viewDidLoad() {
        super.viewDidLoad()
        scrollView.delegate = self
    }
}

extension ViewController: UIScrollViewDelegate {
    func scrollViewDidScroll(_ scrollView: UIScrollView) {
        print("contentOffset: \(scrollView.contentOffset)")
    }
}

通过 Rx 来实现:

class ViewController: UIViewController {
    ...
    override func viewDidLoad() {
        super.viewDidLoad()

        scrollView.rx.contentOffset
            .subscribe(onNext: { contentOffset in
                print("contentOffset: \(contentOffset)")
            })
            .disposed(by: disposeBag)
    }
}

你不需要书写代理的配置代码,就能获得想要的结果。

闭包回调

传统实现方法:

URLSession.shared.dataTask(with: URLRequest(url: url)) {
    (data, response, error) in
    guard error == nil else {
        print("Data Task Error: \(error!)")
        return
    }

    guard let data = data else {
        print("Data Task Error: unknown")
        return
    }

    print("Data Task Success with count: \(data.count)")
}.resume()

通过 Rx 来实现:

URLSession.shared.rx.data(request: URLRequest(url: url))
    .subscribe(onNext: { data in
        print("Data Task Success with count: \(data.count)")
    }, onError: { error in
        print("Data Task Error: \(error)")
    })
    .disposed(by: disposeBag)

回调也变得十分简单

通知

传统实现方法:

var ntfObserver: NSObjectProtocol!

override func viewDidLoad() {
    super.viewDidLoad()

    ntfObserver = NotificationCenter.default.addObserver(
          forName: .UIApplicationWillEnterForeground,
          object: nil, queue: nil) { (notification) in
        print("Application Will Enter Foreground")
    }
}

deinit {
    NotificationCenter.default.removeObserver(ntfObserver)
}

通过 Rx 来实现:

override func viewDidLoad() {
    super.viewDidLoad()

    NotificationCenter.default.rx
        .notification(.UIApplicationWillEnterForeground)
        .subscribe(onNext: { (notification) in
            print("Application Will Enter Foreground")
        })
        .disposed(by: disposeBag)
}

你不需要去管理观察者的生命周期,这样你就有更多精力去关注业务逻辑。

多个任务之间有依赖关系

例如,先通过用户名密码取得 Token 然后通过 Token 取得用户信息,

传统实现方法:

/// 用回调的方式封装接口
enum API {

    /// 通过用户名密码取得一个 token
    static func token(username: String, password: String,
        success: (String) -> Void,
        failure: (Error) -> Void) { ... }

    /// 通过 token 取得用户信息
    static func userinfo(token: String,
        success: (UserInfo) -> Void,
        failure: (Error) -> Void) { ... }
}
/// 通过用户名和密码获取用户信息
API.token(username: "beeth0ven", password: "987654321",
    success: { token in
        API.userInfo(token: token,
            success: { userInfo in
                print("获取用户信息成功: \(userInfo)")
            },
            failure: { error in
                print("获取用户信息失败: \(error)")
        })
    },
    failure: { error in
        print("获取用户信息失败: \(error)")
})

通过 Rx 来实现:

/// 用 Rx 封装接口
enum API {

    /// 通过用户名密码取得一个 token
    static func token(username: String, password: String) -> Observable<String> { ... }

    /// 通过 token 取得用户信息
    static func userInfo(token: String) -> Observable<UserInfo> { ... }
}
/// 通过用户名和密码获取用户信息
API.token(username: "beeth0ven", password: "987654321")
    .flatMapLatest(API.userInfo)
    .subscribe(onNext: { userInfo in
        print("获取用户信息成功: \(userInfo)")
    }, onError: { error in
        print("获取用户信息失败: \(error)")
    })
    .disposed(by: disposeBag)

这样你可以避免回调地狱,从而使得代码易读,易维护。

等待多个并发任务完成后处理结果

例如,需要将两个网络请求合并成一个,

通过 Rx 来实现:

/// 用 Rx 封装接口
enum API {

    /// 取得老师的详细信息
    static func teacher(teacherId: Int) -> Observable<Teacher> { ... }

    /// 取得老师的评论
    static func teacherComments(teacherId: Int) -> Observable<[Comment]> { ... }
}
/// 同时取得老师信息和老师评论
Observable.zip(
      API.teacher(teacherId: teacherId),
      API.teacherComments(teacherId: teacherId)
    ).subscribe(onNext: { (teacher, comments) in
        print("获取老师信息成功: \(teacher)")
        print("获取老师评论成功: \(comments.count) 条")
    }, onError: { error in
        print("获取老师信息或评论失败: \(error)")
    })
    .disposed(by: disposeBag)

这样你可用寥寥几行代码来完成相当复杂的异步操作。

那么为什么要使用 RxSwift ?

  • 复合 - Rx 就是复合的代名词
  • 复用 - 因为它易复合
  • 清晰 - 因为声明都是不可变更的
  • 易用 - 因为它抽象了异步编程,使我们统一了代码风格
  • 稳定 - 因为 Rx 是完全通过单元测试的

RxSwift 核心

这一章主要介绍 RxSwift 的核心内容:

RxSwiftCore

  • Observable - 产生事件
  • Observer - 响应事件
  • Operator - 创建变化组合事件
  • Disposable - 管理绑定(订阅)的生命周期
  • Schedulers - 线程队列调配
// Observable<String>
let text = usernameOutlet.rx.text.orEmpty.asObservable()

// Observable<Bool>
let passwordValid = text
    // Operator
    .map { $0.count >= minimalUsernameLength }

// Observer<Bool>
let observer = passwordValidOutlet.rx.isHidden

// Disposable
let disposable = passwordValid
    // Scheduler 用于控制任务在那个线程队列运行
    .subscribeOn(MainScheduler.instance)
    .observeOn(MainScheduler.instance)
    .bind(to: observer)


...

// 取消绑定,你可以在退出页面时取消绑定
disposable.dispose()

Observable - 可监听序列

Obervable

所有的事物都是序列

之前我们提到,Observable 可以用于描述元素异步产生的序列。这样我们生活中许多事物都可以通过它来表示,例如:

  • Observable 温度

    你可以将温度看作是一个序列,然后监测这个温度值,最后对这个值做出响应。例如:当室温高于 33 度时,打开空调降温。

    Temperature

  • Observable 《海贼王》动漫

    你也可以把《海贼王》的动漫看作是一个序列。然后当《海贼王》更新一集时,我们就立即观看这一集。

    OnePiece

  • Observable JSON

    你可以把网络请求的返回的 JSON 看作是一个序列。然后当取到 JSON 时,将它打印出来。

    JSON

  • Observable 任务回调

    你可以把任务回调看作是一个序列。当任务结束后,提示用户任务已完成。

    Callback

如何创建序列

现在我们已经可以把生活中的许多事物看作是一个序列了。那么我们要怎么创建这些序列呢?

实际上,框架已经帮我们创建好了许多常用的序列。例如:button的点击,textField的当前文本,switch的开关状态,slider的当前数值等等。

另外,有一些自定义的序列是需要我们自己创建的。这里介绍一下创建序列最基本的方法,例如,我们创建一个 [0, 1, ... 8, 9] 的序列:

Obervable (1)

let numbers: Observable<Int> = Observable.create { observer -> Disposable in

    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onNext(4)
    observer.onNext(5)
    observer.onNext(6)
    observer.onNext(7)
    observer.onNext(8)
    observer.onNext(9)
    observer.onCompleted()

    return Disposables.create()
}

创建序列最直接的方法就是调用 Observable.create,然后在构建函数里面描述元素的产生过程。 observer.onNext(0) 就代表产生了一个元素,他的值是 0。后面又产生了 9 个元素分别是 1, 2, ... 8, 9 。最后,用 observer.onCompleted() 表示元素已经全部产生,没有更多元素了。

你可以用这种方式来封装功能组件,例如,闭包回调:

JSON (1)

typealias JSON = Any

let json: Observable<JSON> = Observable.create { (observer) -> Disposable in

    let task = URLSession.shared.dataTask(with: ...) { data, _, error in

        guard error == nil else {
            observer.onError(error!)
            return
        }

        guard let data = data,
            let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves)
            else {
            observer.onError(DataError.cantParseJSON)
            return
        }

        observer.onNext(jsonObject)
        observer.onCompleted()
    }

    task.resume()

    return Disposables.create { task.cancel() }
}

在闭包回调中,如果任务失败,就调用 observer.onError(error!)。如果获取到目标元素,就调用 observer.onNext(jsonObject)。由于我们的这个序列只有一个元素,所以在成功获取到元素后,就直接调用 observer.onCompleted() 来表示任务结束。最后 Disposables.create { task.cancel() } 说明如果数据绑定被清除(订阅被取消)的话,就取消网络请求。

这样一来我们就将传统的闭包回调转换成序列了。然后可以用 subscribe 方法来响应这个请求的结果:

json
    .subscribe(onNext: { json in
        print("取得 json 成功: \(json)")
    }, onError: { error in
        print("取得 json 失败 Error: \(error.localizedDescription)")
    }, onCompleted: {
        print("取得 json 任务成功完成")
    })
    .disposed(by: disposeBag)

这里subscribe后面的onNext,onError, onCompleted 分别响应我们创建 json 时,构建函数里面的onNext,onError, onCompleted 事件。我们称这些事件为 Event:

Event - 事件

public enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
  • next - 序列产生了一个新的元素
  • error - 创建序列时产生了一个错误,导致序列终止
  • completed - 序列的所有元素都已经成功产生,整个序列已经完成

你可以合理的利用这些 Event 来实现业务逻辑。

决策树

现在我们知道如何用最基本的方法创建序列。你还可参考 决策树 来选择其他的方式创建序列。

特征序列

我们都知道 Swift 是一个强类型语言,而强类型语言相对于弱类型语言的一个优点是更加严谨。我们可以通过类型来判断出,实例有哪些特征。同样的在 RxSwift 里面 Observable 也存在一些特征序列,这些特征序列可以帮助我们更准确的描述序列。并且它们还可以给我们提供语法糖,让我们能够用更加优雅的方式书写代码,他们分别是:

  • Single
  • Completable
  • Maybe
  • Driver
  • Signal
  • ControlEvent

Single

Single 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。

一个比较常见的例子就是执行 HTTP 请求,然后返回一个应答或错误。不过你也可以用 Single 来描述任何只有一个元素的序列。

如何创建 Single

创建 Single 和创建 Observable 非常相似:

func getRepo(_ repo: String) -> Single<[String: Any]> {

    return Single<[String: Any]>.create { single in
        let url = URL(string: "https://api.github.com/repos/\(repo)")!
        let task = URLSession.shared.dataTask(with: url) {
            data, _, error in

            if let error = error {
                single(.error(error))
                return
            }

            guard let data = data,
                  let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
                  let result = json as? [String: Any] else {
                single(.error(DataError.cantParseJSON))
                return
            }

            single(.success(result))
        }

        task.resume()

        return Disposables.create { task.cancel() }
    }
}

之后,你可以这样使用 Single:

getRepo("ReactiveX/RxSwift")
    .subscribe(onSuccess: { json in
        print("JSON: ", json)
    }, onError: { error in
        print("Error: ", error)
    })
    .disposed(by: disposeBag)

订阅提供一个 SingleEvent 的枚举:

public enum SingleEvent<Element> {
    case success(Element)
    case error(Swift.Error)
}
  • success - 产生一个单独的元素
  • error - 产生一个错误

你同样可以对 Observable 调用 .asSingle() 方法,将它转换为 Single。

Completable

Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。

  • 发出零个元素
  • 发出一个 completed 事件或者一个 error 事件
  • 不会共享附加作用

Completable 适用于那种你只关心任务是否完成,而不需要在意任务返回值的情况。它和 Observable 有点相似。

如何创建 Completable

创建 Completable 和创建 Observable 非常相似:

func cacheLocally() -> Completable {
    return Completable.create { completable in
       // Store some data locally
       ...
       ...

       guard success else {
           completable(.error(CacheError.failedCaching))
           return Disposables.create {}
       }

       completable(.completed)
       return Disposables.create {}
    }
}

之后,你可以这样使用 Completable:

cacheLocally()
    .subscribe(onCompleted: {
        print("Completed with no error")
    }, onError: { error in
        print("Completed with an error: \(error.localizedDescription)")
     })
    .disposed(by: disposeBag)

订阅提供一个 CompletableEvent 的枚举:

public enum CompletableEvent {
    case error(Swift.Error)
    case completed
}
  • completed - 产生完成事件
  • error - 产生一个错误

Maybe

Maybe 是 Observable 的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。

  • 发出一个元素或者一个 completed 事件或者一个 error 事件
  • 不会共享附加作用

如果你遇到那种可能需要发出一个元素,又可能不需要发出时,就可以使用 Maybe。

如何创建 Maybe

创建 Maybe 和创建 Observable 非常相似:

func generateString() -> Maybe<String> {
    return Maybe<String>.create { maybe in
        maybe(.success("RxSwift"))

        // OR

        maybe(.completed)

        // OR

        maybe(.error(error))

        return Disposables.create {}
    }
}

之后,你可以这样使用 Maybe:

generateString()
    .subscribe(onSuccess: { element in
        print("Completed with element \(element)")
    }, onError: { error in
        print("Completed with an error \(error.localizedDescription)")
    }, onCompleted: {
        print("Completed with no element")
    })
    .disposed(by: disposeBag)

你同样可以对 Observable 调用 .asMaybe() 方法,将它转换为 Maybe。

Driver

Driver(司机?) 是一个精心准备的特征序列。它主要是为了简化 UI 层的代码。不过如果你遇到的序列具有以下特征,你也可以使用它:

  • 不会产生 error 事件
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享附加作用

这些都是驱动 UI 的序列所具有的特征。

为什么要使用 Driver ?

我们举个例子来说明一下,为什么要使用 Driver。

这是文档简介页的例子:

let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
    }

results
    .map { "\($0.count)" }
    .bind(to: resultCount.rx.text)
    .disposed(by: disposeBag)

results
    .bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) {
      (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

这段代码的主要目的是:

  • 取出用户输入稳定后的内容
  • 向服务器请求一组结果
  • 将返回的结果绑定到两个 UI 元素上:tableView 和 显示结果数量的label

那么这里存在什么问题?

  • 如果 fetchAutoCompleteItems 的序列产生了一个错误(网络请求失败),这个错误将取消所有绑定,当用户输入一个新的关键字时,是无法发起新的网络请求。
  • 如果 fetchAutoCompleteItems 在后台返回序列,那么刷新页面也会在后台进行,这样就会出现异常崩溃。
  • 返回的结果被绑定到两个 UI 元素上。那就意味着,每次用户输入一个新的关键字时,就会分别为两个 UI 元素发起 HTTP 请求,这并不是我们想要的结果。

一个更好的方案是这样的:

let results = query.rx.text
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .observeOn(MainScheduler.instance)  // 结果在主线程返回
            .catchErrorJustReturn([])           // 错误被处理了,这样至少不会终止整个序列
    }
    .share(replay: 1)                             // HTTP 请求是被共享的

results
    .map { "\($0.count)" }
    .bind(to: resultCount.rx.text)
    .disposed(by: disposeBag)

results
    .bind(to: resultsTableView.rx.items(cellIdentifier: "Cell")) {
      (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

在一个大型系统内,要确保每一步不被遗漏是一件不太容易的事情。所以更好的选择是合理运用编译器和特征序列来确保这些必备条件都已经满足。

以下是使用 Driver 优化后的代码:

let results = query.rx.text.asDriver()        // 将普通序列转换为 Driver
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .asDriver(onErrorJustReturn: [])  // 仅仅提供发生错误时的备选返回值
    }

results
    .map { "\($0.count)" }
    .drive(resultCount.rx.text)               // 这里改用 `drive` 而不是 `bindTo`
    .disposed(by: disposeBag)                 // 这样可以确保必备条件都已经满足了

results
    .drive(resultsTableView.rx.items(cellIdentifier: "Cell")) {
      (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

首先第一个 asDriver 方法将 ControlProperty 转换为 Driver

然后第二个变化是:

.asDriver(onErrorJustReturn: [])

任何可监听序列都可以被转换为 Driver,只要他满足 3 个条件:

  • 不会产生 error 事件
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享附加作用

那么要如何确定条件都被满足?通过 Rx 操作符来进行转换。asDriver(onErrorJustReturn: []) 相当于以下代码:

let safeSequence = xs
  .observeOn(MainScheduler.instance)       // 主线程监听
  .catchErrorJustReturn(onErrorJustReturn) // 无法产生错误
  .share(replay: 1, scope: .whileConnected)// 共享附加作用
return Driver(raw: safeSequence)           // 封装

最后使用 drive 而不是 bindTo

drive 方法只能被 Driver 调用。这意味着,如果你发现代码所存在 drive,那么这个序列不会产生错误事件并且一定在主线程监听。这样你可以安全的绑定 UI 元素。

Signal

Signal 和 Driver 相似,唯一的区别是,Driver 会对新观察者回放(重新发送)上一个元素,而 Signal 不会对新观察者回放上一个元素。

他有如下特性:

  • 不会产生 error 事件
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享附加作用

现在,我们来看看以下代码是否合理:

let textField: UITextField = ...
let nameLabel: UILabel = ...
let nameSizeLabel: UILabel = ...

let state: Driver<String?> = textField.rx.text.asDriver()

let observer = nameLabel.rx.text
state.drive(observer)

// ... 假设以下代码是在用户输入姓名后运行

let newObserver = nameSizeLabel.rx.text
state.map { $0?.count.description }.drive(newObserver)

这个例子只是将用户输入的姓名绑定到对应的标签上。当用户输入姓名后,我们创建了一个新的观察者,用于订阅姓名的字数。那么问题来了,订阅时,展示字数的标签会立即更新吗?

嗯、、、 因为 Driver 会对新观察者回放上一个元素(当前姓名),所以这里是会更新的。在对他进行订阅时,标签的默认文本会被刷新。这是合理的。

那如果我们用 Driver 来描述点击事件呢,这样合理吗?

let button: UIButton = ...
let showAlert: (String) -> Void = ...

let event: Driver<Void> = button.rx.tap.asDriver()

let observer: () -> Void = { showAlert("弹出提示框1") }
event.drive(onNext: observer)

// ... 假设以下代码是在用户点击 button 后运行

let newObserver: () -> Void = { showAlert("弹出提示框2") }
event.drive(onNext: newObserver)

当用户点击一个按钮后,我们创建一个新的观察者,来响应点击事件。此时会发生什么?Driver 会把上一次的点击事件回放给新观察者。所以,这里的 newObserver 在订阅时,就会接受到上次的点击事件,然后弹出提示框。这似乎不太合理。

因此像这类型的事件序列,用 Driver 建模就不合适。于是我们就引入了 Signal:

...

let event: Signal<Void> = button.rx.tap.asSignal()

let observer: () -> Void = { showAlert("弹出提示框1") }
event.emit(onNext: observer)

// ... 假设以下代码是在用户点击 button 后运行

let newObserver: () -> Void = { showAlert("弹出提示框2") }
event.emit(onNext: newObserver)

在同样的场景中,Signal 不会把上一次的点击事件回放给新观察者,而只会将订阅后产生的点击事件,发布给新观察者。这正是我们所需要的。

结论 一般情况下状态序列我们会选用 Driver 这个类型,事件序列我们会选用 Signal 这个类型。

ControlEvent

ControlEvent 专门用于描述 UI 控件所产生的事件,它具有以下特征:

  • 不会产生 error 事件
  • 一定在 MainScheduler 订阅(主线程订阅)
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享附加作用

Observer - 观察者

Observer

观察者 是用来监听事件,然后它需要这个事件做出响应。例如:弹出提示框就是观察者,它对点击按钮这个事件做出响应。

响应事件的都是观察者

在 Observable 章节,我们举了个几个例子来介绍什么是可监听序列。那么我们还是用这几个例子来解释一下什么是观察者:

  • 当室温高于 33 度时,打开空调降温

Temperature (1)

打开空调降温就是观察者 Observer。

  • 当《海贼王》更新一集时,我们就立即观看这一集

OnePiece (1)

观看这一集就是观察者 Observer。

  • 当取到 JSON 时,将它打印出来

JSON

将它打印出来就是观察者 Observer

  • 当任务结束后,提示用户任务已完成

Callback

提示用户任务已完成就是观察者 Observer

如何创建观察者

现在我们已经知道观察者主要是做什么的了。那么我们要怎么创建它们呢?

和 Observable 一样,框架已经帮我们创建好了许多常用的观察者。例如:view 是否隐藏,button 是否可点击, label 的当前文本,imageView 的当前图片等等。

另外,有一些自定义的观察者是需要我们自己创建的。这里介绍一下创建观察者最基本的方法,例如,我们创建一个弹出提示框的的观察者:

Observer

tap.subscribe(onNext: { [weak self] in
    self?.showAlert()
}, onError: { error in
    print("发生错误: \(error.localizedDescription)")
}, onCompleted: {
    print("任务完成")
})

创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述,事件发生时,需要如何做出响应。而观察者就是由后面的 onNext,onError,onCompleted的这些闭包构建出来的。

以上是创建观察者最常见的方法。当然你还可以通过其他的方式来创建观察者,可以参考一下 AnyObserver 和 Binder。

特征观察者

和 Observable 一样,观察者也存特征观察者,例如:

  • Binder

AnyObserver

AnyObserver 可以用来描叙任意一种观察者。

例如:

打印网络请求结果:

URLSession.shared.rx.data(request: URLRequest(url: url))
    .subscribe(onNext: { data in
        print("Data Task Success with count: \(data.count)")
    }, onError: { error in
        print("Data Task Error: \(error)")
    })
    .disposed(by: disposeBag)

可以看作是:

let observer: AnyObserver<Data> = AnyObserver { (event) in
    switch event {
    case .next(let data):
        print("Data Task Success with count: \(data.count)")
    case .error(let error):
        print("Data Task Error: \(error)")
    default:
        break
    }
}

URLSession.shared.rx.data(request: URLRequest(url: url))
    .subscribe(observer)
    .disposed(by: disposeBag)

用户名提示语是否隐藏:

usernameValid
    .bind(to: usernameValidOutlet.rx.isHidden)
    .disposed(by: disposeBag)

可以看作是:

let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
    switch event {
    case .next(let isHidden):
        self?.usernameValidOutlet.isHidden = isHidden
    default:
        break
    }
}

usernameValid
    .bind(to: observer)
    .disposed(by: disposeBag)

Binder

Binder 主要有以下两个特征:

  • 不会处理错误事件
  • 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)

一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。

示例

在介绍 AnyObserver 时,我们举了这样一个例子:

let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
    switch event {
    case .next(let isHidden):
        self?.usernameValidOutlet.isHidden = isHidden
    default:
        break
    }
}

usernameValid
    .bind(to: observer)
    .disposed(by: disposeBag)

由于这个观察者是一个 UI 观察者,所以它在响应事件时,只会处理 next 事件,并且更新 UI 的操作需要在主线程上执行。

因此一个更好的方案就是使用 Binder:

let observer: Binder<Bool> = Binder(usernameValidOutlet) { (view, isHidden) in
    view.isHidden = isHidden
}

usernameValid
    .bind(to: observer)
    .disposed(by: disposeBag)

Binder 可以只处理 next 事件,并且保证响应 next 事件的代码一定会在给定 Scheduler 上执行,这里采用默认的 MainScheduler。

复用

由于页面是否隐藏是一个常用的观察者,所以应该让所有的 UIView 都提供这种观察者:

extension Reactive where Base: UIView {
  public var isHidden: Binder<Bool> {
      return Binder(self.base) { view, hidden in
          view.isHidden = hidden
      }
  }
}
usernameValid
    .bind(to: usernameValidOutlet.rx.isHidden)
    .disposed(by: disposeBag)

这样你不必为每个 UI 控件单独创建该观察者。这就是 usernameValidOutlet.rx.isHidden 的由来,许多 UI 观察者 都是这样创建的:

  • 按钮是否可点击 button.rx.isEnabled:
extension Reactive where Base: UIControl {
  public var isEnabled: Binder<Bool> {
      return Binder(self.base) { control, value in
          control.isEnabled = value
      }
  }
}
  • label 的当前文本 label.rx.text:
extension Reactive where Base: UILabel {
  public var text: Binder<String?> {
      return Binder(self.base) { label, text in
          label.text = text
      }
  }
}

你也可以用这种方式来创建自定义的 UI 观察者。

Observable & Observer 既是可监听序列也是观察者

ObservableAndObserver

在我们所遇到的事物中,有一部分非常特别。它们既是可监听序列也是观察者。

例如:textField的当前文本。它可以看成是由用户输入,而产生的一个文本序列。也可以是由外部文本序列,来控制当前显示内容的观察者:

// 作为可监听序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })
// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)

有许多 UI 控件都存在这种特性,例如:switch的开关状态,segmentedControl的选中索引号,datePicker的选中日期等等。

AsyncSubject
AsyncSubject

AsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。

AsyncSubject1

它会对随后的观察者发出最终元素。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

演示

let disposeBag = DisposeBag()
let subject = AsyncSubject<String>()

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")
subject.onNext("🐹")
subject.onCompleted()

输出结果:

Subscription: 1 Event: next(🐹)
Subscription: 1 Event: completed
PublishSubject
PublishSubject

PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用 Observable 的 create 方法来创建 Observable,或者使用 ReplaySubject。

PublishSubject1

如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

演示

let disposeBag = DisposeBag()
let subject = PublishSubject<String>()

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

输出结果:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
ReplaySubject
ReplaySubject

ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。

这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。

如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。

演示

let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

输出结果:

Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
BehaviorSubject
BehaviorSubject

当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。

BehaviorSubject1

如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来。

演示

let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")

subject
  .subscribe { print("Subscription: 1 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🐶")
subject.onNext("🐱")

subject
  .subscribe { print("Subscription: 2 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🅰️")
subject.onNext("🅱️")

subject
  .subscribe { print("Subscription: 3 Event:", $0) }
  .disposed(by: disposeBag)

subject.onNext("🍐")
subject.onNext("🍊")

输出结果:

Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)
ControlProperty

ControlProperty 专门用于描述 UI 控件属性的,它具有以下特征:

  • 不会产生 error 事件
  • 一定在 MainScheduler 订阅(主线程订阅)
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享附加作用

Operator - 操作符

Operator

操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。

我们之前在输入验证例子中就多次运用到操作符。例如,通过 map 方法将输入的用户名,转换为用户名是否有效。然后用这个转化后来的序列来控制红色提示语是否隐藏。我们还通过 combineLatest 方法,将用户名是否有效和密码是否有效合并成两者是否同时有效。然后用这个合成后来的序列来控制按钮是否可点击。

这里 map 和 combineLatest 都是操作符,它们可以帮助我们构建所需要的序列。现在,我们再来看几个例子:

filter - 过滤

filter

你可以用 filter 创建一个新的序列。这个序列只发出温度大于 33 度的元素。

map - 转换

map

你可以用 map 创建一个新的序列。这个序列将原有的 JSON 转换成 Model 。这种转换实际上就是解析 JSON 。

zip - 配对

zip

你可以用 zip 来合成一个新的序列。这个序列将汉堡序列的元素和薯条序列的元素配对后,生成一个新的套餐序列。

如何使用操作符

使用操作符是非常容易的。你可以直接调用实例方法,或者静态方法:

温度过滤

// 温度
let rxTemperature: Observable<Double> = ...

// filter 操作符
rxTemperature.filter { temperature in temperature > 33 }
    .subscribe(onNext: { temperature in
        print("高温:\(temperature)度")
    })
    .disposed(by: disposeBag)

解析 JSON

// JSON
let json: Observable<JSON> = ...

// map 操作符
json.map(Model.init)
    .subscribe(onNext: { model in
        print("取得 Model: \(model)")
    })
    .disposed(by: disposeBag)

合成套餐

// 汉堡
let rxHamburg: Observable<Hamburg> = ...
// 薯条
let rxFrenchFries: Observable<FrenchFries> = ...

// zip 操作符
Observable.zip(rxHamburg, rxFrenchFries)
    .subscribe(onNext: { (hamburg, frenchFries) in
        print("取得汉堡: \(hamburg) 和薯条:\(frenchFries)")
    })
    .disposed(by: disposeBag)
决策树

Rx 提供了充分的操作符来帮我们创建序列。当然如果内置操作符无法满足你的需求时,你还可以创建自定义的操作符。

如果你不确定该如何选择操作符,可以参考 决策树。它会引导你找出合适的操作符。

操作符列表

这里提供一个操作符列表,它们就好比是26个英文字母。你如果要将它们的作用全部都发挥出来,是需要学习如何将它们连成一个句子的:

Disposable - 可被清除的资源

Disposable

通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的 可被清除的资源(Disposable) 调用 dispose 方法:

var disposable: Disposable?

override func viewWillAppear(_ animated: Bool) {
    super.viewWillAppear(animated)

    self.disposable = textField.rx.text.orEmpty
        .subscribe(onNext: { text in print(text) })
}

override func viewWillDisappear(_ animated: Bool) {
    super.viewWillDisappear(animated)

    self.disposable?.dispose()
}

调用 dispose 方法后,订阅将被取消,并且内部资源都会被释放。通常情况下,你是不需要手动调用 dispose 方法的,这里只是做个演示而已。我们推荐使用 清除包(DisposeBag) 或者 takeUntil 操作符 来管理订阅的生命周期。

DisposeBag - 清除包

DisposeBag

因为我们用的是 Swift ,所以我们更习惯于使用 ARC 来管理内存。那么我们能不能用 ARC 来管理订阅的生命周期了。答案是肯定了,你可以用 清除包(DisposeBag) 来实现这种订阅管理机制:

var disposeBag = DisposeBag()

override func viewWillAppear(_ animated: Bool) {
    super.viewWillAppear(animated)

    textField.rx.text.orEmpty
        .subscribe(onNext: { text in print(text) })
        .disposed(by: self.disposeBag)
}

override func viewWillDisappear(_ animated: Bool) {
    super.viewWillDisappear(animated)

    self.disposeBag = DisposeBag()
}

当 清除包 被释放的时候,清除包 内部所有 可被清除的资源(Disposable) 都将被清除。在输入验证中我们也多次看到 清除包 的身影:

var disposeBag = DisposeBag() // 来自父类 ViewController

override func viewDidLoad() {
    super.viewDidLoad()

    ...

    usernameValid
        .bind(to: passwordOutlet.rx.isEnabled)
        .disposed(by: disposeBag)

    usernameValid
        .bind(to: usernameValidOutlet.rx.isHidden)
        .disposed(by: disposeBag)

    passwordValid
        .bind(to: passwordValidOutlet.rx.isHidden)
        .disposed(by: disposeBag)

    everythingValid
        .bind(to: doSomethingOutlet.rx.isEnabled)
        .disposed(by: disposeBag)

    doSomethingOutlet.rx.tap
        .subscribe(onNext: { [weak self] in self?.showAlert() })
        .disposed(by: disposeBag)
}

这个例子中 disposeBag 和 ViewController 具有相同的生命周期。当退出页面时, ViewController 就被释放,disposeBag 也跟着被释放了,那么这里的 5 次绑定(订阅)也就被取消了。这正是我们所需要的。

takeUntil

TakeUntil

另外一种实现自动取消订阅的方法就是使用 takeUntil 操作符,上面那个输入验证的演示代码也可以通过使用 takeUntil 来实现:

override func viewDidLoad() {
    super.viewDidLoad()

    ...

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordOutlet.rx.isEnabled)

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: usernameValidOutlet.rx.isHidden)

    _ = passwordValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordValidOutlet.rx.isHidden)

    _ = everythingValid
        .takeUntil(self.rx.deallocated)
        .bind(to: doSomethingOutlet.rx.isEnabled)

    _ = doSomethingOutlet.rx.tap
        .takeUntil(self.rx.deallocated)
        .subscribe(onNext: { [weak self] in self?.showAlert() })
}

这将使得订阅一直持续到控制器的 dealloc 事件产生为止。

Schedulers - 调度器

Scheduler (1)

Schedulers 是 Rx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

如果你曾经使用过 GCD, 那你对以下代码应该不会陌生:

// 后台取得数据,主线程处理结果
DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        self.data = data
    }
}

如果用 RxSwift 来实现,大致是这样的:

let rxData: Observable<Data> = ...

rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)
使用 subscribeOn

我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。以上例子中,由于获取 Data 需要花很长的时间,所以用 subscribeOn 切换到 后台 Scheduler 来获取 Data。这样可以避免主线程被阻塞。

使用 observeOn

我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。以上例子中,通过使用 observeOn 方法切换到主线程来监听并且处理结果。

一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

MainScheduler

MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler 抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler 抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。

OperationQueueScheduler

OperationQueueScheduler 抽象了 NSOperationQueue。

它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。

Error Handling - 错误处理

一旦序列里面产出了一个 error 事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:

  • retry - 重试
  • catch - 恢复
retry - 重试

retry 可以让序列在发生错误后重试:

// 请求 JSON 失败时,立即重试,
// 重试 3 次后仍然失败,就将错误抛出

let rxJson: Observable<JSON> = ...

rxJson
    .retry(3)
    .subscribe(onNext: { json in
        print("取得 JSON 成功: \(json)")
    }, onError: { error in
        print("取得 JSON 失败: \(error)")
    })
    .disposed(by: disposeBag)

以上的代码非常直接 retry(3) 就是当发生错误时,就进行重试操作,并且最多重试 3 次。

retryWhen

如果我们需要在发生错误时,经过一段延时后重试,那可以这样实现:

// 请求 JSON 失败时,等待 5 秒后重试,

let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
    }
    .subscribe(...)
    .disposed(by: disposeBag)

这里我们需要用到 retryWhen 操作符,这个操作符主要描述应该在何时重试,并且通过闭包里面返回的 Observable 来控制重试的时机:

.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
    ...
}

闭包里面的参数是 Observable 也就是所产生错误的序列,然后返回值是一个 Observable。当这个返回的 Observable 发出一个元素时,就进行重试操作。当它发出一个 error 或者 completed 事件时,就不会重试,并且将这个事件传递给到后面的观察者。

如果需要加上一个最大重试次数的限制:

// 请求 JSON 失败时,等待 5 秒后重试,
// 重试 4 次后仍然失败,就将错误抛出

let maxRetryCount = 4       // 最多重试 4 次
let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
    .retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
        return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
            guard index < maxRetryCount else {
                return Observable.error(error)
            }
            return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
        }
    }
    .subscribe(...)
    .disposed(by: disposeBag)

我们这里要实现的是,如果重试超过 4 次,就将错误抛出。如果错误在 4 次以内时,就等待 5 秒后重试:

...
rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
    guard index < maxRetryCount else {
        return Observable.error(error)
    }
    return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
}
...

我们用 flatMapWithIndex 这个操作符,因为它可以给我们提供错误的索引数 index。然后用这个索引数判断是否超过最大重试数,如果超过了,就将错误抛出。如果没有超过,就等待 5 秒后重试。

catchError - 恢复

catchError 可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉:

searchBar.rx.text.orEmpty
    ...
    .flatMapLatest { query -> Observable<[Repository]> in
        ...
        return searchGitHub(query)
            .catchErrorJustReturn([])
    }
    ...
    .bind(to: ...)
    .disposed(by: disposeBag)

我们开头的 Github 搜索就用到了catchErrorJustReturn。当错误产生时,就返回一个空数组,于是就会显示一个空列表页。

你也可以使用 catchError,当错误产生时,将错误事件替换成一个备选序列:

// 先从网络获取数据,如果获取失败了,就从本地缓存获取数据

let rxData: Observable<Data> = ...      // 网络请求的数据
let cahcedData: Observable<Data> = ...  // 之前本地缓存的数据

rxData
    .catchError { _ in cahcedData }
    .subscribe(onNext: { date in
        print("获取数据成功: \(date.count)")
    })
    .disposed(by: disposeBag)

Result

如果我们只是想给用户错误提示,那要如何操作呢?

以下提供一个最为直接的方案,不过这个方案存在一些问题:

// 当用户点击更新按钮时,
// 就立即取出修改后的用户信息。
// 然后发起网络请求,进行更新操作,
// 一旦操作失败就提示用户失败原因

updateUserInfoButton.rx.tap
    .withLatestFrom(rxUserInfo)
    .flatMapLatest { userInfo -> Observable<Void> in
        return update(userInfo)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: {
        print("用户信息更新成功")
    }, onError: { error in
        print("用户信息更新失败: \(error.localizedDescription)")
    })
    .disposed(by: disposeBag)

这样实现是非常直接的。但是一旦网络请求操作失败了,序列就会终止。整个订阅将被取消。如果用户再次点击更新按钮,就无法再次发起网络请求进行更新操作了。

为了解决这个问题,我们需要选择合适的方案来进行错误处理。例如,使用系统自带的枚举 Result:

public enum Result<Success, Failure> where Failure : Error {
    case success(Success)
    case failure(Failure)
}

然后之前的代码需要修改成:

updateUserInfoButton.rx.tap
    .withLatestFrom(rxUserInfo)
    .flatMapLatest { userInfo -> Observable<Result<Void, Error>> in
        return update(userInfo)
            .map(Result.success)  // 转换成 Result
            .catchError { error in Observable.just(Result.failure(error)) }
    }
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { result in
        switch result {           // 处理 Result
        case .success:
            print("用户信息更新成功")
        case .failure(let error):
            print("用户信息更新失败: \(error.localizedDescription)")
        }
    })
    .disposed(by: disposeBag)

这样我们的错误事件被包装成了 Result.failure(Error) 元素,就不会终止整个序列。即便网络请求失败了,整个订阅依然存在。如果用户再次点击更新按钮,也是能够发起网络请求进行更新操作的。

另外你也可以使用 materialize 操作符来进行错误处理。这里就不详细介绍了,如你想了解如何使用 materialize 可以参考这篇文章 How to handle errors in RxSwift!

⚠️ **GitHub.com Fallback** ⚠️