Coroutines - makstron/info GitHub Wiki

Essentially, coroutines are light-weight threads.

Include in project

repositories {
    mavenCentral()
}
dependencies {
    ...
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2'
}

suspend функцию Job Scope

coroutineScope

Можно выделить два основных назначения coroutineScope:

  1. создает свой scope, что позволяет ограничить распространение ошибки
  2. позволяет выносить вызов корутин в отдельные (suspend) функции

supervisorScope

withContext

NonCancellable

Job, который всегда active. Он может быть использован, если необходимо, чтобы код в withContext доработал до конца не обращая внимания на отмену родительской корутины.

withContext(Dispatchers.IO + NonCancellable) {
      deleteTabImagePreview(tab)
      tabsDao.deleteTab(tab)
  }

runBlocking

Запускает корутину, которая блокирует текущий поток, пока не завершит свою работу (и пока не дождется завершения дочерних корутин).
В Android разработке такая корутина нужна для написания unit тестов, чтобы удержать поток, в котором выполняется тест. Иначе тест просто не дождется завершения выполнения вызываемых корутин и завершится. Подробнее об этом мы еще поговорим в уроке про тестирование.
Если в runBlocking произойдет ошибка, то все его дочерние корутины поотменяются, а сам он выбросит исключение.
Также он может вернуть результат своей работы.
runBlocking не предполагается к использованию в основном коде Android приложения.

Dispatcher

Default

Если корутина не находит в своем контексте диспетчер, то она использует диспетчер по умолчанию. Этот диспетчер представляет собой пул потоков. Количество потоков равно количеству ядер процессора.
Он не подходит для IO операций, но сгодится для интенсивных вычислений

IO

Использует тот же пул потоков, что и диспетчер по умолчанию. Но его лимит на потоки равен 64 (или числу ядер процессора, если их больше 64).
Этот диспетчер подходит для выполнения IO операций (запросы в сеть, чтение с диска и т.п.).

Main

Main диспетчер запустит корутину в основном потоке.

Executor

Executor с одним потоком

val scope = CoroutineScope(
   Executors.newSingleThreadExecutor().asCoroutineDispatcher()
)
 
repeat(6, {
   scope.launch {
       log("coroutine $it, start")
       TimeUnit.MILLISECONDS.sleep(100)
       log("coroutine $it, end")
   }
})

Unconfined

У диспетчера Unconfined метод isDispatchNeeded возвращает false. Это приводит к тому, что при старте и возобновлении выполнения кода Continuation не происходит смены потока.

Т.е. при старте корутина выполняется в том потоке, где был вызван билдер, который эту корутину создал и запустил. А при возобновлении выполнения из suspend функции, корутина выполняется в потоке, который использовался в suspend функции для выполнения фоновой работы. Ведь именно в этом потоке мы вызываем continuation.resume.

Channel

Канал используется, как средство передачи данных между корутинами. Т.е. он является ячейкой, куда одна корутина может поместить данные, а другая корутина - взять их оттуда.
Канал - потокобезопасен

val channel = Channel<Int>()

launch {
    // ...
    channel.send(5)
    // ...
}

launch {
    // ...
    val i = channel.receive()
    // ...
}

close

Метод close - это явный сигнал о том, что данные больше передаваться не будут. В этом случае в канал помещается специальное служебное значение Closed. Если получатель попробует получить данные после закрытия канала (или уже ждет данные в момент закрытия), то метод receive получит этот Closed и выбросит исключение ClosedReceiveChannelException

Форма получения данных (вместо метода receive):

launch {
   for (element in channel) {
       // ...
   }
}

cancel

Кроме закрытия канала, его можно отменить. Это то же самое, что и закрытие, только с очисткой буфера.

Еще одно отличие cancel от close в том, что при попытке вызвать receive для отмененного канала будет выброшено исключение CancellationException

И цикл for (element in channel) { также выбросит CancellationException
Оно не приведет к крэшу, а просто отменит корутину, в которой оно произошло.

capacity

Фиксированное значение

Указав число, мы получим канал с буфером указанного размера

val channel = Channel<Int>(2)

Conflated

В этом режиме буфера нет. Канал способен хранить в себе только одно значение. Но теперь каждый send не ждет, пока получатель заберет значение, а просто оставляет его в канале. При этом send своими данными перезаписывает данные предыдущего send.

val channel = Channel<Int>(Channel.Factory.CONFLATED)

Unlimited

Размер буфера в таком канале ограничен только количеством доступной памяти.

val channel = Channel<Int>(Channel.Factory.UNLIMITED)

Buffered

Канал с размером буфера = 64 по умолчанию. Это значение можно поменять в JVM.

val channel = Channel<Int>(Channel.Factory.BUFFERED)

produce

consume

BroadcastChannel

trySend

https://startandroid.ru/ru/courses/kotlin/29-course/kotlin/613-urok-18-korutiny-kanaly.html

Flow

Билдеры

flow

В билдер мы передаем блок кода, который будет запущен, когда получатель запустит этот Flow. Методом emit мы отправляем данные получателю.

val flow = flow {
   emit("a")
   emit("b")
   emit("c")
}

asFlow

val flow = listOf("a","b","c").asFlow()

flowOf

val flow = flowOf("a","b","c")

from suspend fun

Если есть suspend функция, которая ничего не принимает на вход и возвращает результат то можно использовать ссылку на нее, чтобы создать Flow

suspend fun getData(): Data
val flow = ::getData.asFlow()

Операторы

  • Intermediate - добавляют в Flow различные преобразования данных, но не запускают его (создаст обертки flow поверх flow)

map, filter, take, zip, combine, withIndex, scan, debounce, distinctUntilChanged, drop, sample, onEach, onStart, OnCompletion, onEmpty, transform, flowOn, buffer, produceIn, catch, retry, retryWhen

  • Terminal - запускают Flow и работают с результатом его работы, результатом их работы является не Flow, а данные, полученные из Flow и обработанные определенным образом.

collect, single, reduce, count, first, toList, toSet, fold

⚠️ **GitHub.com Fallback** ⚠️