Description
Hi, I have been reading the documentation for this repository and it left me wondering: is this planned to introduce a feature to allow several tasks to consume the same AsyncSequence
?
If this is not the right place to offer ideas, please let me know. I don't think this would fit in a proposal.
Context
I recently had to stream a file using URLSession
and two pieces of code were interested in the streamed values. Rather than starting a stream twice which would not be efficient, I wanted to allow the stream to be consumed by several tasks. The data that were already streamed would be sent when a new consumption is set up.
Basic implementation
I tried to implement such a solution (in a gist) that will use a reduce function on the already emitted values and emit the result when a new task starts consuming the sequence. It's far from being perfect but I think it might help understanding the idea.
Implementation
struct ReducedReplayAsyncStream<Element> {
typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void
private let storage: _Storage
private var originalStream: AsyncStream<Element>
init(
bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
initialResult: Element,
reduce: @escaping Reduce,
build: (AsyncStream<Element>.Continuation) -> Void
) {
originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
storage = _Storage(stored: initialResult, reduce: reduce)
}
private func makeStream() -> AsyncStream<Element> {
AsyncStream<Element> { continuation in
Task {
var isFirst = false
if await !storage.didStart {
await storage.setDidStart(true)
isFirst = true
startConsumingOriginalStream()
}
if !isFirst {
await continuation.yield(storage.stored)
}
await storage.appendContinuation(continuation)
}
}
}
private func startConsumingOriginalStream () {
Task {
for await value in originalStream {
await storage.updateWith(value: value)
}
await storage.continuations.forEach { $0.finish() }
}
}
}
extension ReducedReplayAsyncStream {
private actor _Storage {
private let reduce: ReducedReplayAsyncStream.Reduce
var didStart = false
var stored: Element
var continuations: [AsyncStream<Element>.Continuation] = []
init(stored: Element, reduce: @escaping Reduce) {
self.stored = stored
self.reduce = reduce
}
func updateWith(value: Element) {
reduce(&stored, value)
continuations.forEach { $0.yield(value) }
}
func setDidStart(_ value: Bool) {
didStart = value
}
func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
continuations.append(continuation)
}
}
}
extension ReducedReplayAsyncStream: AsyncSequence {
typealias AsyncIterator = AsyncStream<Element>.AsyncIterator
func makeAsyncIterator() -> AsyncIterator {
let stream = makeStream()
return stream.makeAsyncIterator()
}
}
Usage
var subscriptions: Set<AnyCancellable> = []
var continuation: Stream.Continuation!
let replayStream = ReducedReplayAsyncStream<Int>(
initialResult: 0,
reduce: { partialResult, nextResult in partialResult = partialResult + nextResult },
build: { continuation = $0 }
)
var counter = 0
Timer.publish(every: 0.4, on: .main, in: .default)
.autoconnect()
.sink { _ in
if counter == 10 {
continuation.finish()
}
continuation.yield(counter)
counter += 1
}
.store(in: &subscriptions)
Task {
for await value in replayStream {
print("[A]", value)
}
}
Task {
try await Task.sleep(nanoseconds: 3_000_000_000)
for await value in replayStream {
print("[B]", value)
}
}
Some considerations about efficiency can be found in the gist.