Skip to content

Commit aff3b1d

Browse files
authored
Merge pull request #202 from lutovich/1.1-align-routing-to-spec
Align routing to spec
2 parents c9c4f1a + 79c54f4 commit aff3b1d

27 files changed

+1909
-493
lines changed

package.json

+3
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,15 @@
4949
"gulp-util": "^3.0.6",
5050
"gulp-watch": "^4.3.5",
5151
"jasmine-reporters": "^2.0.7",
52+
"lolex": "^1.5.2",
5253
"merge-stream": "^1.0.0",
5354
"minimist": "^1.2.0",
55+
"mustache": "^2.3.0",
5456
"phantomjs-prebuilt": "^2.1.7 ",
5557
"run-sequence": "^1.1.4",
5658
"semver": "^5.3.0",
5759
"through2": "~2.0.0",
60+
"tmp": "0.0.31",
5861
"vinyl-buffer": "^1.0.0",
5962
"vinyl-source-stream": "^1.1.0"
6063
},

src/v1/driver.js

+33-22
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Session from './session';
21-
import Pool from './internal/pool';
22-
import Integer from './integer';
20+
import Session from "./session";
21+
import Pool from "./internal/pool";
2322
import {connect} from "./internal/connector";
24-
import StreamObserver from './internal/stream-observer';
23+
import StreamObserver from "./internal/stream-observer";
2524
import {newError, SERVICE_UNAVAILABLE} from "./error";
2625

27-
let READ = 'READ', WRITE = 'WRITE';
26+
const READ = 'READ', WRITE = 'WRITE';
2827
/**
2928
* A driver maintains one or more {@link Session sessions} with a remote
3029
* Neo4j instance. Through the {@link Session sessions} you can send statements
@@ -111,38 +110,50 @@ class Driver {
111110
* @return {Session} new session.
112111
*/
113112
session(mode) {
114-
let connectionPromise = this._acquireConnection(mode);
113+
const sessionMode = Driver._validateSessionMode(mode);
114+
const connectionPromise = this._acquireConnection(sessionMode);
115115
connectionPromise.catch((err) => {
116116
if (this.onError && err.code === SERVICE_UNAVAILABLE) {
117117
this.onError(err);
118118
} else {
119119
//we don't need to tell the driver about this error
120120
}
121121
});
122-
return this._createSession(connectionPromise, (cb) => {
123-
// This gets called on Session#close(), and is where we return
124-
// the pooled 'connection' instance.
125-
126-
// We don't pool Session instances, to avoid users using the Session
127-
// after they've called close. The `Session` object is just a thin
128-
// wrapper around Connection anyway, so it makes little difference.
129-
130-
// Queue up a 'reset', to ensure the next user gets a clean
131-
// session to work with.
122+
return this._createSession(connectionPromise, this._releaseConnection(connectionPromise));
123+
}
132124

133-
connectionPromise.then( (conn) => {
125+
/**
126+
* The returned function gets called on Session#close(), and is where we return the pooled 'connection' instance.
127+
* We don't pool Session instances, to avoid users using the Session after they've called close.
128+
* The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference.
129+
* @param {Promise} connectionPromise - promise resolved with the connection.
130+
* @return {function(*=)} - function that releases the connection and then executes an optional callback.
131+
* @protected
132+
*/
133+
_releaseConnection(connectionPromise) {
134+
return userDefinedCallback => {
135+
connectionPromise.then(conn => {
136+
// Queue up a 'reset', to ensure the next user gets a clean session to work with.
134137
conn.reset();
135138
conn.sync();
136139

137140
// Return connection to the pool
138141
conn._release();
139-
}).catch( () => {/*ignore errors here*/});
142+
}).catch(ignoredError => {
143+
});
140144

141-
// Call user callback
142-
if (cb) {
143-
cb();
145+
if (userDefinedCallback) {
146+
userDefinedCallback();
144147
}
145-
});
148+
};
149+
}
150+
151+
static _validateSessionMode(rawMode) {
152+
const mode = rawMode || WRITE;
153+
if (mode !== READ && mode !== WRITE) {
154+
throw newError('Illegal session mode ' + mode);
155+
}
156+
return mode;
146157
}
147158

148159
//Extension point

src/v1/error.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
// A common place for constructing error objects, to keep them
2121
// uniform across the driver surface.
2222

23-
let SERVICE_UNAVAILABLE = 'ServiceUnavailable';
24-
let SESSION_EXPIRED = 'SessionExpired';
23+
const SERVICE_UNAVAILABLE = 'ServiceUnavailable';
24+
const SESSION_EXPIRED = 'SessionExpired';
25+
const PROTOCOL_ERROR = 'ProtocolError';
26+
2527
function newError(message, code="N/A") {
2628
// TODO: Idea is that we can check the code here and throw sub-classes
2729
// of Neo4jError as appropriate
@@ -40,5 +42,6 @@ export {
4042
newError,
4143
Neo4jError,
4244
SERVICE_UNAVAILABLE,
43-
SESSION_EXPIRED
45+
SESSION_EXPIRED,
46+
PROTOCOL_ERROR
4447
}

src/v1/index.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import {int, isInt, inSafeRange, toNumber, toString} from './integer';
2121
import {Node, Relationship, UnboundRelationship, PathSegment, Path} from './graph-types'
22-
import {Neo4jError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from './error';
22+
import {Neo4jError, SERVICE_UNAVAILABLE, SESSION_EXPIRED, PROTOCOL_ERROR} from './error';
2323
import Result from './result';
2424
import ResultSummary from './result-summary';
2525
import Record from './record';
@@ -138,7 +138,8 @@ const session = {
138138
};
139139
const error = {
140140
SERVICE_UNAVAILABLE,
141-
SESSION_EXPIRED
141+
SESSION_EXPIRED,
142+
PROTOCOL_ERROR
142143
};
143144
const integer = {
144145
toNumber,

src/v1/internal/get-servers-util.js

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import RoundRobinArray from "./round-robin-array";
21+
import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error";
22+
import Integer, {int} from "../integer";
23+
24+
const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers';
25+
const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound';
26+
27+
export default class GetServersUtil {
28+
29+
callGetServers(session, routerAddress) {
30+
return session.run(PROCEDURE_CALL).then(result => {
31+
session.close();
32+
return result.records;
33+
}).catch(error => {
34+
if (this._isProcedureNotFoundError(error)) {
35+
// throw when getServers procedure not found because this is clearly a configuration issue
36+
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
37+
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
38+
}
39+
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
40+
// different session towards a different router
41+
return null;
42+
});
43+
}
44+
45+
parseTtl(record, routerAddress) {
46+
try {
47+
const now = int(Date.now());
48+
const expires = record.get('ttl').multiply(1000).add(now);
49+
// if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed
50+
if (expires.lessThan(now)) {
51+
return Integer.MAX_VALUE;
52+
}
53+
return expires;
54+
} catch (error) {
55+
throw newError(
56+
'Unable to parse TTL entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record),
57+
PROTOCOL_ERROR);
58+
}
59+
}
60+
61+
parseServers(record, routerAddress) {
62+
try {
63+
const servers = record.get('servers');
64+
65+
const routers = new RoundRobinArray();
66+
const readers = new RoundRobinArray();
67+
const writers = new RoundRobinArray();
68+
69+
servers.forEach(server => {
70+
const role = server['role'];
71+
const addresses = server['addresses'];
72+
73+
if (role === 'ROUTE') {
74+
routers.pushAll(addresses);
75+
} else if (role === 'WRITE') {
76+
writers.pushAll(addresses);
77+
} else if (role === 'READ') {
78+
readers.pushAll(addresses);
79+
} else {
80+
throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR);
81+
}
82+
});
83+
84+
return {
85+
routers: routers,
86+
readers: readers,
87+
writers: writers
88+
}
89+
} catch (ignore) {
90+
throw newError(
91+
'Unable to parse servers entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record),
92+
PROTOCOL_ERROR);
93+
}
94+
}
95+
96+
_isProcedureNotFoundError(error) {
97+
let errorCode = error.code;
98+
if (!errorCode) {
99+
try {
100+
errorCode = error.fields[0].code;
101+
} catch (e) {
102+
errorCode = 'UNKNOWN';
103+
}
104+
}
105+
return errorCode === PROCEDURE_NOT_FOUND_CODE;
106+
}
107+
}

src/v1/internal/rediscovery.js

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import GetServersUtil from "./get-servers-util";
21+
import RoutingTable from "./routing-table";
22+
import {newError, PROTOCOL_ERROR} from "../error";
23+
24+
export default class Rediscovery {
25+
26+
constructor(getServersUtil) {
27+
this._getServersUtil = getServersUtil || new GetServersUtil();
28+
}
29+
30+
lookupRoutingTableOnRouter(session, routerAddress) {
31+
return this._getServersUtil.callGetServers(session, routerAddress).then(records => {
32+
if (records === null) {
33+
// connection error happened, unable to retrieve routing table from this router, next one should be queried
34+
return null;
35+
}
36+
37+
if (records.length !== 1) {
38+
throw newError('Illegal response from router "' + routerAddress + '". ' +
39+
'Received ' + records.length + ' records but expected only one.\n' + JSON.stringify(records),
40+
PROTOCOL_ERROR);
41+
}
42+
43+
const record = records[0];
44+
45+
const expirationTime = this._getServersUtil.parseTtl(record, routerAddress);
46+
const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress);
47+
48+
Rediscovery._assertNonEmpty(routers, 'routers', routerAddress);
49+
Rediscovery._assertNonEmpty(readers, 'readers', routerAddress);
50+
// case with no writers is processed higher in the promise chain because only RoutingDriver knows
51+
// how to deal with such table and how to treat router that returned such table
52+
53+
return new RoutingTable(routers, readers, writers, expirationTime);
54+
});
55+
}
56+
57+
static _assertNonEmpty(serversRoundRobinArray, serversName, routerAddress) {
58+
if (serversRoundRobinArray.isEmpty()) {
59+
throw newError('Received no ' + serversName + ' from router ' + routerAddress, PROTOCOL_ERROR);
60+
}
61+
}
62+
}

src/v1/internal/round-robin-array.js

+14-34
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,34 @@
2020
/**
2121
* An array that lets you hop through the elements endlessly.
2222
*/
23-
class RoundRobinArray {
23+
export default class RoundRobinArray {
24+
2425
constructor(items) {
2526
this._items = items || [];
26-
this._index = 0;
27+
this._offset = 0;
2728
}
2829

2930
next() {
30-
let elem = this._items[this._index];
31-
if (this._items.length === 0) {
32-
this._index = 0;
33-
} else {
34-
this._index = (this._index + 1) % (this._items.length);
31+
if (this.isEmpty()) {
32+
return null;
3533
}
36-
return elem;
37-
}
38-
39-
push(elem) {
40-
this._items.push(elem);
34+
const index = this._offset % this.size();
35+
this._offset++;
36+
return this._items[index];
4137
}
4238

4339
pushAll(elems) {
40+
if (!Array.isArray(elems)) {
41+
throw new TypeError('Array expected but got: ' + elems);
42+
}
43+
4444
Array.prototype.push.apply(this._items, elems);
4545
}
4646

47-
empty() {
47+
isEmpty() {
4848
return this._items.length === 0;
4949
}
5050

51-
clear() {
52-
this._items = [];
53-
this._index = 0;
54-
}
55-
5651
size() {
5752
return this._items.length;
5853
}
@@ -62,21 +57,6 @@ class RoundRobinArray {
6257
}
6358

6459
remove(item) {
65-
let index = this._items.indexOf(item);
66-
while (index != -1) {
67-
this._items.splice(index, 1);
68-
if (index < this._index) {
69-
this._index -= 1;
70-
}
71-
//make sure we are in range
72-
if (this._items.length === 0) {
73-
this._index = 0;
74-
} else {
75-
this._index %= this._items.length;
76-
}
77-
index = this._items.indexOf(item, index);
78-
}
60+
this._items = this._items.filter(element => element !== item);
7961
}
8062
}
81-
82-
export default RoundRobinArray

0 commit comments

Comments
 (0)