Skip to content

Commit 8659bec

Browse files
committed
Pull stream observers into protocol implementations
1 parent f87463e commit 8659bec

30 files changed

+1890
-1453
lines changed

src/internal/bolt-protocol-util.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717
* limitations under the License.
1818
*/
1919
import { newError } from '../error'
20+
import { ResultStreamObserver } from './stream-observers'
2021

2122
/**
2223
* @param {TxConfig} txConfig the auto-commit transaction configuration.
2324
* @param {Connection} connection the connection.
24-
* @param {StreamObserver} observer the response observer.
25+
* @param {ResultStreamObserver} observer the response observer.
2526
*/
2627
function assertTxConfigIsEmpty (txConfig, connection, observer) {
2728
if (txConfig && !txConfig.isEmpty()) {
@@ -41,7 +42,6 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) {
4142
* Asserts that the passed-in database name is empty.
4243
* @param {string} database
4344
* @param {Connection} connection
44-
* @param {StreamObserver} observer
4545
*/
4646
function assertDatabaseIsEmpty (database, connection, observer) {
4747
if (database) {

src/internal/bolt-protocol-v1.js

+177-50
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,19 @@ import * as v1 from './packstream-v1'
2121
import Bookmark from './bookmark'
2222
import TxConfig from './tx-config'
2323
import { ACCESS_MODE_WRITE } from './constants'
24+
import Connection from './connection'
25+
import { Chunker } from './chunking'
26+
import { Packer } from './packstream-v1'
2427
import {
2528
assertDatabaseIsEmpty,
2629
assertTxConfigIsEmpty
2730
} from './bolt-protocol-util'
31+
import {
32+
ResultStreamObserver,
33+
LoginObserver,
34+
ResetObserver,
35+
StreamObserver
36+
} from './stream-observers'
2837

2938
export default class BoltProtocol {
3039
/**
@@ -66,99 +75,217 @@ export default class BoltProtocol {
6675

6776
/**
6877
* Perform initialization and authentication of the underlying connection.
69-
* @param {string} clientName the client name.
70-
* @param {object} authToken the authentication token.
71-
* @param {StreamObserver} observer the response observer.
78+
* @param {object} param
79+
* @param {string} param.userAgent the user agent.
80+
* @param {object} param.authToken the authentication token.
81+
* @param {function(err: Error)} param.onError the callback to invoke on error.
82+
* @param {function()} param.onComplete the callback to invoke on completion.
83+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
7284
*/
73-
initialize (clientName, authToken, observer) {
74-
const message = RequestMessage.init(clientName, authToken)
75-
this._connection.write(message, observer, true)
85+
initialize ({ userAgent, authToken, onError, onComplete } = {}) {
86+
const observer = new LoginObserver({
87+
connection: this._connection,
88+
afterError: onError,
89+
afterComplete: onComplete
90+
})
91+
92+
this._connection.write(
93+
RequestMessage.init(userAgent, authToken),
94+
observer,
95+
true
96+
)
97+
98+
return observer
7699
}
77100

78-
prepareToClose (observer) {
101+
/**
102+
* Perform protocol related operations for closing this connection
103+
*/
104+
prepareToClose () {
79105
// no need to notify the database in this protocol version
80106
}
81107

82108
/**
83109
* Begin an explicit transaction.
84-
* @param {StreamObserver} observer the response observer.
85-
* @param {Bookmark} bookmark the bookmark.
86-
* @param {TxConfig} txConfig the configuration.
87-
* @param {string} database the target database name.
88-
* @param {string} mode the access mode.
110+
* @param {object} param
111+
* @param {Bookmark} param.bookmark the bookmark.
112+
* @param {TxConfig} param.txConfig the configuration.
113+
* @param {string} param.database the target database name.
114+
* @param {string} param.mode the access mode.
115+
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
116+
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
117+
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
118+
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
119+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
89120
*/
90-
beginTransaction (observer, { bookmark, txConfig, database, mode }) {
91-
assertTxConfigIsEmpty(txConfig, this._connection, observer)
92-
assertDatabaseIsEmpty(database, this._connection, observer)
93-
94-
const runMessage = RequestMessage.run(
121+
beginTransaction ({
122+
bookmark,
123+
txConfig,
124+
database,
125+
mode,
126+
beforeError,
127+
afterError,
128+
beforeComplete,
129+
afterComplete
130+
} = {}) {
131+
return this.run(
95132
'BEGIN',
96-
bookmark.asBeginTransactionParameters()
133+
bookmark ? bookmark.asBeginTransactionParameters() : {},
134+
{
135+
bookmark: bookmark,
136+
txConfig: txConfig,
137+
database,
138+
mode,
139+
beforeError,
140+
afterError,
141+
beforeComplete,
142+
afterComplete,
143+
flush: false
144+
}
97145
)
98-
const pullAllMessage = RequestMessage.pullAll()
99-
100-
this._connection.write(runMessage, observer, false)
101-
this._connection.write(pullAllMessage, observer, false)
102146
}
103147

104148
/**
105149
* Commit the explicit transaction.
106-
* @param {StreamObserver} observer the response observer.
150+
* @param {object} param
151+
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
152+
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
153+
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
154+
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
155+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
107156
*/
108-
commitTransaction (observer) {
157+
commitTransaction ({
158+
beforeError,
159+
afterError,
160+
beforeComplete,
161+
afterComplete
162+
} = {}) {
109163
// WRITE access mode is used as a place holder here, it has
110164
// no effect on behaviour for Bolt V1 & V2
111-
this.run('COMMIT', {}, observer, {
112-
bookmark: Bookmark.empty(),
113-
txConfig: TxConfig.empty(),
114-
mode: ACCESS_MODE_WRITE
115-
})
165+
return this.run(
166+
'COMMIT',
167+
{},
168+
{
169+
bookmark: Bookmark.empty(),
170+
txConfig: TxConfig.empty(),
171+
mode: ACCESS_MODE_WRITE,
172+
beforeError,
173+
afterError,
174+
beforeComplete,
175+
afterComplete
176+
}
177+
)
116178
}
117179

118180
/**
119181
* Rollback the explicit transaction.
120-
* @param {StreamObserver} observer the response observer.
182+
* @param {object} param
183+
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
184+
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
185+
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
186+
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
187+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
121188
*/
122-
rollbackTransaction (observer) {
189+
rollbackTransaction ({
190+
beforeError,
191+
afterError,
192+
beforeComplete,
193+
afterComplete
194+
} = {}) {
123195
// WRITE access mode is used as a place holder here, it has
124196
// no effect on behaviour for Bolt V1 & V2
125-
this.run('ROLLBACK', {}, observer, {
126-
bookmark: Bookmark.empty(),
127-
txConfig: TxConfig.empty(),
128-
mode: ACCESS_MODE_WRITE
129-
})
197+
return this.run(
198+
'ROLLBACK',
199+
{},
200+
{
201+
bookmark: Bookmark.empty(),
202+
txConfig: TxConfig.empty(),
203+
mode: ACCESS_MODE_WRITE,
204+
beforeError,
205+
afterError,
206+
beforeComplete,
207+
afterComplete
208+
}
209+
)
130210
}
131211

132212
/**
133213
* Send a Cypher statement through the underlying connection.
134214
* @param {string} statement the cypher statement.
135215
* @param {object} parameters the statement parameters.
136-
* @param {StreamObserver} observer the response observer.
137-
* @param {Bookmark} bookmark the bookmark.
138-
* @param {TxConfig} txConfig the auto-commit transaction configuration.
139-
* @param {string} database the target database name.
140-
* @param {string} mode the access mode.
216+
* @param {object} param
217+
* @param {Bookmark} param.bookmark the bookmark.
218+
* @param {TxConfig} param.txConfig the transaction configuration.
219+
* @param {string} param.database the target database name.
220+
* @param {string} param.mode the access mode.
221+
* @param {function(keys: string[])} param.beforeKeys the callback to invoke before handling the keys.
222+
* @param {function(keys: string[])} param.afterKeys the callback to invoke after handling the keys.
223+
* @param {function(err: Error)} param.beforeError the callback to invoke before handling the error.
224+
* @param {function(err: Error)} param.afterError the callback to invoke after handling the error.
225+
* @param {function()} param.beforeComplete the callback to invoke before handling the completion.
226+
* @param {function()} param.afterComplete the callback to invoke after handling the completion.
227+
* @param {boolean} param.flush whether to flush the buffered messages.
228+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
141229
*/
142-
run (statement, parameters, observer, { bookmark, txConfig, database, mode }) {
230+
run (
231+
statement,
232+
parameters,
233+
{
234+
bookmark,
235+
txConfig,
236+
database,
237+
mode,
238+
beforeKeys,
239+
afterKeys,
240+
beforeError,
241+
afterError,
242+
beforeComplete,
243+
afterComplete,
244+
flush = true
245+
} = {}
246+
) {
247+
const observer = new ResultStreamObserver({
248+
connection: this._connection,
249+
beforeKeys,
250+
afterKeys,
251+
beforeError,
252+
afterError,
253+
beforeComplete,
254+
afterComplete
255+
})
256+
143257
// bookmark and mode are ignored in this version of the protocol
144258
assertTxConfigIsEmpty(txConfig, this._connection, observer)
145259
// passing in a database name on this protocol version throws an error
146260
assertDatabaseIsEmpty(database, this._connection, observer)
147261

148-
const runMessage = RequestMessage.run(statement, parameters)
149-
const pullAllMessage = RequestMessage.pullAll()
262+
this._connection.write(
263+
RequestMessage.run(statement, parameters),
264+
observer,
265+
false
266+
)
267+
this._connection.write(RequestMessage.pullAll(), observer, flush)
150268

151-
this._connection.write(runMessage, observer, false)
152-
this._connection.write(pullAllMessage, observer, true)
269+
return observer
153270
}
154271

155272
/**
156273
* Send a RESET through the underlying connection.
157-
* @param {StreamObserver} observer the response observer.
274+
* @param {object} param
275+
* @param {function(err: Error)} param.onError the callback to invoke on error.
276+
* @param {function()} param.onComplete the callback to invoke on completion.
277+
* @returns {StreamObserver} the stream observer that monitors the corresponding server response.
158278
*/
159-
reset (observer) {
160-
const message = RequestMessage.reset()
161-
this._connection.write(message, observer, true)
279+
reset ({ onError, onComplete } = {}) {
280+
const observer = new ResetObserver({
281+
connection: this._connection,
282+
onError,
283+
onComplete
284+
})
285+
286+
this._connection.write(RequestMessage.reset(), observer, true)
287+
288+
return observer
162289
}
163290

164291
_createPacker (chunker) {

0 commit comments

Comments
 (0)