-
-
Notifications
You must be signed in to change notification settings - Fork 80
async/await prepared statement API #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
async/await prepared statement API #390
Conversation
public func execute<P: PreparedStatement>( | ||
_ preparedStatement: P, | ||
logger: Logger | ||
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return type isn't exactly nice. Any advice on how I could improve it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some AsyncSequence where Element == P.Row
?
Sources/PostgresNIO/New/Connection State Machine/PreparedStatementStateMachine.swift
Show resolved
Hide resolved
context: context | ||
) | ||
} | ||
promise.futureResult.whenFailure { error in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this I want to propagate failures happening during preparation to all the pending queries we have.
I am not 100% convinced by the way it looks (especially by the need to do error as! PSQLError
) but I cannot find any better alternative.
Do you have any advice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First I think we should use whenComplete here instead of whenSuccess and whenFailure. Reason for this is, we can save an allocation...
Second, we must check if the error we get is actually a PSQLError, if it is not, we need to wrap error with a PSQLError.
logger: statement.logger, | ||
promise: statement.promise | ||
))) | ||
self.run(action, with: context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here (and in a couple other places) I need to execute all the pending statement executions.
To do that I enqueue
and run
many actions at the same time. Is this okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First review pass
public func execute<P: PreparedStatement>( | ||
_ preparedStatement: P, | ||
logger: Logger | ||
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some AsyncSequence where Element == P.Row
?
@@ -448,6 +448,27 @@ extension PostgresConnection { | |||
self.channel.write(task, promise: nil) | |||
} | |||
} | |||
|
|||
/// Execute a prepared statement, taking care of the preparation when necessary | |||
public func execute<P: PreparedStatement>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets rename P
to Statement
Sources/PostgresNIO/New/Connection State Machine/PreparedStatementStateMachine.swift
Show resolved
Hide resolved
public func execute<P: PreparedStatement>( | ||
_ preparedStatement: P, | ||
logger: Logger | ||
) async throws -> AsyncThrowingMapSequence<PostgresRowSequence, P.Row> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also let's overload this method with a method where Statement.Row == ()
where we only return the command tag?
switch self.preparedStatementState.lookup( | ||
name: preparedStatement.name, | ||
context: preparedStatement | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we assign a variable here and then switch over the variable?
enum Action { | ||
case prepareStatement | ||
case waitForAlreadyInFlightPreparation | ||
case executePendingStatements([PreparedStatementContext], RowDescription?) | ||
case returnError([PreparedStatementContext], PSQLError) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returning the same Action for all mutations is what I did in the past, but what we generally frown upon by now. Can we get specific Actions for each mutation?
self.preparedStatements[name] = .preparing(statements) | ||
return .waitForAlreadyInFlightPreparation | ||
case .prepared(let rowDescription): | ||
return .executePendingStatements([context], rowDescription) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't allocate here :P
/// | ||
/// Structs conforming to this protocol can then be used with `PostgresConnection.execute(_ preparedStatement:, logger:)`, | ||
/// which will take care of preparing the statement on the server side and executing it. | ||
public protocol PreparedStatement { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should require that this is Sendable :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we should prefix this one with Postgres
. Reason: https://github.com/apple/swift-nio/blob/main/docs/public-api.md#1-no-global-namespace-additions-that-arent-prefixed
Fixes #385 |
f90b4e5
to
4cf7f48
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #390 +/- ##
==========================================
+ Coverage 47.53% 48.98% +1.44%
==========================================
Files 109 110 +1
Lines 8764 8954 +190
==========================================
+ Hits 4166 4386 +220
+ Misses 4598 4568 -30
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better and better. For unit test examples have a look at PostgresConnectionTests.testSimpleListen
case prepareStatement | ||
case waitForAlreadyInFlightPreparation | ||
case executeStatement(RowDescription?) | ||
case executePendingStatements([PreparedStatementContext], RowDescription?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can drop this one, as it isn't used, can't we?
enum PreparationCompleteAction { | ||
case executePendingStatements([PreparedStatementContext], RowDescription?) | ||
} | ||
|
||
mutating func preparationComplete( | ||
name: String, | ||
rowDescription: RowDescription? | ||
) -> PreparationCompleteAction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this work better as a simple struct?
self.listenState = ListenStateMachine() | ||
self.preparedStatementState = PreparedStatementStateMachine() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should just init those two directly in the declaration...
context: context | ||
) | ||
} | ||
promise.futureResult.whenFailure { error in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First I think we should use whenComplete here instead of whenSuccess and whenFailure. Reason for this is, we can save an allocation...
Second, we must check if the error we get is actually a PSQLError, if it is not, we need to wrap error with a PSQLError.
let action = self.preparedStatementState.lookup( | ||
name: preparedStatement.name, | ||
context: preparedStatement | ||
) | ||
switch action { | ||
case .prepareStatement: | ||
let promise = self.eventLoop.makePromise(of: RowDescription?.self) | ||
promise.futureResult.whenSuccess { rowDescription in | ||
self.prepareStatementComplete( | ||
name: preparedStatement.name, | ||
rowDescription: rowDescription, | ||
context: context | ||
) | ||
} | ||
promise.futureResult.whenFailure { error in | ||
self.prepareStatementFailed( | ||
name: preparedStatement.name, | ||
error: error as! PSQLError, | ||
context: context | ||
) | ||
} | ||
psqlTask = .extendedQuery(.init( | ||
name: preparedStatement.name, | ||
query: preparedStatement.sql, | ||
logger: preparedStatement.logger, | ||
promise: promise | ||
)) | ||
case .waitForAlreadyInFlightPreparation: | ||
// The state machine already keeps track of this | ||
// and will execute the statement as soon as it's prepared | ||
return | ||
case .executeStatement(let rowDescription): | ||
psqlTask = .extendedQuery(.init( | ||
executeStatement: .init( | ||
name: preparedStatement.name, | ||
binds: preparedStatement.bindings, | ||
rowDescription: rowDescription), | ||
logger: preparedStatement.logger, | ||
promise: preparedStatement.promise | ||
)) | ||
case .executePendingStatements(let pendingStatements, let rowDescription): | ||
for statement in pendingStatements { | ||
let action = self.state.enqueue(task: .extendedQuery(.init( | ||
executeStatement: .init( | ||
name: statement.name, | ||
binds: statement.bindings, | ||
rowDescription: rowDescription), | ||
logger: statement.logger, | ||
promise: statement.promise | ||
))) | ||
self.run(action, with: context) | ||
} | ||
return | ||
case .returnError(let pendingStatements, let error): | ||
for statement in pendingStatements { | ||
statement.promise.fail(error) | ||
} | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this into its own method? we should likely do the same with all the other longer ones above.
guard case .executePendingStatements(let statements, let rowDescription) = action else { | ||
preconditionFailure("Expected to have pending statements to execute") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should reconsider the return type from the statemachine here :). No enum needed.
guard case .returnError(let statements, let error) = action else { | ||
preconditionFailure("Expected to have pending statements to execute") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above.
/// | ||
/// func makeBindings() -> PostgresBindings { | ||
/// var bindings = PostgresBindings() | ||
/// bindings.append(.init(string: self.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// bindings.append(.init(string: self.state)) | |
/// bindings.append(self.state) |
does this work as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I get back an error: Cannot convert value of type 'String' to expected argument type 'PostgresData'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we shouldn't create PostgresData
. The problem here is that we better names for those methods, so that users can use them without Interpolation as well.
struct PostgresBindings {
@inlinable
public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Value) throws {
try self.binds.append(value, context: .default)
self.sql.append(contentsOf: "$\(self.binds.count)")
}
@inlinable
public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Optional<Value>) throws {
switch value {
case .none:
self.binds.appendNull()
case .some(let value):
try self.binds.append(value, context: .default)
}
self.sql.append(contentsOf: "$\(self.binds.count)")
}
}
name: String, | ||
rowDescription: RowDescription? | ||
) -> PreparationCompleteAction { | ||
guard case .preparing(let statements) = self.preparedStatements[name] else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we switch over all cases here?
} | ||
|
||
mutating func errorHappened(name: String, error: PSQLError) -> ErrorHappenedAction { | ||
guard case .preparing(let statements) = self.preparedStatements[name] else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we switch over all cases here?
/// | ||
/// func makeBindings() -> PostgresBindings { | ||
/// var bindings = PostgresBindings() | ||
/// bindings.append(.init(string: self.state)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we shouldn't create PostgresData
. The problem here is that we better names for those methods, so that users can use them without Interpolation as well.
struct PostgresBindings {
@inlinable
public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Value) throws {
try self.binds.append(value, context: .default)
self.sql.append(contentsOf: "$\(self.binds.count)")
}
@inlinable
public mutating func appendInterpolation<Value: PostgresEncodable>(_ value: Optional<Value>) throws {
switch value {
case .none:
self.binds.appendNull()
case .some(let value):
try self.binds.append(value, context: .default)
}
self.sql.append(contentsOf: "$\(self.binds.count)")
}
}
Co-authored-by: Fabian Fett <[email protected]>
759587c
to
a426205
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks so good. A few nits.
} | ||
} | ||
|
||
func testPreparedStatement() async throws { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that these tests are somewhat painful to write but we should embrace them, as we can produce hard to reproduce scenarios here.
For this reason, please lets try to create those scenarios in extra tests here:
- Sending the a prepared query twice, before the preparation step has succeeded. Is the order of the send statements still correct? Is the query just prepared once?
- Sending the a prepared, after the the query of the same type has already succeeded. Is the query executed right away?
- Failing a prepared statement in preparation are other executes failed in the same way right away?
Happy to help.
Co-authored-by: Fabian Fett <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm extremely happy with this. Let's wait on the go from @gwynne and then let's merge it! Thanks @mariosangiorgio!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nits
Co-authored-by: Fabian Fett <[email protected]>
Implementation of async/await API for prepared statements (See also #385).
This PR adds a new
PreparedStatement
protocol to represent prepared statements and anexecute
function onPostgresConnection
to prepare and execute statements.To implement the features the PR also introduces a
PreparedStatementStateMachine
that keeps track of the state of a prepared statement at the connection level. This ensures that, for each connection, each statement is prepared once at time of first use and then subsequent uses are going to skip the preparation step and just execute it.Example usage
First define the struct to represent the prepared statement:
then, assuming you already have a
PostgresConnection
you can execute it: