Kotlin Coroutines Introducing Flows and Channels - mariamaged/Java-Android-Kotlin GitHub Wiki

Kotlin - Introducing Flows and Channels

A coroutine centered around simple suspend functions works great when either:

  1. You need asynchronous work to be done, but you do not need to receive any sort of results from that work.
  2. You need asynchronous work to be done, and you are expecting a single object that serves as the result.
     
  • However, there are many occasions in programming where you need a stream of results, not just a single result.
  • An ordinary suspend function does not offer that.
  • Instead, Kotlin's coroutine system offers channels and flows for stream of results.

Note: Flows are still an experimental API at the moment, though they should be ready for production use very soon.

Life is But a Stream

  • Quite a bit of asynchronous work can be modeled as a no-result or single-result operations:
    • Database Transactions.
    • Web service calls.
    • Downloading or reading an image file.
    • And so on.

Basically, anything that is transactional in nature - where each result is triggered by a distinct request - can be modeled as a no-result or single-result operation.

  • However, it is also common to have a single routine needing to return a series of results over time:
    • Durable network connections, such as WebSockets or XMPP, where the server can send down content without a fresh client request.
    • GPS readings.
    • Sensor readings from accelerometers, thermometers, etc.
    • Data received from external devices via USB, Bluetooth, etc.
    • And so on.

You're Hot and Cold

In "programming terms", a "hot" stream is one where events are available on the stream regardless of whether anyone is paying attention to them.

By contrast, a "cold" stream is one where events are available on the stream only when there is at least one consumer of the stream.

  • The determination of whether a stream is hot or cold can depend on where you look.
  • For example, if you think of GPS:
    • GPS satellites emit their signals regardless of whether any GPS receiver on Earth is powered on. Hence, the satellites have a hot stream of signals.
    • On a smartphone, the GPS radio is usually powered down, to save on battery. It is only powered up when one or more apps request GPS fixes. Hence, the GPS subsystem on a phone has a cold stream of GPS fixes, as it only tries to emit those when there is somebody interest in them.

With Kotlin, a flow usually models a cold stream.

A channel models a hot stream.

Dependencies

  • Channels are part of the current stable version of Kotlin.
  • Flows, though, are still in pre-release state.
    • You will need a pre-release version of 1.3.0 of the coroutine dependencies, such as 1.3.0-RC, to have access to flows.

Flow Basics

  • A Flow in Kotlin is represented by a Flow object.
  • A Channel is represented by a Channel object.
     
  • One way to create a Flow is to use the flow() top-level function.
  • flow() is fairly simple:
    • You supply a lamda expression.
    • That expression calls emit() for each item that you want to publish on the stream.
       
  • One typical way to consume a Flow is to call collect() on it.
  • collect() is fairly simple:
    • You supply a lamda expression.
    • It is passed each item that the Flow emits onto its stream.
    • collect() is a suspend function, and so we need to call it from inside of another suspend function or from a coroutine builder like launch().
       
  • emit() is fairly simple:
    • It is a suspend function.
    • flow() sets up a coroutine for you to use, so you do not need to worry about doing that yourself. But it does mean that emit() might trigger a switch to another coroutine, and that emit() might block for a bit.
       
  • When you exit the lamda expression, the flow is considered to be closed.
  • Then, inside of a launched coroutine, we call collect() on that Flow, printing each number.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.Random

fun main() {
	GlobalScope.launch(Dispatchers.Main) {
		randomPercentages(10, 20).collect { println(it) }
		println("Thats all folks!")
	}

	println("...and we're off!")
}

fun randomPercentages(count: Int, delayMs: Long) = flow {
	for(i in 0 until count) {
		delay(delayMs)
		emit(Random.nextInt(1, 100))
	}
}
...and we're off! 
41
29
6
98
49
91
15
62
40
76
That's all folks!

Channel Basics

import kotlinx.coroutines.*    
import kotlinx.coroutines.channels.*  
import kotlin.random.Random  
  
fun main() {  
    GlobalScope.launch(Dispatchers.Main) {  
	  randomPercentages(10, 200).consumeEach { println(it) }  
	  println("That's all folks!")  
    }  
  
  println("... and we are off!")  
}  
  
fun CoroutineScope.randomPercentages(count: Int, delayMs: Long) = produce {  
  for (i in 0 until count) {  
        delay(delayMs)  
        send(Random.nextInt(1, 100))  
    }  
}

This is the same pattern that we used above for a Flow.

  • There are a few differences:
    1. We use produce() instead of flow.
      • Like flow(), produce() takes a lamda expression.
      • When that expression completes, the channel will be closed.
      • However, whereas Flow is a top-level function, produce() is defined on CoroutineScope.
      • One convention for this is to to put produce() in an extension function for CoroutineScope, then call that function from inside the coroutine builder.
    2. We use send() rather than emit() to put a value onto the channel's stream.
    3. We use consumeEach() rather than collect() to receive the values from the channel.
      • Like collect(), consumeEach() is a suspend function and needs to be called from within another suspend function or from within a coroutine builder like launch().
...and we're off! 
69 
18 
21 
51 
74 
60 
57 
14 
49
12 
That's all folks!

Hot and Cold Impacts

import kotlinx.coroutines.*  
import kotlinx.coroutines.channels.*  
import kotlin.random.Random  
  
fun main() {  
    GlobalScope.launch(Dispatchers.Main) {  
	 val channel = randomPercentages1(10, 500)  
         delay(1000L)  
  
        channel.consumeEach { println(it) }  
	println("That's all folks!")  
    }  
    println("... and we are off!")  
}  
  
fun CoroutineScope.randomPercentages1(count: Int, delayMs: Long) = produce {  
  for (i in 0 until count) {  
        delay(delayMs)  
        offer(Random.nextInt(1, 100))  
    }  
}
  • send() on a Channel, like emit on a Flow, is a blocking call.
    • It will not return until something is in position to receive the item that we are placing on the stream.
  • Channel, though, also has offer().
    • offer() will try to put the item on the stream, but if it cannot, it does not block.
    • Here, our consumer code delays a bit before calling consumeEach.
    • With a send() based channel, or with a Flow, we still wind up getting all 10 items, despite the delay, because send() and emit() can block until something can receive their items.
    • In this case, though, we are using offer(), so a few of the items will be dropped because nobody is consuming when we make our offer.
    • As a result, we wind up with six or so items in our output, rather than the full set of 10.