Skip to content

Commit 24571eb

Browse files
authored
Fix connections not being destroyed when released to purged pool (#823)
This scenario happens when the pull for a given address is purged while a connection stills in use and then another pool for the same address is created. In this case, the connection was being wrongly added to the existing pool. The correct behaviour is destroy this orphan connections.
1 parent beab63c commit 24571eb

File tree

2 files changed

+78
-22
lines changed

2 files changed

+78
-22
lines changed

packages/bolt-connection/src/pool/pool.js

+26-4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Pool {
6262
this._pendingCreates = {}
6363
this._acquireRequests = {}
6464
this._activeResourceCounts = {}
65+
this._poolState = {}
6566
this._release = this._release.bind(this)
6667
this._log = log
6768
this._closed = false
@@ -189,10 +190,13 @@ class Pool {
189190

190191
const key = address.asKey()
191192
let pool = this._pools[key]
193+
let poolState = this._poolState[key]
192194
if (!pool) {
193195
pool = []
196+
poolState = new PoolState()
194197
this._pools[key] = pool
195198
this._pendingCreates[key] = 0
199+
this._poolState[key] = poolState
196200
}
197201
while (pool.length) {
198202
const resource = pool.pop()
@@ -231,7 +235,7 @@ class Pool {
231235
let resource
232236
try {
233237
// Invoke callback that creates actual connection
234-
resource = await this._create(address, this._release)
238+
resource = await this._create(address, (address, resource) => this._release(poolState, address, resource))
235239

236240
resourceAcquired(key, this._activeResourceCounts)
237241
if (this._log.isDebugEnabled()) {
@@ -243,11 +247,11 @@ class Pool {
243247
return resource
244248
}
245249

246-
async _release (address, resource) {
250+
async _release (poolState, address, resource) {
247251
const key = address.asKey()
248252
const pool = this._pools[key]
249253

250-
if (pool) {
254+
if (pool && poolState.isActive()) {
251255
// there exist idle connections for the given key
252256
if (!this._validate(resource)) {
253257
if (this._log.isDebugEnabled()) {
@@ -295,19 +299,23 @@ class Pool {
295299

296300
async _purgeKey (key) {
297301
const pool = this._pools[key] || []
302+
const poolState = this._poolState[key] || new PoolState()
298303
while (pool.length) {
299304
const resource = pool.pop()
300305
if (this._removeIdleObserver) {
301306
this._removeIdleObserver(resource)
302307
}
303308
await this._destroy(resource)
304309
}
310+
poolState.close()
305311
delete this._pools[key]
312+
delete this._poolState[key]
306313
}
307314

308315
_processPendingAcquireRequests (address) {
309316
const key = address.asKey()
310317
const requests = this._acquireRequests[key]
318+
const poolState = this._poolState[key]
311319
if (requests) {
312320
const pendingRequest = requests.shift() // pop a pending acquire request
313321

@@ -326,7 +334,7 @@ class Pool {
326334
if (pendingRequest.isCompleted()) {
327335
// request has been completed, most likely failed by a timeout
328336
// return the acquired resource back to the pool
329-
this._release(address, resource)
337+
this._release(poolState, address, resource)
330338
} else {
331339
// request is still pending and can be resolved with the newly acquired resource
332340
pendingRequest.resolve(resource) // resolve the pending request with the acquired resource
@@ -404,4 +412,18 @@ class PendingRequest {
404412
}
405413
}
406414

415+
class PoolState {
416+
constructor() {
417+
this._active = true;
418+
}
419+
420+
isActive() {
421+
return this._active;
422+
}
423+
424+
close() {
425+
this._active = false;
426+
}
427+
}
428+
407429
export default Pool

packages/neo4j-driver/test/internal/pool.test.js renamed to packages/bolt-connection/test/pool/pool.test.js

+52-18
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Pool from '../../../bolt-connection/lib/pool/pool'
21-
import PoolConfig from '../../../bolt-connection/lib/pool/pool-config'
20+
import Pool from '../../src/pool/pool'
21+
import PoolConfig from '../../src/pool/pool-config'
2222
import { newError, error, internal } from 'neo4j-driver-core'
2323

2424
const {
@@ -27,7 +27,7 @@ const {
2727

2828
const { SERVICE_UNAVAILABLE } = error
2929

30-
describe('#unit Pool', async () => {
30+
describe('#unit Pool', () => {
3131
it('allocates if pool is empty', async () => {
3232
// Given
3333
let counter = 0
@@ -237,6 +237,44 @@ describe('#unit Pool', async () => {
237237
expect(r0.destroyed).toBeTruthy()
238238
})
239239

240+
it('destroys resource when pool is purged even if a new pool is created for the same address', async () => {
241+
let counter = 0
242+
const address = ServerAddress.fromUrl('bolt://localhost:7687')
243+
const pool = new Pool({
244+
create: (server, release) =>
245+
Promise.resolve(new Resource(server, counter++, release)),
246+
destroy: res => {
247+
res.destroyed = true
248+
return Promise.resolve()
249+
}
250+
})
251+
252+
// Acquire resource
253+
const r0 = await pool.acquire(address)
254+
expect(pool.has(address)).toBeTruthy()
255+
expect(r0.id).toEqual(0)
256+
257+
// Purging the key
258+
await pool.purge(address)
259+
expect(pool.has(address)).toBeFalsy()
260+
expect(r0.destroyed).toBeFalsy()
261+
262+
// Acquiring second resource should recreate the pool
263+
const r1 = await pool.acquire(address)
264+
expect(pool.has(address)).toBeTruthy()
265+
expect(r1.id).toEqual(1)
266+
267+
// Closing the first resource should destroy it
268+
await r0.close()
269+
expect(pool.has(address)).toBeTruthy()
270+
expect(r0.destroyed).toBeTruthy()
271+
272+
// Closing the second resource should not destroy it
273+
await r1.close()
274+
expect(pool.has(address)).toBeTruthy()
275+
expect(r1.destroyed).toBeFalsy()
276+
})
277+
240278
it('close purges all keys', async () => {
241279
let counter = 0
242280

@@ -282,11 +320,9 @@ describe('#unit Pool', async () => {
282320
// Close the pool
283321
await pool.close()
284322

285-
await expectAsync(pool.acquire(address)).toBeRejectedWith(
286-
jasmine.objectContaining({
287-
message: jasmine.stringMatching(/Pool is closed/)
288-
})
289-
)
323+
await expect(pool.acquire(address)).rejects.toMatchObject({
324+
message: expect.stringMatching('Pool is closed')
325+
})
290326
})
291327

292328
it('should fail to acquire when closed with idle connections', async () => {
@@ -307,11 +343,9 @@ describe('#unit Pool', async () => {
307343
// Close the pool
308344
await pool.close()
309345

310-
await expectAsync(pool.acquire(address)).toBeRejectedWith(
311-
jasmine.objectContaining({
312-
message: jasmine.stringMatching(/Pool is closed/)
313-
})
314-
)
346+
await expect(pool.acquire(address)).rejects.toMatchObject({
347+
message: expect.stringMatching('Pool is closed')
348+
})
315349
})
316350
it('purges keys other than the ones to keep', async () => {
317351
let counter = 0
@@ -561,9 +595,9 @@ describe('#unit Pool', async () => {
561595
await pool.acquire(address)
562596
await pool.acquire(address)
563597

564-
await expectAsync(pool.acquire(address)).toBeRejectedWith(
565-
jasmine.stringMatching('acquisition timed out')
566-
)
598+
await expect(pool.acquire(address)).rejects.toMatchObject({
599+
message: expect.stringMatching('acquisition timed out')
600+
})
567601
expectNumberOfAcquisitionRequests(pool, address, 0)
568602
})
569603

@@ -607,11 +641,11 @@ describe('#unit Pool', async () => {
607641

608642
// Let's fulfill the connect promise belonging to the first request.
609643
conns[0].resolve(conns[0])
610-
await expectAsync(req1).toBeResolved()
644+
await expect(req1).resolves.toBeDefined()
611645

612646
// Release the connection, it should be picked up by the second request.
613647
conns[0].release(address, conns[0])
614-
await expectAsync(req2).toBeResolved()
648+
await expect(req2).resolves.toBeDefined()
615649

616650
// Just to make sure that there hasn't been any new connection.
617651
expect(conns.length).toEqual(1)

0 commit comments

Comments
 (0)