Skip to content

Commit fa2ce28

Browse files
authored
Merge pull request #238 from lutovich/1.3-read-when-no-writer
Read in absence of viable writer
2 parents 6dc11f5 + cf43106 commit fa2ce28

11 files changed

+442
-162
lines changed

src/v1/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ function driver(url, authToken, config = {}) {
132132
return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config);
133133
} else if (scheme === 'bolt://') {
134134
if (!isEmptyObjectOrNull(routingContext)) {
135-
throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
135+
throw new Error(`Parameters are not supported with scheme 'bolt'. Given URL: '${url}'`);
136136
}
137137
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
138138
} else {

src/v1/internal/connection-providers.js

+75-34
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
import {newError, SERVICE_UNAVAILABLE} from '../error';
20+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../error';
2121
import {READ, WRITE} from '../driver';
2222
import Session from '../session';
2323
import RoundRobinArray from './round-robin-array';
@@ -70,16 +70,17 @@ export class LoadBalancer extends ConnectionProvider {
7070
this._connectionPool = connectionPool;
7171
this._driverOnErrorCallback = driverOnErrorCallback;
7272
this._hostNameResolver = LoadBalancer._createHostNameResolver();
73+
this._useSeedRouter = false;
7374
}
7475

75-
acquireConnection(mode) {
76-
const connectionPromise = this._freshRoutingTable().then(routingTable => {
77-
if (mode === READ) {
76+
acquireConnection(accessMode) {
77+
const connectionPromise = this._freshRoutingTable(accessMode).then(routingTable => {
78+
if (accessMode === READ) {
7879
return this._acquireConnectionToServer(routingTable.readers, 'read');
79-
} else if (mode === WRITE) {
80+
} else if (accessMode === WRITE) {
8081
return this._acquireConnectionToServer(routingTable.writers, 'write');
8182
} else {
82-
throw newError('Illegal mode ' + mode);
83+
throw newError('Illegal mode ' + accessMode);
8384
}
8485
});
8586
return this._withAdditionalOnErrorCallback(connectionPromise, this._driverOnErrorCallback);
@@ -97,15 +98,17 @@ export class LoadBalancer extends ConnectionProvider {
9798
_acquireConnectionToServer(serversRoundRobinArray, serverName) {
9899
const address = serversRoundRobinArray.next();
99100
if (!address) {
100-
return Promise.reject(newError('No ' + serverName + ' servers available', SERVICE_UNAVAILABLE));
101+
return Promise.reject(newError(
102+
`Failed to obtain connection towards ${serverName} server. Known routing table is: ${this._routingTable}`,
103+
SESSION_EXPIRED));
101104
}
102105
return this._connectionPool.acquire(address);
103106
}
104107

105-
_freshRoutingTable() {
108+
_freshRoutingTable(accessMode) {
106109
const currentRoutingTable = this._routingTable;
107110

108-
if (!currentRoutingTable.isStale()) {
111+
if (!currentRoutingTable.isStaleFor(accessMode)) {
109112
return Promise.resolve(currentRoutingTable);
110113
}
111114
return this._refreshRoutingTable(currentRoutingTable);
@@ -114,48 +117,73 @@ export class LoadBalancer extends ConnectionProvider {
114117
_refreshRoutingTable(currentRoutingTable) {
115118
const knownRouters = currentRoutingTable.routers.toArray();
116119

117-
return this._fetchNewRoutingTable(knownRouters, currentRoutingTable).then(newRoutingTable => {
118-
if (LoadBalancer._isValidRoutingTable(newRoutingTable)) {
119-
// one of the known routers returned a valid routing table - use it
120+
if (this._useSeedRouter) {
121+
return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters(knownRouters, currentRoutingTable);
122+
}
123+
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(knownRouters, currentRoutingTable);
124+
}
125+
126+
_fetchRoutingTableFromSeedRouterFallbackToKnownRouters(knownRouters, currentRoutingTable) {
127+
// we start with seed router, no routers were probed before
128+
const seenRouters = [];
129+
return this._fetchRoutingTableUsingSeedRouter(seenRouters, this._seedRouter).then(newRoutingTable => {
130+
if (newRoutingTable) {
131+
this._useSeedRouter = false;
120132
return newRoutingTable;
121133
}
122134

123-
if (!newRoutingTable) {
124-
// returned routing table was undefined, this means a connection error happened and the last known
125-
// router did not return a valid routing table, so we need to forget it
126-
const lastRouterIndex = knownRouters.length - 1;
127-
LoadBalancer._forgetRouter(currentRoutingTable, knownRouters, lastRouterIndex);
135+
// seed router did not return a valid routing table - try to use other known routers
136+
return this._fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable);
137+
}).then(newRoutingTable => {
138+
this._applyRoutingTableIfPossible(newRoutingTable);
139+
return newRoutingTable;
140+
});
141+
}
142+
143+
_fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(knownRouters, currentRoutingTable) {
144+
return this._fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable).then(newRoutingTable => {
145+
if (newRoutingTable) {
146+
return newRoutingTable;
128147
}
129148

130149
// none of the known routers returned a valid routing table - try to use seed router address for rediscovery
131-
return this._fetchNewRoutingTableUsingSeedRouterAddress(knownRouters, this._seedRouter);
150+
return this._fetchRoutingTableUsingSeedRouter(knownRouters, this._seedRouter);
132151
}).then(newRoutingTable => {
133-
if (LoadBalancer._isValidRoutingTable(newRoutingTable)) {
134-
this._updateRoutingTable(newRoutingTable);
152+
this._applyRoutingTableIfPossible(newRoutingTable);
153+
return newRoutingTable;
154+
});
155+
}
156+
157+
_fetchRoutingTableUsingKnownRouters(knownRouters, currentRoutingTable) {
158+
return this._fetchRoutingTable(knownRouters, currentRoutingTable).then(newRoutingTable => {
159+
if (newRoutingTable) {
160+
// one of the known routers returned a valid routing table - use it
135161
return newRoutingTable;
136162
}
137163

138-
// none of the existing routers returned valid routing table, throw exception
139-
throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE);
164+
// returned routing table was undefined, this means a connection error happened and the last known
165+
// router did not return a valid routing table, so we need to forget it
166+
const lastRouterIndex = knownRouters.length - 1;
167+
LoadBalancer._forgetRouter(currentRoutingTable, knownRouters, lastRouterIndex);
168+
169+
return null;
140170
});
141171
}
142172

143-
_fetchNewRoutingTableUsingSeedRouterAddress(knownRouters, seedRouter) {
173+
_fetchRoutingTableUsingSeedRouter(seenRouters, seedRouter) {
144174
return this._hostNameResolver.resolve(seedRouter).then(resolvedRouterAddresses => {
145175
// filter out all addresses that we've already tried
146-
const newAddresses = resolvedRouterAddresses.filter(address => knownRouters.indexOf(address) < 0);
147-
return this._fetchNewRoutingTable(newAddresses, null);
176+
const newAddresses = resolvedRouterAddresses.filter(address => seenRouters.indexOf(address) < 0);
177+
return this._fetchRoutingTable(newAddresses, null);
148178
});
149179
}
150180

151-
_fetchNewRoutingTable(routerAddresses, routingTable) {
181+
_fetchRoutingTable(routerAddresses, routingTable) {
152182
return routerAddresses.reduce((refreshedTablePromise, currentRouter, currentIndex) => {
153183
return refreshedTablePromise.then(newRoutingTable => {
154184
if (newRoutingTable) {
155-
if (!newRoutingTable.writers.isEmpty()) {
156-
// valid routing table was fetched - just return it, try next router otherwise
157-
return newRoutingTable;
158-
}
185+
// valid routing table was fetched - just return it, try next router otherwise
186+
return newRoutingTable;
159187
} else {
160188
// returned routing table was undefined, this means a connection error happened and we need to forget the
161189
// previous router and try the next one
@@ -179,6 +207,23 @@ export class LoadBalancer extends ConnectionProvider {
179207
return new Session(READ, connectionProvider);
180208
}
181209

210+
_applyRoutingTableIfPossible(newRoutingTable) {
211+
if (!newRoutingTable) {
212+
// none of routing servers returned valid routing table, throw exception
213+
throw newError(
214+
`Could not perform discovery. No routing servers available. Known routing table: ${this._routingTable}`,
215+
SERVICE_UNAVAILABLE);
216+
}
217+
218+
if (newRoutingTable.writers.isEmpty()) {
219+
// use seed router next time. this is important when cluster is partitioned. it tries to make sure driver
220+
// does not always get routing table without writers because it talks exclusively to a minority partition
221+
this._useSeedRouter = true;
222+
}
223+
224+
this._updateRoutingTable(newRoutingTable);
225+
}
226+
182227
_updateRoutingTable(newRoutingTable) {
183228
const currentRoutingTable = this._routingTable;
184229

@@ -190,10 +235,6 @@ export class LoadBalancer extends ConnectionProvider {
190235
this._routingTable = newRoutingTable;
191236
}
192237

193-
static _isValidRoutingTable(routingTable) {
194-
return routingTable && !routingTable.writers.isEmpty();
195-
}
196-
197238
static _forgetRouter(routingTable, routersArray, routerIndex) {
198239
const address = routersArray[routerIndex];
199240
if (routingTable && address) {

src/v1/internal/round-robin-array.js

+4
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,8 @@ export default class RoundRobinArray {
5959
remove(item) {
6060
this._items = this._items.filter(element => element !== item);
6161
}
62+
63+
toString() {
64+
return JSON.stringify(this._items);
65+
}
6266
}

src/v1/internal/routing-table.js

+18-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
import {int} from '../integer';
2020
import RoundRobinArray from './round-robin-array';
21+
import {READ, WRITE} from '../driver';
2122

2223
const MIN_ROUTERS = 1;
2324

@@ -53,14 +54,28 @@ export default class RoutingTable {
5354
return Array.from(oldServers);
5455
}
5556

56-
isStale() {
57+
/**
58+
* Check if this routing table is fresh to perform the required operation.
59+
* @param {string} accessMode the type of operation. Allowed values are {@link READ} and {@link WRITE}.
60+
* @return {boolean} <code>true</code> when this table contains servers to serve the required operation,
61+
* <code>false</code> otherwise.
62+
*/
63+
isStaleFor(accessMode) {
5764
return this.expirationTime.lessThan(Date.now()) ||
5865
this.routers.size() < MIN_ROUTERS ||
59-
this.readers.isEmpty() ||
60-
this.writers.isEmpty();
66+
accessMode === READ && this.readers.isEmpty() ||
67+
accessMode === WRITE && this.writers.isEmpty();
6168
}
6269

6370
_allServers() {
6471
return [...this.routers.toArray(), ...this.readers.toArray(), ...this.writers.toArray()];
6572
}
73+
74+
toString() {
75+
return `RoutingTable[` +
76+
`expirationTime=${this.expirationTime}, ` +
77+
`routers=${this.routers}, ` +
78+
`readers=${this.readers}, ` +
79+
`writers=${this.writers}]`;
80+
}
6681
}

0 commit comments

Comments
 (0)