Недавно я нашел очень хороший пример, который показывает, как работают Kotlin Coroutines. Он содержит минимум кода, но при этом показывает все строительные блоки, необходимые для понимания основ. На работе я воспользовался возможностью, чтобы обсудить это со своей командой, и ответ был ошеломляющим даже для меня. Это было приятное путешествие, потому что я также смог укрепить свои знания о Kotlin Coroutines.

В этом блоге я хочу поделиться с вами тем, что я сделал со своими коллегами. Давайте углубимся, показав вам «почти идентичный оригинальному» код:

class UpdateDiskAfterSomeDependencyCollectUpdater(
    private val someDependency: SomeDependency,
    private val diskUpdater: DiskUpdater
) {
    suspend fun update() {
        someDependency.aflow
            .onEach { diskUpdater.writeToDisk(it) }
            .collect()
    }
}

interface SomeDependency {
    val aflow: Flow<String>
}

interface DiskUpdater {
    fun writeToDisk(textToWrite: String)
}

Нас интересует только один класс, и функция update() также является единственной функцией, которая существует в этом классе. Позвольте мне быстро объяснить, что он делает:

  • Он берет Flow от SomeDependency
  • Как только это Flow emits, мы вызываем DiskUpdater.writeToDisk со строкой, предоставленной Flow
  • Начинаем сбор на Flow, позвонив на него collect()

Чтобы протестировать этот класс, мы написали тест следующим образом:

var someDependencyUpdate = MutableSharedFlow<String>()

val fakeSomeDependency = object : SomeDependency {
    override val aflow: Flow<String> = someDependencyUpdate
}

val fakeDiskUpdater = object : DiskUpdater {
    var writeToDiskCalled = false
    override fun writeToDisk(textToWrite: String) {
        writeToDiskCalled = true
    }
}

@Test
fun `after emit of some dependency diskUpdater should write to disk`() = runTest {
    val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
        someDependency = fakeSomeDependency,
        diskUpdater = fakeDiskUpdater
    )

    updater.update()

    someDependencyUpdate.emit("Saves this please")

    assert(fakeDiskUpdater.writeToDiskCalled)
}

Мы создаем MutableSharedFlow как подделку для SomeDependency.aFlow, подделывая обе зависимости, создаем экземпляр класса Updater, вызываем update() в этом классе, отправляем значение в MutableSharedFlow и, наконец, assert(), что была вызвана функция DiskUpter.writeToDisk.

Если вы уже разобрались, в чем проблема с этим тестом, смело уходите. Если нет, или вам просто интересно, читайте дальше.

Этот тест застрял навсегда

Почему это? Давайте посмотрим, что мы делаем в нашем тесте. Помимо создания всего этого фальшивого шума, мы вызываем runTest{} (это рекомендуемый способ запуска теста с сопрограммами), а затем вызываем функцию suspend update().

Это уже проблема? Да, это!

Что на самом деле означает suspend?
Это означает, что функцию можно приостановить и возобновить позже.

Хорошо, красивая фраза. Но что именно это значит?
Для меня (и в «нормальных словах») это означает следующее:

Я не буду выполнять следующий код, пока не закончу свою работу. Пока я делаю это (свою работу), я не буду блокировать поток, чтобы он тем временем мог заняться другими делами. Как только я закончу свою работу, я снова сообщу об этом потоку, чтобы он мог возобновиться с того места, где меня вызвали для продолжения выполнения следующего кода.

Имея это в виду, мы можем снова исследовать наш (зависший) тест.
Мы называем функцию suspend update(). Эта функция вызывает другую функцию suspendcollect(). Теперь collect() говорит: Я не буду выполнять следующий код, пока не закончу свою работу. Что означает с моей работой выполнено в этом контексте? Это означает, что Flow либо готово (потому что больше не производит значений), либо сопрограмма — canceled, либо scopecanceled.

Посмотрите на код выше, ни один из этих вариантов не подходит. Flow (на самом деле это MutableSharedFlow) не готово, сопрограмма не canceled, а scope (то есть TestScope, предоставляемая функцией runTest) не canceled. Вот почему наш тест застрял.

Исправление на словах

Прежде чем мы обсудим возможные решения, давайте подумаем, как устранить проблему. Что нам нужно сделать, чтобы он больше не застревал?

Как упоминалось выше, у нас есть три варианта:

  • Cancel “a” scope
  • Cancel «а» сопрограмма
  • «Завершение» Flow

Отмена scope бессмысленна, потому что это наш TestScope, который нам нужен для проведения настоящего теста. Поскольку Flow в фактической реализации является бесконечной Flow, я предлагаю сосредоточиться на canceling сопрограмме. Однако в конце блога я предоставлю решение по «доработке» Flow.

Для cancel сопрограммы мы должны вызвать либо job.cancel, либо scope.cancel. Последний также cancel всех своих дочерних сопрограмм. Однако оба решения в нашем случае гоняются за собственным хвостом. Если мы cancel выполним один из них — TestScope, предоставленный функцией runTest, или задание, являющееся частью TestScope, — мы cancel проверим весь наш тест и, следовательно, больше ничего не выполним, верно?

Что, если мы… вместо этого создадим новую сопрограмму?

Создание новой сопрограммы и suspend будет ли это работать вместо сопрограммы, предоставляемой TestScope? Давай выясним…

Первая попытка

Чтобы создать новую сопрограмму, мы можем использовать один из существующих конструкторов сопрограмм, например launch. Следующий код не сработал, но показывает направление, в котором мы движемся:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

launch {
    updater.update()
}

someDependencyUpdate.emit("Saves this please")

assert(fakeDiskUpdater.writeToDiskCalled)

Почему это не не работает? Потому что новая сопрограмма (код внутри launch) выполняется слишком поздно. На самом деле, он никогда не будет выполнен, потому что ранее тест завершился неудачно с ошибкой «Assertion failed».

Помните, что я писал выше о suspending функциях.

Пока я делаю это (свою работу), я не буду блокировать поток, чтобы он тем временем мог заняться другими делами.

Здесь код внутри блока runTest{} также является функцией suspend, но поток (который представляет собой отдельный поток) никогда не входит в так называемую точку приостановки. Следовательно, этот поток не может «заниматься другими делами в это время» (и, таким образом, выполнять функцию updater.update()). Это будет canceled (из-за ошибки Assertion failed), прежде чем он успеет выполнить другие сопрограммы.

Слава богу, у runTest{} есть хорошая документация, и она дает нам полезную информацию об этом случае. Предлагается использовать либо job.join(), либо от testScope.advanceUntilIdle() до suspend, чтобы можно было выполнить новую сопрограмму.

С другой стороны, первое предложение в kdoc о job.join() гласило:

Приостанавливает сопрограмму до завершения этой работы.

Полный? Недавно созданная сопрограмма является бесконечной. Так что, он снова застрянет? Да, это будет. И так будет testScope.advanceUntilIdle().

Поэтому нам нужно другое решение.
Нам нужно дождаться выполнения сопрограммы, но не до ее завершения.

Как насчет добавления еще одной suspend функции, которая suspends тестовая сопрограмма, чтобы (один) поток мог получить время и выполнить другие (запланированные) сопрограммы, а затем «прыгнуть» назад для выполнения следующего кода?

Вторая (неудачная) попытка

Я слышал, что delay — это фантастическая функция suspending, которая ждет определенное время, а затем решает выполнить следующий код:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

launch {
    updater.update()
}

delay(1)
someDependencyUpdate.emit("Saves this please")

assert(fakeDiskUpdater.writeToDiskCalled)

Работает 🎉… ну, по крайней мере, сопрограмма внутри launch выполняется.
Но тест снова завис. Почему так?

Прежде чем мы продолжим, давайте быстро рассмотрим, что здесь происходит. Как нить «прыгает» и почему снова застряла.

По умолчанию код выполняется сверху вниз. Это происходит и здесь. Но когда он достигает построителя сопрограммы launch, поток добавит код внутри себя в свой список задач. Вы можете думать об этом так. В теме есть список задач. Как только одна задача выполнена, она выполняет следующую и так далее. Выполнение продолжается, и мы достигаем delay — функции suspend и, следовательно, точки приостановки. Как я писал выше о функциях suspend, поток теперь может делать другие вещи и будет информирован, как только функция suspend подаст сигнал. Вместо того, чтобы простаивать, поток посмотрит в свой список задач и найдет задачу — задачу вызов функции update.update(). Итак, он делает это и… находит другую функцию suspend (саму функцию update()) и, таким образом, еще одну точку приостановки. Теперь у потока есть время для других действий снова. delay сообщает потоку, что это сделано с задержкой (suspending). Таким образом, поток возвращается к delay и продолжает выполнение кода (вызывая someDependencyUpdate.emit и assert).

Итак, почему мы застряли? Потому что вновь созданная сопрограмма является дочерней по отношению к TestScope, которая запускает тестовую сопрограмму. И эта область действия и эта тестовая сопрограмма будут отменены (или завершены) только тогда, когда все их дочерние сопрограммы также будут отменены или завершены. Но помните, Flow никогда не заканчиваются.

Чего ждать? Я только что сказал, что все дочерние сопрограммы должны быть canceled? 💡

Окончательное исправление

Решение состоит в том, чтобы cancel использовать бесконечную suspending сопрограмму, как только она нам больше не понадобится для целей тестирования:

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
    updater.update()
}

delay(1)
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

Поскольку мы явно являемся canceling сопрограммой, TestScope и ее тестовая сопрограмма могут быть завершены, как и ожидалось. Тест больше не зависает.

Почему я выбрал это как хороший пример для демонстрации сопрограмм?

Потому что он показывает, что такое suspend функции. На самом деле они только suspend или приостанавливают текущий код и не блокируют текущий поток. Поток может тем временем заниматься другими делами. Например, выполнение других сопрограмм, которые также могут вызывать другие функции suspend.

В этом примере хорошо видно, как поток «прыгает» с одной точки кода на другую, а затем через какое-то время возобновляется («прыгает обратно») к начальной точке.

Вот почему я думаю, что это хороший пример, и написал этот блог.

Еще три решения — почему бы и нет?

Ниже я покажу вам еще три решения, которые не зависают при выполнении теста. Когда вы читаете, имейте в виду, что я не рекомендую одно другому. Все они имеют свои преимущества и недостатки. Одно может быть «лучше» или приятнее для чтения, чем другое. Но, в конце концов, вы должны решить, что, по вашему мнению, лучше подходит для вашего случая.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

val job = launch(Dispatchers.Unconfined) {
    updater.update()
}

someDependencyUpdate.emit("Saves this please")
job.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

Здесь нам не нужно вызывать другую функцию suspend (например, delay, которую мы использовали выше). Это связано с тем, что сопрограмма использует Unconfined Диспетчер. Это означает, что код не добавляется в список задач потоков (и выполняется позже), а выполняется немедленно.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
    updater.update()
}

val emitJob = launch {
    someDependencyUpdate.emit("Saves this please")
    updateJob.cancel()
}

emitJob.join()

assert(fakeDiskUpdater.writeToDiskCalled)

Хотите активно использовать сопрограммы? Используйте этот код 🙂. emitJob.join будет suspend потоком, позволяя ему работать со своим списком задач (который в настоящее время содержит две задачи). Сначала он выполнит сопрограмму updater.update(). Поскольку это тоже функция suspending, поток выполнит свою вторую задачу, вызвав someDependencyUpdate.emit и cancel первую сопрограмму. Поскольку эта (вторая) задача была завершена (а первая была canceled), она вернется к функции emitJob.join и возобновит работу оттуда.

val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)

val updateJob = launch {
    updater.update()
}

suspendCoroutine<Unit> {
    thread {
        Thread.sleep(1)
        it.resume(Unit)
    }
}
someDependencyUpdate.emit("Saves this please")
updateJob.cancel()

assert(fakeDiskUpdater.writeToDiskCalled)

Это решение в основном такое же, как вызов delay, но с использованием самой фундаментальной функции suspendCoroutine. Он ведет себя точно так же, как решение delay. Код внутри suspendCoroutine выполняется немедленно и создает новый поток. Вновь созданный поток будет спать в течение миллисекунды, а затем возобновится. Поскольку работа перемещается в другой поток, функция возвращается и, таким образом, suspend здесь. Это позволяет потоку делать другие вещи (и выполнять другую сопрограмму).

Решение «завершить поток»

Как и было обещано, ниже показано решение для завершения Flow и отказа от каких-либо дочерних сопрограмм.

val someDependencyUpdate = MutableSharedFlow<String>(replay = 1)
val fakeSomeDependency = object : SomeDependency {
    override val aflow: Flow<String> = someDependencyUpdate.take(1)
}
val updater = UpdateDiskAfterSomeDependencyCollectUpdater(
    someDependency = fakeSomeDependency,
    diskUpdater = fakeDiskUpdater
)
someDependencyUpdate.emit("Saves this please")

updater.update()

assert(fakeDiskUpdater.writeToDiskCalled)

Сначала мы добавляем replay = 1 к MutableSharedFlow. Это означает, что любой новый сборщик также получит последнее значение этого SharedFlow. В качестве альтернативы можно было бы использовать MutableStateFlow, который ведет себя в этом случае так же.

Во-вторых, мы сообщаем Flow внутри SomeDependecy, что он должен закончить (на самом деле, cancel — правильная формулировка для него), как только он получит свой первый элемент.

С этими изменениями функция update по-прежнему будет suspended, пока не будет выполнена исходная Flow. Но так как он заканчивается немедленно, он возобновится вскоре после этого.

Я также опубликовал весь код, который я разместил здесь, в моей учетной записи GitHub в этом репозитории. Не стесняйтесь проверить это:



Я надеюсь, что вы немного узнали или, по крайней мере, укрепили свои знания о сопрограммах. Лично я не мог не написать этот блог и не поделиться им со всем миром. Как упоминалось выше, я думаю, что он показывает все, что вам нужно знать об основах Kotlin Coroutines, при этом имея лишь минимальный необходимый код.