Skip to content

Bolt V3 messaging part #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/v1/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ export default class BoltProtocol {
return this._unpacker;
}

/**
* Transform metadata received in SUCCESS message before it is passed to the handler.
* @param {object} metadata the received metadata.
* @return {object} transformed metadata.
*/
transformMetadata(metadata) {
return metadata;
}

/**
* Perform initialization and authentication of the underlying connection.
* @param {string} clientName the client name.
Expand Down
82 changes: 82 additions & 0 deletions src/v1/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import BoltProtocolV2 from './bolt-protocol-v2';
import RequestMessage from './request-message';

export default class BoltProtocol extends BoltProtocolV2 {

constructor(connection, chunker, disableLosslessIntegers) {
super(connection, chunker, disableLosslessIntegers);
}

transformMetadata(metadata) {
if (metadata.t_first) {
// Bolt V3 uses shorter key 't_first' to represent 'result_available_after'
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
metadata.result_available_after = metadata.t_first;
delete metadata.t_first;
}
if (metadata.t_last) {
// Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after'
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
metadata.result_consumed_after = metadata.t_last;
delete metadata.t_last;
}
return metadata;
}

initialize(userAgent, authToken, observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.hello(userAgent, authToken);
this._connection.write(message, observer, true);
}

beginTransaction(bookmark, observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.begin(bookmark);
this._connection.write(message, observer, true);
}

commitTransaction(observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.commit();
this._connection.write(message, observer, true);
}

rollbackTransaction(observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.rollback();
this._connection.write(message, observer, true);
}

run(statement, parameters, observer) {
const metadata = {};
const runMessage = RequestMessage.runWithMetadata(statement, parameters, metadata);
const pullAllMessage = RequestMessage.pullAll();

this._connection.write(runMessage, observer, false);
this._connection.write(pullAllMessage, observer, true);
}
}

function prepareToHandleSingleResponse(observer) {
if (observer && typeof observer.prepareToHandleSingleResponse === 'function') {
observer.prepareToHandleSingleResponse();
}
}
8 changes: 8 additions & 0 deletions src/v1/internal/bookmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export default class Bookmark {
return this._maxValue;
}

/**
* Get all bookmark values as an array.
* @return {string[]} all values.
*/
values() {
return this._values;
}

/**
* Get this bookmark as an object for begin transaction call.
* @return {object} the value of this bookmark as object.
Expand Down
3 changes: 2 additions & 1 deletion src/v1/internal/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ export default class Connection {
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`);
}
try {
this._currentObserver.onCompleted( payload );
const metadata = this._protocol.transformMetadata(payload);
this._currentObserver.onCompleted(metadata);
} finally {
this._updateCurrentObserver();
}
Expand Down
24 changes: 14 additions & 10 deletions src/v1/internal/protocol-handshaker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {alloc} from './buf';
import {newError} from '../error';
import BoltProtocolV1 from './bolt-protocol-v1';
import BoltProtocolV2 from './bolt-protocol-v2';
import BoltProtocolV3 from './bolt-protocol-v3';

const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
const BOLT_MAGIC_PREAMBLE = 0x6060B017;
Expand Down Expand Up @@ -69,15 +70,18 @@ export default class ProtocolHandshaker {
* @private
*/
_createProtocolWithVersion(version) {
if (version === 1) {
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
} else if (version === 2) {
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
} else if (version === HTTP_MAGIC_PREAMBLE) {
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
} else {
throw newError('Unknown Bolt protocol version: ' + version);
switch (version) {
case 1:
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
case 2:
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
case 3:
return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers);
case HTTP_MAGIC_PREAMBLE:
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
default:
throw newError('Unknown Bolt protocol version: ' + version);
}
}
}
Expand All @@ -93,10 +97,10 @@ function newHandshakeBuffer() {
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE);

//proposed versions
handshakeBuffer.writeInt32(3);
handshakeBuffer.writeInt32(2);
handshakeBuffer.writeInt32(1);
handshakeBuffer.writeInt32(0);
handshakeBuffer.writeInt32(0);

// reset the reader position
handshakeBuffer.reset();
Expand Down
62 changes: 59 additions & 3 deletions src/v1/internal/request-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/

// Signature bytes for each request message type
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
const INIT = 0x01; // 0000 0001 // INIT <user_agent> <authentication_token>
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
const RESET = 0x0F; // 0000 1111 // RESET
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused
const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL

const HELLO = 0x01; // 0000 0001 // HELLO <metadata>
const BEGIN = 0x11; // 0001 0001 // BEGIN <metadata>
const COMMIT = 0x12; // 0001 0010 // COMMIT
const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK

export default class RequestMessage {

Expand Down Expand Up @@ -68,8 +73,59 @@ export default class RequestMessage {
static reset() {
return RESET_MESSAGE;
}

/**
* Create a new HELLO message.
* @param {string} userAgent the user agent.
* @param {object} authToken the authentication token.
* @return {RequestMessage} new HELLO message.
*/
static hello(userAgent, authToken) {
const metadata = Object.assign({user_agent: userAgent}, authToken);
return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`);
}

/**
* Create a new BEGIN message.
* @param {Bookmark} bookmark the bookmark.
* @return {RequestMessage} new BEGIN message.
*/
static begin(bookmark) {
const metadata = {bookmarks: bookmark.values()};
return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`);
}

/**
* Get a COMMIT message.
* @return {RequestMessage} the COMMIT message.
*/
static commit() {
return COMMIT_MESSAGE;
}

/**
* Get a ROLLBACK message.
* @return {RequestMessage} the ROLLBACK message.
*/
static rollback() {
return ROLLBACK_MESSAGE;
}

/**
* Create a new RUN message with additional metadata.
* @param {string} statement the cypher statement.
* @param {object} parameters the statement parameters.
* @param {object} metadata the additional metadata.
* @return {RequestMessage} new RUN message with additional metadata.
*/
static runWithMetadata(statement, parameters, metadata) {
return new RequestMessage(RUN, [statement, parameters, metadata],
() => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`);
}
}

// constants for messages that never change
const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL');
const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET');
const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT');
const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK');
14 changes: 14 additions & 0 deletions src/v1/internal/stream-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ class StreamObserver {
this._conn = conn;
}

/**
* Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL.
* Response for RUN initializes statement keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream.
*
* However, some operations can be represented as a single message which receives full metadata in a single response.
* For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3.
* Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3.
*
* This function prepares the observer to only handle a single response message.
*/
prepareToHandleSingleResponse() {
this._fieldKeys = [];
}

/**
* Will be called on errors.
* If user-provided observer is present, pass the error
Expand Down
9 changes: 9 additions & 0 deletions test/internal/bolt-protocol-v1.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ class MessageRecorder {

describe('BoltProtocolV1', () => {

it('should not change metadata', () => {
const metadata = {result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4};
const protocol = new BoltProtocolV1(new MessageRecorder(), null, false);

const transformedMetadata = protocol.transformMetadata(metadata);

expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, t_first: 3, t_last: 4});
});

it('should initialize the connection', () => {
const recorder = new MessageRecorder();
const protocol = new BoltProtocolV1(recorder, null, false);
Expand Down
33 changes: 33 additions & 0 deletions test/internal/bolt-protocol-v3.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import BoltProtocolV3 from '../../src/v1/internal/bolt-protocol-v3';

describe('BoltProtocolV3', () => {

it('should update metadata', () => {
const metadata = {t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4};
const protocol = new BoltProtocolV3(null, null, false);

const transformedMetadata = protocol.transformMetadata(metadata);

expect(transformedMetadata).toEqual({result_available_after: 1, result_consumed_after: 2, db_hits: 3, some_other_key: 4});
});

});
11 changes: 11 additions & 0 deletions test/internal/bookmark.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,15 @@ describe('Bookmark', () => {
});
});

it('should expose bookmark values', () => {
expect(new Bookmark(undefined).values()).toEqual([]);
expect(new Bookmark(null).values()).toEqual([]);

const bookmarkString = 'neo4j:bookmark:v1:tx123';
expect(new Bookmark(bookmarkString).values()).toEqual([bookmarkString]);

const bookmarkStrings = ['neo4j:bookmark:v1:tx1', 'neo4j:bookmark:v1:tx2', 'neo4j:bookmark:v1:tx3'];
expect(new Bookmark(bookmarkStrings).values()).toEqual(bookmarkStrings);
});

});
17 changes: 9 additions & 8 deletions test/internal/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import {ServerVersion} from '../../src/v1/internal/server-version';
import lolex from 'lolex';
import Logger from '../../src/v1/internal/logger';
import StreamObserver from '../../src/v1/internal/stream-observer';
import RequestMessage from '../../src/v1/internal/request-message';
import ConnectionErrorHandler from '../../src/v1/internal/connection-error-handler';
import testUtils from '../internal/test-utils';

Expand Down Expand Up @@ -88,16 +87,17 @@ describe('Connection', () => {
records.push(record);
},
onCompleted: () => {
expect(records[0][0]).toBe(1);
expect(records[0].get(0)).toBe(1);
done();
}
};
const streamObserver = new StreamObserver();
streamObserver.subscribe(pullAllObserver);

connection._negotiateProtocol().then(() => {
connection.protocol().initialize('mydriver/0.0.0', basicAuthToken());
connection.write(RequestMessage.run('RETURN 1.0', {}), {}, false);
connection.write(RequestMessage.pullAll(), pullAllObserver, true);
});
connection.connect('mydriver/0.0.0', basicAuthToken())
.then(() => {
connection.protocol().run('RETURN 1.0', {}, streamObserver);
});
});

it('should write protocol handshake', () => {
Expand All @@ -107,10 +107,11 @@ describe('Connection', () => {
connection._negotiateProtocol();

const boltMagicPreamble = '60 60 b0 17';
const protocolVersion3 = '00 00 00 03';
const protocolVersion2 = '00 00 00 02';
const protocolVersion1 = '00 00 00 01';
const noProtocolVersion = '00 00 00 00';
expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `);
expect(observer.instance.toHex()).toBe(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `);
});

it('should provide error message when connecting to http-port', done => {
Expand Down
3 changes: 2 additions & 1 deletion test/internal/protocol-handshaker.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ describe('ProtocolHandshaker', () => {
expect(writtenBuffers.length).toEqual(1);

const boltMagicPreamble = '60 60 b0 17';
const protocolVersion3 = '00 00 00 03';
const protocolVersion2 = '00 00 00 02';
const protocolVersion1 = '00 00 00 01';
const noProtocolVersion = '00 00 00 00';

expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} ${noProtocolVersion} `);
expect(writtenBuffers[0].toHex()).toEqual(`${boltMagicPreamble} ${protocolVersion3} ${protocolVersion2} ${protocolVersion1} ${noProtocolVersion} `);
});

it('should create protocol with valid version', () => {
Expand Down
Loading