Skip to content

Allow initial bookmark in session #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion neokit
Submodule neokit updated 1 files
+2 −2 neoget.py
35 changes: 19 additions & 16 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class Driver {
Driver._validateConnection.bind(this),
config.connectionPoolSize
);
this._connectionProvider = this._createConnectionProvider(url, this._pool);
this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this));
}

/**
Expand Down Expand Up @@ -108,20 +108,14 @@ class Driver {
* it is returned to the pool, the session will be reset to a clean state and
* made available for others to use.
*
* @param {String} mode of session - optional
* @param {string} [mode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string} [bookmark=null] the initial reference to some previous transaction. Value is optional and
* absence indicates that that the bookmark does not exist or is unknown.
* @return {Session} new session.
*/
session(mode) {
session(mode, bookmark) {
const sessionMode = Driver._validateSessionMode(mode);
const connectionPromise = this._connectionProvider.acquireConnection(sessionMode);
connectionPromise.catch((err) => {
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
this.onError(err);
} else {
//we don't need to tell the driver about this error
}
});
return this._createSession(connectionPromise);
return this._createSession(sessionMode, this._connectionProvider, bookmark);
}

static _validateSessionMode(rawMode) {
Expand All @@ -133,13 +127,22 @@ class Driver {
}

//Extension point
_createConnectionProvider(address, connectionPool) {
return new DirectConnectionProvider(address, connectionPool);
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
return new DirectConnectionProvider(address, connectionPool, driverOnErrorCallback);
}

//Extension point
_createSession(connectionPromise) {
return new Session(connectionPromise);
_createSession(mode, connectionProvider, bookmark) {
return new Session(mode, connectionProvider, bookmark);
}

_driverOnErrorCallback(error) {
const userDefinedOnErrorCallback = this.onError;
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
userDefinedOnErrorCallback(error);
} else {
// we don't need to tell the driver about this error
}
}

/**
Expand Down
137 changes: 137 additions & 0 deletions src/v1/internal/connection-holder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2002-2017 "Neo Technology,","
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {newError} from '../error';

/**
* Utility to lazily initialize connections and return them back to the pool when unused.
*/
export default class ConnectionHolder {

/**
* @constructor
* @param {string} mode - the access mode for new connection holder.
* @param {ConnectionProvider} connectionProvider - the connection provider to acquire connections from.
*/
constructor(mode, connectionProvider) {
this._mode = mode;
this._connectionProvider = connectionProvider;
this._referenceCount = 0;
this._connectionPromise = Promise.resolve(null);
}

/**
* Make this holder initialize new connection if none exists already.
* @return {undefined}
*/
initializeConnection() {
if (this._referenceCount === 0) {
this._connectionPromise = this._connectionProvider.acquireConnection(this._mode);
}
this._referenceCount++;
}

/**
* Get the current connection promise.
* @return {Promise<Connection>} promise resolved with the current connection.
*/
getConnection() {
return this._connectionPromise;
}

/**
* Notify this holder that single party does not require current connection any more.
* @return {Promise<Connection>} promise resolved with the current connection.
*/
releaseConnection() {
if (this._referenceCount === 0) {
return this._connectionPromise;
}

this._referenceCount--;
if (this._referenceCount === 0) {
// release a connection without muting ACK_FAILURE, this is the last action on this connection
return this._releaseConnection(true);
}
return this._connectionPromise;
}

/**
* Closes this holder and releases current connection (if any) despite any existing users.
* @return {Promise<Connection>} promise resolved when current connection is released to the pool.
*/
close() {
if (this._referenceCount === 0) {
return this._connectionPromise;
}
this._referenceCount = 0;
// release a connection and mute ACK_FAILURE, this might be called concurrently with other
// operations and thus should ignore failure handling
return this._releaseConnection(false);
}

/**
* Return the current pooled connection instance to the connection pool.
* We don't pool Session instances, to avoid users using the Session after they've called close.
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
* @return {Promise} - promise resolved then connection is returned to the pool.
* @private
*/
_releaseConnection(sync) {
this._connectionPromise = this._connectionPromise.then(connection => {
if (connection) {
if(sync) {
connection.reset();
} else {
connection.resetAsync();
}
connection.sync();
connection._release();
}
}).catch(ignoredError => {
});

return this._connectionPromise;
}
}

class EmptyConnectionHolder extends ConnectionHolder {

initializeConnection() {
// nothing to initialize
}

getConnection() {
return Promise.reject(newError('This connection holder does not serve connections'));
}

releaseConnection() {
return Promise.resolve();
}

close() {
return Promise.resolve();
}
}

/**
* Connection holder that does not manage any connections.
* @type {ConnectionHolder}
*/
export const EMPTY_CONNECTION_HOLDER = new EmptyConnectionHolder();
40 changes: 35 additions & 5 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,46 @@ class ConnectionProvider {
acquireConnection(mode) {
throw new Error('Abstract method');
}

_withAdditionalOnErrorCallback(connectionPromise, driverOnErrorCallback) {
// install error handler from the driver on the connection promise; this callback is installed separately
// so that it does not handle errors, instead it is just an additional error reporting facility.
connectionPromise.catch(error => {
driverOnErrorCallback(error)
});
// return the original connection promise
return connectionPromise;
}
}

export class DirectConnectionProvider extends ConnectionProvider {

constructor(address, connectionPool) {
constructor(address, connectionPool, driverOnErrorCallback) {
super();
this._address = address;
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
}

acquireConnection(mode) {
return Promise.resolve(this._connectionPool.acquire(this._address));
const connection = this._connectionPool.acquire(this._address);
const connectionPromise = Promise.resolve(connection);
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
}
}

export class LoadBalancer extends ConnectionProvider {

constructor(address, connectionPool) {
constructor(address, connectionPool, driverOnErrorCallback) {
super();
this._routingTable = new RoutingTable(new RoundRobinArray([address]));
this._rediscovery = new Rediscovery();
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
}

acquireConnection(mode) {
return this._freshRoutingTable().then(routingTable => {
const connectionPromise = this._freshRoutingTable().then(routingTable => {
if (mode === READ) {
return this._acquireConnectionToServer(routingTable.readers, 'read');
} else if (mode === WRITE) {
Expand All @@ -63,6 +77,7 @@ export class LoadBalancer extends ConnectionProvider {
throw newError('Illegal mode ' + mode);
}
});
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
}

forget(address) {
Expand Down Expand Up @@ -132,7 +147,8 @@ export class LoadBalancer extends ConnectionProvider {
_createSessionForRediscovery(routerAddress) {
const connection = this._connectionPool.acquire(routerAddress);
const connectionPromise = Promise.resolve(connection);
return new Session(connectionPromise);
const connectionProvider = new SingleConnectionProvider(connectionPromise);
return new Session(READ, connectionProvider);
}

_updateRoutingTable(newRoutingTable) {
Expand All @@ -153,3 +169,17 @@ export class LoadBalancer extends ConnectionProvider {
}
}
}

export class SingleConnectionProvider extends ConnectionProvider {

constructor(connectionPromise) {
super();
this._connectionPromise = connectionPromise;
}

acquireConnection(mode) {
const connectionPromise = this._connectionPromise;
this._connectionPromise = null;
return connectionPromise;
}
}
15 changes: 11 additions & 4 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ function log(actor, msg) {
}
}


function NO_OP(){}

let NO_OP_OBSERVER = {
Expand Down Expand Up @@ -384,9 +383,9 @@ class Connection {
this._chunker.messageBoundary();
}

/** Queue a RESET-message to be sent to the database */
reset( observer ) {
log("C", "RESET");
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
resetAsync( observer ) {
log("C", "RESET_ASYNC");
this._isHandlingFailure = true;
let self = this;
let wrappedObs = {
Expand All @@ -404,6 +403,14 @@ class Connection {
this._chunker.messageBoundary();
}

/** Queue a RESET-message to be sent to the database */
reset(observer) {
log('C', 'RESET');
this._queueObserver(observer);
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
this._chunker.messageBoundary();
}

/** Queue a ACK_FAILURE-message to be sent to the database */
_ackFailure( observer ) {
log("C", "ACK_FAILURE");
Expand Down
6 changes: 3 additions & 3 deletions src/v1/internal/get-servers-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
* limitations under the License.
*/

import RoundRobinArray from "./round-robin-array";
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error";
import Integer, {int} from "../integer";
import RoundRobinArray from './round-robin-array';
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error';
import Integer, {int} from '../integer';

const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
Expand Down
Loading