Skip to content

Resolve ambiguity issue of stream #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions Sources/AsyncHTTPClient/HTTPHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,52 @@ extension HTTPClient {
}
}

public struct Length: Hashable, Sendable {
@usableFromInline
let length: RequestBodyLength

private init(_ length: RequestBodyLength) {
self.length = length
}

public static let unknown = Length(.unknown)

public static func known(_ length: Int64) -> Length {
return Length(.known(length))
}
}

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `length`.
@available(*, deprecated, renamed: "contentLength")
public var length: Int? {
get {
self.contentLength.flatMap { Int($0) }
if case .known(let count) = self.contentLength.length {
return Int(count)
}
return nil
Comment on lines +91 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer switching exhaustively over enums

}
set {
self.contentLength = newValue.flatMap { Int64($0) }
if let newValue = newValue {
self.contentLength = Length.known(Int64(newValue))
} else {
self.contentLength = Length.unknown
}
}
}

/// Body size. If nil,`Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
public var contentLength: Int64?
public var contentLength: Length
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't make this change, it's an API break. We could work around it by adding a new length property using the new type and have computed getters/setters on contentLength.

However I don't think we should make this change: Length doesn't allow the user to recover the (known) length value. I think if we want to use this type it should only be at the surface level in the stream func. It also makes me wonder whether we should do this at all and just use @_disfavoredOverload. WDYT @FranzBusch?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new not an API break because we haven't released the API yet afaik but I agree exposing this var is weird. @fabianfett what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't released this API yet. Therefore this will not be a breaking change. Generally I'm in favor of using @_disfavoredOverload on the Int version of this API as well as the initializers that use trailing closure syntax.


/// Body chunk provider.
public var stream: @Sendable (StreamWriter) -> EventLoopFuture<Void>

@usableFromInline typealias StreamCallback = @Sendable (StreamWriter) -> EventLoopFuture<Void>

@inlinable
init(contentLength: Int64?, stream: @escaping StreamCallback) {
self.contentLength = contentLength.flatMap { $0 }
init(contentLength: Length, stream: @escaping StreamCallback) {
self.contentLength = contentLength
self.stream = stream
}

Expand All @@ -100,7 +122,7 @@ extension HTTPClient {
/// - parameters:
/// - buffer: Body `ByteBuffer` representation.
public static func byteBuffer(_ buffer: ByteBuffer) -> Body {
return Body(contentLength: Int64(buffer.readableBytes)) { writer in
return Body(contentLength: .known(Int64(buffer.readableBytes))) { writer in
writer.write(.byteBuffer(buffer))
}
}
Expand All @@ -114,17 +136,23 @@ extension HTTPClient {
@preconcurrency
@available(*, deprecated, renamed: "stream(contentLength:bodyStream:)")
public static func stream(length: Int? = nil, _ stream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(contentLength: length.flatMap { Int64($0) }, stream: stream)
let contentLength: Length
if let length = length {
contentLength = .known(Int64(length))
} else {
contentLength = .unknown
}
return Body(contentLength: contentLength, stream: stream)
}

/// Create and stream body using ``StreamWriter``.
///
/// - parameters:
/// - contentLength: Body size. If nil, `Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// - length: Body size. If nil, `Transfer-Encoding` will automatically be set to `chunked`. Otherwise a `Content-Length`
/// header is set with the given `contentLength`.
/// - bodyStream: Body chunk provider.
public static func stream(contentLength: Int64? = nil, bodyStream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(contentLength: contentLength, stream: bodyStream)
/// - stream: Body chunk provider.
public static func stream(length: Length, _ stream: @Sendable @escaping (StreamWriter) -> EventLoopFuture<Void>) -> Body {
return Body(contentLength: length, stream: stream)
}

/// Create and stream body using a collection of bytes.
Expand All @@ -134,7 +162,7 @@ extension HTTPClient {
@preconcurrency
@inlinable
public static func bytes<Bytes>(_ bytes: Bytes) -> Body where Bytes: RandomAccessCollection, Bytes: Sendable, Bytes.Element == UInt8 {
return Body(contentLength: Int64(bytes.count)) { writer in
return Body(contentLength: .known(Int64(bytes.count))) { writer in
if bytes.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(bytes: bytes)))
} else {
Expand All @@ -148,7 +176,7 @@ extension HTTPClient {
/// - parameters:
/// - string: Body `String` representation.
public static func string(_ string: String) -> Body {
return Body(contentLength: Int64(string.utf8.count)) { writer in
return Body(contentLength: .known(Int64(string.utf8.count))) { writer in
if string.utf8.count <= bagOfBytesToByteBufferConversionChunkSize {
return writer.write(.byteBuffer(ByteBuffer(string: string)))
} else {
Expand Down Expand Up @@ -881,10 +909,6 @@ extension RequestBodyLength {
self = .known(0)
return
}
guard let length = body.contentLength else {
self = .unknown
return
}
self = .known(length)
self = body.contentLength.length
}
}
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(100)) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -345,7 +345,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(10)) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -384,7 +384,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(10)) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -432,7 +432,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(2)) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -595,7 +595,7 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
guard let testUtils = maybeTestUtils else { return XCTFail("Expected connection setup works") }

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(10)) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
2 changes: 1 addition & 1 deletion Tests/AsyncHTTPClientTests/HTTP1ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class HTTP1ConnectionTests: XCTestCase {
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(
url: "http://localhost/hello/swift",
method: .POST,
body: .stream(contentLength: 4) { writer -> EventLoopFuture<Void> in
body: .stream(length: .known(4)) { writer -> EventLoopFuture<Void> in
@Sendable func recursive(count: UInt8, promise: EventLoopPromise<Void>) {
guard count < 4 else {
return promise.succeed(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 50)

var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 100) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(100)) { writer in
testWriter.start(writer: writer)
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down Expand Up @@ -295,7 +295,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(10)) { writer in
// Advance time by more than the idle write timeout (that's 1 millisecond) to trigger the timeout.
embedded.embeddedEventLoop.advanceTime(by: .milliseconds(2))
return testWriter.start(writer: writer)
Expand Down Expand Up @@ -335,7 +335,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 10) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(10)) { writer in
embedded.isWritable = false
embedded.pipeline.fireChannelWritabilityChanged()
// This should not trigger any errors or timeouts, because the timer isn't running
Expand Down Expand Up @@ -385,7 +385,7 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {

let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 5)
var maybeRequest: HTTPClient.Request?
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(contentLength: 2) { writer in
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: .known(2)) { writer in
return testWriter.start(writer: writer, expectedErrors: [HTTPClientError.cancelled])
}))
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
Expand Down
4 changes: 2 additions & 2 deletions Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HTTP2ClientTests: XCTestCase {
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
var response: HTTPClient.Response?
let body = HTTPClient.Body.stream(contentLength: nil) { writer in
let body = HTTPClient.Body.stream(length: .unknown) { writer in
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap {
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0))))
}
Expand All @@ -84,7 +84,7 @@ class HTTP2ClientTests: XCTestCase {
defer { XCTAssertNoThrow(try bin.shutdown()) }
let client = self.makeDefaultHTTPClient()
defer { XCTAssertNoThrow(try client.syncShutdown()) }
let body = HTTPClient.Body.stream(contentLength: 12) { writer in
let body = HTTPClient.Body.stream(length: .known(12)) { writer in
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0)))).flatMap {
writer.write(.byteBuffer(ByteBuffer(integer: UInt64(0))))
}
Expand Down
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

let body: HTTPClient.Body = .stream(contentLength: 50) { writer in
let body: HTTPClient.Body = .stream(length: .known(50)) { writer in
do {
var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name: "Accept", value: "text/event-stream")
Expand Down Expand Up @@ -81,13 +81,13 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpBin.shutdown())
}

var body: HTTPClient.Body = .stream(contentLength: 50) { _ in
var body: HTTPClient.Body = .stream(length: .known(50)) { _ in
httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse)
}

XCTAssertThrowsError(try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait())

body = .stream(contentLength: 50) { _ in
body = .stream(length: .known(50)) { _ in
do {
var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1")
request.headers.add(name: "Accept", value: "text/event-stream")
Expand Down Expand Up @@ -223,7 +223,7 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true))
}

let body: HTTPClient.Body = .stream(contentLength: 8) { writer in
let body: HTTPClient.Body = .stream(length: .known(8)) { writer in
let buffer = ByteBuffer(string: "1234")
return writer.write(.byteBuffer(buffer)).flatMap {
let buffer = ByteBuffer(string: "4321")
Expand Down Expand Up @@ -366,7 +366,7 @@ class HTTPClientInternalTests: XCTestCase {
let el2 = group.next()
XCTAssert(el1 !== el2)

let body: HTTPClient.Body = .stream(contentLength: 8) { writer in
let body: HTTPClient.Body = .stream(length: .known(8)) { writer in
XCTAssert(el1.inEventLoop)
let buffer = ByteBuffer(string: "1234")
return writer.write(.byteBuffer(buffer)).flatMap {
Expand Down
Loading