18
18
*/
19
19
20
20
import Session from './session' ;
21
- import { Pool } from './internal/pool' ;
22
- import { connect } from "./internal/connector" ;
21
+ import Pool from './internal/pool' ;
22
+ import Integer from './integer' ;
23
+ import { connect , scheme } from "./internal/connector" ;
23
24
import StreamObserver from './internal/stream-observer' ;
24
- import { VERSION } from '../version' ;
25
+ import VERSION from '../version' ;
26
+ import "babel-polyfill" ;
25
27
28
+ let READ = 'READ' , WRITE = 'WRITE' ;
26
29
/**
27
30
* A driver maintains one or more {@link Session sessions} with a remote
28
31
* Neo4j instance. Through the {@link Session sessions} you can send statements
@@ -53,7 +56,7 @@ class Driver {
53
56
this . _pool = new Pool (
54
57
this . _createConnection . bind ( this ) ,
55
58
this . _destroyConnection . bind ( this ) ,
56
- this . _validateConnection . bind ( this ) ,
59
+ Driver . _validateConnection . bind ( this ) ,
57
60
config . connectionPoolSize
58
61
) ;
59
62
}
@@ -63,7 +66,7 @@ class Driver {
63
66
* @return {Connection } new connector-api session instance, a low level session API.
64
67
* @access private
65
68
*/
66
- _createConnection ( release ) {
69
+ _createConnection ( release ) {
67
70
let sessionId = this . _sessionIdGenerator ++ ;
68
71
let streamObserver = new _ConnectionStreamObserver ( this ) ;
69
72
let conn = connect ( this . _url , this . _config ) ;
@@ -80,7 +83,7 @@ class Driver {
80
83
* @return {boolean } true if the connection is open
81
84
* @access private
82
85
**/
83
- _validateConnection ( conn ) {
86
+ static _validateConnection ( conn ) {
84
87
return conn . isOpen ( ) ;
85
88
}
86
89
@@ -89,7 +92,7 @@ class Driver {
89
92
* @return {Session } new session.
90
93
* @access private
91
94
*/
92
- _destroyConnection ( conn ) {
95
+ _destroyConnection ( conn ) {
93
96
delete this . _openSessions [ conn . _id ] ;
94
97
conn . close ( ) ;
95
98
}
@@ -109,7 +112,11 @@ class Driver {
109
112
*/
110
113
session ( ) {
111
114
let conn = this . _pool . acquire ( this . _url ) ;
112
- return new Session ( conn , ( cb ) => {
115
+ return this . _createSession ( conn ) ;
116
+ }
117
+
118
+ _createSession ( conn ) {
119
+ return new Session ( conn , ( cb ) => {
113
120
// This gets called on Session#close(), and is where we return
114
121
// the pooled 'connection' instance.
115
122
@@ -126,7 +133,9 @@ class Driver {
126
133
conn . _release ( ) ;
127
134
128
135
// Call user callback
129
- if ( cb ) { cb ( ) ; }
136
+ if ( cb ) {
137
+ cb ( ) ;
138
+ }
130
139
} ) ;
131
140
}
132
141
@@ -144,25 +153,212 @@ class Driver {
144
153
}
145
154
}
146
155
156
+ class RoundRobinArray {
157
+ constructor ( items ) {
158
+ this . _items = items || [ ] ;
159
+ this . _index = 0 ;
160
+ }
161
+
162
+ hop ( ) {
163
+ let elem = this . _items [ this . _index ] ;
164
+ this . _index = ( this . _index + 1 ) % ( this . _items . length - 1 ) ;
165
+ return elem ;
166
+ }
167
+
168
+ push ( elem ) {
169
+ this . _items . push ( elem ) ;
170
+ }
171
+
172
+ pushAll ( elems ) {
173
+ Array . prototype . push . apply ( this . _items , elems ) ;
174
+ }
175
+
176
+ empty ( ) {
177
+ return this . _items . length === 0 ;
178
+ }
179
+
180
+ clear ( ) {
181
+ this . _items = [ ] ;
182
+ this . _index = 0 ;
183
+ }
184
+
185
+ size ( ) {
186
+ return this . _items . length ;
187
+ }
188
+
189
+ toArray ( ) {
190
+ return this . _items ;
191
+ }
192
+
193
+ remove ( item ) {
194
+ let index = this . _items . indexOf ( item ) ;
195
+ while ( index != - 1 ) {
196
+ this . _items . splice ( index , 1 ) ;
197
+ if ( index < this . _index ) {
198
+ this . _index -= 1 ;
199
+ }
200
+ //make sure we are in range
201
+ this . _index %= ( this . _items . length - 1 ) ;
202
+ }
203
+ }
204
+ }
205
+
206
+ let GET_SERVERS = "CALL dbms.cluster.routing.getServers" ;
207
+ class RoutingDriver extends Driver {
208
+
209
+ constructor ( url , userAgent = 'neo4j-javascript/0.0' , token = { } , config = { } ) {
210
+ super ( url , userAgent , token , config ) ;
211
+ this . _routers = new RoundRobinArray ( ) ;
212
+ this . _routers . push ( url ) ;
213
+ this . _readers = new RoundRobinArray ( ) ;
214
+ this . _writers = new RoundRobinArray ( ) ;
215
+ this . _expires = Date . now ( ) ;
216
+ this . _checkServers ( ) ;
217
+ }
218
+
219
+ _checkServers ( ) {
220
+ if ( this . _expires < Date . now ( ) ||
221
+ this . _routers . empty ( ) ||
222
+ this . _readers . empty ( ) ||
223
+ this . _writers . empty ( ) ) {
224
+ this . _callServers ( ) ;
225
+ }
226
+ }
227
+
228
+ async _callServers ( ) {
229
+ let seen = this . _allServers ( ) ;
230
+ //clear writers and readers
231
+ this . _writers . clear ( ) ;
232
+ this . _readers . clear ( ) ;
233
+ //we have to wait to clear routers until
234
+ //we have discovered new ones
235
+ let newRouters = new RoundRobinArray ( ) ;
236
+ let success = false ;
237
+
238
+ while ( ! this . _routers . empty ( ) && ! success ) {
239
+ let url = this . _routers . hop ( ) ;
240
+ try {
241
+ let res = await this . _call ( url ) ;
242
+ if ( res . records . length != 1 ) continue ;
243
+ let record = res . records [ 0 ] ;
244
+ //Note we are loosing precision here but we are not
245
+ //terribly worried since it is only
246
+ //for dates more than 140000 years into the future.
247
+ this . _expires += record . get ( 'ttl' ) . toNumber ( ) ;
248
+ let servers = record . get ( 'servers' ) ;
249
+ for ( let i = 0 ; i <= servers . length ; i ++ ) {
250
+ let server = servers [ i ] ;
251
+ seen . delete ( server ) ;
252
+ let role = server [ 'role' ] ;
253
+ let addresses = server [ 'addresses' ] ;
254
+ if ( role === 'ROUTE' ) {
255
+ newRouters . push ( server ) ;
256
+ } else if ( role === 'WRITE' ) {
257
+ this . _writers . push ( server ) ;
258
+ } else if ( role === 'READ' ) {
259
+ this . _readers . push ( server ) ;
260
+ }
261
+ }
262
+
263
+ if ( newRouters . empty ( ) ) continue ;
264
+ //we have results
265
+ this . _routers = newRouters ( ) ;
266
+ //these are no longer valid according to server
267
+ let self = this ;
268
+ seen . forEach ( ( key ) => {
269
+ self . _pool . purge ( key ) ;
270
+ } ) ;
271
+ success = true ;
272
+ return ;
273
+ } catch ( error ) {
274
+ //continue
275
+ this . _forget ( url ) ;
276
+ console . log ( error ) ;
277
+ }
278
+ }
279
+
280
+ if ( this . onError ) {
281
+ this . onError ( "Server could not perform discovery, please open a new driver with a different seed address." ) ;
282
+ }
283
+ this . close ( ) ;
284
+ }
285
+
286
+ //TODO make nice, expose constants?
287
+ session ( mode ) {
288
+ let conn = this . _aquireConnection ( mode ) ;
289
+ return this . _createSession ( conn ) ;
290
+ }
291
+
292
+ _aquireConnection ( mode ) {
293
+ //make sure we have enough servers
294
+ this . _checkServers ( ) ;
295
+
296
+ let m = mode || WRITE ;
297
+ if ( m === READ ) {
298
+ return this . _pools . acquire ( this . _readers . hop ( ) ) ;
299
+ } else if ( m === WRITE ) {
300
+ return this . _pools . acquire ( this . _writers . hop ( ) ) ;
301
+ } else {
302
+ //TODO fail
303
+ }
304
+ }
305
+
306
+ _allServers ( ) {
307
+ let seen = new Set ( this . _routers . toArray ( ) ) ;
308
+ let writers = this . _writers . toArray ( )
309
+ let readers = this . _readers . toArray ( )
310
+ for ( let i = 0 ; i < writers . length ; i ++ ) {
311
+ seen . add ( writers [ i ] ) ;
312
+ }
313
+ for ( let i = 0 ; i < readers . length ; i ++ ) {
314
+ seen . add ( writers [ i ] ) ;
315
+ }
316
+ return seen ;
317
+ }
318
+
319
+ async _call ( url ) {
320
+ let conn = this . _pool . acquire ( url ) ;
321
+ let session = this . _createSession ( conn ) ;
322
+ console . log ( "calling " + GET_SERVERS ) ;
323
+ return session . run ( GET_SERVERS )
324
+ . then ( ( res ) => {
325
+ session . close ( ) ;
326
+ return res ;
327
+ } ) . catch ( ( err ) => {
328
+ this . _forget ( url ) ;
329
+ return Promise . reject ( err ) ;
330
+ } ) ;
331
+ }
332
+
333
+ _forget ( url ) {
334
+ this . _pools . purge ( url ) ;
335
+ this . _routers . remove ( url ) ;
336
+ this . _readers . remove ( url ) ;
337
+ this . _writers . remove ( url ) ;
338
+ }
339
+ }
340
+
147
341
/** Internal stream observer used for connection state */
148
342
class _ConnectionStreamObserver extends StreamObserver {
149
343
constructor ( driver ) {
150
344
super ( ) ;
151
345
this . _driver = driver ;
152
346
this . _hasFailed = false ;
153
347
}
348
+
154
349
onError ( error ) {
155
350
if ( ! this . _hasFailed ) {
156
351
super . onError ( error ) ;
157
- if ( this . _driver . onError ) {
352
+ if ( this . _driver . onError ) {
158
353
this . _driver . onError ( error ) ;
159
354
}
160
355
this . _hasFailed = true ;
161
356
}
162
357
}
358
+
163
359
onCompleted ( message ) {
164
- if ( this . _driver . onCompleted ) {
165
- this . _driver . onCompleted ( message ) ;
360
+ if ( this . _driver . onCompleted ) {
361
+ this . _driver . onCompleted ( message ) ;
166
362
}
167
363
}
168
364
}
@@ -205,7 +401,8 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
205
401
* //
206
402
* // TRUST_SYSTEM_CA_SIGNED_CERTIFICATES meand that you trust whatever certificates
207
403
* // are in the default certificate chain of th
208
- * trust: "TRUST_ON_FIRST_USE" | "TRUST_SIGNED_CERTIFICATES" | TRUST_CUSTOM_CA_SIGNED_CERTIFICATES | TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
404
+ * trust: "TRUST_ON_FIRST_USE" | "TRUST_SIGNED_CERTIFICATES" | TRUST_CUSTOM_CA_SIGNED_CERTIFICATES |
405
+ * TRUST_SYSTEM_CA_SIGNED_CERTIFICATES,
209
406
*
210
407
* // List of one or more paths to trusted encryption certificates. This only
211
408
* // works in the NodeJS bundle, and only matters if you use "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES".
@@ -226,8 +423,13 @@ let USER_AGENT = "neo4j-javascript/" + VERSION;
226
423
* @param {Object } config Configuration object. See the configuration section above for details.
227
424
* @returns {Driver }
228
425
*/
229
- function driver ( url , authToken , config = { } ) {
230
- return new Driver ( url , USER_AGENT , authToken , config ) ;
426
+ function driver ( url , authToken , config = { } ) {
427
+ let sch = scheme ( url ) ;
428
+ if ( sch === "bolt+routing://" ) {
429
+ return new RoutingDriver ( url , USER_AGENT , authToken , config ) ;
430
+ } else {
431
+ return new Driver ( url , USER_AGENT , authToken , config ) ;
432
+ }
231
433
}
232
434
233
435
export { Driver , driver }
0 commit comments