Skip to content

Commit 0f3e411

Browse files
committed
Stay on EL, when connection has become idle
1 parent 115dc40 commit 0f3e411

File tree

8 files changed

+231
-55
lines changed

8 files changed

+231
-55
lines changed

Benchmarks/Package.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ let package = Package(
1111
.package(path: "../"),
1212
.package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"),
1313
.package(url: "https://github.com/vapor/postgres-kit.git", from: "2.14.0"),
14-
.package(name: "swift-nio", path: "../../../apple/swift-nio"),
14+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.82.0"),
1515
],
1616
targets: [
1717
.executableTarget(

Benchmarks/Sources/PostgresPerf/PostgresPerf.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import PostgresNIO
1414
@available(macOS 15.0, *)
1515
enum PostgresPerf {
1616

17-
static let maxConnections: Int = 400
17+
static let maxConnections: Int = 50
1818
static let tasks: Int = 400
1919
static let iterationsPerTask: Int = 1000
2020
static let logger = Logger(label: "TestLogger")

Sources/ConnectionPoolModule/ConnectionIDGenerator.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Atomics
22

33
public struct ConnectionIDGenerator: ConnectionIDGeneratorProtocol {
4-
static let globalGenerator = ConnectionIDGenerator()
4+
public static let globalGenerator = ConnectionIDGenerator()
55

66
private let atomic: ManagedAtomic<Int>
77

Sources/ConnectionPoolModule/ConnectionPool.swift

+11-2
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,19 @@ public final class ConnectionPool<
264264
}
265265
}
266266

267-
public func cancelLeaseConnection(_ requestID: RequestID) {
267+
@discardableResult
268+
public func cancelLeaseConnection(_ requestID: RequestID) -> Bool {
269+
var found = false
268270
self.modifyStateAndRunActions { state in
269-
state.stateMachine.cancelRequest(id: requestID)
271+
let action = state.stateMachine.cancelRequest(id: requestID)
272+
if case .failRequest = action.request {
273+
found = true
274+
} else {
275+
found = false
276+
}
277+
return action
270278
}
279+
return found
271280
}
272281

273282
/// Mark a connection as going away. Connection implementors have to call this method if the connection

Sources/ConnectionPoolModule/ConnectionPoolManager.swift

+10
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,16 @@ public final class ConnectionPoolManager<
146146
self.roundRobinPools[index].leaseConnection(request)
147147
}
148148

149+
@inlinable
150+
public func cancelLeaseConnection(_ requestID: RequestID) {
151+
// TODO: This is expensive!
152+
for pool in self.roundRobinPools {
153+
if pool.cancelLeaseConnection(requestID) {
154+
break
155+
}
156+
}
157+
}
158+
149159
@usableFromInline
150160
enum Actions: Sendable {
151161
case runPool(ConnectionPool)

Sources/PostgresNIO/Pool/ConnectionFactory.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ final class ConnectionFactory: Sendable {
3535
self.logger = logger
3636
}
3737

38-
func makeConnection(_ connectionID: PostgresConnection.ID, pool: PostgresClient.Pool) async throws -> PostgresConnection {
38+
func makeConnection(_ connectionID: PostgresConnection.ID, pool: PostgresClient.ConnectionPool) async throws -> PostgresConnection {
3939
let config = try await self.makeConnectionConfig()
4040

4141
var connectionLogger = self.logger

Sources/PostgresNIO/Pool/PostgresClient.swift

+128-49
Original file line numberDiff line numberDiff line change
@@ -233,33 +233,130 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
233233
}
234234
}
235235

236-
typealias PoolManager = ConnectionPoolManager<
236+
typealias ConnectionPoolManager = _ConnectionPoolModule.ConnectionPoolManager<
237237
PostgresConnection,
238238
PostgresConnection.ID,
239239
ConnectionIDGenerator,
240240
Foo,
241-
ConnectionRequest<PostgresConnection>,
242-
ConnectionRequest.ID,
241+
PostgresConnectionRequest,
242+
PostgresConnectionRequest.ID,
243243
PostgresKeepAliveBehavor,
244244
NothingConnectionPoolExecutor,
245245
PostgresClientMetrics,
246246
ContinuousClock
247247
>
248248

249-
typealias Pool = ConnectionPool<
249+
typealias ConnectionPool = _ConnectionPoolModule.ConnectionPool<
250250
PostgresConnection,
251251
PostgresConnection.ID,
252252
ConnectionIDGenerator,
253253
Foo,
254-
ConnectionRequest<PostgresConnection>,
255-
ConnectionRequest.ID,
254+
PostgresConnectionRequest,
255+
PostgresConnectionRequest.ID,
256256
PostgresKeepAliveBehavor,
257257
NothingConnectionPoolExecutor,
258258
PostgresClientMetrics,
259259
ContinuousClock
260260
>
261261

262-
let pool: PoolManager
262+
enum Pool {
263+
case manager(ConnectionPoolManager)
264+
case pool(ConnectionPool)
265+
266+
init(
267+
configuration: Configuration,
268+
factory: ConnectionFactory,
269+
eventLoopGroup: any EventLoopGroup,
270+
backgroundLogger: Logger
271+
) {
272+
let idGenerator = ConnectionIDGenerator.globalGenerator
273+
274+
if configuration.options.maximumConnections > 50 {
275+
// make as many executors as we have NIO els
276+
let executors = (0..<10).map { _ in NothingConnectionPoolExecutor() }
277+
var poolManagerConfiguration = ConnectionPoolManagerConfiguration()
278+
poolManagerConfiguration.minimumConnectionPerExecutorCount = configuration.options.minimumConnections / executors.count
279+
poolManagerConfiguration.maximumConnectionPerExecutorSoftLimit = configuration.options.maximumConnections / executors.count
280+
poolManagerConfiguration.maximumConnectionPerExecutorHardLimit = configuration.options.maximumConnections / executors.count
281+
282+
self = .manager(
283+
ConnectionPoolManager(
284+
configuration: poolManagerConfiguration,
285+
connectionConfiguration: Foo(),
286+
idGenerator: idGenerator,
287+
requestType: PostgresConnectionRequest.self,
288+
keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger),
289+
executors: executors,
290+
observabilityDelegate: .init(logger: backgroundLogger),
291+
clock: ContinuousClock()
292+
) { (connectionID, connectionConfiguration, pool) in
293+
let connection = try await factory.makeConnection(connectionID, pool: pool)
294+
return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1)
295+
}
296+
)
297+
} else {
298+
self = .pool(
299+
ConnectionPool(
300+
configuration: .init(configuration),
301+
connectionConfiguration: Foo(),
302+
idGenerator: idGenerator,
303+
requestType: PostgresConnectionRequest.self,
304+
keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger),
305+
executor: NothingConnectionPoolExecutor(),
306+
observabilityDelegate: .init(logger: backgroundLogger),
307+
clock: ContinuousClock()
308+
) { (connectionID, connectionConfiguration, pool) in
309+
let connection = try await factory.makeConnection(connectionID, pool: pool)
310+
return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1)
311+
}
312+
)
313+
}
314+
}
315+
316+
func run() async {
317+
switch self {
318+
case .manager(let manager):
319+
await manager.run()
320+
case .pool(let pool):
321+
await pool.run()
322+
}
323+
}
324+
325+
func cancelRequest(id: PostgresConnectionRequest.ID) {
326+
switch self {
327+
case .pool(let pool):
328+
pool.cancelLeaseConnection(id)
329+
330+
case .manager(let manager):
331+
manager.cancelLeaseConnection(id)
332+
}
333+
}
334+
335+
func leaseConnection(_ request: PostgresConnectionRequest) {
336+
switch self {
337+
case .pool(let pool):
338+
pool.leaseConnection(request)
339+
case .manager(let manager):
340+
manager.leaseConnection(request)
341+
}
342+
}
343+
344+
func leaseConnection() async throws -> ConnectionLease<PostgresConnection> {
345+
let requestID = PostgresConnectionRequest.idGenerator.next()
346+
347+
return try await withTaskCancellationHandler {
348+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<PostgresConnection>, Error>) in
349+
let request = PostgresConnectionRequest(id: requestID, continuation: continuation)
350+
351+
self.leaseConnection(request)
352+
}
353+
} onCancel: {
354+
self.cancelRequest(id: requestID)
355+
}
356+
}
357+
}
358+
359+
let pool: Pool
263360
let factory: ConnectionFactory
264361
let runningAtomic = ManagedAtomic(false)
265362
let backgroundLogger: Logger
@@ -295,26 +392,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
295392
self.factory = factory
296393
self.backgroundLogger = backgroundLogger
297394

298-
let executors = (0..<10).map { _ in NothingConnectionPoolExecutor() }
299-
var poolManagerConfiguration = ConnectionPoolManagerConfiguration()
300-
poolManagerConfiguration.minimumConnectionPerExecutorCount = configuration.options.minimumConnections / executors.count
301-
poolManagerConfiguration.maximumConnectionPerExecutorSoftLimit = configuration.options.maximumConnections / executors.count
302-
poolManagerConfiguration.maximumConnectionPerExecutorHardLimit = configuration.options.maximumConnections / executors.count
303-
304-
self.pool = ConnectionPoolManager(
305-
configuration: poolManagerConfiguration,
306-
connectionConfiguration: Foo(),
307-
idGenerator: ConnectionIDGenerator(),
308-
requestType: ConnectionRequest<PostgresConnection>.self,
309-
keepAliveBehavior: .init(configuration.options.keepAliveBehavior, logger: backgroundLogger),
310-
executors: executors,
311-
observabilityDelegate: .init(logger: backgroundLogger),
312-
clock: ContinuousClock()
313-
) { (connectionID, connectionConfiguration, pool) in
314-
let connection = try await factory.makeConnection(connectionID, pool: pool)
315-
316-
return ConnectionAndMetadata(connection: connection, maximalStreamsOnConnection: 1)
317-
}
395+
self.pool = .init(
396+
configuration: configuration,
397+
factory: factory,
398+
eventLoopGroup: eventLoopGroup,
399+
backgroundLogger: backgroundLogger
400+
)
318401
}
319402

320403
/// Lease a connection for the provided `closure`'s lifetime.
@@ -427,30 +510,22 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
427510
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
428511
}
429512

430-
let lease = try await self.leaseConnection()
431-
let connection = lease.connection
432-
433-
var logger = logger
434-
logger[postgresMetadataKey: .connectionID] = "\(connection.id)"
435-
436-
let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
437-
let context = ExtendedQueryContext(
438-
query: query,
439-
logger: logger,
440-
promise: promise
441-
)
513+
let requestID = PostgresConnectionRequest.idGenerator.next()
442514

443-
connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)
515+
return try await withTaskCancellationHandler {
516+
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<PostgresRowSequence, Error>) in
517+
let request = PostgresConnectionRequest(
518+
id: requestID,
519+
query: query,
520+
continuation: continuation,
521+
logger: logger
522+
)
444523

445-
promise.futureResult.whenFailure { _ in
446-
lease.release()
524+
self.pool.leaseConnection(request)
525+
}
526+
} onCancel: {
527+
self.pool.cancelRequest(id: requestID)
447528
}
448-
449-
return try await promise.futureResult.map {
450-
$0.asyncSequence(onFinish: {
451-
lease.release()
452-
})
453-
}.get()
454529
} catch var error as PSQLError {
455530
error.file = file
456531
error.line = line
@@ -543,7 +618,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
543618
PostgresConnection.defaultEventLoopGroup
544619
}
545620

546-
static let loggingDisabled = Logger(label: "Postgres-do-not-log", factory: { _ in SwiftLogNoOpLogHandler() })
621+
static let loggingDisabled = {
622+
var logger = Logger(label: "Postgres-do-not-log", factory: { _ in SwiftLogNoOpLogHandler() })
623+
logger.logLevel = .critical
624+
return logger
625+
}()
547626
}
548627

549628
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//
2+
// PostgresConnectionRequest.swift
3+
// postgres-nio
4+
//
5+
// Created by Fabian Fett on 15.05.25.
6+
//
7+
8+
import _ConnectionPoolModule
9+
10+
struct PostgresConnectionRequest: ConnectionRequestProtocol {
11+
12+
static let idGenerator = ConnectionIDGenerator()
13+
14+
private enum `Type` {
15+
case connection(CheckedContinuation<ConnectionLease<PostgresConnection>, any Error>)
16+
case query(PostgresQuery, Logger, CheckedContinuation<PostgresRowSequence, any Error>)
17+
}
18+
19+
typealias ID = Int
20+
21+
var id: ID
22+
private var type: `Type`
23+
24+
init(
25+
id: Int,
26+
continuation: CheckedContinuation<ConnectionLease<PostgresConnection>, any Error>
27+
) {
28+
self.id = id
29+
self.type = .connection(continuation)
30+
}
31+
32+
init(
33+
id: Int,
34+
query: PostgresQuery,
35+
continuation: CheckedContinuation<PostgresRowSequence, any Error>,
36+
logger: Logger
37+
) {
38+
self.id = id
39+
self.type = .query(query, logger, continuation)
40+
}
41+
42+
public func complete(with result: Result<ConnectionLease<PostgresConnection>, ConnectionPoolError>) {
43+
switch self.type {
44+
case .connection(let checkedContinuation):
45+
checkedContinuation.resume(with: result)
46+
47+
case .query(let query, var logger, let checkedContinuation):
48+
switch result {
49+
case .success(let lease):
50+
logger[postgresMetadataKey: .connectionID] = "\(lease.connection.id)"
51+
52+
let promise = lease.connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
53+
let context = ExtendedQueryContext(
54+
query: query,
55+
logger: logger,
56+
promise: promise
57+
)
58+
59+
lease.connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)
60+
promise.futureResult.whenFailure { error in
61+
lease.release()
62+
checkedContinuation.resume(throwing: error)
63+
}
64+
65+
promise.futureResult.whenSuccess { rowSequence in
66+
let asyncSequence = rowSequence.asyncSequence {
67+
lease.release()
68+
}
69+
checkedContinuation.resume(returning: asyncSequence)
70+
}
71+
72+
case .failure(let error):
73+
checkedContinuation.resume(throwing: error)
74+
}
75+
}
76+
77+
}
78+
}

0 commit comments

Comments
 (0)