Skip to content

Ensures records cannot be visited after summary with reactive API. #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/result-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export default class RxResult {
* @public
* @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
*/
summary () {
consume () {
return this._result.pipe(
flatMap(
result =>
Expand All @@ -114,17 +114,17 @@ export default class RxResult {
} = {}) {
const subscriptions = []

if (recordsObserver) {
subscriptions.push(this._records.subscribe(recordsObserver))
}

if (summaryObserver) {
subscriptions.push(this._summary.subscribe(summaryObserver))
}

if (this._state < States.STREAMING) {
this._state = States.STREAMING

if (recordsObserver) {
subscriptions.push(this._records.subscribe(recordsObserver))
}

subscriptions.push({
unsubscribe: () => {
if (result._cancel) {
Expand Down Expand Up @@ -156,10 +156,10 @@ export default class RxResult {
this._state = States.COMPLETED
}
})
} else if (this._state === States.STREAMING && recordsObserver) {
} else if (recordsObserver) {
recordsObserver.error(
newError(
'Streaming has already started with a previous records or summary subscription.'
'Streaming has already started/consumed with a previous records or summary subscription.'
)
)
}
Expand Down
13 changes: 11 additions & 2 deletions src/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ class Session {
const connectionHolder = this._connectionHolderWithMode(this._mode)

let observerPromise
if (!this._hasTx && connectionHolder.initializeConnection()) {
if (!this._open) {
observerPromise = Promise.resolve(
new FailedObserver({
error: newError('Cannot run statement in a closed session.')
})
)
} else if (!this._hasTx && connectionHolder.initializeConnection()) {
observerPromise = connectionHolder
.getConnection()
.then(connection => customRunner(connection))
Expand Down Expand Up @@ -163,6 +169,9 @@ class Session {
}

_beginTransaction (accessMode, txConfig) {
if (!this._open) {
throw newError('Cannot begin a transaction on a closed session.')
}
if (this._hasTx) {
throw newError(
'You cannot begin a transaction on a session with an open transaction; ' +
Expand Down Expand Up @@ -193,7 +202,7 @@ class Session {
/**
* Return the bookmark received following the last completed {@link Transaction}.
*
* @return {string|null} a reference to a previous transaction
* @return {string[]} a reference to a previous transaction
*/
lastBookmark () {
return this._lastBookmark.values()
Expand Down
3 changes: 2 additions & 1 deletion test/internal/node/routing.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ describe('#stub-routing routing driver with stub server', () => {
const driver = boltStub.newDriver('neo4j://127.0.0.1:9010')

// run a dummy query to force routing table initialization
const session = driver.session({ defaultAccessMode: READ })
var session = driver.session({ defaultAccessMode: READ })
const result1 = await session.run('MATCH (n) RETURN n.name')
expect(result1.records.length).toEqual(3)
await session.close()
Expand All @@ -1491,6 +1491,7 @@ describe('#stub-routing routing driver with stub server', () => {
9010
)

session = driver.session({ defaultAccessMode: READ })
const result2 = await session.readTransaction(tx =>
tx.run('MATCH (n) RETURN n.name AS name')
)
Expand Down
93 changes: 82 additions & 11 deletions test/rx/navigation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,19 @@ describe('#integration-rx navigation', () => {
it('should fail on records when run fails', () =>
shouldFailOnRecordsWhenRunFails(serverVersion, session))

it('should fail on subsequent records differently when run fails', () =>
shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, session))

it('should fail on summary when run fails', () =>
shouldFailOnSummaryWhenRunFails(serverVersion, session))

it('should fail on subsequent summary when run fails', () =>
shouldFailOnSubsequentKeysWhenRunFails(serverVersion, session))
shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, session))

it('should fail on result when closed', () =>
shouldFailOnResultWhenClosed(serverVersion, session, () =>
session.close()
))
})

describe('transaction', () => {
Expand Down Expand Up @@ -197,11 +205,20 @@ describe('#integration-rx navigation', () => {
it('should fail on records when run fails', () =>
shouldFailOnRecordsWhenRunFails(serverVersion, txc))

it('should fail on subsequent records differently when run fails', () =>
shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, txc))

it('should fail on summary when run fails', () =>
shouldFailOnSummaryWhenRunFails(serverVersion, txc))

it('should fail on subsequent summary when run fails', () =>
shouldFailOnSubsequentKeysWhenRunFails(serverVersion, txc))
shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, txc))

it('should fail on result when committed', () =>
shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.commit()))

it('should fail on result when rolled back', () =>
shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.rollback()))
})

/**
Expand Down Expand Up @@ -310,7 +327,10 @@ describe('#integration-rx navigation', () => {
await collectAndAssertKeys(result)
await collectAndAssertSummary(result)

await collectAndAssertEmpty(result.records())
const expectedError = jasmine.objectContaining({
message: jasmine.stringMatching(/Streaming has already started/)
})
await collectAndAssertError(result.records(), expectedError)
}

/**
Expand Down Expand Up @@ -402,9 +422,13 @@ describe('#integration-rx navigation', () => {
)

await collectAndAssertRecords(result)
await collectAndAssertEmpty(result.records())
await collectAndAssertEmpty(result.records())
await collectAndAssertEmpty(result.records())

const expectedError = jasmine.objectContaining({
message: jasmine.stringMatching(/Streaming has already started/)
})
await collectAndAssertError(result.records(), expectedError)
await collectAndAssertError(result.records(), expectedError)
await collectAndAssertError(result.records(), expectedError)
}

/**
Expand Down Expand Up @@ -527,6 +551,32 @@ describe('#integration-rx navigation', () => {
)
}

/**
* @param {ServerVersion} version
* @param {RxSession|RxTransaction} runnable
*/
async function shouldFailOnSubsequentRecordsWhenRunFails (version, runnable) {
if (version.compareTo(VERSION_4_0_0) < 0) {
return
}

const result = runnable.run('THIS IS NOT A CYPHER')

await collectAndAssertError(
result.records(),
jasmine.objectContaining({
code: 'Neo.ClientError.Statement.SyntaxError',
message: jasmine.stringMatching(/Invalid input/)
})
)

const expectedError = jasmine.objectContaining({
message: jasmine.stringMatching(/Streaming has already started/)
})
await collectAndAssertError(result.records(), expectedError)
await collectAndAssertError(result.records(), expectedError)
}

/**
* @param {ServerVersion} version
* @param {RxSession|RxTransaction} runnable
Expand All @@ -539,7 +589,7 @@ describe('#integration-rx navigation', () => {
const result = runnable.run('THIS IS NOT A CYPHER')

await collectAndAssertError(
result.summary(),
result.consume(),
jasmine.objectContaining({
code: 'Neo.ClientError.Statement.SyntaxError',
message: jasmine.stringMatching(/Invalid input/)
Expand All @@ -562,9 +612,30 @@ describe('#integration-rx navigation', () => {
message: jasmine.stringMatching(/Invalid input/)
})

await collectAndAssertError(result.summary(), expectedError)
await collectAndAssertError(result.summary(), expectedError)
await collectAndAssertError(result.summary(), expectedError)
await collectAndAssertError(result.consume(), expectedError)
await collectAndAssertError(result.consume(), expectedError)
await collectAndAssertError(result.consume(), expectedError)
}

/**
* @param {ServerVersion} version
* @param {RxSession|RxTransaction} runnable
* @param {function(): Observable} closeFunc
*/
async function shouldFailOnResultWhenClosed (version, runnable, closeFunc) {
if (version.compareTo(VERSION_4_0_0) < 0) {
return
}

const result = runnable.run('RETURN 1')
await collectAndAssertEmpty(closeFunc())

const expectedError = jasmine.objectContaining({
message: jasmine.stringMatching(/Cannot run statement/)
})
await collectAndAssertError(result.keys(), expectedError)
await collectAndAssertError(result.records(), expectedError)
await collectAndAssertError(result.consume(), expectedError)
}

async function collectAndAssertKeys (result) {
Expand Down Expand Up @@ -602,7 +673,7 @@ describe('#integration-rx navigation', () => {

async function collectAndAssertSummary (result, expectedStatementType = 'r') {
const summary = await result
.summary()
.consume()
.pipe(
map(s => s.statementType),
materialize(),
Expand Down
20 changes: 10 additions & 10 deletions test/rx/summary.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('UNWIND RANGE(1,10) AS n RETURN n')
.summary()
.consume()
.toPromise()

expect(summary).toBeDefined()
Expand Down Expand Up @@ -553,7 +553,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('CREATE (n) RETURN n')
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.hasPlan()).toBeFalsy()
Expand All @@ -573,7 +573,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('EXPLAIN CREATE (n) RETURN n')
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.hasPlan()).toBeTruthy()
Expand All @@ -594,7 +594,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('PROFILE CREATE (n) RETURN n')
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.hasPlan()).toBeTruthy()
Expand All @@ -616,7 +616,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('CREATE (n) RETURN n')
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.notifications).toBeTruthy()
Expand All @@ -634,7 +634,7 @@ describe('#integration-rx summary', () => {

const summary = await runnable
.run('EXPLAIN MATCH (n:ThisLabelDoesNotExist) RETURN n')
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.notifications).toBeTruthy()
Expand Down Expand Up @@ -664,7 +664,7 @@ describe('#integration-rx summary', () => {
) {
const summary = await runnable
.run(statement, parameters)
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.statement).toBeDefined()
Expand All @@ -685,7 +685,7 @@ describe('#integration-rx summary', () => {
) {
const summary = await runnable
.run(statement)
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.statementType).toBe(expectedStatementType)
Expand All @@ -701,7 +701,7 @@ describe('#integration-rx summary', () => {
async function verifyUpdates (runnable, statement, parameters, stats) {
const summary = await runnable
.run(statement, parameters)
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()
expect(summary.counters.containsUpdates()).toBeTruthy()
Expand All @@ -725,7 +725,7 @@ describe('#integration-rx summary', () => {
) {
const summary = await runnable
.run(statement, parameters)
.summary()
.consume()
.toPromise()
expect(summary).toBeDefined()

Expand Down
8 changes: 4 additions & 4 deletions test/rx/transaction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ describe('#integration-rx transaction', () => {
)
])

const summary = await result.summary().toPromise()
const summary = await result.consume().toPromise()
expect(summary).toBeTruthy()
})

Expand Down Expand Up @@ -588,15 +588,15 @@ describe('#integration-rx transaction', () => {

await txc
.run('CREATE (n:Node {id: 1})')
.summary()
.consume()
.toPromise()
await txc
.run('CREATE (n:Node {id: 2})')
.summary()
.consume()
.toPromise()
await txc
.run('CREATE (n:Node {id: 1})')
.summary()
.consume()
.toPromise()

await verifyCanCommitOrRollback(txc, commit)
Expand Down
2 changes: 0 additions & 2 deletions test/temporal-types.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,6 @@ describe('#integration temporal-types', () => {

const receivedValue = records[0].get(0)
expect(receivedValue).toEqual(value)

await session.close()
}

async function testDurationToString (values) {
Expand Down