Skip to content

Commit e092c95

Browse files
feat: kill in-flight operations when monitor fails
1 parent 90181b1 commit e092c95

File tree

7 files changed

+107
-20
lines changed

7 files changed

+107
-20
lines changed

src/cmap/connection_pool.ts

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ import {
4141
ConnectionPoolReadyEvent,
4242
ConnectionReadyEvent
4343
} from './connection_pool_events';
44-
import { PoolClearedError, PoolClosedError, WaitQueueTimeoutError } from './errors';
44+
import {
45+
PoolClearedError,
46+
PoolClearedOnNetworkError,
47+
PoolClosedError,
48+
WaitQueueTimeoutError
49+
} from './errors';
4550
import { ConnectionPoolMetrics } from './metrics';
4651

4752
/** @internal */
@@ -391,10 +396,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
391396
this[kConnections].unshift(connection);
392397
}
393398

394-
this[kCheckedOut].delete(connection);
399+
const wasConnectionDeleted = this[kCheckedOut].delete(connection);
395400
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));
396401

397-
if (willDestroy) {
402+
if (wasConnectionDeleted && willDestroy) {
398403
const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
399404
this.destroyConnection(connection, reason);
400405
}
@@ -408,8 +413,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
408413
* Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
409414
* previous generation will eventually be pruned during subsequent checkouts.
410415
*/
411-
clear(options: { serviceId?: ObjectId } = {}): void {
416+
clear(options: { serviceId?: ObjectId; interruptInUseConnections?: boolean } = {}): void {
412417
const { serviceId } = options;
418+
const interruptInUseConnections = options.interruptInUseConnections ?? false;
413419
if (this.closed) {
414420
return;
415421
}
@@ -433,18 +439,72 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
433439
return;
434440
}
435441

442+
const oldGeneration = this[kGeneration];
443+
436444
// handle non load-balanced case
437445
this[kGeneration] += 1;
438446
const alreadyPaused = this[kPoolState] === PoolState.paused;
439447
this[kPoolState] = PoolState.paused;
440448

441449
this.clearMinPoolSizeTimer();
442450
if (!alreadyPaused) {
443-
this.emit(ConnectionPool.CONNECTION_POOL_CLEARED, new ConnectionPoolClearedEvent(this));
451+
this.emit(
452+
ConnectionPool.CONNECTION_POOL_CLEARED,
453+
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
454+
);
444455
}
456+
457+
process.nextTick(() =>
458+
this.pruneConnections({ minGeneration: oldGeneration, interruptInUseConnections })
459+
);
460+
445461
this.processWaitQueue();
446462
}
447463

464+
/**
465+
* Closes all checked in perished connections in the pool with a resumable PoolClearedOnNetworkError.
466+
*
467+
* If interruptInUseConnections is `true`, this method attempts to kill checked out connections as well.
468+
* Only connections where `connection.generation <= minGeneration` are killed. Connections are closed with a
469+
* resumable PoolClearedOnNetworkTimeoutError.
470+
*/
471+
private pruneConnections({
472+
interruptInUseConnections,
473+
minGeneration
474+
}: {
475+
interruptInUseConnections: boolean;
476+
minGeneration: number;
477+
}) {
478+
this[kConnections].prune(connection => {
479+
if (connection.generation <= minGeneration) {
480+
connection.onError(new PoolClearedOnNetworkError(this));
481+
this.emit(
482+
ConnectionPool.CONNECTION_CLOSED,
483+
new ConnectionClosedEvent(this, connection, 'stale')
484+
);
485+
486+
return true;
487+
}
488+
return false;
489+
});
490+
491+
if (interruptInUseConnections) {
492+
for (const connection of this[kCheckedOut]) {
493+
if (connection.generation <= minGeneration) {
494+
this[kCheckedOut].delete(connection);
495+
connection.onError(new PoolClearedOnNetworkError(this));
496+
this.emit(
497+
ConnectionPool.CONNECTION_CLOSED,
498+
new ConnectionClosedEvent(this, connection, 'stale')
499+
);
500+
}
501+
}
502+
503+
// TODO(NODE-xxxx): track pending connections and cancel
504+
// this[kCancellationToken].emit('cancel');
505+
}
506+
}
507+
448508
/** Close the pool */
449509
close(callback: Callback<void>): void;
450510
close(options: CloseOptions, callback: Callback<void>): void;
@@ -573,7 +633,12 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
573633
return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
574634
}
575635

576-
private connectionIsPerished(connection: Connection) {
636+
/**
637+
* Destroys a connection if the connection is perished.
638+
*
639+
* @returns `true` if the connection was destroyed, `false` otherwise.
640+
*/
641+
private destroyConnectionIfPerished(connection: Connection) {
577642
const isStale = this.connectionIsStale(connection);
578643
const isIdle = this.connectionIsIdle(connection);
579644
if (!isStale && !isIdle && !connection.closed) {
@@ -659,7 +724,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
659724
return;
660725
}
661726

662-
this[kConnections].prune(connection => this.connectionIsPerished(connection));
727+
this[kConnections].prune(connection => this.destroyConnectionIfPerished(connection));
663728

664729
if (
665730
this.totalConnectionCount < minPoolSize &&
@@ -735,7 +800,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
735800
break;
736801
}
737802

738-
if (!this.connectionIsPerished(connection)) {
803+
if (!this.destroyConnectionIfPerished(connection)) {
739804
this[kCheckedOut].add(connection);
740805
this.emit(
741806
ConnectionPool.CONNECTION_CHECKED_OUT,

src/cmap/errors.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { MongoDriverError, MongoNetworkError } from '../error';
1+
import { MongoDriverError, MongoErrorLabel, MongoNetworkError } from '../error';
22
import type { ConnectionPool } from './connection_pool';
33

44
/**
@@ -49,6 +49,8 @@ export class PoolClearedOnNetworkError extends MongoNetworkError {
4949
constructor(pool: ConnectionPool) {
5050
super(`Connection to ${pool.address} interrupted due to server monitor timeout`);
5151
this.address = pool.address;
52+
53+
this.addErrorLabel(MongoErrorLabel.RetryableWriteError);
5254
}
5355

5456
override get name(): string {

src/error.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export const MongoErrorLabel = Object.freeze({
9191
ResumableChangeStreamError: 'ResumableChangeStreamError',
9292
HandshakeError: 'HandshakeError',
9393
ResetPool: 'ResetPool',
94+
InterruptInUseConnections: 'InterruptInUseConnections',
9495
NoWritesPerformed: 'NoWritesPerformed'
9596
} as const);
9697

src/sdam/monitor.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Document, Long } from '../bson';
44
import { connect } from '../cmap/connect';
55
import { Connection, ConnectionOptions } from '../cmap/connection';
66
import { LEGACY_HELLO_COMMAND } from '../constants';
7-
import { MongoError, MongoErrorLabel } from '../error';
7+
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
88
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
99
import type { Callback } from '../utils';
1010
import { calculateDurationInMs, EventEmitterWithState, makeStateMachine, now, ns } from '../utils';
@@ -221,6 +221,9 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
221221

222222
const error = !(err instanceof MongoError) ? new MongoError(err) : err;
223223
error.addErrorLabel(MongoErrorLabel.ResetPool);
224+
if (error instanceof MongoNetworkTimeoutError) {
225+
error.addErrorLabel(MongoErrorLabel.InterruptInUseConnections);
226+
}
224227

225228
monitor.emit('resetServer', error);
226229
callback(err);

src/sdam/topology.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { deserialize, serialize } from '../bson';
66
import type { MongoCredentials } from '../cmap/auth/mongo_credentials';
77
import type { ConnectionEvents, DestroyOptions } from '../cmap/connection';
88
import type { CloseOptions, ConnectionPoolEvents } from '../cmap/connection_pool';
9+
import { PoolClearedOnNetworkError } from '../cmap/errors';
910
import { DEFAULT_OPTIONS, FEATURE_FLAGS } from '../connection_string';
1011
import {
1112
CLOSE,
@@ -839,7 +840,11 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
839840
incomingServerDescription.error instanceof MongoError &&
840841
incomingServerDescription.error.hasErrorLabel(MongoErrorLabel.ResetPool)
841842
) {
842-
server.s.pool.clear();
843+
const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(
844+
MongoErrorLabel.InterruptInUseConnections
845+
);
846+
847+
server.s.pool.clear({ interruptInUseConnections });
843848
} else if (incomingServerDescription.error == null) {
844849
const newTopologyType = topology.s.description.type;
845850
const shouldMarkPoolReady =

test/integration/connection-monitoring-and-pooling/connection_monitoring_and_pooling.spec.test.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,28 @@ const LB_SKIP_TESTS: SkipDescription[] = [
1919
skipReason: 'cannot run against a load balanced environment'
2020
}));
2121

22+
const INTERRUPT_IN_USE_SKIPPED_TESTS: SkipDescription[] = [
23+
{
24+
description: 'clear with interruptInUseConnections = true closes pending connections',
25+
skipIfCondition: 'always',
26+
skipReason: 'TODO(NODE-xxxx): track and kill pending connections'
27+
}
28+
];
29+
2230
describe('Connection Monitoring and Pooling Spec Tests (Integration)', function () {
2331
const tests: CmapTest[] = loadSpecTests('connection-monitoring-and-pooling');
2432

2533
runCmapTestSuite(tests, {
26-
testsToSkip: LB_SKIP_TESTS.concat([
27-
{
28-
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
29-
skipIfCondition: 'always',
30-
skipReason:
31-
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
32-
}
33-
])
34+
testsToSkip: LB_SKIP_TESTS.concat(
35+
[
36+
{
37+
description: 'waiting on maxConnecting is limited by WaitQueueTimeoutMS',
38+
skipIfCondition: 'always',
39+
skipReason:
40+
'not applicable: waitQueueTimeoutMS limits connection establishment time in our driver'
41+
}
42+
],
43+
INTERRUPT_IN_USE_SKIPPED_TESTS
44+
)
3445
});
3546
});

test/tools/cmap_spec_runner.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ const getTestOpDefinitions = (threadContext: ThreadContext) => ({
197197

198198
return threadContext.pool.checkIn(connection);
199199
},
200-
clear: function (interruptInUseConnections: boolean) {
200+
clear: function ({ interruptInUseConnections }: { interruptInUseConnections: boolean }) {
201201
return threadContext.pool.clear({ interruptInUseConnections });
202202
},
203203
close: async function () {

0 commit comments

Comments
 (0)