Skip to content

Commit 2c2ae55

Browse files
authored
Merge pull request #376 from lutovich/1.7-ack_failure-and-reset-failures
Improve failure handling for RESET and ACK_FAILURE
2 parents 9929475 + 6a49a99 commit 2c2ae55

File tree

8 files changed

+278
-107
lines changed

8 files changed

+278
-107
lines changed

src/v1/internal/connection-holder.js

+12-15
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ export default class ConnectionHolder {
7070

7171
this._referenceCount--;
7272
if (this._referenceCount === 0) {
73-
// release a connection without muting ACK_FAILURE, this is the last action on this connection
74-
return this._releaseConnection(true);
73+
return this._releaseConnection();
7574
}
7675
return this._connectionPromise;
7776
}
@@ -85,9 +84,7 @@ export default class ConnectionHolder {
8584
return this._connectionPromise;
8685
}
8786
this._referenceCount = 0;
88-
// release a connection and mute ACK_FAILURE, this might be called concurrently with other
89-
// operations and thus should ignore failure handling
90-
return this._releaseConnection(false);
87+
return this._releaseConnection();
9188
}
9289

9390
/**
@@ -97,19 +94,16 @@ export default class ConnectionHolder {
9794
* @return {Promise} - promise resolved then connection is returned to the pool.
9895
* @private
9996
*/
100-
_releaseConnection(sync) {
97+
_releaseConnection() {
10198
this._connectionPromise = this._connectionPromise.then(connection => {
10299
if (connection) {
103-
if(sync) {
104-
connection.reset();
105-
} else {
106-
connection.resetAsync();
107-
}
108-
connection.sync();
109-
connection._release();
100+
return connection.resetAndFlush()
101+
.catch(ignoreError)
102+
.then(() => connection._release());
103+
} else {
104+
return Promise.resolve();
110105
}
111-
}).catch(ignoredError => {
112-
});
106+
}).catch(ignoreError);
113107

114108
return this._connectionPromise;
115109
}
@@ -134,6 +128,9 @@ class EmptyConnectionHolder extends ConnectionHolder {
134128
}
135129
}
136130

131+
function ignoreError(error) {
132+
}
133+
137134
/**
138135
* Connection holder that does not manage any connections.
139136
* @type {ConnectionHolder}

src/v1/internal/connector.js

+102-67
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
2323
import packStreamUtil from './packstream-util';
2424
import {alloc} from './buf';
25-
import {newError} from './../error';
25+
import {newError, PROTOCOL_ERROR} from './../error';
2626
import ChannelConfig from './ch-config';
2727
import urlUtil from './url-util';
2828
import StreamObserver from './stream-observer';
@@ -120,7 +120,7 @@ class Connection {
120120
this._packer = packStreamUtil.createLatestPacker(this._chunker);
121121
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
122122

123-
this._isHandlingFailure = false;
123+
this._ackFailureMuted = false;
124124
this._currentFailure = null;
125125

126126
this._state = new ConnectionState(this);
@@ -241,25 +241,8 @@ class Connection {
241241
this._currentObserver.onError( this._currentFailure );
242242
} finally {
243243
this._updateCurrentObserver();
244-
// Things are now broken. Pending observers will get FAILURE messages routed until
245-
// We are done handling this failure.
246-
if( !this._isHandlingFailure ) {
247-
this._isHandlingFailure = true;
248-
249-
// isHandlingFailure was false, meaning this is the first failure message
250-
// we see from this failure. We may see several others, one for each message
251-
// we had "optimistically" already sent after whatever it was that failed.
252-
// We only want to and need to ACK the first one, which is why we are tracking
253-
// this _isHandlingFailure thing.
254-
this._ackFailure({
255-
onNext: NO_OP,
256-
onError: NO_OP,
257-
onCompleted: () => {
258-
this._isHandlingFailure = false;
259-
this._currentFailure = null;
260-
}
261-
});
262-
}
244+
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
245+
this._ackFailureIfNeeded();
263246
}
264247
break;
265248
case IGNORED:
@@ -268,7 +251,7 @@ class Connection {
268251
if (this._currentFailure && this._currentObserver.onError)
269252
this._currentObserver.onError(this._currentFailure);
270253
else if(this._currentObserver.onError)
271-
this._currentObserver.onError(payload);
254+
this._currentObserver.onError(newError('Ignored either because of an error or RESET'));
272255
} finally {
273256
this._updateCurrentObserver();
274257
}
@@ -282,80 +265,122 @@ class Connection {
282265
initialize( clientName, token, observer ) {
283266
log("C", "INIT", clientName, token);
284267
const initObserver = this._state.wrap(observer);
285-
this._queueObserver(initObserver);
286-
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
287-
(err) => this._handleFatalError(err) );
288-
this._chunker.messageBoundary();
289-
this.sync();
268+
const queued = this._queueObserver(initObserver);
269+
if (queued) {
270+
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
271+
(err) => this._handleFatalError(err));
272+
this._chunker.messageBoundary();
273+
this.sync();
274+
}
290275
}
291276

292277
/** Queue a RUN-message to be sent to the database */
293278
run( statement, params, observer ) {
294279
log("C", "RUN", statement, params);
295-
this._queueObserver(observer);
296-
this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)],
297-
(err) => this._handleFatalError(err) );
298-
this._chunker.messageBoundary();
280+
const queued = this._queueObserver(observer);
281+
if (queued) {
282+
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)],
283+
(err) => this._handleFatalError(err));
284+
this._chunker.messageBoundary();
285+
}
299286
}
300287

301288
/** Queue a PULL_ALL-message to be sent to the database */
302289
pullAll( observer ) {
303290
log("C", "PULL_ALL");
304-
this._queueObserver(observer);
305-
this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) );
306-
this._chunker.messageBoundary();
291+
const queued = this._queueObserver(observer);
292+
if (queued) {
293+
this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err));
294+
this._chunker.messageBoundary();
295+
}
307296
}
308297

309298
/** Queue a DISCARD_ALL-message to be sent to the database */
310299
discardAll( observer ) {
311300
log("C", "DISCARD_ALL");
312-
this._queueObserver(observer);
313-
this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) );
314-
this._chunker.messageBoundary();
301+
const queued = this._queueObserver(observer);
302+
if (queued) {
303+
this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err));
304+
this._chunker.messageBoundary();
305+
}
315306
}
316307

317-
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
318-
resetAsync( observer ) {
319-
log("C", "RESET_ASYNC");
320-
this._isHandlingFailure = true;
321-
let self = this;
322-
let wrappedObs = {
323-
onNext: observer ? observer.onNext : NO_OP,
324-
onError: observer ? observer.onError : NO_OP,
325-
onCompleted: () => {
326-
self._isHandlingFailure = false;
327-
if (observer) {
328-
observer.onCompleted();
308+
/**
309+
* Send a RESET-message to the database. Mutes failure handling.
310+
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
311+
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
312+
*/
313+
resetAndFlush() {
314+
log('C', 'RESET');
315+
this._ackFailureMuted = true;
316+
317+
return new Promise((resolve, reject) => {
318+
const observer = {
319+
onNext: record => {
320+
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
321+
reject(neo4jError);
322+
},
323+
onError: error => {
324+
if (this._isBroken) {
325+
// handling a fatal error, no need to raise a protocol violation
326+
reject(error);
327+
} else {
328+
const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error);
329+
reject(neo4jError);
330+
}
331+
},
332+
onCompleted: () => {
333+
this._ackFailureMuted = false;
334+
resolve();
329335
}
336+
};
337+
const queued = this._queueObserver(observer);
338+
if (queued) {
339+
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
340+
this._chunker.messageBoundary();
341+
this.sync();
330342
}
331-
};
332-
this._queueObserver(wrappedObs);
333-
this._packer.packStruct( RESET, [], (err) => this._handleFatalError(err) );
334-
this._chunker.messageBoundary();
343+
});
335344
}
336345

337-
/** Queue a RESET-message to be sent to the database */
338-
reset(observer) {
339-
log('C', 'RESET');
340-
this._queueObserver(observer);
341-
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
342-
this._chunker.messageBoundary();
343-
}
346+
_ackFailureIfNeeded() {
347+
if (this._ackFailureMuted) {
348+
return;
349+
}
344350

345-
/** Queue a ACK_FAILURE-message to be sent to the database */
346-
_ackFailure( observer ) {
347-
log("C", "ACK_FAILURE");
348-
this._queueObserver(observer);
349-
this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) );
350-
this._chunker.messageBoundary();
351+
log('C', 'ACK_FAILURE');
352+
353+
const observer = {
354+
onNext: record => {
355+
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
356+
},
357+
onError: error => {
358+
if (!this._isBroken && !this._ackFailureMuted) {
359+
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
360+
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
361+
} else {
362+
this._currentFailure = null;
363+
}
364+
},
365+
onCompleted: () => {
366+
this._currentFailure = null;
367+
}
368+
};
369+
370+
const queued = this._queueObserver(observer);
371+
if (queued) {
372+
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
373+
this._chunker.messageBoundary();
374+
this.sync();
375+
}
351376
}
352377

353378
_queueObserver(observer) {
354379
if( this._isBroken ) {
355380
if( observer && observer.onError ) {
356381
observer.onError(this._error);
357382
}
358-
return;
383+
return false;
359384
}
360385
observer = observer || NO_OP_OBSERVER;
361386
observer.onCompleted = observer.onCompleted || NO_OP;
@@ -366,6 +391,7 @@ class Connection {
366391
} else {
367392
this._pendingObservers.push( observer );
368393
}
394+
return true;
369395
}
370396

371397
/**
@@ -423,6 +449,15 @@ class Connection {
423449
}
424450
}
425451
}
452+
453+
_handleProtocolError(message) {
454+
this._ackFailureMuted = false;
455+
this._currentFailure = null;
456+
this._updateCurrentObserver();
457+
const error = newError(message, PROTOCOL_ERROR);
458+
this._handleFatalError(error);
459+
return error;
460+
}
426461
}
427462

428463
class ConnectionState {

test/internal/connection-holder.test.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ describe('ConnectionHolder', () => {
168168
connectionHolder.initializeConnection();
169169

170170
connectionHolder.close().then(() => {
171-
expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy();
171+
expect(connection.isReleasedOnce()).toBeTruthy();
172172
done();
173173
});
174174
});
@@ -201,11 +201,11 @@ describe('ConnectionHolder', () => {
201201
connectionHolder.initializeConnection();
202202

203203
connectionHolder.close().then(() => {
204-
expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy();
204+
expect(connection1.isReleasedOnce()).toBeTruthy();
205205

206206
connectionHolder.initializeConnection();
207207
connectionHolder.close().then(() => {
208-
expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy();
208+
expect(connection2.isReleasedOnce()).toBeTruthy();
209209
done();
210210
});
211211
});

0 commit comments

Comments
 (0)