Skip to content

Commit 64fdb3e

Browse files
authored
feat(NODE-6882): eagerly close checked out connections when client is closed (#4499)
1 parent f57c51b commit 64fdb3e

File tree

15 files changed

+268
-66
lines changed

15 files changed

+268
-66
lines changed

src/client-side-encryption/auto_encrypter.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,8 +375,8 @@ export class AutoEncrypter {
375375
/**
376376
* Cleans up the `_mongocryptdClient`, if present.
377377
*/
378-
async teardown(force: boolean): Promise<void> {
379-
await this._mongocryptdClient?.close(force);
378+
async close(): Promise<void> {
379+
await this._mongocryptdClient?.close();
380380
}
381381

382382
/**

src/cmap/connection_pool.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
} from '../constants';
1818
import {
1919
type AnyError,
20+
MongoClientClosedError,
2021
type MongoError,
2122
MongoInvalidArgumentError,
2223
MongoMissingCredentialsError,
@@ -484,11 +485,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
484485
for (const connection of this.checkedOut) {
485486
if (connection.generation <= minGeneration) {
486487
connection.onError(new PoolClearedOnNetworkError(this));
487-
this.checkIn(connection);
488488
}
489489
}
490490
}
491491

492+
/** For MongoClient.close() procedures */
493+
public closeCheckedOutConnections() {
494+
for (const conn of this.checkedOut) {
495+
conn.onError(new MongoClientClosedError());
496+
}
497+
}
498+
492499
/** Close the pool */
493500
close(): void {
494501
if (this.closed) {

src/encrypter.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
import { callbackify } from 'util';
2-
31
import { AutoEncrypter, type AutoEncryptionOptions } from './client-side-encryption/auto_encrypter';
42
import { MONGO_CLIENT_EVENTS } from './constants';
53
import { getMongoDBClientEncryption } from './deps';
64
import { MongoInvalidArgumentError, MongoMissingDependencyError } from './error';
75
import { MongoClient, type MongoClientOptions } from './mongo_client';
8-
import { type Callback } from './utils';
96

107
/** @internal */
118
export interface EncrypterOptions {
@@ -98,20 +95,16 @@ export class Encrypter {
9895
}
9996
}
10097

101-
closeCallback(client: MongoClient, force: boolean, callback: Callback<void>) {
102-
callbackify(this.close.bind(this))(client, force, callback);
103-
}
104-
105-
async close(client: MongoClient, force: boolean): Promise<void> {
98+
async close(client: MongoClient): Promise<void> {
10699
let error;
107100
try {
108-
await this.autoEncrypter.teardown(force);
101+
await this.autoEncrypter.close();
109102
} catch (autoEncrypterError) {
110103
error = autoEncrypterError;
111104
}
112105
const internalClient = this.internalClient;
113106
if (internalClient != null && client !== internalClient) {
114-
return await internalClient.close(force);
107+
return await internalClient.close();
115108
}
116109
if (error != null) {
117110
throw error;

src/error.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,6 +1018,34 @@ export class MongoTopologyClosedError extends MongoAPIError {
10181018
}
10191019
}
10201020

1021+
/**
1022+
* An error generated when the MongoClient is closed and async
1023+
* operations are interrupted.
1024+
*
1025+
* @public
1026+
* @category Error
1027+
*/
1028+
export class MongoClientClosedError extends MongoAPIError {
1029+
/**
1030+
* **Do not use this constructor!**
1031+
*
1032+
* Meant for internal use only.
1033+
*
1034+
* @remarks
1035+
* This class is only meant to be constructed within the driver. This constructor is
1036+
* not subject to semantic versioning compatibility guarantees and may change at any time.
1037+
*
1038+
* @public
1039+
**/
1040+
constructor() {
1041+
super('Operation interrupted because client was closed');
1042+
}
1043+
1044+
override get name(): string {
1045+
return 'MongoClientClosedError';
1046+
}
1047+
}
1048+
10211049
/** @public */
10221050
export interface MongoNetworkErrorOptions {
10231051
/** Indicates the timeout happened before a connection handshake completed */

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export {
5353
MongoClientBulkWriteCursorError,
5454
MongoClientBulkWriteError,
5555
MongoClientBulkWriteExecutionError,
56+
MongoClientClosedError,
5657
MongoCompatibilityError,
5758
MongoCursorExhaustedError,
5859
MongoCursorInUseError,

src/mongo_client.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -641,25 +641,27 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
641641
}
642642

643643
/**
644-
* Cleans up client-side resources used by the MongoCLient and . This includes:
644+
* Cleans up client-side resources used by the MongoClient.
645645
*
646-
* - Closes all open, unused connections (see note).
646+
* This includes:
647+
*
648+
* - Closes in-use connections.
649+
* - Closes all active cursors.
647650
* - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}.
651+
* - aborts in progress transactions if is one related to the session.
648652
* - Ends all unused sessions server-side.
653+
* - Closes all remaining idle connections.
649654
* - Cleans up any resources being used for auto encryption if auto encryption is enabled.
650655
*
651-
* @remarks Any in-progress operations are not killed and any connections used by in progress operations
652-
* will be cleaned up lazily as operations finish.
653-
*
654-
* @param force - Force close, emitting no events
656+
* @param _force - currently an unused flag that has no effect. Defaults to `false`.
655657
*/
656-
async close(force = false): Promise<void> {
658+
async close(_force = false): Promise<void> {
657659
if (this.closeLock) {
658660
return await this.closeLock;
659661
}
660662

661663
try {
662-
this.closeLock = this._close(force);
664+
this.closeLock = this._close();
663665
await this.closeLock;
664666
} finally {
665667
// release
@@ -668,7 +670,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
668670
}
669671

670672
/* @internal */
671-
private async _close(force = false): Promise<void> {
673+
private async _close(): Promise<void> {
672674
// There's no way to set hasBeenClosed back to false
673675
Object.defineProperty(this.s, 'hasBeenClosed', {
674676
value: true,
@@ -677,6 +679,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
677679
writable: false
678680
});
679681

682+
this.topology?.closeCheckedOutConnections();
683+
680684
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
681685
this.s.activeCursors.clear();
682686

@@ -722,7 +726,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
722726

723727
const { encrypter } = this.options;
724728
if (encrypter) {
725-
await encrypter.close(this, force);
729+
await encrypter.close(this);
726730
}
727731
}
728732

src/sdam/server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
246246
}
247247
}
248248

249+
closeCheckedOutConnections() {
250+
return this.pool.closeCheckedOutConnections();
251+
}
252+
249253
/** Destroy the server connection */
250-
destroy(): void {
254+
close(): void {
251255
if (this.s.state === STATE_CLOSED) {
252256
return;
253257
}

src/sdam/topology.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,14 +490,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
490490
}
491491
}
492492

493+
closeCheckedOutConnections() {
494+
for (const server of this.s.servers.values()) {
495+
return server.closeCheckedOutConnections();
496+
}
497+
}
498+
493499
/** Close this topology */
494500
close(): void {
495501
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
496502
return;
497503
}
498504

499505
for (const server of this.s.servers.values()) {
500-
destroyServer(server, this);
506+
closeServer(server, this);
501507
}
502508

503509
this.s.servers.clear();
@@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
791797
}
792798

793799
/** Destroys a server, and removes all event listeners from the instance */
794-
function destroyServer(server: Server, topology: Topology) {
800+
function closeServer(server: Server, topology: Topology) {
795801
for (const event of LOCAL_SERVER_EVENTS) {
796802
server.removeAllListeners(event);
797803
}
798804

799-
server.destroy();
805+
server.close();
800806
topology.emitAndLog(
801807
Topology.SERVER_CLOSED,
802808
new ServerClosedEvent(topology.s.id, server.description.address)
@@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
903909

904910
// prepare server for garbage collection
905911
if (server) {
906-
destroyServer(server, topology);
912+
closeServer(server, topology);
907913
}
908914
}
909915
}

test/integration/change-streams/change_stream.test.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ describe('Change Streams', function () {
6363
await csDb.createCollection('test').catch(() => null);
6464
collection = csDb.collection('test');
6565
changeStream = collection.watch();
66+
changeStream.on('error', () => null);
6667
});
6768

6869
afterEach(async () => {
@@ -702,15 +703,19 @@ describe('Change Streams', function () {
702703

703704
const outStream = new PassThrough({ objectMode: true });
704705

705-
// @ts-expect-error: transform requires a Document return type
706-
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
706+
const transform = doc => ({ doc: JSON.stringify(doc) });
707+
changeStream
708+
.stream({ transform })
709+
.on('error', () => null)
710+
.pipe(outStream)
711+
.on('error', () => null);
707712

708713
const willBeData = once(outStream, 'data');
709714

710715
await collection.insertMany([{ a: 1 }]);
711716

712717
const [data] = await willBeData;
713-
const parsedEvent = JSON.parse(data);
718+
const parsedEvent = JSON.parse(data.doc);
714719
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);
715720

716721
outStream.destroy();

test/integration/connection-monitoring-and-pooling/connection_pool.test.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { once } from 'node:events';
33
import { expect } from 'chai';
44

55
import { type ConnectionPoolCreatedEvent, type Db, type MongoClient } from '../../mongodb';
6+
import { clearFailPoint, configureFailPoint, sleep } from '../../tools/utils';
67

78
describe('Connection Pool', function () {
89
let client: MongoClient;
@@ -64,5 +65,89 @@ describe('Connection Pool', function () {
6465
});
6566
});
6667
});
68+
69+
const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } };
70+
71+
describe('ConnectionCheckedInEvent', metadata, function () {
72+
let client: MongoClient;
73+
74+
beforeEach(async function () {
75+
if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) {
76+
return;
77+
}
78+
if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) {
79+
return;
80+
}
81+
82+
await configureFailPoint(this.configuration, {
83+
configureFailPoint: 'failCommand',
84+
mode: 'alwaysOn',
85+
data: {
86+
failCommands: ['insert'],
87+
blockConnection: true,
88+
blockTimeMS: 500
89+
}
90+
});
91+
92+
client = this.configuration.newClient();
93+
await client.connect();
94+
await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 })));
95+
});
96+
97+
afterEach(async function () {
98+
if (this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) {
99+
await clearFailPoint(this.configuration);
100+
}
101+
await client.close();
102+
});
103+
104+
describe('when a MongoClient is closed', function () {
105+
it(
106+
'a connection pool emits checked in events for closed connections',
107+
metadata,
108+
async () => {
109+
const allClientEvents = [];
110+
const pushToClientEvents = e => allClientEvents.push(e);
111+
112+
client
113+
.on('connectionCheckedOut', pushToClientEvents)
114+
.on('connectionCheckedIn', pushToClientEvents)
115+
.on('connectionClosed', pushToClientEvents);
116+
117+
const inserts = Promise.allSettled([
118+
client.db('test').collection('test').insertOne({ a: 1 }),
119+
client.db('test').collection('test').insertOne({ a: 1 }),
120+
client.db('test').collection('test').insertOne({ a: 1 })
121+
]);
122+
123+
// wait until all pings are pending on the server
124+
while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) {
125+
await sleep(1);
126+
}
127+
128+
const insertConnectionIds = allClientEvents
129+
.filter(e => e.name === 'connectionCheckedOut')
130+
.map(({ address, connectionId }) => `${address} + ${connectionId}`);
131+
132+
await client.close();
133+
134+
const insertCheckInAndCloses = allClientEvents
135+
.filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed')
136+
.filter(({ address, connectionId }) =>
137+
insertConnectionIds.includes(`${address} + ${connectionId}`)
138+
);
139+
140+
expect(insertCheckInAndCloses).to.have.lengthOf(6);
141+
142+
// check that each check-in is followed by a close (not proceeded by one)
143+
expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal(
144+
Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1)
145+
);
146+
147+
await inserts;
148+
}
149+
);
150+
});
151+
});
67152
});
68153
});

0 commit comments

Comments
 (0)