Skip to content

Multi-consumption of an AsyncSequence #110

Open
@ABridoux

Description

@ABridoux

Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same AsyncSequence?

If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.

Context

I recently had to stream a file using URLSession and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.

Basic implementation

I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.

Implementation

struct ReducedReplayAsyncStream<Element> {

    typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void

    private let storage: _Storage
    private var originalStream: AsyncStream<Element>

    init(
        bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
        initialResult: Element,
        reduce: @escaping Reduce,
        build: (AsyncStream<Element>.Continuation) -> Void
    ) {
        originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
        storage = _Storage(stored: initialResult, reduce: reduce)
    }

    private func makeStream() -> AsyncStream<Element> {
        AsyncStream<Element> { continuation in
            Task {
                var isFirst = false
                if await !storage.didStart {
                    await storage.setDidStart(true)
                    isFirst = true
                    startConsumingOriginalStream()
                }

                if !isFirst {
                    await continuation.yield(storage.stored)
                }
                await storage.appendContinuation(continuation)
            }
        }
    }

    private func startConsumingOriginalStream () {
        Task {
            for await value in originalStream {
                await storage.updateWith(value: value)
            }
            await storage.continuations.forEach { $0.finish() }
        }
    }
}

extension ReducedReplayAsyncStream {

    private actor _Storage {
        private let reduce: ReducedReplayAsyncStream.Reduce

        var didStart = false
        var stored: Element
        var continuations: [AsyncStream<Element>.Continuation] = []

        init(stored: Element, reduce: @escaping Reduce) {
            self.stored = stored
            self.reduce = reduce
        }

        func updateWith(value: Element) {
            reduce(&stored, value)
            continuations.forEach { $0.yield(value) }
        }

        func setDidStart(_ value: Bool) {
            didStart = value
        }

        func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
            continuations.append(continuation)
        }
    }
}

extension ReducedReplayAsyncStream: AsyncSequence {
    typealias AsyncIterator = AsyncStream<Element>.AsyncIterator

    func makeAsyncIterator() -> AsyncIterator {
        let stream = makeStream()
        return stream.makeAsyncIterator()
    }
}

Usage

var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!

let replayStream = ReducedReplayAsyncStream<Int>(
    initialResult: 0,
    reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
    build: { continuation = $0 }
)

var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
    .autoconnect()
    .sink { _ in
        if counter == 10 {
            continuation.finish()
        }
        continuation.yield(counter)
        counter += 1
    }
    .store(in: &subscriptions)

Task {
    for await value in replayStream {
        print("[A]", value)
    }
}

Task {
    try await Task.sleep(nanoseconds: 3_000_000_000)
    for await value in replayStream {
        print("[B]", value)
    }
}

Some considerations about efficiency can be found in the gist.

Metadata

Metadata

Assignees

No one assigned

    Labels

    FutureUnscheduled work for future releases

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions