Open
Description
This test due to the flaky code:
@Test
fun `CoroutineStart_UNDISPATCHED is flaky when using collectLatest`() = repeat(10) {
runBlocking(Dispatchers.IO) {
println("Iteration $it")
val flow = MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
var valueReceived = false
val job = launch(start = CoroutineStart.UNDISPATCHED) {
flow.collectLatest {
valueReceived = true
}
}
check(flow.tryEmit(1))
withTimeout(2_000L) {
while (!valueReceived) {
delay(20L)
}
}
job.cancel()
}
}
The goal is to launch a job and start collecting a flow before the next instructions happen. In the sample it so happens when tryEmit
is called that there sometimes is a subscriber and sometimes there isn't. This behavior is surprising and unintuitive.
I found multiple workarounds to fix that, but ideally the sample would work fine:
- Use
collect
instead ofcollectLatest
. With that the test is green, but in my production code I need the behavior ofcollectLatest
. - Use
replay = 1
for the shared flow. Then the missed value on the flow would be replayed. But this isn't ideal for more values or if there other subscribers that don't want the replay behavior. - Use
Dispatchers.Unconfined
when launching the job. This is a significant change with other side effects. - Add a delay before emitting the value.
Is this behavior expected? What is the recommendation for this use case?