Skip to content

Commit 6a49a99

Browse files
committed
Improve failure handling for RESET and ACK_FAILURE
Both RESET and ACK_FAILURE messages will now be immediately flushed to the network to make sure connections are fully cleaned up before being returned to the pool. Outbound messages will only be sent if the connection is not broken. Failure to RESET or ACK_FAILURE will be considered a fatal error, unless ACK_FAILURE was ignored because of a subsequent RESET.
1 parent 1fe5df0 commit 6a49a99

File tree

6 files changed

+271
-68
lines changed

6 files changed

+271
-68
lines changed

src/v1/internal/connection-holder.js

+9-5
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,13 @@ export default class ConnectionHolder {
9797
_releaseConnection() {
9898
this._connectionPromise = this._connectionPromise.then(connection => {
9999
if (connection) {
100-
connection.reset();
101-
connection.sync();
102-
connection._release();
100+
return connection.resetAndFlush()
101+
.catch(ignoreError)
102+
.then(() => connection._release());
103+
} else {
104+
return Promise.resolve();
103105
}
104-
}).catch(ignoredError => {
105-
});
106+
}).catch(ignoreError);
106107

107108
return this._connectionPromise;
108109
}
@@ -127,6 +128,9 @@ class EmptyConnectionHolder extends ConnectionHolder {
127128
}
128129
}
129130

131+
function ignoreError(error) {
132+
}
133+
130134
/**
131135
* Connection holder that does not manage any connections.
132136
* @type {ConnectionHolder}

src/v1/internal/connector.js

+102-59
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
2323
import packStreamUtil from './packstream-util';
2424
import {alloc} from './buf';
25-
import {newError} from './../error';
25+
import {newError, PROTOCOL_ERROR} from './../error';
2626
import ChannelConfig from './ch-config';
2727
import urlUtil from './url-util';
2828
import StreamObserver from './stream-observer';
@@ -120,7 +120,7 @@ class Connection {
120120
this._packer = packStreamUtil.createLatestPacker(this._chunker);
121121
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
122122

123-
this._isHandlingFailure = false;
123+
this._ackFailureMuted = false;
124124
this._currentFailure = null;
125125

126126
this._state = new ConnectionState(this);
@@ -241,25 +241,8 @@ class Connection {
241241
this._currentObserver.onError( this._currentFailure );
242242
} finally {
243243
this._updateCurrentObserver();
244-
// Things are now broken. Pending observers will get FAILURE messages routed until
245-
// We are done handling this failure.
246-
if( !this._isHandlingFailure ) {
247-
this._isHandlingFailure = true;
248-
249-
// isHandlingFailure was false, meaning this is the first failure message
250-
// we see from this failure. We may see several others, one for each message
251-
// we had "optimistically" already sent after whatever it was that failed.
252-
// We only want to and need to ACK the first one, which is why we are tracking
253-
// this _isHandlingFailure thing.
254-
this._ackFailure({
255-
onNext: NO_OP,
256-
onError: NO_OP,
257-
onCompleted: () => {
258-
this._isHandlingFailure = false;
259-
this._currentFailure = null;
260-
}
261-
});
262-
}
244+
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
245+
this._ackFailureIfNeeded();
263246
}
264247
break;
265248
case IGNORED:
@@ -268,7 +251,7 @@ class Connection {
268251
if (this._currentFailure && this._currentObserver.onError)
269252
this._currentObserver.onError(this._currentFailure);
270253
else if(this._currentObserver.onError)
271-
this._currentObserver.onError(payload);
254+
this._currentObserver.onError(newError('Ignored either because of an error or RESET'));
272255
} finally {
273256
this._updateCurrentObserver();
274257
}
@@ -282,72 +265,122 @@ class Connection {
282265
initialize( clientName, token, observer ) {
283266
log("C", "INIT", clientName, token);
284267
const initObserver = this._state.wrap(observer);
285-
this._queueObserver(initObserver);
286-
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
287-
(err) => this._handleFatalError(err) );
288-
this._chunker.messageBoundary();
289-
this.sync();
268+
const queued = this._queueObserver(initObserver);
269+
if (queued) {
270+
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
271+
(err) => this._handleFatalError(err));
272+
this._chunker.messageBoundary();
273+
this.sync();
274+
}
290275
}
291276

292277
/** Queue a RUN-message to be sent to the database */
293278
run( statement, params, observer ) {
294279
log("C", "RUN", statement, params);
295-
this._queueObserver(observer);
296-
this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)],
297-
(err) => this._handleFatalError(err) );
298-
this._chunker.messageBoundary();
280+
const queued = this._queueObserver(observer);
281+
if (queued) {
282+
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)],
283+
(err) => this._handleFatalError(err));
284+
this._chunker.messageBoundary();
285+
}
299286
}
300287

301288
/** Queue a PULL_ALL-message to be sent to the database */
302289
pullAll( observer ) {
303290
log("C", "PULL_ALL");
304-
this._queueObserver(observer);
305-
this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) );
306-
this._chunker.messageBoundary();
291+
const queued = this._queueObserver(observer);
292+
if (queued) {
293+
this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err));
294+
this._chunker.messageBoundary();
295+
}
307296
}
308297

309298
/** Queue a DISCARD_ALL-message to be sent to the database */
310299
discardAll( observer ) {
311300
log("C", "DISCARD_ALL");
312-
this._queueObserver(observer);
313-
this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) );
314-
this._chunker.messageBoundary();
301+
const queued = this._queueObserver(observer);
302+
if (queued) {
303+
this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err));
304+
this._chunker.messageBoundary();
305+
}
315306
}
316307

317-
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
318-
reset(observer) {
308+
/**
309+
* Send a RESET-message to the database. Mutes failure handling.
310+
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
311+
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
312+
*/
313+
resetAndFlush() {
319314
log('C', 'RESET');
320-
this._isHandlingFailure = true;
321-
let self = this;
322-
let wrappedObs = {
323-
onNext: observer ? observer.onNext : NO_OP,
324-
onError: observer ? observer.onError : NO_OP,
325-
onCompleted: () => {
326-
self._isHandlingFailure = false;
327-
if (observer) {
328-
observer.onCompleted();
315+
this._ackFailureMuted = true;
316+
317+
return new Promise((resolve, reject) => {
318+
const observer = {
319+
onNext: record => {
320+
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
321+
reject(neo4jError);
322+
},
323+
onError: error => {
324+
if (this._isBroken) {
325+
// handling a fatal error, no need to raise a protocol violation
326+
reject(error);
327+
} else {
328+
const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error);
329+
reject(neo4jError);
330+
}
331+
},
332+
onCompleted: () => {
333+
this._ackFailureMuted = false;
334+
resolve();
329335
}
336+
};
337+
const queued = this._queueObserver(observer);
338+
if (queued) {
339+
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
340+
this._chunker.messageBoundary();
341+
this.sync();
330342
}
331-
};
332-
this._queueObserver(wrappedObs);
333-
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
334-
this._chunker.messageBoundary();
343+
});
335344
}
336345

337-
/** Queue a ACK_FAILURE-message to be sent to the database */
338-
_ackFailure( observer ) {
339-
log("C", "ACK_FAILURE");
340-
this._queueObserver(observer);
341-
this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) );
342-
this._chunker.messageBoundary();
346+
_ackFailureIfNeeded() {
347+
if (this._ackFailureMuted) {
348+
return;
349+
}
350+
351+
log('C', 'ACK_FAILURE');
352+
353+
const observer = {
354+
onNext: record => {
355+
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
356+
},
357+
onError: error => {
358+
if (!this._isBroken && !this._ackFailureMuted) {
359+
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
360+
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
361+
} else {
362+
this._currentFailure = null;
363+
}
364+
},
365+
onCompleted: () => {
366+
this._currentFailure = null;
367+
}
368+
};
369+
370+
const queued = this._queueObserver(observer);
371+
if (queued) {
372+
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
373+
this._chunker.messageBoundary();
374+
this.sync();
375+
}
343376
}
344377

345378
_queueObserver(observer) {
346379
if( this._isBroken ) {
347380
if( observer && observer.onError ) {
348381
observer.onError(this._error);
349382
}
350-
return;
383+
return false;
351384
}
352385
observer = observer || NO_OP_OBSERVER;
353386
observer.onCompleted = observer.onCompleted || NO_OP;
@@ -358,6 +391,7 @@ class Connection {
358391
} else {
359392
this._pendingObservers.push( observer );
360393
}
394+
return true;
361395
}
362396

363397
/**
@@ -419,6 +453,15 @@ class Connection {
419453
}
420454
}
421455
}
456+
457+
_handleProtocolError(message) {
458+
this._ackFailureMuted = false;
459+
this._currentFailure = null;
460+
this._updateCurrentObserver();
461+
const error = newError(message, PROTOCOL_ERROR);
462+
this._handleFatalError(error);
463+
return error;
464+
}
422465
}
423466

424467
class ConnectionState {

0 commit comments

Comments
 (0)