Skip to content

feat(NODE-6882): close outstanding connections #4499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
May 2, 2025
4 changes: 2 additions & 2 deletions src/client-side-encryption/auto_encrypter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ export class AutoEncrypter {
/**
* Cleans up the `_mongocryptdClient`, if present.
*/
async teardown(force: boolean): Promise<void> {
await this._mongocryptdClient?.close(force);
async close(): Promise<void> {
await this._mongocryptdClient?.close();
}

/**
Expand Down
9 changes: 8 additions & 1 deletion src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from '../constants';
import {
type AnyError,
MongoClientClosedError,
type MongoError,
MongoInvalidArgumentError,
MongoMissingCredentialsError,
Expand Down Expand Up @@ -484,11 +485,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
for (const connection of this.checkedOut) {
if (connection.generation <= minGeneration) {
connection.onError(new PoolClearedOnNetworkError(this));
this.checkIn(connection);
}
}
}

/** For MongoClient.close() procedures */
public closeCheckedOutConnections() {
for (const conn of this.checkedOut) {
conn.onError(new MongoClientClosedError());
}
}

/** Close the pool */
close(): void {
if (this.closed) {
Expand Down
13 changes: 3 additions & 10 deletions src/encrypter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import { callbackify } from 'util';

import { AutoEncrypter, type AutoEncryptionOptions } from './client-side-encryption/auto_encrypter';
import { MONGO_CLIENT_EVENTS } from './constants';
import { getMongoDBClientEncryption } from './deps';
import { MongoInvalidArgumentError, MongoMissingDependencyError } from './error';
import { MongoClient, type MongoClientOptions } from './mongo_client';
import { type Callback } from './utils';

/** @internal */
export interface EncrypterOptions {
Expand Down Expand Up @@ -98,20 +95,16 @@ export class Encrypter {
}
}

closeCallback(client: MongoClient, force: boolean, callback: Callback<void>) {
callbackify(this.close.bind(this))(client, force, callback);
}

async close(client: MongoClient, force: boolean): Promise<void> {
async close(client: MongoClient): Promise<void> {
let error;
try {
await this.autoEncrypter.teardown(force);
await this.autoEncrypter.close();
} catch (autoEncrypterError) {
error = autoEncrypterError;
}
const internalClient = this.internalClient;
if (internalClient != null && client !== internalClient) {
return await internalClient.close(force);
return await internalClient.close();
}
if (error != null) {
throw error;
Expand Down
28 changes: 28 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,34 @@ export class MongoTopologyClosedError extends MongoAPIError {
}
}

/**
* An error generated when the MongoClient is closed and async
* operations are interrupted.
*
* @public
* @category Error
*/
export class MongoClientClosedError extends MongoAPIError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor() {
super('Operation interrupted because client was closed');
}

override get name(): string {
return 'MongoClientClosedError';
}
}

/** @public */
export interface MongoNetworkErrorOptions {
/** Indicates the timeout happened before a connection handshake completed */
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export {
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoClientClosedError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
24 changes: 14 additions & 10 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,25 +641,27 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
}

/**
* Cleans up client-side resources used by the MongoCLient and . This includes:
* Cleans up client-side resources used by the MongoClient.
*
* - Closes all open, unused connections (see note).
* This includes:
*
* - Closes in-use connections.
* - Closes all active cursors.
* - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}.
* - aborts in progress transactions if is one related to the session.
* - Ends all unused sessions server-side.
* - Closes all remaining idle connections.
* - Cleans up any resources being used for auto encryption if auto encryption is enabled.
*
* @remarks Any in-progress operations are not killed and any connections used by in progress operations
* will be cleaned up lazily as operations finish.
*
* @param force - Force close, emitting no events
* @param _force - currently an unused flag that has no effect. Defaults to `false`.
*/
async close(force = false): Promise<void> {
async close(_force = false): Promise<void> {
if (this.closeLock) {
return await this.closeLock;
}

try {
this.closeLock = this._close(force);
this.closeLock = this._close();
await this.closeLock;
} finally {
// release
Expand All @@ -668,7 +670,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
}

/* @internal */
private async _close(force = false): Promise<void> {
private async _close(): Promise<void> {
// There's no way to set hasBeenClosed back to false
Object.defineProperty(this.s, 'hasBeenClosed', {
value: true,
Expand All @@ -677,6 +679,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

this.topology?.closeCheckedOutConnections();

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

Expand Down Expand Up @@ -722,7 +726,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

const { encrypter } = this.options;
if (encrypter) {
await encrypter.close(this, force);
await encrypter.close(this);
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
}

closeCheckedOutConnections() {
return this.pool.closeCheckedOutConnections();
}

/** Destroy the server connection */
destroy(): void {
close(): void {
if (this.s.state === STATE_CLOSED) {
return;
}
Expand Down
14 changes: 10 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

closeCheckedOutConnections() {
for (const server of this.s.servers.values()) {
return server.closeCheckedOutConnections();
}
}

/** Close this topology */
close(): void {
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return;
}

for (const server of this.s.servers.values()) {
destroyServer(server, this);
closeServer(server, this);
}

this.s.servers.clear();
Expand Down Expand Up @@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Destroys a server, and removes all event listeners from the instance */
function destroyServer(server: Server, topology: Topology) {
function closeServer(server: Server, topology: Topology) {
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}

server.destroy();
server.close();
topology.emitAndLog(
Topology.SERVER_CLOSED,
new ServerClosedEvent(topology.s.id, server.description.address)
Expand Down Expand Up @@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes

// prepare server for garbage collection
if (server) {
destroyServer(server, topology);
closeServer(server, topology);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe('Change Streams', function () {
await csDb.createCollection('test').catch(() => null);
collection = csDb.collection('test');
changeStream = collection.watch();
changeStream.on('error', () => null);
});

afterEach(async () => {
Expand Down Expand Up @@ -702,15 +703,19 @@ describe('Change Streams', function () {

const outStream = new PassThrough({ objectMode: true });

// @ts-expect-error: transform requires a Document return type
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
const transform = doc => ({ doc: JSON.stringify(doc) });
changeStream
.stream({ transform })
.on('error', () => null)
.pipe(outStream)
.on('error', () => null);

const willBeData = once(outStream, 'data');

await collection.insertMany([{ a: 1 }]);

const [data] = await willBeData;
const parsedEvent = JSON.parse(data);
const parsedEvent = JSON.parse(data.doc);
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);

outStream.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { once } from 'node:events';
import { expect } from 'chai';

import { type ConnectionPoolCreatedEvent, type Db, type MongoClient } from '../../mongodb';
import { clearFailPoint, configureFailPoint, sleep } from '../../tools/utils';

describe('Connection Pool', function () {
let client: MongoClient;
Expand Down Expand Up @@ -64,5 +65,89 @@ describe('Connection Pool', function () {
});
});
});

const metadata: MongoDBMetadataUI = { requires: { mongodb: '>=4.4', topology: 'single' } };

describe('ConnectionCheckedInEvent', metadata, function () {
let client: MongoClient;

beforeEach(async function () {
if (!this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) {
return;
}
if (!this.configuration.filters.MongoDBTopologyFilter.filter({ metadata })) {
return;
}

await configureFailPoint(this.configuration, {
configureFailPoint: 'failCommand',
mode: 'alwaysOn',
data: {
failCommands: ['insert'],
blockConnection: true,
blockTimeMS: 500
}
});

client = this.configuration.newClient();
await client.connect();
await Promise.all(Array.from({ length: 100 }, () => client.db().command({ ping: 1 })));
});

afterEach(async function () {
if (this.configuration.filters.MongoDBVersionFilter.filter({ metadata })) {
await clearFailPoint(this.configuration);
}
await client.close();
});

describe('when a MongoClient is closed', function () {
it(
'a connection pool emits checked in events for closed connections',
metadata,
async () => {
const allClientEvents = [];
const pushToClientEvents = e => allClientEvents.push(e);

client
.on('connectionCheckedOut', pushToClientEvents)
.on('connectionCheckedIn', pushToClientEvents)
.on('connectionClosed', pushToClientEvents);

const inserts = Promise.allSettled([
client.db('test').collection('test').insertOne({ a: 1 }),
client.db('test').collection('test').insertOne({ a: 1 }),
client.db('test').collection('test').insertOne({ a: 1 })
]);

// wait until all pings are pending on the server
while (allClientEvents.filter(e => e.name === 'connectionCheckedOut').length < 3) {
await sleep(1);
}

const insertConnectionIds = allClientEvents
.filter(e => e.name === 'connectionCheckedOut')
.map(({ address, connectionId }) => `${address} + ${connectionId}`);

await client.close();

const insertCheckInAndCloses = allClientEvents
.filter(e => e.name === 'connectionCheckedIn' || e.name === 'connectionClosed')
.filter(({ address, connectionId }) =>
insertConnectionIds.includes(`${address} + ${connectionId}`)
);

expect(insertCheckInAndCloses).to.have.lengthOf(6);

// check that each check-in is followed by a close (not proceeded by one)
expect(insertCheckInAndCloses.map(e => e.name)).to.deep.equal(
Array.from({ length: 3 }, () => ['connectionCheckedIn', 'connectionClosed']).flat(1)
);

await inserts;
}
);
});
});
});
});
Loading