Skip to content

Replace ACK_FAILURE with RESET #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,039 changes: 1,589 additions & 450 deletions package-lock.json

Large diffs are not rendered by default.

67 changes: 23 additions & 44 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ else {
let
// Signature bytes for each message type
INIT = 0x01, // 0000 0001 // INIT <user_agent>
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE
ACK_FAILURE = 0x0E, // 0000 1110 // ACK_FAILURE - unused
RESET = 0x0F, // 0000 1111 // RESET
RUN = 0x10, // 0001 0000 // RUN <statement> <parameters>
DISCARD_ALL = 0x2F, // 0010 1111 // DISCARD *
Expand Down Expand Up @@ -106,7 +106,6 @@ class Connection {
this._packer = packStreamUtil.createLatestPacker(this._chunker);
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);

this._ackFailureMuted = false;
this._currentFailure = null;

this._state = new ConnectionState(this);
Expand Down Expand Up @@ -247,7 +246,7 @@ class Connection {
} finally {
this._updateCurrentObserver();
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
this._ackFailureIfNeeded();
this._resetOnFailure();
}
break;
case IGNORED:
Expand Down Expand Up @@ -279,7 +278,7 @@ class Connection {
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
(err) => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
this.flush();
}
}

Expand Down Expand Up @@ -322,17 +321,12 @@ class Connection {

/**
* Send a RESET-message to the database. Mutes failure handling.
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
* Message is immediately flushed to the network. Separate {@link Connection#flush()} call is not required.
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
*/
resetAndFlush() {
if (this._log.isDebugEnabled()) {
this._log.debug(`${this} C: RESET`);
}
this._ackFailureMuted = true;

return new Promise((resolve, reject) => {
const observer = {
this._reset({
onNext: record => {
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
reject(neo4jError);
Expand All @@ -347,50 +341,37 @@ class Connection {
}
},
onCompleted: () => {
this._ackFailureMuted = false;
resolve();
}
};
const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
}
});
});
}

_ackFailureIfNeeded() {
if (this._ackFailureMuted) {
return;
}

if (this._log.isDebugEnabled()) {
this._log.debug(`${this} C: ACK_FAILURE`);
}

const observer = {
_resetOnFailure() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt about calling resetAndFlush instead, like resetAndFlush().finally(() => { /* clear failure */ })?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would definitely make the code a bit more concise but then failure will be cleared in a tick after the one that processes the RESET response. I think it might result in some queued callbacks to be failed unnecessarily.

this._reset({
onNext: record => {
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
},
onError: error => {
if (!this._isBroken && !this._ackFailureMuted) {
// not handling a fatal error and RESET did not cause the given error - looks like a protocol violation
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
} else {
this._currentFailure = null;
}
// clear the current failure when response for RESET is received
onError: () => {
this._currentFailure = null;
},
onCompleted: () => {
this._currentFailure = null;
}
};
});
}

_reset(observer) {
if (this._log.isDebugEnabled()) {
this._log.debug(`${this} C: RESET`);
}

const queued = this._queueObserver(observer);
if (queued) {
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
this._chunker.messageBoundary();
this.sync();
this.flush();
}
}

Expand Down Expand Up @@ -431,10 +412,9 @@ class Connection {
}

/**
* Synchronize - flush all queued outgoing messages and route their responses
* to their respective handlers.
* Flush all queued outgoing messages.
*/
sync() {
flush() {
this._chunker.flush();
}

Expand Down Expand Up @@ -477,7 +457,6 @@ class Connection {
}

_handleProtocolError(message) {
this._ackFailureMuted = false;
this._currentFailure = null;
this._updateCurrentObserver();
const error = newError(message, PROTOCOL_ERROR);
Expand Down
2 changes: 1 addition & 1 deletion src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class Session {
connectionHolder.getConnection(streamObserver).then(connection => {
statementRunner(connection, streamObserver);
connection.pullAll(streamObserver);
connection.sync();
connection.flush();
}).catch(error => streamObserver.onError(error));
} else {
streamObserver.onError(newError('Statements cannot be run directly on a ' +
Expand Down
4 changes: 2 additions & 2 deletions src/v1/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ let _states = {
connectionHolder.getConnection(observer).then(conn => {
conn.run(statement, parameters || {}, observer);
conn.pullAll(observer);
conn.sync();
conn.flush();
}).catch(error => observer.onError(error));

return _newRunResult(observer, statement, parameters, () => observer.serverMetadata());
Expand Down Expand Up @@ -240,7 +240,7 @@ function _runPullAll(msg, connectionHolder, observer) {
connectionHolder.getConnection(observer).then(conn => {
conn.run(msg, {}, observer);
conn.pullAll(observer);
conn.sync();
conn.flush();
}).catch(error => observer.onError(error));

// for commit & rollback we need result that uses real connection holder and notifies it when
Expand Down
36 changes: 6 additions & 30 deletions test/internal/connector.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('connector', () => {
},
onError: console.log
});
connection.sync();
connection.flush();

});

Expand All @@ -94,7 +94,7 @@ describe('connector', () => {
done();
}
});
connection.sync();
connection.flush();
});

it('should use DummyChannel to read what gets written', done => {
Expand All @@ -113,7 +113,7 @@ describe('connector', () => {
// When
connection.initialize('mydriver/0.0.0', basicAuthToken());
connection.run('RETURN 1', {});
connection.sync();
connection.flush();
expect(observer.instance.toHex()).toBe('00 44 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 88 70 61 73 73 77 6f 72 64 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 ');
done();
});
Expand All @@ -135,7 +135,7 @@ describe('connector', () => {
done();
}
});
connection.sync();
connection.flush();

});

Expand Down Expand Up @@ -270,10 +270,6 @@ describe('connector', () => {
testQueueingOfObserversWithBrokenConnection(resetAction);
});

it('should not queue ACK_FAILURE observer when broken', () => {
testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded());
});

it('should reset and flush when SUCCESS received', done => {
connection = connect('bolt://localhost');

Expand Down Expand Up @@ -315,36 +311,16 @@ describe('connector', () => {
connection._handleMessage(RECORD_MESSAGE);
});

it('should ACK_FAILURE when SUCCESS received', () => {
it('should acknowledge failure with RESET when SUCCESS received', () => {
connection = connect('bolt://localhost');

connection._currentFailure = newError('Hello');
connection._ackFailureIfNeeded();
connection._resetOnFailure();

connection._handleMessage(SUCCESS_MESSAGE);
expect(connection._currentFailure).toBeNull();
});

it('should fail the connection when ACK_FAILURE receives FAILURE', () => {
connection = connect('bolt://localhost');

connection._ackFailureIfNeeded();

connection._handleMessage(FAILURE_MESSAGE);
expect(connection._isBroken).toBeTruthy();
expect(connection.isOpen()).toBeFalsy();
});

it('should fail the connection when ACK_FAILURE receives RECORD', () => {
connection = connect('bolt://localhost');

connection._ackFailureIfNeeded();

connection._handleMessage(RECORD_MESSAGE);
expect(connection._isBroken).toBeTruthy();
expect(connection.isOpen()).toBeFalsy();
});

function packedHandshakeMessage() {
const result = alloc(4);
result.putInt32(0, 1);
Expand Down
6 changes: 3 additions & 3 deletions test/internal/fake-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export default class FakeConnection {
this.creationTimestamp = Date.now();

this.resetInvoked = 0;
this.syncInvoked = 0;
this.flushInvoked = 0;
this.releaseInvoked = 0;
this.initializationInvoked = 0;
this.seenStatements = [];
Expand All @@ -58,8 +58,8 @@ export default class FakeConnection {
return Promise.resolve();
}

sync() {
this.syncInvoked++;
flush() {
this.flushInvoked++;
}

_release() {
Expand Down
2 changes: 1 addition & 1 deletion test/internal/shared-neo4j.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ const additionalConfig = {
};

const neoCtrlVersionParam = '-e';
const defaultNeo4jVersion = '3.3.4';
const defaultNeo4jVersion = '3.4.5';
const defaultNeoCtrlArgs = `${neoCtrlVersionParam} ${defaultNeo4jVersion}`;

function neo4jCertPath(dir) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
!: AUTO INIT
!: AUTO RESET
!: AUTO PULL_ALL
!: AUTO ACK_FAILURE
!: AUTO RUN "ROLLBACK" {}
!: AUTO RUN "BEGIN" {}
!: AUTO RUN "COMMIT" {}
Expand Down
11 changes: 11 additions & 0 deletions test/resources/boltstub/query_with_error.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
!: AUTO INIT
!: AUTO PULL_ALL

C: RUN "RETURN 10 / 0" {}
C: PULL_ALL
S: FAILURE {"code": "Neo.ClientError.Statement.ArithmeticError", "message": "/ by zero"}
S: IGNORED
C: RESET
S: SUCCESS
C: RESET
S: SUCCESS
29 changes: 29 additions & 0 deletions test/v1/direct.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,33 @@ describe('direct driver with stub server', () => {
});
});

it('should send RESET on error', done => {
if (!boltStub.supported) {
done();
return;
}

const server = boltStub.start('./test/resources/boltstub/query_with_error.script', 9001);

boltStub.run(() => {
const driver = boltStub.newDriver('bolt://127.0.0.1:9001');
const session = driver.session();

session.run('RETURN 10 / 0').then(result => {
done.fail('Should fail but received a result: ' + JSON.stringify(result));
}).catch(error => {
expect(error.code).toEqual('Neo.ClientError.Statement.ArithmeticError');
expect(error.message).toEqual('/ by zero');

session.close(() => {
driver.close();
server.exit(code => {
expect(code).toEqual(0);
done();
});
});
});
});
});

});