Skip to content

CoroutineStart.UNDISPATCHED flaky with collectLatest #4383

Open
@vRallev

Description

@vRallev

This test due to the flaky code:

@Test
fun `CoroutineStart_UNDISPATCHED is flaky when using collectLatest`() = repeat(10) {
    runBlocking(Dispatchers.IO) {
        println("Iteration $it")

        val flow = MutableSharedFlow<Int>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
        var valueReceived = false

        val job = launch(start = CoroutineStart.UNDISPATCHED) {
            flow.collectLatest {
                valueReceived = true
            }
        }

        check(flow.tryEmit(1))

        withTimeout(2_000L) {
            while (!valueReceived) {
                delay(20L)
            }
        }

        job.cancel()
    }
}

The goal is to launch a job and start collecting a flow before the next instructions happen. In the sample it so happens when tryEmit is called that there sometimes is a subscriber and sometimes there isn't. This behavior is surprising and unintuitive.

I found multiple workarounds to fix that, but ideally the sample would work fine:

  1. Use collect instead of collectLatest. With that the test is green, but in my production code I need the behavior of collectLatest.
  2. Use replay = 1 for the shared flow. Then the missed value on the flow would be replayed. But this isn't ideal for more values or if there other subscribers that don't want the replay behavior.
  3. Use Dispatchers.Unconfined when launching the job. This is a significant change with other side effects.
  4. Add a delay before emitting the value.

Is this behavior expected? What is the recommendation for this use case?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions