Skip to content

Commit 0a9c7fc

Browse files
authored
Merge pull request #499 from zhenlineo/4.0-summary-to-consume
Ensures `records` cannot be visited after `summary` with reactive API.
2 parents e764306 + 3232435 commit 0a9c7fc

7 files changed

+116
-37
lines changed

src/result-rx.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export default class RxResult {
9696
* @public
9797
* @returns {Observable<ResultSummary>} - An observable stream (with exactly one element) of result summary.
9898
*/
99-
summary () {
99+
consume () {
100100
return this._result.pipe(
101101
flatMap(
102102
result =>
@@ -114,17 +114,17 @@ export default class RxResult {
114114
} = {}) {
115115
const subscriptions = []
116116

117-
if (recordsObserver) {
118-
subscriptions.push(this._records.subscribe(recordsObserver))
119-
}
120-
121117
if (summaryObserver) {
122118
subscriptions.push(this._summary.subscribe(summaryObserver))
123119
}
124120

125121
if (this._state < States.STREAMING) {
126122
this._state = States.STREAMING
127123

124+
if (recordsObserver) {
125+
subscriptions.push(this._records.subscribe(recordsObserver))
126+
}
127+
128128
subscriptions.push({
129129
unsubscribe: () => {
130130
if (result._cancel) {
@@ -156,10 +156,10 @@ export default class RxResult {
156156
this._state = States.COMPLETED
157157
}
158158
})
159-
} else if (this._state === States.STREAMING && recordsObserver) {
159+
} else if (recordsObserver) {
160160
recordsObserver.error(
161161
newError(
162-
'Streaming has already started with a previous records or summary subscription.'
162+
'Streaming has already started/consumed with a previous records or summary subscription.'
163163
)
164164
)
165165
}

src/session.js

+11-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ class Session {
120120
const connectionHolder = this._connectionHolderWithMode(this._mode)
121121

122122
let observerPromise
123-
if (!this._hasTx && connectionHolder.initializeConnection()) {
123+
if (!this._open) {
124+
observerPromise = Promise.resolve(
125+
new FailedObserver({
126+
error: newError('Cannot run statement in a closed session.')
127+
})
128+
)
129+
} else if (!this._hasTx && connectionHolder.initializeConnection()) {
124130
observerPromise = connectionHolder
125131
.getConnection()
126132
.then(connection => customRunner(connection))
@@ -163,6 +169,9 @@ class Session {
163169
}
164170

165171
_beginTransaction (accessMode, txConfig) {
172+
if (!this._open) {
173+
throw newError('Cannot begin a transaction on a closed session.')
174+
}
166175
if (this._hasTx) {
167176
throw newError(
168177
'You cannot begin a transaction on a session with an open transaction; ' +
@@ -193,7 +202,7 @@ class Session {
193202
/**
194203
* Return the bookmark received following the last completed {@link Transaction}.
195204
*
196-
* @return {string|null} a reference to a previous transaction
205+
* @return {string[]} a reference to a previous transaction
197206
*/
198207
lastBookmark () {
199208
return this._lastBookmark.values()

test/internal/node/routing.driver.boltkit.test.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -1477,7 +1477,7 @@ describe('#stub-routing routing driver with stub server', () => {
14771477
const driver = boltStub.newDriver('neo4j://127.0.0.1:9010')
14781478

14791479
// run a dummy query to force routing table initialization
1480-
const session = driver.session({ defaultAccessMode: READ })
1480+
var session = driver.session({ defaultAccessMode: READ })
14811481
const result1 = await session.run('MATCH (n) RETURN n.name')
14821482
expect(result1.records.length).toEqual(3)
14831483
await session.close()
@@ -1491,6 +1491,7 @@ describe('#stub-routing routing driver with stub server', () => {
14911491
9010
14921492
)
14931493

1494+
session = driver.session({ defaultAccessMode: READ })
14941495
const result2 = await session.readTransaction(tx =>
14951496
tx.run('MATCH (n) RETURN n.name AS name')
14961497
)

test/rx/navigation.test.js

+82-11
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,19 @@ describe('#integration-rx navigation', () => {
100100
it('should fail on records when run fails', () =>
101101
shouldFailOnRecordsWhenRunFails(serverVersion, session))
102102

103+
it('should fail on subsequent records differently when run fails', () =>
104+
shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, session))
105+
103106
it('should fail on summary when run fails', () =>
104107
shouldFailOnSummaryWhenRunFails(serverVersion, session))
105108

106109
it('should fail on subsequent summary when run fails', () =>
107-
shouldFailOnSubsequentKeysWhenRunFails(serverVersion, session))
110+
shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, session))
111+
112+
it('should fail on result when closed', () =>
113+
shouldFailOnResultWhenClosed(serverVersion, session, () =>
114+
session.close()
115+
))
108116
})
109117

110118
describe('transaction', () => {
@@ -197,11 +205,20 @@ describe('#integration-rx navigation', () => {
197205
it('should fail on records when run fails', () =>
198206
shouldFailOnRecordsWhenRunFails(serverVersion, txc))
199207

208+
it('should fail on subsequent records differently when run fails', () =>
209+
shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, txc))
210+
200211
it('should fail on summary when run fails', () =>
201212
shouldFailOnSummaryWhenRunFails(serverVersion, txc))
202213

203214
it('should fail on subsequent summary when run fails', () =>
204-
shouldFailOnSubsequentKeysWhenRunFails(serverVersion, txc))
215+
shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, txc))
216+
217+
it('should fail on result when committed', () =>
218+
shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.commit()))
219+
220+
it('should fail on result when rolled back', () =>
221+
shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.rollback()))
205222
})
206223

207224
/**
@@ -310,7 +327,10 @@ describe('#integration-rx navigation', () => {
310327
await collectAndAssertKeys(result)
311328
await collectAndAssertSummary(result)
312329

313-
await collectAndAssertEmpty(result.records())
330+
const expectedError = jasmine.objectContaining({
331+
message: jasmine.stringMatching(/Streaming has already started/)
332+
})
333+
await collectAndAssertError(result.records(), expectedError)
314334
}
315335

316336
/**
@@ -402,9 +422,13 @@ describe('#integration-rx navigation', () => {
402422
)
403423

404424
await collectAndAssertRecords(result)
405-
await collectAndAssertEmpty(result.records())
406-
await collectAndAssertEmpty(result.records())
407-
await collectAndAssertEmpty(result.records())
425+
426+
const expectedError = jasmine.objectContaining({
427+
message: jasmine.stringMatching(/Streaming has already started/)
428+
})
429+
await collectAndAssertError(result.records(), expectedError)
430+
await collectAndAssertError(result.records(), expectedError)
431+
await collectAndAssertError(result.records(), expectedError)
408432
}
409433

410434
/**
@@ -527,6 +551,32 @@ describe('#integration-rx navigation', () => {
527551
)
528552
}
529553

554+
/**
555+
* @param {ServerVersion} version
556+
* @param {RxSession|RxTransaction} runnable
557+
*/
558+
async function shouldFailOnSubsequentRecordsWhenRunFails (version, runnable) {
559+
if (version.compareTo(VERSION_4_0_0) < 0) {
560+
return
561+
}
562+
563+
const result = runnable.run('THIS IS NOT A CYPHER')
564+
565+
await collectAndAssertError(
566+
result.records(),
567+
jasmine.objectContaining({
568+
code: 'Neo.ClientError.Statement.SyntaxError',
569+
message: jasmine.stringMatching(/Invalid input/)
570+
})
571+
)
572+
573+
const expectedError = jasmine.objectContaining({
574+
message: jasmine.stringMatching(/Streaming has already started/)
575+
})
576+
await collectAndAssertError(result.records(), expectedError)
577+
await collectAndAssertError(result.records(), expectedError)
578+
}
579+
530580
/**
531581
* @param {ServerVersion} version
532582
* @param {RxSession|RxTransaction} runnable
@@ -539,7 +589,7 @@ describe('#integration-rx navigation', () => {
539589
const result = runnable.run('THIS IS NOT A CYPHER')
540590

541591
await collectAndAssertError(
542-
result.summary(),
592+
result.consume(),
543593
jasmine.objectContaining({
544594
code: 'Neo.ClientError.Statement.SyntaxError',
545595
message: jasmine.stringMatching(/Invalid input/)
@@ -562,9 +612,30 @@ describe('#integration-rx navigation', () => {
562612
message: jasmine.stringMatching(/Invalid input/)
563613
})
564614

565-
await collectAndAssertError(result.summary(), expectedError)
566-
await collectAndAssertError(result.summary(), expectedError)
567-
await collectAndAssertError(result.summary(), expectedError)
615+
await collectAndAssertError(result.consume(), expectedError)
616+
await collectAndAssertError(result.consume(), expectedError)
617+
await collectAndAssertError(result.consume(), expectedError)
618+
}
619+
620+
/**
621+
* @param {ServerVersion} version
622+
* @param {RxSession|RxTransaction} runnable
623+
* @param {function(): Observable} closeFunc
624+
*/
625+
async function shouldFailOnResultWhenClosed (version, runnable, closeFunc) {
626+
if (version.compareTo(VERSION_4_0_0) < 0) {
627+
return
628+
}
629+
630+
const result = runnable.run('RETURN 1')
631+
await collectAndAssertEmpty(closeFunc())
632+
633+
const expectedError = jasmine.objectContaining({
634+
message: jasmine.stringMatching(/Cannot run statement/)
635+
})
636+
await collectAndAssertError(result.keys(), expectedError)
637+
await collectAndAssertError(result.records(), expectedError)
638+
await collectAndAssertError(result.consume(), expectedError)
568639
}
569640

570641
async function collectAndAssertKeys (result) {
@@ -602,7 +673,7 @@ describe('#integration-rx navigation', () => {
602673

603674
async function collectAndAssertSummary (result, expectedStatementType = 'r') {
604675
const summary = await result
605-
.summary()
676+
.consume()
606677
.pipe(
607678
map(s => s.statementType),
608679
materialize(),

test/rx/summary.test.js

+10-10
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ describe('#integration-rx summary', () => {
249249

250250
const summary = await runnable
251251
.run('UNWIND RANGE(1,10) AS n RETURN n')
252-
.summary()
252+
.consume()
253253
.toPromise()
254254

255255
expect(summary).toBeDefined()
@@ -553,7 +553,7 @@ describe('#integration-rx summary', () => {
553553

554554
const summary = await runnable
555555
.run('CREATE (n) RETURN n')
556-
.summary()
556+
.consume()
557557
.toPromise()
558558
expect(summary).toBeDefined()
559559
expect(summary.hasPlan()).toBeFalsy()
@@ -573,7 +573,7 @@ describe('#integration-rx summary', () => {
573573

574574
const summary = await runnable
575575
.run('EXPLAIN CREATE (n) RETURN n')
576-
.summary()
576+
.consume()
577577
.toPromise()
578578
expect(summary).toBeDefined()
579579
expect(summary.hasPlan()).toBeTruthy()
@@ -594,7 +594,7 @@ describe('#integration-rx summary', () => {
594594

595595
const summary = await runnable
596596
.run('PROFILE CREATE (n) RETURN n')
597-
.summary()
597+
.consume()
598598
.toPromise()
599599
expect(summary).toBeDefined()
600600
expect(summary.hasPlan()).toBeTruthy()
@@ -616,7 +616,7 @@ describe('#integration-rx summary', () => {
616616

617617
const summary = await runnable
618618
.run('CREATE (n) RETURN n')
619-
.summary()
619+
.consume()
620620
.toPromise()
621621
expect(summary).toBeDefined()
622622
expect(summary.notifications).toBeTruthy()
@@ -634,7 +634,7 @@ describe('#integration-rx summary', () => {
634634

635635
const summary = await runnable
636636
.run('EXPLAIN MATCH (n:ThisLabelDoesNotExist) RETURN n')
637-
.summary()
637+
.consume()
638638
.toPromise()
639639
expect(summary).toBeDefined()
640640
expect(summary.notifications).toBeTruthy()
@@ -664,7 +664,7 @@ describe('#integration-rx summary', () => {
664664
) {
665665
const summary = await runnable
666666
.run(statement, parameters)
667-
.summary()
667+
.consume()
668668
.toPromise()
669669
expect(summary).toBeDefined()
670670
expect(summary.statement).toBeDefined()
@@ -685,7 +685,7 @@ describe('#integration-rx summary', () => {
685685
) {
686686
const summary = await runnable
687687
.run(statement)
688-
.summary()
688+
.consume()
689689
.toPromise()
690690
expect(summary).toBeDefined()
691691
expect(summary.statementType).toBe(expectedStatementType)
@@ -701,7 +701,7 @@ describe('#integration-rx summary', () => {
701701
async function verifyUpdates (runnable, statement, parameters, stats) {
702702
const summary = await runnable
703703
.run(statement, parameters)
704-
.summary()
704+
.consume()
705705
.toPromise()
706706
expect(summary).toBeDefined()
707707
expect(summary.counters.containsUpdates()).toBeTruthy()
@@ -725,7 +725,7 @@ describe('#integration-rx summary', () => {
725725
) {
726726
const summary = await runnable
727727
.run(statement, parameters)
728-
.summary()
728+
.consume()
729729
.toPromise()
730730
expect(summary).toBeDefined()
731731

test/rx/transaction.test.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ describe('#integration-rx transaction', () => {
535535
)
536536
])
537537

538-
const summary = await result.summary().toPromise()
538+
const summary = await result.consume().toPromise()
539539
expect(summary).toBeTruthy()
540540
})
541541

@@ -588,15 +588,15 @@ describe('#integration-rx transaction', () => {
588588

589589
await txc
590590
.run('CREATE (n:Node {id: 1})')
591-
.summary()
591+
.consume()
592592
.toPromise()
593593
await txc
594594
.run('CREATE (n:Node {id: 2})')
595-
.summary()
595+
.consume()
596596
.toPromise()
597597
await txc
598598
.run('CREATE (n:Node {id: 1})')
599-
.summary()
599+
.consume()
600600
.toPromise()
601601

602602
await verifyCanCommitOrRollback(txc, commit)

test/temporal-types.test.js

-2
Original file line numberDiff line numberDiff line change
@@ -1460,8 +1460,6 @@ describe('#integration temporal-types', () => {
14601460

14611461
const receivedValue = records[0].get(0)
14621462
expect(receivedValue).toEqual(value)
1463-
1464-
await session.close()
14651463
}
14661464

14671465
async function testDurationToString (values) {

0 commit comments

Comments
 (0)