Skip to content

Commit cd47b7c

Browse files
committed
Initial reactive session and transaction
1 parent ce7e252 commit cd47b7c

13 files changed

+2551
-64
lines changed

src/driver.js

+34-13
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,19 @@
1717
* limitations under the License.
1818
*/
1919

20-
import Session from './session'
21-
import Pool from './internal/pool'
22-
import ChannelConnection from './internal/connection-channel'
23-
import { newError, SERVICE_UNAVAILABLE } from './error'
24-
import DirectConnectionProvider from './internal/connection-provider-direct'
20+
import { newError } from './error'
21+
import ConnectionProvider from './internal/connection-provider'
2522
import Bookmark from './internal/bookmark'
23+
import DirectConnectionProvider from './internal/connection-provider-direct'
2624
import ConnectivityVerifier from './internal/connectivity-verifier'
27-
import PoolConfig, {
25+
import { ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants'
26+
import Logger from './internal/logger'
27+
import {
2828
DEFAULT_ACQUISITION_TIMEOUT,
2929
DEFAULT_MAX_SIZE
3030
} from './internal/pool-config'
31-
import Logger from './internal/logger'
32-
import ConnectionErrorHandler from './internal/connection-error-handler'
33-
import { ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants'
31+
import Session from './session'
32+
import RxSession from './session-rx'
3433

3534
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour
3635

@@ -121,17 +120,38 @@ class Driver {
121120
* it is closed, the underlying connection will be released to the connection
122121
* pool and made available for others to use.
123122
*
124-
* @param {string} [defaultAccessMode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
125-
* @param {string|string[]} [bookmarks=null] the initial reference or references to some previous
123+
* @param {Object} args -
124+
* @param {string} args.defaultAccessMode='WRITE' - the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
125+
* @param {string|string[]} args.bookmarks - the initial reference or references to some previous
126126
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
127-
* @param {string} [database=''] the database this session will connect to.
127+
* @param {string} args.database='' - the database this session will connect to.
128128
* @return {Session} new session.
129129
*/
130130
session ({
131131
defaultAccessMode = WRITE,
132132
bookmarks: bookmarkOrBookmarks,
133133
database = ''
134134
} = {}) {
135+
return this._newSession({
136+
defaultAccessMode,
137+
bookmarkOrBookmarks,
138+
database,
139+
reactive: false
140+
})
141+
}
142+
143+
rxSession ({ defaultAccessMode = WRITE, bookmarks, database = '' } = {}) {
144+
return new RxSession(
145+
this._newSession({
146+
defaultAccessMode,
147+
bookmarks,
148+
database,
149+
reactive: true
150+
})
151+
)
152+
}
153+
154+
_newSession ({ defaultAccessMode, bookmarkOrBookmarks, database, reactive }) {
135155
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
136156
const connectionProvider = this._getOrCreateConnectionProvider()
137157
const bookmark = bookmarkOrBookmarks
@@ -142,7 +162,8 @@ class Driver {
142162
database,
143163
connectionProvider,
144164
bookmark,
145-
config: this._config
165+
config: this._config,
166+
reactive
146167
})
147168
}
148169

src/internal/bolt-protocol-v4.js

+21-3
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,15 @@ export default class BoltProtocol extends BoltProtocolV3 {
6363
afterError,
6464
beforeComplete,
6565
afterComplete,
66-
flush = true
66+
flush = true,
67+
reactive = false
6768
} = {}
6869
) {
6970
const observer = new ResultStreamObserver({
7071
connection: this._connection,
72+
reactive: reactive,
73+
moreFunction: reactive ? this._requestMore : this._noOp,
74+
discardFunction: reactive ? this._requestDiscard : this._noOp,
7175
beforeKeys,
7276
afterKeys,
7377
beforeError,
@@ -76,6 +80,7 @@ export default class BoltProtocol extends BoltProtocolV3 {
7680
afterComplete
7781
})
7882

83+
const flushRun = reactive
7984
this._connection.write(
8085
RequestMessage.runWithMetadata(statement, parameters, {
8186
bookmark,
@@ -84,10 +89,23 @@ export default class BoltProtocol extends BoltProtocolV3 {
8489
mode
8590
}),
8691
observer,
87-
false
92+
flushRun && flush
8893
)
89-
this._connection.write(RequestMessage.pull(), observer, flush)
94+
95+
if (!reactive) {
96+
this._connection.write(RequestMessage.pull(), observer, flush)
97+
}
9098

9199
return observer
92100
}
101+
102+
_requestMore (connection, stmtId, n, observer) {
103+
connection.write(RequestMessage.pull({ stmtId, n }), observer, true)
104+
}
105+
106+
_requestDiscard (connection, stmtId, observer) {
107+
connection.write(RequestMessage.discard({ stmtId }), observer, true)
108+
}
109+
110+
_noOp () {}
93111
}

src/internal/request-message.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ export default class RequestMessage {
185185
* @return {RequestMessage} the PULL message.
186186
*/
187187
static pull ({ stmtId = NO_STATEMENT_ID, n = ALL } = {}) {
188-
const metadata = buildStreamMetadata(stmtId, n)
188+
const metadata = buildStreamMetadata(stmtId || NO_STATEMENT_ID, n || ALL)
189189
return new RequestMessage(
190190
PULL,
191191
[metadata],
@@ -200,7 +200,7 @@ export default class RequestMessage {
200200
* @return {RequestMessage} the PULL message.
201201
*/
202202
static discard ({ stmtId = NO_STATEMENT_ID, n = ALL } = {}) {
203-
const metadata = buildStreamMetadata(stmtId, n)
203+
const metadata = buildStreamMetadata(stmtId || NO_STATEMENT_ID, n || ALL)
204204
return new RequestMessage(
205205
DISCARD,
206206
[metadata],
@@ -246,7 +246,7 @@ function buildTxMetadata (bookmark, txConfig, database, mode) {
246246
function buildStreamMetadata (stmtId, n) {
247247
const metadata = { n: int(n) }
248248
if (stmtId !== NO_STATEMENT_ID) {
249-
metadata['stmt_id'] = int(stmtId)
249+
metadata['qid'] = int(stmtId)
250250
}
251251
return metadata
252252
}

src/internal/stream-observers.js

+63-27
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import Record from '../record'
2020
import Connection from './connection'
2121
import { newError, PROTOCOL_ERROR } from '../error'
2222
import { isString } from './util'
23+
import Integer from '../integer'
2324

24-
const DefaultBatchSize = 100
25+
const DefaultBatchSize = 50
2526

2627
class StreamObserver {
2728
onNext (rawRecord) {}
@@ -45,10 +46,11 @@ class ResultStreamObserver extends StreamObserver {
4546
/**
4647
*
4748
* @param {Object} param
48-
* @param {Connection} connection
49-
* @param {} param.moreFunction -
50-
* @param {} param.discardFunction -
51-
* @param {} param.batchSize -
49+
* @param {Connection} param.connection
50+
* @param {boolean} param.reactive
51+
* @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction -
52+
* @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction -
53+
* @param {number|Integer} param.batchSize -
5254
* @param {function(err: Error): Promise|void} param.beforeError -
5355
* @param {function(err: Error): Promise|void} param.afterError -
5456
* @param {function(keys: string[]): Promise|void} param.beforeKeys -
@@ -58,6 +60,7 @@ class ResultStreamObserver extends StreamObserver {
5860
*/
5961
constructor ({
6062
connection,
63+
reactive = false,
6164
moreFunction,
6265
discardFunction,
6366
batchSize = DefaultBatchSize,
@@ -71,6 +74,8 @@ class ResultStreamObserver extends StreamObserver {
7174
super()
7275

7376
this._connection = connection
77+
this._reactive = reactive
78+
this._streaming = false
7479

7580
this._fieldKeys = null
7681
this._fieldLookup = null
@@ -105,7 +110,11 @@ class ResultStreamObserver extends StreamObserver {
105110
onNext (rawRecord) {
106111
let record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
107112
if (this._observers.some(o => o.onNext)) {
108-
this._observers.forEach(o => (o.onNext ? o.onNext(record) : {}))
113+
this._observers.forEach(o => {
114+
if (o.onNext) {
115+
o.onNext(record)
116+
}
117+
})
109118
} else {
110119
this._queuedRecords.push(record)
111120
}
@@ -149,14 +158,20 @@ class ResultStreamObserver extends StreamObserver {
149158
this._head = this._fieldKeys
150159

151160
if (this._observers.some(o => o.onKeys)) {
152-
this._observers.forEach(o =>
153-
o.onKeys ? o.onKeys(this._fieldKeys) : {}
154-
)
161+
this._observers.forEach(o => {
162+
if (o.onKeys) {
163+
o.onKeys(this._fieldKeys)
164+
}
165+
})
155166
}
156167

157168
if (this._afterKeys) {
158169
this._afterKeys(this._fieldKeys)
159170
}
171+
172+
if (this._reactive) {
173+
this._handleStreaming()
174+
}
160175
}
161176

162177
if (beforeHandlerResult) {
@@ -165,22 +180,13 @@ class ResultStreamObserver extends StreamObserver {
165180
continuation()
166181
}
167182
} else {
183+
this._streaming = false
184+
168185
if (meta.has_more) {
169186
// We've consumed current batch and server notified us that there're more
170187
// records to stream. Let's invoke more or discard function based on whether
171188
// the user wants to discard streaming or not
172-
if (this._discard) {
173-
this._discardFunction({
174-
connection: this._connection,
175-
statementId: this._statementId
176-
})
177-
} else {
178-
this._moreFunction({
179-
connection: this._connection,
180-
statementId: this._statementId,
181-
n: this._batchSize
182-
})
183-
}
189+
this._handleStreaming()
184190

185191
delete meta.has_more
186192
} else {
@@ -200,9 +206,11 @@ class ResultStreamObserver extends StreamObserver {
200206
this._tail = completionMetadata
201207

202208
if (this._observers.some(o => o.onCompleted)) {
203-
this._observers.forEach(o =>
204-
o.onCompleted ? o.onCompleted(completionMetadata) : {}
205-
)
209+
this._observers.forEach(o => {
210+
if (o.onCompleted) {
211+
o.onCompleted(completionMetadata)
212+
}
213+
})
206214
}
207215

208216
if (this._afterComplete) {
@@ -219,6 +227,28 @@ class ResultStreamObserver extends StreamObserver {
219227
}
220228
}
221229

230+
_handleStreaming () {
231+
if (
232+
this._reactive &&
233+
this._head &&
234+
this._observers.some(o => o.onNext || o.onCompleted) &&
235+
!this._streaming
236+
) {
237+
this._streaming = true
238+
239+
if (this._discard) {
240+
this._discardFunction(this._connection, this._statementId, this)
241+
} else {
242+
this._moreFunction(
243+
this._connection,
244+
this._statementId,
245+
this._batchSize,
246+
this
247+
)
248+
}
249+
}
250+
}
251+
222252
_storeMetadataForCompletion (meta) {
223253
const keys = Object.keys(meta)
224254
let index = keys.length
@@ -282,9 +312,11 @@ class ResultStreamObserver extends StreamObserver {
282312

283313
const continuation = () => {
284314
if (this._observers.some(o => o.onError)) {
285-
this._observers.forEach(o =>
286-
o.onError ? o.onError(error) : console.log(error)
287-
)
315+
this._observers.forEach(o => {
316+
if (o.onError) {
317+
o.onError(error)
318+
}
319+
})
288320
}
289321

290322
if (this._afterError) {
@@ -324,6 +356,10 @@ class ResultStreamObserver extends StreamObserver {
324356
observer.onCompleted(this._tail)
325357
}
326358
this._observers.push(observer)
359+
360+
if (this._reactive) {
361+
this._handleStreaming()
362+
}
327363
}
328364

329365
hasFailed () {

0 commit comments

Comments
 (0)