Skip to content

Commit 175ab6c

Browse files
author
Zhen Li
committed
Adding fetchSize as an optional configuration option on driver config and session config.
1 parent f9e528d commit 175ab6c

File tree

7 files changed

+88
-19
lines changed

7 files changed

+88
-19
lines changed

src/driver.js

+43-4
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,16 @@ import {
3030
} from './internal/pool-config'
3131
import Session from './session'
3232
import RxSession from './session-rx'
33+
import { ALL } from './internal/request-message'
3334

3435
const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour
3536

37+
/**
38+
* The default record fetch size. This is used in Bolt V4 protocol to pull query execution result in batches.
39+
* @type {number}
40+
*/
41+
const DEFAULT_FETCH_SIZE = 1000
42+
3643
/**
3744
* Constant that represents read session access mode.
3845
* Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`.
@@ -132,19 +139,23 @@ class Driver {
132139
* @param {string} param.defaultAccessMode=WRITE - the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
133140
* @param {string|string[]} param.bookmarks - the initial reference or references to some previous
134141
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
142+
* @param {number} param.fetchSize - the record fetch size of each batch of this session.
143+
* Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config.
135144
* @param {string} param.database - the database this session will operate on.
136145
* @return {Session} new session.
137146
*/
138147
session ({
139148
defaultAccessMode = WRITE,
140149
bookmarks: bookmarkOrBookmarks,
141-
database = ''
150+
database = '',
151+
fetchSize
142152
} = {}) {
143153
return this._newSession({
144154
defaultAccessMode,
145155
bookmarkOrBookmarks,
146156
database,
147-
reactive: false
157+
reactive: false,
158+
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
148159
})
149160
}
150161

@@ -229,7 +240,13 @@ class Driver {
229240
/**
230241
* @private
231242
*/
232-
_newSession ({ defaultAccessMode, bookmarkOrBookmarks, database, reactive }) {
243+
_newSession ({
244+
defaultAccessMode,
245+
bookmarkOrBookmarks,
246+
database,
247+
reactive,
248+
fetchSize
249+
}) {
233250
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
234251
const connectionProvider = this._getOrCreateConnectionProvider()
235252
const bookmark = bookmarkOrBookmarks
@@ -241,7 +258,8 @@ class Driver {
241258
connectionProvider,
242259
bookmark,
243260
config: this._config,
244-
reactive
261+
reactive,
262+
fetchSize
245263
})
246264
}
247265

@@ -277,6 +295,10 @@ function sanitizeConfig (config) {
277295
config.connectionAcquisitionTimeout,
278296
DEFAULT_ACQUISITION_TIMEOUT
279297
)
298+
config.fetchSize = validateFetchSizeValue(
299+
config.fetchSize,
300+
DEFAULT_FETCH_SIZE
301+
)
280302
}
281303

282304
/**
@@ -293,6 +315,23 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) {
293315
}
294316
}
295317

318+
/**
319+
* @private
320+
*/
321+
function validateFetchSizeValue (rawValue, defaultWhenAbsent) {
322+
const fetchSize = parseInt(rawValue, 10)
323+
if (fetchSize > 0 || fetchSize === ALL) {
324+
return fetchSize
325+
} else if (fetchSize === 0 || fetchSize < 0) {
326+
throw new Error(
327+
'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' +
328+
fetchSize
329+
)
330+
} else {
331+
return defaultWhenAbsent
332+
}
333+
}
334+
296335
export { Driver, READ, WRITE }
297336

298337
export default Driver

src/internal/bolt-protocol-v4.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919
import BoltProtocolV3 from './bolt-protocol-v3'
20-
import RequestMessage from './request-message'
20+
import RequestMessage, { ALL } from './request-message'
2121
import { ResultStreamObserver } from './stream-observers'
2222
import { BOLT_PROTOCOL_V4 } from './constants'
2323

@@ -69,7 +69,8 @@ export default class BoltProtocol extends BoltProtocolV3 {
6969
beforeComplete,
7070
afterComplete,
7171
flush = true,
72-
reactive = false
72+
reactive = false,
73+
fetchSize = ALL
7374
} = {}
7475
) {
7576
const observer = new ResultStreamObserver({

src/internal/request-message.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ const READ_MODE = 'r'
4343
/* eslint-enable no-unused-vars */
4444

4545
const NO_STATEMENT_ID = -1
46-
const ALL = -1
46+
export const ALL = -1
4747

4848
export default class RequestMessage {
4949
constructor (signature, fields, toString) {
@@ -220,19 +220,19 @@ export default class RequestMessage {
220220
function buildTxMetadata (bookmark, txConfig, database, mode) {
221221
const metadata = {}
222222
if (!bookmark.isEmpty()) {
223-
metadata['bookmarks'] = bookmark.values()
223+
metadata.bookmarks = bookmark.values()
224224
}
225225
if (txConfig.timeout) {
226-
metadata['tx_timeout'] = txConfig.timeout
226+
metadata.tx_timeout = txConfig.timeout
227227
}
228228
if (txConfig.metadata) {
229-
metadata['tx_metadata'] = txConfig.metadata
229+
metadata.tx_metadata = txConfig.metadata
230230
}
231231
if (database) {
232-
metadata['db'] = assertString(database, 'database')
232+
metadata.db = assertString(database, 'database')
233233
}
234234
if (mode === ACCESS_MODE_READ) {
235-
metadata['mode'] = READ_MODE
235+
metadata.mode = READ_MODE
236236
}
237237
return metadata
238238
}
@@ -246,7 +246,7 @@ function buildTxMetadata (bookmark, txConfig, database, mode) {
246246
function buildStreamMetadata (stmtId, n) {
247247
const metadata = { n: int(n) }
248248
if (stmtId !== NO_STATEMENT_ID) {
249-
metadata['qid'] = int(stmtId)
249+
metadata.qid = int(stmtId)
250250
}
251251
return metadata
252252
}

src/internal/stream-observers.js

+3-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import Connection from './connection'
2121
import { newError, PROTOCOL_ERROR } from '../error'
2222
import { isString } from './util'
2323
import Integer from '../integer'
24-
25-
const DefaultBatchSize = 50
24+
import { ALL } from './request-message'
2625

2726
class StreamObserver {
2827
onNext (rawRecord) {}
@@ -63,7 +62,7 @@ class ResultStreamObserver extends StreamObserver {
6362
reactive = false,
6463
moreFunction,
6564
discardFunction,
66-
batchSize = DefaultBatchSize,
65+
batchSize = ALL,
6766
beforeError,
6867
afterError,
6968
beforeKeys,
@@ -108,7 +107,7 @@ class ResultStreamObserver extends StreamObserver {
108107
* @param {Array} rawRecord - An array with the raw record
109108
*/
110109
onNext (rawRecord) {
111-
let record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
110+
const record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
112111
if (this._observers.some(o => o.onNext)) {
113112
this._observers.forEach(o => {
114113
if (o.onNext) {

src/session.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,21 @@ class Session {
4949
* @param {string} args.database the database name
5050
* @param {Object} args.config={} - this driver configuration.
5151
* @param {boolean} args.reactive - whether this session should create reactive streams
52+
* @param {number} args.fetchSize - defines how many records is pulled in each pulling batch
5253
*/
5354
constructor ({
5455
mode,
5556
connectionProvider,
5657
bookmark,
5758
database,
5859
config,
59-
reactive
60+
reactive,
61+
fetchSize
6062
}) {
6163
this._mode = mode
6264
this._database = database
6365
this._reactive = reactive
66+
this._fetchSize = fetchSize
6467
this._readConnectionHolder = new ConnectionHolder({
6568
mode: ACCESS_MODE_READ,
6669
database,
@@ -107,7 +110,8 @@ class Session {
107110
mode: this._mode,
108111
database: this._database,
109112
afterComplete: this._onComplete,
110-
reactive: this._reactive
113+
reactive: this._reactive,
114+
fetchSize: this._fetchSize
111115
})
112116
)
113117
}

test/driver.test.js

+23
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
} from '../src/internal/pool-config'
2727
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
2828
import testUtils from './internal/test-utils'
29+
import { AuthToken } from '../types'
2930

3031
describe('#integration driver', () => {
3132
let driver
@@ -253,6 +254,28 @@ describe('#integration driver', () => {
253254
)
254255
})
255256

257+
it('should validate fetch size in the config', async () => {
258+
await validateConfigSanitizing({}, 1000)
259+
await validateConfigSanitizing({ fetchSize: 42 }, 42)
260+
await validateConfigSanitizing({ fetchSize: -1 }, -1)
261+
await validateConfigSanitizing({ fetchSize: '42' }, 42)
262+
await validateConfigSanitizing({ fetchSize: '-1' }, -1)
263+
})
264+
265+
it('should fail when fetch size is negative', () => {
266+
expect(() =>
267+
neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {
268+
fetchSize: -77
269+
})
270+
).toThrow()
271+
})
272+
273+
it('should fail when fetch size is 0', () => {
274+
expect(() =>
275+
neo4j.driver('bolt://localhost', sharedNeo4j.authToken, { fetchSize: 0 })
276+
).toThrow()
277+
})
278+
256279
it('should discard closed connections', async () => {
257280
driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken)
258281

types/driver.d.ts

+3
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@ declare interface Config {
4949
trust?: TrustStrategy
5050
trustedCertificates?: string[]
5151
knownHosts?: string
52+
fetchSize?: number
5253
maxConnectionPoolSize?: number
5354
maxTransactionRetryTime?: number
5455
maxConnectionLifetime?: number
56+
connectionAcquisitionTimeout?: number
5557
connectionTimeout?: number
5658
disableLosslessIntegers?: boolean
5759
logging?: LoggingConfig
@@ -71,6 +73,7 @@ declare interface Driver {
7173
}?: {
7274
defaultAccessMode?: SessionMode
7375
bookmarks?: string | string[]
76+
fetchSize?: number
7477
database?: string
7578
}): Session
7679

0 commit comments

Comments
 (0)