Skip to content

Commit a5a087a

Browse files
committed
Further improvements
1 parent 70edbed commit a5a087a

14 files changed

+358
-113
lines changed

Benchmarks/Benchmarks/ConnectionPoolBenchmarks/ConnectionPoolBenchmarks.swift

+133-8
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
import _ConnectionPoolModule
22
import _ConnectionPoolTestUtils
33
import Benchmark
4+
import NIOCore
5+
import NIOPosix
46

57
let benchmarks: @Sendable () -> Void = {
6-
Benchmark("Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in
8+
Benchmark("Pool: Lease/Release 1k requests: 50 parallel", configuration: .init(scalingFactor: .kilo)) { benchmark in
79
let clock = MockClock()
8-
let factory = MockConnectionFactory<MockClock>(autoMaxStreams: 1)
10+
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
911
var configuration = ConnectionPoolConfiguration()
1012
configuration.maximumConnectionSoftLimit = 50
1113
configuration.maximumConnectionHardLimit = 50
1214

1315
let pool = ConnectionPool(
1416
configuration: configuration,
17+
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
1518
idGenerator: ConnectionIDGenerator(),
1619
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
17-
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
20+
executor: MockExecutor(),
21+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
1822
clock: clock
1923
) {
20-
try await factory.makeConnection(id: $0, for: $1)
24+
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
2125
}
2226

2327
await withTaskGroup { taskGroup in
@@ -54,21 +58,23 @@ let benchmarks: @Sendable () -> Void = {
5458
}
5559
}
5660

57-
Benchmark("Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in
61+
Benchmark("Pool: Lease/Release 1k requests: sequential", configuration: .init(scalingFactor: .kilo)) { benchmark in
5862
let clock = MockClock()
59-
let factory = MockConnectionFactory<MockClock>(autoMaxStreams: 1)
63+
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
6064
var configuration = ConnectionPoolConfiguration()
6165
configuration.maximumConnectionSoftLimit = 50
6266
configuration.maximumConnectionHardLimit = 50
6367

6468
let pool = ConnectionPool(
6569
configuration: configuration,
70+
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
6671
idGenerator: ConnectionIDGenerator(),
6772
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
68-
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
73+
executor: MockExecutor(),
74+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
6975
clock: clock
7076
) {
71-
try await factory.makeConnection(id: $0, for: $1)
77+
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
7278
}
7379

7480
await withTaskGroup { taskGroup in
@@ -96,4 +102,123 @@ let benchmarks: @Sendable () -> Void = {
96102
taskGroup.cancelAll()
97103
}
98104
}
105+
106+
Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 50 parallel – 10 MockExecutors", configuration: .init(scalingFactor: .kilo)) { benchmark in
107+
let clock = MockClock()
108+
let factory = MockConnectionFactory<MockClock, MockExecutor>(autoMaxStreams: 1)
109+
var configuration = ConnectionPoolManagerConfiguration()
110+
let executorCount = 10
111+
let executors = (0..<executorCount).map { _ in MockExecutor() }
112+
113+
let concurrency = 50
114+
115+
configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executorCount
116+
configuration.maximumConnectionPerExecutorHardLimit = concurrency / executorCount
117+
118+
let pool = ConnectionPoolManager(
119+
configuration: configuration,
120+
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
121+
idGenerator: ConnectionIDGenerator(),
122+
requestType: ConnectionRequest<MockConnection>.self,
123+
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
124+
executors: executors,
125+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<MockExecutor>.ID.self),
126+
clock: clock
127+
) {
128+
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
129+
}
130+
131+
await withTaskGroup { taskGroup in
132+
taskGroup.addTask {
133+
await pool.run()
134+
}
135+
136+
let sequential = benchmark.scaledIterations.upperBound / concurrency
137+
138+
benchmark.startMeasurement()
139+
140+
for parallel in 0..<concurrency {
141+
taskGroup.addTask {
142+
for _ in 0..<sequential {
143+
do {
144+
try await pool.withConnection { connection in
145+
blackHole(connection)
146+
}
147+
} catch {
148+
fatalError()
149+
}
150+
}
151+
}
152+
}
153+
154+
for i in 0..<concurrency {
155+
await taskGroup.next()
156+
}
157+
158+
benchmark.stopMeasurement()
159+
160+
taskGroup.cancelAll()
161+
}
162+
}
163+
164+
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) {
165+
let eventLoops = NIOSingletons.posixEventLoopGroup
166+
let count = eventLoops.makeIterator().reduce(into: 0, { (result, _) in result += 1 })
167+
Benchmark("PoolManager/TaskExecutor: Lease/Release 1k requests: 10 parallel – \(count) NIO executors", configuration: .init(scalingFactor: .kilo)) { benchmark in
168+
let clock = MockClock()
169+
let factory = MockConnectionFactory<MockClock, NIOTaskExecutor>(autoMaxStreams: 1)
170+
var configuration = ConnectionPoolManagerConfiguration()
171+
try await NIOTaskExecutor.withExecutors(eventLoops) { executors in
172+
let concurrency = 50
173+
174+
configuration.maximumConnectionPerExecutorSoftLimit = concurrency / executors.count
175+
configuration.maximumConnectionPerExecutorHardLimit = concurrency / executors.count
176+
177+
let pool = ConnectionPoolManager(
178+
configuration: configuration,
179+
connectionConfiguration: MockConnectionConfiguration(username: "username", password: "password"),
180+
idGenerator: ConnectionIDGenerator(),
181+
requestType: ConnectionRequest<MockConnection>.self,
182+
keepAliveBehavior: MockPingPongBehavior(keepAliveFrequency: nil, connectionType: MockConnection.self),
183+
executors: executors,
184+
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection<NIOTaskExecutor>.ID.self),
185+
clock: clock
186+
) {
187+
try await factory.makeConnection(id: $0, configuration: $1, for: $2)
188+
}
189+
190+
await withTaskGroup { taskGroup in
191+
taskGroup.addTask {
192+
await pool.run()
193+
}
194+
195+
let sequential = benchmark.scaledIterations.upperBound / executors.count
196+
197+
benchmark.startMeasurement()
198+
199+
for executor in executors {
200+
taskGroup.addTask(executorPreference: executor) {
201+
for _ in 0..<sequential {
202+
do {
203+
try await pool.withConnection { connection in
204+
blackHole(connection)
205+
}
206+
} catch {
207+
fatalError()
208+
}
209+
}
210+
}
211+
}
212+
213+
for _ in executors {
214+
await taskGroup.next()
215+
}
216+
217+
benchmark.stopMeasurement()
218+
219+
taskGroup.cancelAll()
220+
}
221+
}
222+
}
223+
}
99224
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
//
2+
// NIOTaskExecutor.swift
3+
// benchmarks
4+
//
5+
// Created by Fabian Fett on 09.05.25.
6+
//
7+
8+
import NIOCore
9+
import NIOPosix
10+
import _ConnectionPoolModule
11+
12+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
13+
final class NIOTaskExecutor {
14+
15+
private static let threadSpecificEventLoop = ThreadSpecificVariable<NIOTaskExecutor>()
16+
17+
let eventLoop: any EventLoop
18+
19+
private init(eventLoop: any EventLoop) {
20+
self.eventLoop = eventLoop
21+
}
22+
23+
static func withExecutors(_ eventLoops: MultiThreadedEventLoopGroup, _ body: ([NIOTaskExecutor]) async throws -> ()) async throws {
24+
var executors = [NIOTaskExecutor]()
25+
for eventLoop in eventLoops.makeIterator() {
26+
let executor = NIOTaskExecutor(eventLoop: eventLoop)
27+
try await eventLoop.submit {
28+
NIOTaskExecutor.threadSpecificEventLoop.currentValue = executor
29+
}.get()
30+
executors.append(executor)
31+
}
32+
do {
33+
try await body(executors)
34+
} catch {
35+
36+
}
37+
for eventLoop in eventLoops.makeIterator() {
38+
try await eventLoop.submit {
39+
NIOTaskExecutor.threadSpecificEventLoop.currentValue = nil
40+
}.get()
41+
}
42+
}
43+
}
44+
45+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
46+
extension NIOTaskExecutor: TaskExecutor {
47+
48+
func enqueue(_ job: consuming ExecutorJob) {
49+
// By default we are just going to use execute to run the job
50+
// this is quite heavy since it allocates the closure for
51+
// every single job.
52+
let unownedJob = UnownedJob(job)
53+
self.eventLoop.execute {
54+
unownedJob.runSynchronously(on: self.asUnownedTaskExecutor())
55+
}
56+
}
57+
58+
func asUnownedTaskExecutor() -> UnownedTaskExecutor {
59+
UnownedTaskExecutor(ordinary: self)
60+
}
61+
}
62+
63+
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
64+
extension NIOTaskExecutor: ConnectionPoolExecutor {
65+
typealias ID = ObjectIdentifier
66+
67+
var id: ObjectIdentifier {
68+
ObjectIdentifier(self)
69+
}
70+
71+
static func getExecutorID() -> ObjectIdentifier? {
72+
self.threadSpecificEventLoop.currentValue?.id
73+
}
74+
}

Benchmarks/Package.swift

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ let package = Package(
1010
dependencies: [
1111
.package(path: "../"),
1212
.package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.29.0"),
13+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.82.0"),
1314
],
1415
targets: [
1516
.executableTarget(
@@ -18,6 +19,8 @@ let package = Package(
1819
.product(name: "_ConnectionPoolModule", package: "postgres-nio"),
1920
.product(name: "_ConnectionPoolTestUtils", package: "postgres-nio"),
2021
.product(name: "Benchmark", package: "package-benchmark"),
22+
.product(name: "NIOCore", package: "swift-nio"),
23+
.product(name: "NIOPosix", package: "swift-nio"),
2124
],
2225
path: "Benchmarks/ConnectionPoolBenchmarks",
2326
plugins: [

Sources/ConnectionPoolModule/ConnectionLease.swift

+9-6
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,21 @@
55
// Created by Fabian Fett on 05.05.25.
66
//
77

8-
struct ConnectionLease<Connection: PooledConnection> {
8+
public struct ConnectionLease<Connection: PooledConnection>: Sendable {
99

10-
var connection: Connection
11-
var _release: () -> ()
10+
public var connection: Connection
11+
12+
@usableFromInline
13+
let _release: @Sendable () -> ()
1214

13-
init(connection: Connection, release: @escaping () -> Void) {
15+
@inlinable
16+
public init(connection: Connection, release: @escaping @Sendable () -> Void) {
1417
self.connection = connection
1518
self._release = release
1619
}
1720

18-
func release() {
21+
@inlinable
22+
public func release() {
1923
self._release()
2024
}
21-
2225
}

Sources/ConnectionPoolModule/ConnectionPool.swift

+6-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public protocol ConnectionRequestProtocol: Sendable {
8888

8989
/// A function that is called with a connection or a
9090
/// `PoolError`.
91-
func complete(with: Result<Connection, ConnectionPoolError>)
91+
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
9292
}
9393

9494
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@@ -420,8 +420,11 @@ public final class ConnectionPool<
420420
/*private*/ func runRequestAction(_ action: StateMachine.RequestAction) {
421421
switch action {
422422
case .leaseConnection(let requests, let connection):
423+
let lease = ConnectionLease(connection: connection) {
424+
self.releaseConnection(connection)
425+
}
423426
for request in requests {
424-
request.complete(with: .success(connection))
427+
request.complete(with: .success(lease))
425428
}
426429

427430
case .failRequest(let request, let error):
@@ -521,6 +524,7 @@ public final class ConnectionPool<
521524

522525
@inlinable
523526
/*private*/ func runTimer(_ timer: StateMachine.Timer, in taskGroup: inout some TaskGroupProtocol) {
527+
print("timer: \(timer.connectionID), underlying: \(timer.underlying.usecase)")
524528
self.addTask(into: &taskGroup) { () async -> () in
525529
await withTaskGroup(of: TimerRunResult.self, returning: Void.self) { taskGroup in
526530
taskGroup.addTask {

Sources/ConnectionPoolModule/ConnectionPoolExecutor.swift

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ public protocol ConnectionPoolExecutor: AnyObject, Sendable {
99
public final class NothingConnectionPoolExecutor: ConnectionPoolExecutor {
1010
public typealias ID = ObjectIdentifier
1111

12+
public init() {}
13+
1214
public var id: ObjectIdentifier { ObjectIdentifier(self) }
1315

1416
public static func getExecutorID() -> ObjectIdentifier? { nil }

Sources/ConnectionPoolModule/ConnectionPoolManager.swift

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public final class ConnectionPoolManager<
8989
@usableFromInline
9090
let eventContinuation: AsyncStream<Actions>.Continuation
9191

92+
@inlinable
9293
public init(
9394
configuration: ConnectionPoolManagerConfiguration,
9495
connectionConfiguration: ConnectionConfiguration,

0 commit comments

Comments
 (0)