Kotlin Flow 操作符 - chuwuwang/ReadingNote GitHub Wiki
Flow就是Kotlin 协程与响应式编程模型结合的产物,你会发现它与RxJava非常像,二者之间也有相互转换的API,使用起来非常方便。
- flow { ... } 构建一个Flow类型
- flow { ... } 内可以使用suspend函数
- emit()方法用来发射数据
- collect()方法用来遍历结果
- flowOf()创建一个保护固定数量的flow,类似listOf
- 任意集合类或者squence通过.asFlow()转成一个flow
- channelFlow()
Flow创建后并不返回可以cancel的句柄,但是一个flow的collect()是suspend的,所以可以像取消一个suspend方法一样取消flow的collection。
GlobalScope.launch(myDispatcher) {
intFlow.flowOn(Dispatchers.IO)
.collect { println(it) }
}.join()
Observable.create<Int> {
(1..3).forEach { e ->
it.onNext(e)
}
it.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(myExecutor))
.subscribe {
println(it)
}
其中subscribeOn()指定的调度器影响前面的逻辑,observeOn()影响的是后面的逻辑,因此it.onNext(e)执行在它的io这个调度器上,而最后的println(it)执行在通过myExecutor创建出来的调度器上。
Flow的调度器API中看似只有flowOn()与subscribeOn()对应,其实不然,collect()所在协程的调度器则与observeOn()指定的调度器对应。
在RxJava的学习和使用过程中,subscribeOn()和observeOn()经常容易被混淆。而在Flow当中collect()所在的协程自然就是观察者,它想运行在什么调度器上它自己指定即可,非常容易区分。
一个Flow创建出来之后,不消费则不生产,多次消费则多次生产,生产和消费总是相对应的。Flow像Sequences一样是冷流,即在调用collect()之前,flow { ... } 中的代码不会执行。
Flow是Cold Stream。在没有切换线程的情况下,生产者和消费者是同步非阻塞的。Channel是Hot Stream。而channelFlow实现了生产者和消费者异步非阻塞模型。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}.onCompletion { t: Throwable ? ->
println("finally.")
}
onCompletion用起来比较类似于try ... catch ... finally中的finally,无论前面是否存在异常,它都会被调用,参数t则是前面未捕获的异常。
collect是最基本的末端操作符,功能与RxJava的subscribe类似。除了collect之外,还有其他常见的末端操作符,大体分为两类:
- 集合类型转换操作,包括toList、toSet等。
- 聚合操作,包括将Flow规约到单值的reduce、fold等操作,以及获得单个元素的操作包括single、singleOrNull、first等。
实际上,识别是否为末端操作符,还有一个简单方法,由于Flow的消费端一定需要运行在协程当中,因此末端操作符都是挂起函数。
- Flow的API有点类似于Java Stream的API。它也同样拥有Intermediate Operations、Terminal Operations。
- Flow的Terminal运算符可以是suspend函数,如collect、single、reduce、toList等;也可以是launchIn运算符,用于在指定CoroutineScope内使用flow。
- 每一个Flow其内部是按照顺序执行的,这一点跟Sequences很类似。
- Flow跟Sequences之间的区别是Flow不会阻塞主线程的运行,而Sequences会阻塞主线程的运行。