Skip to content

Commit 591f2a7

Browse files
committed
Basic support for routing.
Support routing on session acquistion.
1 parent 960d125 commit 591f2a7

15 files changed

+526
-194
lines changed

src/v1/driver.js

+114-148
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import Session from './session';
2121
import Pool from './internal/pool';
2222
import Integer from './integer';
23-
import {connect, scheme} from "./internal/connector";
23+
import {connect, parseScheme, parseUrl} from "./internal/connector";
2424
import StreamObserver from './internal/stream-observer';
2525
import VERSION from '../version';
26+
import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "./error";
2627
import "babel-polyfill";
2728

2829
let READ = 'READ', WRITE = 'WRITE';
@@ -66,10 +67,10 @@ class Driver {
6667
* @return {Connection} new connector-api session instance, a low level session API.
6768
* @access private
6869
*/
69-
_createConnection(release) {
70+
_createConnection(url, release) {
7071
let sessionId = this._sessionIdGenerator++;
7172
let streamObserver = new _ConnectionStreamObserver(this);
72-
let conn = connect(this._url, this._config);
73+
let conn = connect(url, this._config);
7374
conn.initialize(this._userAgent, this._token, streamObserver);
7475
conn._id = sessionId;
7576
conn._release = () => release(this._url, conn);
@@ -112,11 +113,11 @@ class Driver {
112113
*/
113114
session() {
114115
let conn = this._pool.acquire(this._url);
115-
return this._createSession(conn);
116+
return this._createSession(Promise.resolve(conn));
116117
}
117118

118-
_createSession(conn) {
119-
return new Session(new Promise((resolve, reject) => resolve(conn)), (cb) => {
119+
_createSession(connectionPromise) {
120+
return new Session(connectionPromise, (cb) => {
120121
// This gets called on Session#close(), and is where we return
121122
// the pooled 'connection' instance.
122123

@@ -126,11 +127,14 @@ class Driver {
126127

127128
// Queue up a 'reset', to ensure the next user gets a clean
128129
// session to work with.
129-
conn.reset();
130-
conn.sync();
130+
connectionPromise.then( (conn) => {
131+
conn.reset();
132+
conn.sync();
133+
134+
// Return connection to the pool
135+
conn._release();
136+
});
131137

132-
// Return connection to the pool
133-
conn._release();
134138

135139
// Call user callback
136140
if (cb) {
@@ -161,7 +165,11 @@ class RoundRobinArray {
161165

162166
hop() {
163167
let elem = this._items[this._index];
164-
this._index = (this._index + 1) % (this._items.length - 1);
168+
if (this._items.length === 0) {
169+
this._index = 0;
170+
} else {
171+
this._index = (this._index + 1) % (this._items.length);
172+
}
165173
return elem;
166174
}
167175

@@ -198,36 +206,70 @@ class RoundRobinArray {
198206
this._index -= 1;
199207
}
200208
//make sure we are in range
201-
this._index %= (this._items.length - 1);
209+
if (this._items.length === 0) {
210+
this._index = 0;
211+
} else {
212+
this._index %= this._items.length;
213+
}
202214
}
203215
}
204216
}
205217

206218
let GET_SERVERS = "CALL dbms.cluster.routing.getServers";
207219

208220
class ClusterView {
209-
constructor(expires, routers, readers, writers) {
210-
this.expires = expires;
211-
this.routers = routers;
212-
this.readers = readers;
213-
this.routers = writers;
221+
constructor(routers, readers, writers, expires) {
222+
this.routers = routers || new RoundRobinArray();
223+
this.readers = readers || new RoundRobinArray();
224+
this.writers = writers || new RoundRobinArray();
225+
this._expires = expires || -1;
226+
227+
}
228+
229+
needsUpdate() {
230+
return this._expires < Date.now() ||
231+
this.routers.empty() ||
232+
this.readers.empty() ||
233+
this.writers.empty();
234+
}
235+
236+
all() {
237+
let seen = new Set(this.routers.toArray());
238+
let writers = this.writers.toArray();
239+
let readers = this.readers.toArray();
240+
for (let i = 0; i < writers.length; i++) {
241+
seen.add(writers[i]);
242+
}
243+
for (let i = 0; i < readers.length; i++) {
244+
seen.add(readers[i]);
245+
}
246+
return seen;
247+
}
248+
249+
remove(item) {
250+
this.routers.remove(item);
251+
this.readers.remove(item);
252+
this.writers.remove(item);
214253
}
215254
}
216255

217256
function newClusterView(session) {
218257
return session.run(GET_SERVERS)
219258
.then((res) => {
220259
session.close();
260+
if (res.records.length != 1) {
261+
return Promise.reject(newError("Invalid routing response from server", SERVICE_UNAVAILABLE));
262+
}
221263
let record = res.records[0];
222-
//Note we are loosing precision here but we are not
223-
//terribly worried since it is only
224-
//for dates more than 140000 years into the future.
264+
//Note we are loosing precision here but let's hope that in
265+
//the 140000 years to come before this precision loss
266+
//hits us, that we get native 64 bit integers in javascript
225267
let expires = record.get('ttl').toNumber();
226268
let servers = record.get('servers');
227269
let routers = new RoundRobinArray();
228270
let readers = new RoundRobinArray();
229271
let writers = new RoundRobinArray();
230-
for (let i = 0; i <= servers.length; i++) {
272+
for (let i = 0; i < servers.length; i++) {
231273
let server = servers[i];
232274

233275
let role = server['role'];
@@ -240,151 +282,75 @@ function newClusterView(session) {
240282
readers.pushAll(addresses);
241283
}
242284
}
243-
244-
return new ClusterView(expires, routers, readers, writers);
285+
return new ClusterView(routers, readers, writers, expires);
245286
});
246287
}
247288

248289
class RoutingDriver extends Driver {
249290

250291
constructor(url, userAgent = 'neo4j-javascript/0.0', token = {}, config = {}) {
251292
super(url, userAgent, token, config);
252-
this._routers = new RoundRobinArray();
253-
this._routers.push(url);
254-
this._readers = new RoundRobinArray();
255-
this._writers = new RoundRobinArray();
256-
this._expires = Date.now();
293+
this._clusterView = new ClusterView(new RoundRobinArray([parseUrl(url)]));
257294
}
258295

259-
//TODO make nice, expose constants?
260296
session(mode) {
261-
//Check so that we have servers available
262-
this._checkServers().then( () => {
263-
let conn = this._acquireConnection(mode);
264-
return this._createSession(conn);
265-
});
297+
let conn = this._acquireConnection(mode);
298+
return this._createSession(conn);
266299
}
267300

268-
async _checkServers() {
269-
if (this._expires < Date.now() ||
270-
this._routers.empty() ||
271-
this._readers.empty() ||
272-
this._writers.empty()) {
273-
return await this._callServers();
301+
_updatedClusterView() {
302+
if (!this._clusterView.needsUpdate()) {
303+
return Promise.resolve(this._clusterView);
274304
} else {
275-
return new Promise((resolve, reject) => resolve(false));
276-
}
277-
}
278-
279-
async _callServers() {
280-
let seen = this._allServers();
281-
//clear writers and readers
282-
this._writers.clear();
283-
this._readers.clear();
284-
//we have to wait to clear routers until
285-
//we have discovered new ones
286-
let newRouters = new RoundRobinArray();
287-
let success = false;
288-
289-
while (!this._routers.empty() && !success) {
290-
let url = this._routers.hop();
291-
try {
292-
let res = await this._call(url);
293-
console.log("got result");
294-
if (res.records.length != 1) continue;
295-
let record = res.records[0];
296-
//Note we are loosing precision here but we are not
297-
//terribly worried since it is only
298-
//for dates more than 140000 years into the future.
299-
this._expires += record.get('ttl').toNumber();
300-
let servers = record.get('servers');
301-
console.log(servers);
302-
for (let i = 0; i <= servers.length; i++) {
303-
let server = servers[i];
304-
seen.remove(server);
305-
306-
let role = server['role'];
307-
let addresses = server['addresses'];
308-
if (role === 'ROUTE') {
309-
newRouters.push(server);
310-
} else if (role === 'WRITE') {
311-
this._writers.push(server);
312-
} else if (role === 'READ') {
313-
this._readers.push(server);
314-
}
315-
}
316-
317-
if (newRouters.empty()) continue;
318-
//we have results
319-
this._routers = newRouters();
320-
//these are no longer valid according to server
321-
let self = this;
322-
seen.forEach((key) => {
323-
console.log("remove seen");
324-
self._pools.purge(key);
305+
let routers = this._clusterView.routers;
306+
let acc = Promise.reject();
307+
for (let i = 0; i < routers.size(); i++) {
308+
acc = acc.catch(() => {
309+
let conn = this._pool.acquire(routers.hop());
310+
let session = this._createSession(Promise.resolve(conn));
311+
return newClusterView(session).catch((err) => {
312+
this._forget(conn);
313+
return Promise.reject(err);
314+
});
325315
});
326-
success = true;
327-
return new Promise((resolve, reject) => resolve(true));
328-
} catch (error) {
329-
//continue
330-
console.log(error);
331-
this._forget(url);
332316
}
333-
}
334317

335-
let errorMsg = "Server could not perform discovery, please open a new driver with a different seed address.";
336-
if (this.onError) {
337-
this.onError(errorMsg);
318+
return acc;
338319
}
339-
340-
return new Promise((resolve, reject) => reject(errorMsg));
320+
}
321+
_diff(oldView, updatedView) {
322+
let oldSet = oldView.all();
323+
let newSet = updatedView.all();
324+
newSet.forEach((item) => {
325+
oldSet.delete(item);
326+
});
327+
return oldSet;
341328
}
342329

343330
_acquireConnection(mode) {
344-
//make sure we have enough servers
345331
let m = mode || WRITE;
346-
if (m === READ) {
347-
return this._pool.acquire(this._readers.hop());
348-
} else if (m === WRITE) {
349-
return this._pool.acquire(this._writers.hop());
350-
} else {
351-
//TODO fail
352-
}
353-
}
354-
355-
_allServers() {
356-
let seen = new Set(this._routers.toArray());
357-
let writers = this._writers.toArray();
358-
let readers = this._readers.toArray();
359-
for (let i = 0; i < writers.length; i++) {
360-
seen.add(writers[i]);
361-
}
362-
for (let i = 0; i < readers.length; i++) {
363-
seen.add(writers[i]);
364-
}
365-
return seen;
366-
}
367-
368-
async _call(url) {
369-
let conn = this._pool.acquire(url);
370-
let session = this._createSession(conn);
371-
return session.run(GET_SERVERS)
372-
.then((res) => {
373-
session.close();
374-
return res;
375-
}).catch((err) => {
376-
console.log(err);
377-
this._forget(url);
378-
return Promise.reject(err);
332+
//make sure we have enough servers
333+
return this._updatedClusterView().then((view) => {
334+
let toRemove = this._diff(this._clusterView, view);
335+
let self = this;
336+
toRemove.forEach((url) => {
337+
self._pool.purge(url);
379338
});
339+
//update our cached view
340+
this._clusterView = view;
341+
if (m === READ) {
342+
return this._pool.acquire(view.readers.hop());
343+
} else if (m === WRITE) {
344+
return this._pool.acquire(view.writers.hop());
345+
} else {
346+
return Promise.reject(m + " is not a valid option");
347+
}
348+
});
380349
}
381350

382351
_forget(url) {
383-
console.log("forget");
384-
this._pools.purge(url);
385-
this._routers.remove(url);
386-
this._readers.remove(url);
387-
this._writers.remove(url);
352+
this._pool.purge(url);
353+
this._clusterView.remove(url);
388354
}
389355
}
390356

@@ -474,15 +440,15 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
474440
* @returns {Driver}
475441
*/
476442
function driver(url, authToken, config = {}) {
477-
let sch = scheme(url);
478-
if (sch === "bolt+routing://") {
479-
return new RoutingDriver(url, USER_AGENT, authToken, config);
480-
} else if (sch === "bolt://") {
481-
return new Driver(url, USER_AGENT, authToken, config);
443+
let scheme = parseScheme(url);
444+
if (scheme === "bolt+routing://") {
445+
return new RoutingDriver(parseUrl(url), USER_AGENT, authToken, config);
446+
} else if (scheme === "bolt://") {
447+
return new Driver(parseUrl(url), USER_AGENT, authToken, config);
482448
} else {
483-
throw new Error("Unknown scheme: " + sch);
449+
throw new Error("Unknown scheme: " + scheme);
484450

485451
}
486452
}
487453

488-
export {Driver, driver}
454+
export {Driver, driver, READ, WRITE}

src/v1/error.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
// A common place for constructing error objects, to keep them
2121
// uniform across the driver surface.
2222

23+
let SERVICE_UNAVAILABLE = 'ServiceUnavailable';
24+
let SESSION_EXPIRED = 'SessionExpired';
2325
function newError(message, code="N/A") {
2426
// TODO: Idea is that we can check the cod here and throw sub-classes
2527
// of Neo4jError as appropriate
@@ -36,5 +38,7 @@ class Neo4jError extends Error {
3638

3739
export {
3840
newError,
39-
Neo4jError
41+
Neo4jError,
42+
SERVICE_UNAVAILABLE,
43+
SESSION_EXPIRED
4044
}

0 commit comments

Comments
 (0)