Skip to content

Pull Results in Batches #495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 43 additions & 4 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ import {
} from './internal/pool-config'
import Session from './session'
import RxSession from './session-rx'
import { ALL } from './internal/request-message'

const DEFAULT_MAX_CONNECTION_LIFETIME = 60 * 60 * 1000 // 1 hour

/**
* The default record fetch size. This is used in Bolt V4 protocol to pull query execution result in batches.
* @type {number}
*/
const DEFAULT_FETCH_SIZE = 1000

/**
* Constant that represents read session access mode.
* Should be used like this: `driver.session({ defaultAccessMode: neo4j.session.READ })`.
Expand Down Expand Up @@ -132,19 +139,23 @@ class Driver {
* @param {string} param.defaultAccessMode=WRITE - the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} param.bookmarks - the initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @param {number} param.fetchSize - the record fetch size of each batch of this session.
* Use {@link ALL} to always pull all records in one batch. This will override the config value set on driver config.
* @param {string} param.database - the database this session will operate on.
* @return {Session} new session.
*/
session ({
defaultAccessMode = WRITE,
bookmarks: bookmarkOrBookmarks,
database = ''
database = '',
fetchSize
} = {}) {
return this._newSession({
defaultAccessMode,
bookmarkOrBookmarks,
database,
reactive: false
reactive: false,
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
})
}

Expand Down Expand Up @@ -229,7 +240,13 @@ class Driver {
/**
* @private
*/
_newSession ({ defaultAccessMode, bookmarkOrBookmarks, database, reactive }) {
_newSession ({
defaultAccessMode,
bookmarkOrBookmarks,
database,
reactive,
fetchSize
}) {
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
const connectionProvider = this._getOrCreateConnectionProvider()
const bookmark = bookmarkOrBookmarks
Expand All @@ -241,7 +258,8 @@ class Driver {
connectionProvider,
bookmark,
config: this._config,
reactive
reactive,
fetchSize
})
}

Expand Down Expand Up @@ -277,6 +295,10 @@ function sanitizeConfig (config) {
config.connectionAcquisitionTimeout,
DEFAULT_ACQUISITION_TIMEOUT
)
config.fetchSize = validateFetchSizeValue(
config.fetchSize,
DEFAULT_FETCH_SIZE
)
}

/**
Expand All @@ -293,6 +315,23 @@ function sanitizeIntValue (rawValue, defaultWhenAbsent) {
}
}

/**
* @private
*/
function validateFetchSizeValue (rawValue, defaultWhenAbsent) {
const fetchSize = parseInt(rawValue, 10)
if (fetchSize > 0 || fetchSize === ALL) {
return fetchSize
} else if (fetchSize === 0 || fetchSize < 0) {
throw new Error(
'The fetch size can only be a positive value or -1 for ALL. However fetchSize = ' +
fetchSize
)
} else {
return defaultWhenAbsent
}
}

export { Driver, READ, WRITE }

export default Driver
16 changes: 11 additions & 5 deletions src/internal/bolt-protocol-v4.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage from './request-message'
import RequestMessage, { ALL } from './request-message'
import { ResultStreamObserver } from './stream-observers'
import { BOLT_PROTOCOL_V4 } from './constants'

Expand Down Expand Up @@ -69,14 +69,16 @@ export default class BoltProtocol extends BoltProtocolV3 {
beforeComplete,
afterComplete,
flush = true,
reactive = false
reactive = false,
fetchSize = ALL
} = {}
) {
const observer = new ResultStreamObserver({
connection: this._connection,
reactive: reactive,
moreFunction: reactive ? this._requestMore : this._noOp,
discardFunction: reactive ? this._requestDiscard : this._noOp,
fetchSize: fetchSize,
moreFunction: this._requestMore,
discardFunction: this._requestDiscard,
beforeKeys,
afterKeys,
beforeError,
Expand All @@ -98,7 +100,11 @@ export default class BoltProtocol extends BoltProtocolV3 {
)

if (!reactive) {
this._connection.write(RequestMessage.pull(), observer, flush)
this._connection.write(
RequestMessage.pull({ n: fetchSize }),
observer,
flush
)
}

return observer
Expand Down
14 changes: 7 additions & 7 deletions src/internal/request-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const READ_MODE = 'r'
/* eslint-enable no-unused-vars */

const NO_STATEMENT_ID = -1
const ALL = -1
export const ALL = -1

export default class RequestMessage {
constructor (signature, fields, toString) {
Expand Down Expand Up @@ -220,19 +220,19 @@ export default class RequestMessage {
function buildTxMetadata (bookmark, txConfig, database, mode) {
const metadata = {}
if (!bookmark.isEmpty()) {
metadata['bookmarks'] = bookmark.values()
metadata.bookmarks = bookmark.values()
}
if (txConfig.timeout) {
metadata['tx_timeout'] = txConfig.timeout
metadata.tx_timeout = txConfig.timeout
}
if (txConfig.metadata) {
metadata['tx_metadata'] = txConfig.metadata
metadata.tx_metadata = txConfig.metadata
}
if (database) {
metadata['db'] = assertString(database, 'database')
metadata.db = assertString(database, 'database')
}
if (mode === ACCESS_MODE_READ) {
metadata['mode'] = READ_MODE
metadata.mode = READ_MODE
}
return metadata
}
Expand All @@ -246,7 +246,7 @@ function buildTxMetadata (bookmark, txConfig, database, mode) {
function buildStreamMetadata (stmtId, n) {
const metadata = { n: int(n) }
if (stmtId !== NO_STATEMENT_ID) {
metadata['qid'] = int(stmtId)
metadata.qid = int(stmtId)
}
return metadata
}
Expand Down
25 changes: 13 additions & 12 deletions src/internal/stream-observers.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import Record from '../record'
import Connection from './connection'
import { newError, PROTOCOL_ERROR } from '../error'
import { isString } from './util'
import Integer from '../integer'

const DefaultBatchSize = 50
import { ALL } from './request-message'

class StreamObserver {
onNext (rawRecord) {}
Expand Down Expand Up @@ -50,7 +48,7 @@ class ResultStreamObserver extends StreamObserver {
* @param {boolean} param.reactive
* @param {function(connection: Connection, stmtId: number|Integer, n: number|Integer, observer: StreamObserver)} param.moreFunction -
* @param {function(connection: Connection, stmtId: number|Integer, observer: StreamObserver)} param.discardFunction -
* @param {number|Integer} param.batchSize -
* @param {number|Integer} param.fetchSize -
* @param {function(err: Error): Promise|void} param.beforeError -
* @param {function(err: Error): Promise|void} param.afterError -
* @param {function(keys: string[]): Promise|void} param.beforeKeys -
Expand All @@ -63,7 +61,7 @@ class ResultStreamObserver extends StreamObserver {
reactive = false,
moreFunction,
discardFunction,
batchSize = DefaultBatchSize,
fetchSize = ALL,
beforeError,
afterError,
beforeKeys,
Expand Down Expand Up @@ -98,7 +96,8 @@ class ResultStreamObserver extends StreamObserver {
this._moreFunction = moreFunction
this._discardFunction = discardFunction
this._discard = false
this._batchSize = batchSize
this._fetchSize = fetchSize
this._finished = false
}

/**
Expand All @@ -108,7 +107,7 @@ class ResultStreamObserver extends StreamObserver {
* @param {Array} rawRecord - An array with the raw record
*/
onNext (rawRecord) {
let record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
const record = new Record(this._fieldKeys, rawRecord, this._fieldLookup)
if (this._observers.some(o => o.onNext)) {
this._observers.forEach(o => {
if (o.onNext) {
Expand Down Expand Up @@ -190,6 +189,7 @@ class ResultStreamObserver extends StreamObserver {

delete meta.has_more
} else {
this._finished = true
const completionMetadata = Object.assign(
this._connection ? { server: this._connection.server } : {},
this._meta,
Expand Down Expand Up @@ -229,7 +229,6 @@ class ResultStreamObserver extends StreamObserver {

_handleStreaming () {
if (
this._reactive &&
this._head &&
this._observers.some(o => o.onNext || o.onCompleted) &&
!this._streaming
Expand All @@ -242,7 +241,7 @@ class ResultStreamObserver extends StreamObserver {
this._moreFunction(
this._connection,
this._statementId,
this._batchSize,
this._fetchSize,
this
)
}
Expand Down Expand Up @@ -282,12 +281,13 @@ class ResultStreamObserver extends StreamObserver {
this._head = []
this._fieldKeys = []
this._tail = {}
this._finished = true
}

/**
* Discard pending record stream
* Cancel pending record stream
*/
discard () {
cancel () {
this._discard = true
}

Expand All @@ -302,6 +302,7 @@ class ResultStreamObserver extends StreamObserver {
return
}

this._finished = true
this._hasFailed = true
this._error = error

Expand Down Expand Up @@ -357,7 +358,7 @@ class ResultStreamObserver extends StreamObserver {
}
this._observers.push(observer)

if (this._reactive) {
if (this._reactive && !this._finished) {
this._handleStreaming()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/result-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export default class RxResult {
})

if (this._records.observers.length === 0) {
result._discard()
result._cancel()
}

result.subscribe({
Expand Down
13 changes: 7 additions & 6 deletions src/result.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ class Result {
*/
summary () {
return new Promise((resolve, reject) => {
this._streamObserverPromise.then(o =>
this._streamObserverPromise.then(o => {
o.cancel()
o.subscribe({
onCompleted: metadata => resolve(metadata),
onError: err => reject(err)
})
)
})
})
}

Expand All @@ -102,8 +103,8 @@ class Result {
_getOrCreatePromise () {
if (!this._p) {
this._p = new Promise((resolve, reject) => {
let records = []
let observer = {
const records = []
const observer = {
onNext: record => {
records.push(record)
},
Expand Down Expand Up @@ -192,8 +193,8 @@ class Result {
* @protected
* @since 4.0.0
*/
_discard () {
this._streamObserverPromise.then(o => o.discard())
_cancel () {
this._streamObserverPromise.then(o => o.cancel())
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,21 @@ class Session {
* @param {string} args.database the database name
* @param {Object} args.config={} - this driver configuration.
* @param {boolean} args.reactive - whether this session should create reactive streams
* @param {number} args.fetchSize - defines how many records is pulled in each pulling batch
*/
constructor ({
mode,
connectionProvider,
bookmark,
database,
config,
reactive
reactive,
fetchSize
}) {
this._mode = mode
this._database = database
this._reactive = reactive
this._fetchSize = fetchSize
this._readConnectionHolder = new ConnectionHolder({
mode: ACCESS_MODE_READ,
database,
Expand Down Expand Up @@ -107,7 +110,8 @@ class Session {
mode: this._mode,
database: this._database,
afterComplete: this._onComplete,
reactive: this._reactive
reactive: this._reactive,
fetchSize: this._fetchSize
})
)
}
Expand Down Expand Up @@ -176,7 +180,8 @@ class Session {
connectionHolder,
onClose: this._transactionClosed.bind(this),
onBookmark: this._updateBookmark.bind(this),
reactive: this._reactive
reactive: this._reactive,
fetchSize: this._fetchSize
})
tx._begin(this._lastBookmark, txConfig)
return tx
Expand Down
Loading