Skip to content

Bolt V3 support & transaction configuration #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class Driver {
session(mode, bookmarkOrBookmarks) {
const sessionMode = Driver._validateSessionMode(mode);
const connectionProvider = this._getOrCreateConnectionProvider();
const bookmark = new Bookmark(bookmarkOrBookmarks);
const bookmark = bookmarkOrBookmarks ? new Bookmark(bookmarkOrBookmarks) : Bookmark.empty();
return new Session(sessionMode, connectionProvider, bookmark, this._config);
}

Expand Down
45 changes: 41 additions & 4 deletions src/v1/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
import RequestMessage from './request-message';
import * as v1 from './packstream-v1';
import {newError} from '../error';
import Bookmark from './bookmark';
import TxConfig from './tx-config';

export default class BoltProtocol {

Expand Down Expand Up @@ -49,6 +52,15 @@ export default class BoltProtocol {
return this._unpacker;
}

/**
* Transform metadata received in SUCCESS message before it is passed to the handler.
* @param {object} metadata the received metadata.
* @return {object} transformed metadata.
*/
transformMetadata(metadata) {
return metadata;
}

/**
* Perform initialization and authentication of the underlying connection.
* @param {string} clientName the client name.
Expand All @@ -63,9 +75,12 @@ export default class BoltProtocol {
/**
* Begin an explicit transaction.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @param {StreamObserver} observer the response observer.
*/
beginTransaction(bookmark, observer) {
beginTransaction(bookmark, txConfig, observer) {
assertTxConfigIsEmpty(txConfig, this._connection, observer);

const runMessage = RequestMessage.run('BEGIN', bookmark.asBeginTransactionParameters());
const pullAllMessage = RequestMessage.pullAll();

Expand All @@ -78,24 +93,29 @@ export default class BoltProtocol {
* @param {StreamObserver} observer the response observer.
*/
commitTransaction(observer) {
this.run('COMMIT', {}, observer);
this.run('COMMIT', {}, Bookmark.empty(), TxConfig.empty(), observer);
}

/**
* Rollback the explicit transaction.
* @param {StreamObserver} observer the response observer.
*/
rollbackTransaction(observer) {
this.run('ROLLBACK', {}, observer);
this.run('ROLLBACK', {}, Bookmark.empty(), TxConfig.empty(), observer);
}

/**
* Send a Cypher statement through the underlying connection.
* @param {string} statement the cypher statement.
* @param {object} parameters the statement parameters.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {StreamObserver} observer the response observer.
*/
run(statement, parameters, observer) {
run(statement, parameters, bookmark, txConfig, observer) {
// bookmark is ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._connection, observer);

const runMessage = RequestMessage.run(statement, parameters);
const pullAllMessage = RequestMessage.pullAll();

Expand All @@ -120,3 +140,20 @@ export default class BoltProtocol {
return new v1.Unpacker(disableLosslessIntegers);
}
}

/**
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {Connection} connection the connection.
* @param {StreamObserver} observer the response observer.
*/
function assertTxConfigIsEmpty(txConfig, connection, observer) {
if (!txConfig.isEmpty()) {
const error = newError('Driver is connected to the database that does not support transaction configuration. ' +
'Please upgrade to neo4j 3.5.0 or later in order to use this functionality');

// unsupported API was used, consider this a fatal error for the current connection
connection._handleFatalError(error);
observer.onError(error);
throw error;
}
}
81 changes: 81 additions & 0 deletions src/v1/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import BoltProtocolV2 from './bolt-protocol-v2';
import RequestMessage from './request-message';

export default class BoltProtocol extends BoltProtocolV2 {

constructor(connection, chunker, disableLosslessIntegers) {
super(connection, chunker, disableLosslessIntegers);
}

transformMetadata(metadata) {
if (metadata.t_first) {
// Bolt V3 uses shorter key 't_first' to represent 'result_available_after'
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
metadata.result_available_after = metadata.t_first;
delete metadata.t_first;
}
if (metadata.t_last) {
// Bolt V3 uses shorter key 't_last' to represent 'result_consumed_after'
// adjust the key to be the same as in Bolt V1 so that ResultSummary can retrieve the value
metadata.result_consumed_after = metadata.t_last;
delete metadata.t_last;
}
return metadata;
}

initialize(userAgent, authToken, observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.hello(userAgent, authToken);
this._connection.write(message, observer, true);
}

beginTransaction(bookmark, txConfig, observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.begin(bookmark, txConfig);
this._connection.write(message, observer, true);
}

commitTransaction(observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.commit();
this._connection.write(message, observer, true);
}

rollbackTransaction(observer) {
prepareToHandleSingleResponse(observer);
const message = RequestMessage.rollback();
this._connection.write(message, observer, true);
}

run(statement, parameters, bookmark, txConfig, observer) {
const runMessage = RequestMessage.runWithMetadata(statement, parameters, bookmark, txConfig);
const pullAllMessage = RequestMessage.pullAll();

this._connection.write(runMessage, observer, false);
this._connection.write(pullAllMessage, observer, true);
}
}

function prepareToHandleSingleResponse(observer) {
if (observer && typeof observer.prepareToHandleSingleResponse === 'function') {
observer.prepareToHandleSingleResponse();
}
}
14 changes: 14 additions & 0 deletions src/v1/internal/bookmark.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default class Bookmark {
this._maxValue = maxBookmark(this._values);
}

static empty() {
return EMPTY_BOOKMARK;
}

/**
* Check if the given bookmark is meaningful and can be send to the database.
* @return {boolean} returns <code>true</code> bookmark has a value, <code>false</code> otherwise.
Expand All @@ -52,6 +56,14 @@ export default class Bookmark {
return this._maxValue;
}

/**
* Get all bookmark values as an array.
* @return {string[]} all values.
*/
values() {
return this._values;
}

/**
* Get this bookmark as an object for begin transaction call.
* @return {object} the value of this bookmark as object.
Expand All @@ -72,6 +84,8 @@ export default class Bookmark {
}
}

const EMPTY_BOOKMARK = new Bookmark(null);

/**
* Converts given value to an array.
* @param {string|string[]} [value=undefined] argument to convert.
Expand Down
4 changes: 2 additions & 2 deletions src/v1/internal/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ export default class Connection {
* failing, and the connection getting ejected from the session pool.
*
* @param error an error object, forwarded to all current and future subscribers
* @protected
*/
_handleFatalError(error) {
this._isBroken = true;
Expand Down Expand Up @@ -267,7 +266,8 @@ export default class Connection {
this._log.debug(`${this} S: SUCCESS ${JSON.stringify(msg)}`);
}
try {
this._currentObserver.onCompleted( payload );
const metadata = this._protocol.transformMetadata(payload);
this._currentObserver.onCompleted(metadata);
} finally {
this._updateCurrentObserver();
}
Expand Down
24 changes: 14 additions & 10 deletions src/v1/internal/protocol-handshaker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {alloc} from './buf';
import {newError} from '../error';
import BoltProtocolV1 from './bolt-protocol-v1';
import BoltProtocolV2 from './bolt-protocol-v2';
import BoltProtocolV3 from './bolt-protocol-v3';

const HTTP_MAGIC_PREAMBLE = 1213486160; // == 0x48545450 == "HTTP"
const BOLT_MAGIC_PREAMBLE = 0x6060B017;
Expand Down Expand Up @@ -69,15 +70,18 @@ export default class ProtocolHandshaker {
* @private
*/
_createProtocolWithVersion(version) {
if (version === 1) {
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
} else if (version === 2) {
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
} else if (version === HTTP_MAGIC_PREAMBLE) {
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
} else {
throw newError('Unknown Bolt protocol version: ' + version);
switch (version) {
case 1:
return new BoltProtocolV1(this._connection, this._chunker, this._disableLosslessIntegers);
case 2:
return new BoltProtocolV2(this._connection, this._chunker, this._disableLosslessIntegers);
case 3:
return new BoltProtocolV3(this._connection, this._chunker, this._disableLosslessIntegers);
case HTTP_MAGIC_PREAMBLE:
throw newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)');
default:
throw newError('Unknown Bolt protocol version: ' + version);
}
}
}
Expand All @@ -93,10 +97,10 @@ function newHandshakeBuffer() {
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE);

//proposed versions
handshakeBuffer.writeInt32(3);
handshakeBuffer.writeInt32(2);
handshakeBuffer.writeInt32(1);
handshakeBuffer.writeInt32(0);
handshakeBuffer.writeInt32(0);

// reset the reader position
handshakeBuffer.reset();
Expand Down
85 changes: 82 additions & 3 deletions src/v1/internal/request-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/

// Signature bytes for each request message type
const INIT = 0x01; // 0000 0001 // INIT <user_agent>
const INIT = 0x01; // 0000 0001 // INIT <user_agent> <authentication_token>
const ACK_FAILURE = 0x0E; // 0000 1110 // ACK_FAILURE - unused
const RESET = 0x0F; // 0000 1111 // RESET
const RUN = 0x10; // 0001 0000 // RUN <statement> <parameters>
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD * - unused
const PULL_ALL = 0x3F; // 0011 1111 // PULL *
const DISCARD_ALL = 0x2F; // 0010 1111 // DISCARD_ALL - unused
const PULL_ALL = 0x3F; // 0011 1111 // PULL_ALL

const HELLO = 0x01; // 0000 0001 // HELLO <metadata>
const BEGIN = 0x11; // 0001 0001 // BEGIN <metadata>
const COMMIT = 0x12; // 0001 0010 // COMMIT
const ROLLBACK = 0x13; // 0001 0011 // ROLLBACK

export default class RequestMessage {

Expand Down Expand Up @@ -68,8 +73,82 @@ export default class RequestMessage {
static reset() {
return RESET_MESSAGE;
}

/**
* Create a new HELLO message.
* @param {string} userAgent the user agent.
* @param {object} authToken the authentication token.
* @return {RequestMessage} new HELLO message.
*/
static hello(userAgent, authToken) {
const metadata = Object.assign({user_agent: userAgent}, authToken);
return new RequestMessage(HELLO, [metadata], () => `HELLO {user_agent: '${userAgent}', ...}`);
}

/**
* Create a new BEGIN message.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @return {RequestMessage} new BEGIN message.
*/
static begin(bookmark, txConfig) {
const metadata = buildTxMetadata(bookmark, txConfig);
return new RequestMessage(BEGIN, [metadata], () => `BEGIN ${JSON.stringify(metadata)}`);
}

/**
* Get a COMMIT message.
* @return {RequestMessage} the COMMIT message.
*/
static commit() {
return COMMIT_MESSAGE;
}

/**
* Get a ROLLBACK message.
* @return {RequestMessage} the ROLLBACK message.
*/
static rollback() {
return ROLLBACK_MESSAGE;
}

/**
* Create a new RUN message with additional metadata.
* @param {string} statement the cypher statement.
* @param {object} parameters the statement parameters.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @return {RequestMessage} new RUN message with additional metadata.
*/
static runWithMetadata(statement, parameters, bookmark, txConfig) {
const metadata = buildTxMetadata(bookmark, txConfig);
return new RequestMessage(RUN, [statement, parameters, metadata],
() => `RUN ${statement} ${JSON.stringify(parameters)} ${JSON.stringify(metadata)}`);
}
}

/**
* Create an object that represent transaction metadata.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @return {object} a metadata object.
*/
function buildTxMetadata(bookmark, txConfig) {
const metadata = {};
if (!bookmark.isEmpty()) {
metadata['bookmarks'] = bookmark.values();
}
if (txConfig.timeout) {
metadata['tx_timeout'] = txConfig.timeout;
}
if (txConfig.metadata) {
metadata['tx_metadata'] = txConfig.metadata;
}
return metadata;
}

// constants for messages that never change
const PULL_ALL_MESSAGE = new RequestMessage(PULL_ALL, [], () => 'PULL_ALL');
const RESET_MESSAGE = new RequestMessage(RESET, [], () => 'RESET');
const COMMIT_MESSAGE = new RequestMessage(COMMIT, [], () => 'COMMIT');
const ROLLBACK_MESSAGE = new RequestMessage(ROLLBACK, [], () => 'ROLLBACK');
Loading