What do you think, do the test below pass or fail?
@Test
fun `combine passes or fails`() = runBlockingTest {
val letterFlow = MutableSharedFlow<String>()
val digitFlow = MutableSharedFlow<Int>()
var actual = ""
val job = launch {
combine(letterFlow, digitFlow) { letter, digit -> letter + digit }
.collect { actual += "$it " }
}
letterFlow.emit("A")
digitFlow.emit(1)
digitFlow.emit(2)
letterFlow.emit("B")
advanceUntilIdle()
assertEquals("A1 A2 B2 ", actual)
job.cancelAndJoin()
}
Recently I stumbled upon this interesting issue. I was sure that the test must pass, but it failed with the actual execution result A1 B1
. Whaaat?! 😮
Subsequent experiments showed that the same issue was reproducible with a MutableStateFlow
, which in some cases could be solved by applying the buffer()
operator onto the letter and digit flows, before combining them.
In contrast, when I used cold flows instead of the hot flows, the test always passed just fine.
Another interesting observation was that the code in the test did work well in a real app. It only failed in tests. That puzzled me more and more.
The suspect under investigation was the combine
operator. Couple of hours and cups of tea later, the combine
operator was nominated to be the fairest coroutine operator of them all. The reason for that was the yield()
call inside the FlowCollector<R>.combineInternal()
method of the coroutines implementation. The comment next it says that combine
yields here to emulate fairness.
for (i in 0 until size) {
launch {
flows[i].collect { value ->
resultChannel.send(Update(i, value))
yield() // Emulate fairness, giving each flow chance to emit
}
}
}
Exactly this fairness of combine
operator was the reason for its weird behavior in the test, and here is why.
The test runs in a single-threaded environment. Kotlin coroutines support concurrency also while running on a single thread. For this, combine
operator calls yield()
inside its collecting for-loop to give other suspended coroutines a chance to resume and perform their job. Emitting multiple values into the hot shared flows one after another seemed to misuse that fairness and block internal event-loop, preventing combine
from running on each value emission. This caused the combine
operator to skip values and misbehave.
After it became clear that the excessive fairness of the combine
operator was causing the issue, the solution was fairly easy to find. I just had to force emit
operator to emulate fairness in return by yielding after emitting a value, similar to how combine
operator does it. This is how fairEmit
operator was born.
suspend fun <T> MutableSharedFlow<T>.fairEmit(value: T) {
emit(value)
yield()
}
The final replacement of the emit
operator with the new fairEmit
one made the test pass just fine.
Despite solving the issue, I wouldn’t be that confident in running coroutines code in a single-threaded environment without prior testing. Who knows what other surprises Kotlin coroutines yield.
I hope that the simple trick described above saved your some time and energy, and you enjoyed it. Have fun and happy coding! ✌️