Skip to content

Read in absence of viable writer #238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ function driver(url, authToken, config = {}) {
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
} else if (scheme === 'bolt://') {
if (!isEmptyObjectOrNull(routingContext)) {
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
throw new Error(`Parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
}
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
} else {
Expand Down
109 changes: 75 additions & 34 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

import {newError, SERVICE_UNAVAILABLE} from '../error';
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
import {READ, WRITE} from '../driver';
import Session from '../session';
import RoundRobinArray from './round-robin-array';
Expand Down Expand Up @@ -70,16 +70,17 @@ export class LoadBalancer extends ConnectionProvider {
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
this._hostNameResolver = LoadBalancer._createHostNameResolver();
this._useSeedRouter = false;
}

acquireConnection(mode) {
const connectionPromise = this._freshRoutingTable().then(routingTable => {
if (mode === READ) {
acquireConnection(accessMode) {
const connectionPromise = this._freshRoutingTable(accessMode).then(routingTable => {
if (accessMode === READ) {
return this._acquireConnectionToServer(routingTable.readers, 'read');
} else if (mode === WRITE) {
} else if (accessMode === WRITE) {
return this._acquireConnectionToServer(routingTable.writers, 'write');
} else {
throw newError('Illegal mode ' + mode);
throw newError('Illegal mode ' + accessMode);
}
});
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
Expand All @@ -97,15 +98,17 @@ export class LoadBalancer extends ConnectionProvider {
_acquireConnectionToServer(serversRoundRobinArray, serverName) {
const address = serversRoundRobinArray.next();
if (!address) {
return Promise.reject(newError('No ' + serverName + ' servers available', SERVICE_UNAVAILABLE));
return Promise.reject(newError(
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${this._routingTable}`,
SESSION_EXPIRED));
}
return this._connectionPool.acquire(address);
}

_freshRoutingTable() {
_freshRoutingTable(accessMode) {
const currentRoutingTable = this._routingTable;

if (!currentRoutingTable.isStale()) {
if (!currentRoutingTable.isStaleFor(accessMode)) {
return Promise.resolve(currentRoutingTable);
}
return this._refreshRoutingTable(currentRoutingTable);
Expand All @@ -114,48 +117,73 @@ export class LoadBalancer extends ConnectionProvider {
_refreshRoutingTable(currentRoutingTable) {
const knownRouters = currentRoutingTable.routers.toArray();

return this._fetchNewRoutingTable(knownRouters, currentRoutingTable).then(newRoutingTable => {
if (LoadBalancer._isValidRoutingTable(newRoutingTable)) {
// one of the known routers returned a valid routing table - use it
if (this._useSeedRouter) {
return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters(knownRouters, currentRoutingTable);
}
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(knownRouters, currentRoutingTable);
}

_fetchRoutingTableFromSeedRouterFallbackToKnownRouters(knownRouters, currentRoutingTable) {
// we start with seed router, no routers were probed before
const seenRouters = [];
return this._fetchRoutingTableUsingSeedRouter(seenRouters, this._seedRouter).then(newRoutingTable => {
if (newRoutingTable) {
this._useSeedRouter = false;
return newRoutingTable;
}

if (!newRoutingTable) {
// returned routing table was undefined, this means a connection error happened and the last known
// router did not return a valid routing table, so we need to forget it
const lastRouterIndex = knownRouters.length - 1;
LoadBalancer._forgetRouter(currentRoutingTable, knownRouters, lastRouterIndex);
// seed router did not return a valid routing table - try to use other known routers
return this._fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable);
}).then(newRoutingTable => {
this._applyRoutingTableIfPossible(newRoutingTable);
return newRoutingTable;
});
}

_fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(knownRouters, currentRoutingTable) {
return this._fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable).then(newRoutingTable => {
if (newRoutingTable) {
return newRoutingTable;
}

// none of the known routers returned a valid routing table - try to use seed router address for rediscovery
return this._fetchNewRoutingTableUsingSeedRouterAddress(knownRouters, this._seedRouter);
return this._fetchRoutingTableUsingSeedRouter(knownRouters, this._seedRouter);
}).then(newRoutingTable => {
if (LoadBalancer._isValidRoutingTable(newRoutingTable)) {
this._updateRoutingTable(newRoutingTable);
this._applyRoutingTableIfPossible(newRoutingTable);
return newRoutingTable;
});
}

_fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable) {
return this._fetchRoutingTable(knownRouters, currentRoutingTable).then(newRoutingTable => {
if (newRoutingTable) {
// one of the known routers returned a valid routing table - use it
return newRoutingTable;
}

// none of the existing routers returned valid routing table, throw exception
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
// returned routing table was undefined, this means a connection error happened and the last known
// router did not return a valid routing table, so we need to forget it
const lastRouterIndex = knownRouters.length - 1;
LoadBalancer._forgetRouter(currentRoutingTable, knownRouters, lastRouterIndex);

return null;
});
}

_fetchNewRoutingTableUsingSeedRouterAddress(knownRouters, seedRouter) {
_fetchRoutingTableUsingSeedRouter(seenRouters, seedRouter) {
return this._hostNameResolver.resolve(seedRouter).then(resolvedRouterAddresses => {
// filter out all addresses that we've already tried
const newAddresses = resolvedRouterAddresses.filter(address => knownRouters.indexOf(address) < 0);
return this._fetchNewRoutingTable(newAddresses, null);
const newAddresses = resolvedRouterAddresses.filter(address => seenRouters.indexOf(address) < 0);
return this._fetchRoutingTable(newAddresses, null);
});
}

_fetchNewRoutingTable(routerAddresses, routingTable) {
_fetchRoutingTable(routerAddresses, routingTable) {
return routerAddresses.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
return refreshedTablePromise.then(newRoutingTable => {
if (newRoutingTable) {
if (!newRoutingTable.writers.isEmpty()) {
// valid routing table was fetched - just return it, try next router otherwise
return newRoutingTable;
}
// valid routing table was fetched - just return it, try next router otherwise
return newRoutingTable;
} else {
// returned routing table was undefined, this means a connection error happened and we need to forget the
// previous router and try the next one
Expand All @@ -179,6 +207,23 @@ export class LoadBalancer extends ConnectionProvider {
return new Session(READ, connectionProvider);
}

_applyRoutingTableIfPossible(newRoutingTable) {
if (!newRoutingTable) {
// none of routing servers returned valid routing table, throw exception
throw newError(
`Could not perform discovery. No routing servers available. Known routing table: ${this._routingTable}`,
SERVICE_UNAVAILABLE);
}

if (newRoutingTable.writers.isEmpty()) {
// use seed router next time. this is important when cluster is partitioned. it tries to make sure driver
// does not always get routing table without writers because it talks exclusively to a minority partition
this._useSeedRouter = true;
}

this._updateRoutingTable(newRoutingTable);
}

_updateRoutingTable(newRoutingTable) {
const currentRoutingTable = this._routingTable;

Expand All @@ -190,10 +235,6 @@ export class LoadBalancer extends ConnectionProvider {
this._routingTable = newRoutingTable;
}

static _isValidRoutingTable(routingTable) {
return routingTable && !routingTable.writers.isEmpty();
}

static _forgetRouter(routingTable, routersArray, routerIndex) {
const address = routersArray[routerIndex];
if (routingTable && address) {
Expand Down
4 changes: 4 additions & 0 deletions src/v1/internal/round-robin-array.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ export default class RoundRobinArray {
remove(item) {
this._items = this._items.filter(element => element !== item);
}

toString() {
return JSON.stringify(this._items);
}
}
21 changes: 18 additions & 3 deletions src/v1/internal/routing-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
import {int} from '../integer';
import RoundRobinArray from './round-robin-array';
import {READ, WRITE} from '../driver';

const MIN_ROUTERS = 1;

Expand Down Expand Up @@ -53,14 +54,28 @@ export default class RoutingTable {
return Array.from(oldServers);
}

isStale() {
/**
* Check if this routing table is fresh to perform the required operation.
* @param {string} accessMode the type of operation. Allowed values are {@link READ} and {@link WRITE}.
* @return {boolean} <code>true</code> when this table contains servers to serve the required operation,
* <code>false</code> otherwise.
*/
isStaleFor(accessMode) {
return this.expirationTime.lessThan(Date.now()) ||
this.routers.size() < MIN_ROUTERS ||
this.readers.isEmpty() ||
this.writers.isEmpty();
accessMode === READ && this.readers.isEmpty() ||
accessMode === WRITE && this.writers.isEmpty();
}

_allServers() {
return [...this.routers.toArray(), ...this.readers.toArray(), ...this.writers.toArray()];
}

toString() {
return `RoutingTable[` +
`expirationTime=${this.expirationTime}, ` +
`routers=${this.routers}, ` +
`readers=${this.readers}, ` +
`writers=${this.writers}]`;
}
}
Loading