Skip to content

Commit cb9ca48

Browse files
authored
Merge pull request #399 from lutovich/1.7-ack-failure-to-reset
Replace ACK_FAILURE with RESET
2 parents d7a5158 + b27fa91 commit cb9ca48

10 files changed

+1665
-532
lines changed

package-lock.json

+1,589-450
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/v1/internal/connector.js

+23-44
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ else {
4444
let
4545
// Signature bytes for each message type
4646
INIT = 0x01, // 0000 0001 // INIT <user_agent>
47-
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE
47+
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE - unused
4848
RESET = 0x0F, // 0000 1111 // RESET
4949
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
5050
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
@@ -106,7 +106,6 @@ class Connection {
106106
this._packer = packStreamUtil.createLatestPacker(this._chunker);
107107
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
108108

109-
this._ackFailureMuted = false;
110109
this._currentFailure = null;
111110

112111
this._state = new ConnectionState(this);
@@ -247,7 +246,7 @@ class Connection {
247246
} finally {
248247
this._updateCurrentObserver();
249248
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
250-
this._ackFailureIfNeeded();
249+
this._resetOnFailure();
251250
}
252251
break;
253252
case IGNORED:
@@ -279,7 +278,7 @@ class Connection {
279278
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
280279
(err) => this._handleFatalError(err));
281280
this._chunker.messageBoundary();
282-
this.sync();
281+
this.flush();
283282
}
284283
}
285284

@@ -322,17 +321,12 @@ class Connection {
322321

323322
/**
324323
* Send a RESET-message to the database. Mutes failure handling.
325-
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
324+
* Message is immediately flushed to the network. Separate {@link Connection#flush()} call is not required.
326325
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
327326
*/
328327
resetAndFlush() {
329-
if (this._log.isDebugEnabled()) {
330-
this._log.debug(`${this} C: RESET`);
331-
}
332-
this._ackFailureMuted = true;
333-
334328
return new Promise((resolve, reject) => {
335-
const observer = {
329+
this._reset({
336330
onNext: record => {
337331
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
338332
reject(neo4jError);
@@ -347,50 +341,37 @@ class Connection {
347341
}
348342
},
349343
onCompleted: () => {
350-
this._ackFailureMuted = false;
351344
resolve();
352345
}
353-
};
354-
const queued = this._queueObserver(observer);
355-
if (queued) {
356-
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
357-
this._chunker.messageBoundary();
358-
this.sync();
359-
}
346+
});
360347
});
361348
}
362349

363-
_ackFailureIfNeeded() {
364-
if (this._ackFailureMuted) {
365-
return;
366-
}
367-
368-
if (this._log.isDebugEnabled()) {
369-
this._log.debug(`${this} C: ACK_FAILURE`);
370-
}
371-
372-
const observer = {
350+
_resetOnFailure() {
351+
this._reset({
373352
onNext: record => {
374-
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
353+
this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
375354
},
376-
onError: error => {
377-
if (!this._isBroken && !this._ackFailureMuted) {
378-
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
379-
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
380-
} else {
381-
this._currentFailure = null;
382-
}
355+
// clear the current failure when response for RESET is received
356+
onError: () => {
357+
this._currentFailure = null;
383358
},
384359
onCompleted: () => {
385360
this._currentFailure = null;
386361
}
387-
};
362+
});
363+
}
364+
365+
_reset(observer) {
366+
if (this._log.isDebugEnabled()) {
367+
this._log.debug(`${this} C: RESET`);
368+
}
388369

389370
const queued = this._queueObserver(observer);
390371
if (queued) {
391-
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
372+
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
392373
this._chunker.messageBoundary();
393-
this.sync();
374+
this.flush();
394375
}
395376
}
396377

@@ -431,10 +412,9 @@ class Connection {
431412
}
432413

433414
/**
434-
* Synchronize - flush all queued outgoing messages and route their responses
435-
* to their respective handlers.
415+
* Flush all queued outgoing messages.
436416
*/
437-
sync() {
417+
flush() {
438418
this._chunker.flush();
439419
}
440420

@@ -477,7 +457,6 @@ class Connection {
477457
}
478458

479459
_handleProtocolError(message) {
480-
this._ackFailureMuted = false;
481460
this._currentFailure = null;
482461
this._updateCurrentObserver();
483462
const error = newError(message, PROTOCOL_ERROR);

src/v1/session.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class Session {
7575
connectionHolder.getConnection(streamObserver).then(connection => {
7676
statementRunner(connection, streamObserver);
7777
connection.pullAll(streamObserver);
78-
connection.sync();
78+
connection.flush();
7979
}).catch(error => streamObserver.onError(error));
8080
} else {
8181
streamObserver.onError(newError('Statements cannot be run directly on a ' +

src/v1/transaction.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ let _states = {
162162
connectionHolder.getConnection(observer).then(conn => {
163163
conn.run(statement, parameters || {}, observer);
164164
conn.pullAll(observer);
165-
conn.sync();
165+
conn.flush();
166166
}).catch(error => observer.onError(error));
167167

168168
return _newRunResult(observer, statement, parameters, () => observer.serverMetadata());
@@ -240,7 +240,7 @@ function _runPullAll(msg, connectionHolder, observer) {
240240
connectionHolder.getConnection(observer).then(conn => {
241241
conn.run(msg, {}, observer);
242242
conn.pullAll(observer);
243-
conn.sync();
243+
conn.flush();
244244
}).catch(error => observer.onError(error));
245245

246246
// for commit & rollback we need result that uses real connection holder and notifies it when

test/internal/connector.test.js

+6-30
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ describe('connector', () => {
7373
},
7474
onError: console.log
7575
});
76-
connection.sync();
76+
connection.flush();
7777

7878
});
7979

@@ -94,7 +94,7 @@ describe('connector', () => {
9494
done();
9595
}
9696
});
97-
connection.sync();
97+
connection.flush();
9898
});
9999

100100
it('should use DummyChannel to read what gets written', done => {
@@ -113,7 +113,7 @@ describe('connector', () => {
113113
// When
114114
connection.initialize('mydriver/0.0.0', basicAuthToken());
115115
connection.run('RETURN 1', {});
116-
connection.sync();
116+
connection.flush();
117117
expect(observer.instance.toHex()).toBe('00 44 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 88 70 61 73 73 77 6f 72 64 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 ');
118118
done();
119119
});
@@ -135,7 +135,7 @@ describe('connector', () => {
135135
done();
136136
}
137137
});
138-
connection.sync();
138+
connection.flush();
139139

140140
});
141141

@@ -270,10 +270,6 @@ describe('connector', () => {
270270
testQueueingOfObserversWithBrokenConnection(resetAction);
271271
});
272272

273-
it('should not queue ACK_FAILURE observer when broken', () => {
274-
testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded());
275-
});
276-
277273
it('should reset and flush when SUCCESS received', done => {
278274
connection = connect('bolt://localhost');
279275

@@ -315,36 +311,16 @@ describe('connector', () => {
315311
connection._handleMessage(RECORD_MESSAGE);
316312
});
317313

318-
it('should ACK_FAILURE when SUCCESS received', () => {
314+
it('should acknowledge failure with RESET when SUCCESS received', () => {
319315
connection = connect('bolt://localhost');
320316

321317
connection._currentFailure = newError('Hello');
322-
connection._ackFailureIfNeeded();
318+
connection._resetOnFailure();
323319

324320
connection._handleMessage(SUCCESS_MESSAGE);
325321
expect(connection._currentFailure).toBeNull();
326322
});
327323

328-
it('should fail the connection when ACK_FAILURE receives FAILURE', () => {
329-
connection = connect('bolt://localhost');
330-
331-
connection._ackFailureIfNeeded();
332-
333-
connection._handleMessage(FAILURE_MESSAGE);
334-
expect(connection._isBroken).toBeTruthy();
335-
expect(connection.isOpen()).toBeFalsy();
336-
});
337-
338-
it('should fail the connection when ACK_FAILURE receives RECORD', () => {
339-
connection = connect('bolt://localhost');
340-
341-
connection._ackFailureIfNeeded();
342-
343-
connection._handleMessage(RECORD_MESSAGE);
344-
expect(connection._isBroken).toBeTruthy();
345-
expect(connection.isOpen()).toBeFalsy();
346-
});
347-
348324
function packedHandshakeMessage() {
349325
const result = alloc(4);
350326
result.putInt32(0, 1);

test/internal/fake-connection.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ export default class FakeConnection {
3131
this.creationTimestamp = Date.now();
3232

3333
this.resetInvoked = 0;
34-
this.syncInvoked = 0;
34+
this.flushInvoked = 0;
3535
this.releaseInvoked = 0;
3636
this.initializationInvoked = 0;
3737
this.seenStatements = [];
@@ -58,8 +58,8 @@ export default class FakeConnection {
5858
return Promise.resolve();
5959
}
6060

61-
sync() {
62-
this.syncInvoked++;
61+
flush() {
62+
this.flushInvoked++;
6363
}
6464

6565
_release() {

test/internal/shared-neo4j.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ const additionalConfig = {
109109
};
110110

111111
const neoCtrlVersionParam = '-e';
112-
const defaultNeo4jVersion = '3.3.4';
112+
const defaultNeo4jVersion = '3.4.5';
113113
const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`;
114114

115115
function neo4jCertPath(dir) {

test/resources/boltstub/address_unavailable_template.script.mst

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
!: AUTO INIT
22
!: AUTO RESET
33
!: AUTO PULL_ALL
4-
!: AUTO ACK_FAILURE
54
!: AUTO RUN "ROLLBACK" {}
65
!: AUTO RUN "BEGIN" {}
76
!: AUTO RUN "COMMIT" {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
!: AUTO INIT
2+
!: AUTO PULL_ALL
3+
4+
C: RUN "RETURN 10 / 0" {}
5+
C: PULL_ALL
6+
S: FAILURE {"code": "Neo.ClientError.Statement.ArithmeticError", "message": "/ by zero"}
7+
S: IGNORED
8+
C: RESET
9+
S: SUCCESS
10+
C: RESET
11+
S: SUCCESS

test/v1/direct.driver.boltkit.test.js

+29
Original file line numberDiff line numberDiff line change
@@ -304,4 +304,33 @@ describe('direct driver with stub server', () => {
304304
});
305305
});
306306

307+
it('should send RESET on error', done => {
308+
if (!boltStub.supported) {
309+
done();
310+
return;
311+
}
312+
313+
const server = boltStub.start('./test/resources/boltstub/query_with_error.script', 9001);
314+
315+
boltStub.run(() => {
316+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001');
317+
const session = driver.session();
318+
319+
session.run('RETURN 10 / 0').then(result => {
320+
done.fail('Should fail but received a result: ' + JSON.stringify(result));
321+
}).catch(error => {
322+
expect(error.code).toEqual('Neo.ClientError.Statement.ArithmeticError');
323+
expect(error.message).toEqual('/ by zero');
324+
325+
session.close(() => {
326+
driver.close();
327+
server.exit(code => {
328+
expect(code).toEqual(0);
329+
done();
330+
});
331+
});
332+
});
333+
});
334+
});
335+
307336
});

0 commit comments

Comments
 (0)