Skip to content

Commit 70edbed

Browse files
committed
Started work on a pool manager
1 parent 9ef0611 commit 70edbed

File tree

10 files changed

+380
-20
lines changed

10 files changed

+380
-20
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//
2+
// ConnectionLease.swift
3+
// postgres-nio
4+
//
5+
// Created by Fabian Fett on 05.05.25.
6+
//
7+
8+
struct ConnectionLease<Connection: PooledConnection> {
9+
10+
var connection: Connection
11+
var _release: () -> ()
12+
13+
init(connection: Connection, release: @escaping () -> Void) {
14+
self.connection = connection
15+
self._release = release
16+
}
17+
18+
func release() {
19+
self._release()
20+
}
21+
22+
}

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,11 @@ public final class ConnectionPool<
134134
Connection: PooledConnection,
135135
ConnectionID: Hashable & Sendable,
136136
ConnectionIDGenerator: ConnectionIDGeneratorProtocol,
137+
ConnectionConfiguration: Equatable & Sendable,
137138
Request: ConnectionRequestProtocol,
138139
RequestID: Hashable & Sendable,
139140
KeepAliveBehavior: ConnectionKeepAliveBehavior,
141+
Executor: ConnectionPoolExecutor,
140142
ObservabilityDelegate: ConnectionPoolObservabilityDelegate,
141143
Clock: _Concurrency.Clock
142144
>: Sendable where
@@ -148,14 +150,19 @@ public final class ConnectionPool<
148150
ObservabilityDelegate.ConnectionID == ConnectionID,
149151
Clock.Duration == Duration
150152
{
151-
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionPool<Connection, ConnectionID, ConnectionIDGenerator, Request, RequestID, KeepAliveBehavior, ObservabilityDelegate, Clock>) async throws -> ConnectionAndMetadata<Connection>
153+
public typealias ConnectionFactory = @Sendable (ConnectionID, ConnectionConfiguration, ConnectionPool<Connection, ConnectionID, ConnectionIDGenerator, ConnectionConfiguration, Request, RequestID, KeepAliveBehavior, Executor, ObservabilityDelegate, Clock>) async throws -> ConnectionAndMetadata<Connection>
152154

153155
@usableFromInline
154156
typealias StateMachine = PoolStateMachine<Connection, ConnectionIDGenerator, ConnectionID, Request, Request.ID, CheckedContinuation<Void, Never>>
155157

156158
@usableFromInline
157159
let factory: ConnectionFactory
158160

161+
public let executor: Executor
162+
163+
@usableFromInline
164+
let connectionConfiguration: ConnectionConfiguration
165+
159166
@usableFromInline
160167
let keepAliveBehavior: KeepAliveBehavior
161168

@@ -188,18 +195,22 @@ public final class ConnectionPool<
188195

189196
public init(
190197
configuration: ConnectionPoolConfiguration,
198+
connectionConfiguration: ConnectionConfiguration,
191199
idGenerator: ConnectionIDGenerator,
192200
requestType: Request.Type,
193201
keepAliveBehavior: KeepAliveBehavior,
202+
executor: Executor,
194203
observabilityDelegate: ObservabilityDelegate,
195204
clock: Clock,
196205
connectionFactory: @escaping ConnectionFactory
197206
) {
207+
self.executor = executor
198208
self.clock = clock
199209
self.factory = connectionFactory
200210
self.keepAliveBehavior = keepAliveBehavior
201211
self.observabilityDelegate = observabilityDelegate
202212
self.configuration = configuration
213+
self.connectionConfiguration = connectionConfiguration
203214
var stateMachine = StateMachine(
204215
configuration: .init(configuration, keepAliveBehavior: keepAliveBehavior),
205216
generator: idGenerator,
@@ -271,6 +282,13 @@ public final class ConnectionPool<
271282
}
272283
}
273284

285+
@inlinable
286+
public func updateConfiguration(_ configuration: ConnectionConfiguration, forceReconnection: Bool) {
287+
// TODO: Implement connection will close correctly
288+
// If the forceReconnection flag is set, we should gracefully close the connection once they
289+
// are returned the next time.
290+
}
291+
274292
@inlinable
275293
public func run() async {
276294
await withTaskCancellationHandler {
@@ -419,11 +437,11 @@ public final class ConnectionPool<
419437

420438
@inlinable
421439
/*private*/ func makeConnection(for request: StateMachine.ConnectionRequest, in taskGroup: inout some TaskGroupProtocol) {
422-
taskGroup.addTask_ {
440+
self.addTask(into: &taskGroup) {
423441
self.observabilityDelegate.startedConnecting(id: request.connectionID)
424442

425443
do {
426-
let bundle = try await self.factory(request.connectionID, self)
444+
let bundle = try await self.factory(request.connectionID, self.connectionConfiguration, self)
427445
self.connectionEstablished(bundle)
428446

429447
// after the connection has been established, we keep the task open. This ensures
@@ -468,7 +486,7 @@ public final class ConnectionPool<
468486
/*private*/ func runKeepAlive(_ connection: Connection, in taskGroup: inout some TaskGroupProtocol) {
469487
self.observabilityDelegate.keepAliveTriggered(id: connection.id)
470488

471-
taskGroup.addTask_ {
489+
self.addTask(into: &taskGroup) {
472490
do {
473491
try await self.keepAliveBehavior.runKeepAlive(for: connection)
474492

@@ -502,8 +520,8 @@ public final class ConnectionPool<
502520
}
503521

504522
@inlinable
505-
/*private*/ func runTimer(_ timer: StateMachine.Timer, in poolGroup: inout some TaskGroupProtocol) {
506-
poolGroup.addTask_ { () async -> () in
523+
/*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) {
524+
self.addTask(into: &taskGroup) { () async -> () in
507525
await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in
508526
taskGroup.addTask {
509527
do {
@@ -554,6 +572,15 @@ public final class ConnectionPool<
554572
token.resume()
555573
}
556574
}
575+
576+
@inlinable
577+
func addTask(into taskGroup: inout some TaskGroupProtocol, operation: @escaping @Sendable () async -> Void) {
578+
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *), let executor = self.executor as? TaskExecutor {
579+
taskGroup.addTask_(executorPreference: executor, operation: operation)
580+
} else {
581+
taskGroup.addTask_(operation: operation)
582+
}
583+
}
557584
}
558585

559586
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@@ -573,6 +600,9 @@ protocol TaskGroupProtocol {
573600
// under exactly this name and others have different attributes. So let's pick
574601
// a name that doesn't clash anywhere and implement it using the standard `addTask`.
575602
mutating func addTask_(operation: @escaping @Sendable () async -> Void)
603+
604+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
605+
mutating func addTask_(executorPreference: ((any TaskExecutor)?), operation: @escaping @Sendable () async -> Void)
576606
}
577607

578608
@available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *)
@@ -581,11 +611,23 @@ extension DiscardingTaskGroup: TaskGroupProtocol {
581611
mutating func addTask_(operation: @escaping @Sendable () async -> Void) {
582612
self.addTask(priority: nil, operation: operation)
583613
}
614+
615+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
616+
@inlinable
617+
mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) {
618+
self.addTask(executorPreference: executorPreference, operation: operation)
619+
}
584620
}
585621

586622
extension TaskGroup<Void>: TaskGroupProtocol {
587623
@inlinable
588624
mutating func addTask_(operation: @escaping @Sendable () async -> Void) {
589625
self.addTask(priority: nil, operation: operation)
590626
}
627+
628+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
629+
@inlinable
630+
mutating func addTask_(executorPreference: (any TaskExecutor)?, operation: @escaping @Sendable () async -> Void) {
631+
self.addTask(executorPreference: executorPreference, operation: operation)
632+
}
591633
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
public protocol ConnectionPoolExecutor: AnyObject, Sendable {
2+
associatedtype ID: Hashable, Sendable
3+
4+
var id: ID { get }
5+
6+
static func getExecutorID() -> Self.ID?
7+
}
8+
9+
public final class NothingConnectionPoolExecutor: ConnectionPoolExecutor {
10+
public typealias ID = ObjectIdentifier
11+
12+
public var id: ObjectIdentifier { ObjectIdentifier(self) }
13+
14+
public static func getExecutorID() -> ObjectIdentifier? { nil }
15+
}

0 commit comments

Comments
 (0)