Skip to content

Commit 97db791

Browse files
author
Zhen Li
committed
Commit and rollback shall cancel unfinished streaming
However session.close will send reset which cancels query execution.
1 parent f4f6d6b commit 97db791

14 files changed

+192
-49
lines changed

src/internal/stream-observers.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,9 @@ class ResultStreamObserver extends StreamObserver {
282282
}
283283

284284
/**
285-
* Discard pending record stream
285+
* Cancel pending record stream
286286
*/
287-
discard () {
287+
cancel () {
288288
this._discard = true
289289
}
290290

src/result-rx.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ export default class RxResult {
134134
})
135135

136136
if (this._records.observers.length === 0) {
137-
result._discard()
137+
result._cancel()
138138
}
139139

140140
result.subscribe({

src/result.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class Result {
8585
summary () {
8686
return new Promise((resolve, reject) => {
8787
this._streamObserverPromise.then(o => {
88+
o.cancel()
8889
o.subscribe({
8990
onCompleted: metadata => resolve(metadata),
9091
onError: err => reject(err)
@@ -192,8 +193,8 @@ class Result {
192193
* @protected
193194
* @since 4.0.0
194195
*/
195-
_discard () {
196-
this._streamObserverPromise.then(o => o.discard())
196+
_cancel () {
197+
this._streamObserverPromise.then(o => o.cancel())
197198
}
198199
}
199200

src/transaction.js

+50-23
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Transaction {
5454
this._onError = this._onErrorCallback.bind(this)
5555
this._onComplete = this._onCompleteCallback.bind(this)
5656
this._fetchSize = fetchSize
57+
this._results = []
5758
}
5859

5960
_begin (bookmark, txConfig) {
@@ -86,27 +87,30 @@ class Transaction {
8687
parameters
8788
)
8889

89-
return this._state.run(query, params, {
90+
var result = this._state.run(query, params, {
9091
connectionHolder: this._connectionHolder,
9192
onError: this._onError,
9293
onComplete: this._onComplete,
9394
reactive: this._reactive,
9495
fetchSize: this._fetchSize
9596
})
97+
this._results.push(result)
98+
return result
9699
}
97100

98101
/**
99102
* Commits the transaction and returns the result.
100103
*
101104
* After committing the transaction can no longer be used.
102105
*
103-
* @returns {Result} New Result
106+
* @returns {Promise<void>} An empty promise if committed successfully or error if any error happened during commit.
104107
*/
105108
commit () {
106109
const committed = this._state.commit({
107110
connectionHolder: this._connectionHolder,
108111
onError: this._onError,
109-
onComplete: this._onComplete
112+
onComplete: this._onComplete,
113+
pendingResults: this._results
110114
})
111115
this._state = committed.state
112116
// clean up
@@ -124,13 +128,15 @@ class Transaction {
124128
*
125129
* After rolling back, the transaction can no longer be used.
126130
*
127-
* @returns {Result} New Result
131+
* @returns {Promise<void>} An empty promise if rolled back successfully or error if any error happened during
132+
* rollback.
128133
*/
129134
rollback () {
130135
const rolledback = this._state.rollback({
131136
connectionHolder: this._connectionHolder,
132137
onError: this._onError,
133-
onComplete: this._onComplete
138+
onComplete: this._onComplete,
139+
pendingResults: this._results
134140
})
135141
this._state = rolledback.state
136142
// clean up
@@ -170,15 +176,27 @@ class Transaction {
170176
const _states = {
171177
// The transaction is running with no explicit success or failure marked
172178
ACTIVE: {
173-
commit: ({ connectionHolder, onError, onComplete }) => {
179+
commit: ({ connectionHolder, onError, onComplete, pendingResults }) => {
174180
return {
175-
result: finishTransaction(true, connectionHolder, onError, onComplete),
181+
result: finishTransaction(
182+
true,
183+
connectionHolder,
184+
onError,
185+
onComplete,
186+
pendingResults
187+
),
176188
state: _states.SUCCEEDED
177189
}
178190
},
179-
rollback: ({ connectionHolder, onError, onComplete }) => {
191+
rollback: ({ connectionHolder, onError, onComplete, pendingResults }) => {
180192
return {
181-
result: finishTransaction(false, connectionHolder, onError, onComplete),
193+
result: finishTransaction(
194+
false,
195+
connectionHolder,
196+
onError,
197+
onComplete,
198+
pendingResults
199+
),
182200
state: _states.ROLLED_BACK
183201
}
184202
},
@@ -188,14 +206,13 @@ const _states = {
188206
{ connectionHolder, onError, onComplete, reactive, fetchSize }
189207
) => {
190208
// RUN in explicit transaction can't contain bookmarks and transaction configuration
209+
// No need to include mode and database name as it shall be inclued in begin
191210
const observerPromise = connectionHolder
192211
.getConnection()
193212
.then(conn =>
194213
conn.protocol().run(statement, parameters, {
195214
bookmark: Bookmark.empty(),
196215
txConfig: TxConfig.empty(),
197-
mode: connectionHolder.mode(),
198-
database: connectionHolder.database(),
199216
beforeError: onError,
200217
afterComplete: onComplete,
201218
reactive: reactive,
@@ -356,22 +373,32 @@ const _states = {
356373
* @param {ConnectionHolder} connectionHolder
357374
* @param {function(err:Error): any} onError
358375
* @param {function(metadata:object): any} onComplete
376+
* @param {list<Result>>}pendingResults all run results in this transaction
359377
*/
360-
function finishTransaction (commit, connectionHolder, onError, onComplete) {
378+
function finishTransaction (
379+
commit,
380+
connectionHolder,
381+
onError,
382+
onComplete,
383+
pendingResults
384+
) {
361385
const observerPromise = connectionHolder
362386
.getConnection()
363387
.then(connection => {
364-
if (commit) {
365-
return connection.protocol().commitTransaction({
366-
beforeError: onError,
367-
afterComplete: onComplete
368-
})
369-
} else {
370-
return connection.protocol().rollbackTransaction({
371-
beforeError: onError,
372-
afterComplete: onComplete
373-
})
374-
}
388+
pendingResults.forEach(r => r.summary())
389+
return Promise.all(pendingResults).then(results => {
390+
if (commit) {
391+
return connection.protocol().commitTransaction({
392+
beforeError: onError,
393+
afterComplete: onComplete
394+
})
395+
} else {
396+
return connection.protocol().rollbackTransaction({
397+
beforeError: onError,
398+
afterComplete: onComplete
399+
})
400+
}
401+
})
375402
})
376403
.catch(error => new FailedObserver({ error, onError }))
377404

test/internal/node/direct.driver.boltkit.test.js

+93
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,99 @@ describe('#stub-direct direct driver with stub server', () => {
585585
it('v2', () => verifyFailureOnCommit('v2'))
586586
})
587587

588+
describe('should cancel stream with result summary method', () => {
589+
async function verifyFailureOnCommit (version) {
590+
if (!boltStub.supported) {
591+
return
592+
}
593+
594+
const server = await boltStub.start(
595+
`./test/resources/boltstub/${version}/read_discard.script`,
596+
9001
597+
)
598+
599+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
600+
const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 })
601+
602+
const result = session.run('MATCH (n) RETURN n.name')
603+
await result.summary()
604+
const records = (await result).records
605+
expect(records.length).toEqual(2)
606+
expect(records[0].get(0)).toBe('Bob')
607+
expect(records[1].get(0)).toBe('Alice')
608+
609+
const connectionKey = Object.keys(openConnections(driver))[0]
610+
expect(connectionKey).toBeTruthy()
611+
612+
const connection = openConnections(driver, connectionKey)
613+
await session.close()
614+
615+
// generate a fake fatal error
616+
connection._handleFatalError(
617+
newError('connection reset', SERVICE_UNAVAILABLE)
618+
)
619+
620+
// expect that the connection to be removed from the pool
621+
expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0)
622+
expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy()
623+
// expect that the connection to be unregistered from the open connections registry
624+
expect(openConnections(driver, connectionKey)).toBeFalsy()
625+
626+
await driver.close()
627+
await server.exit()
628+
}
629+
630+
it('v4', () => verifyFailureOnCommit('v4'))
631+
})
632+
633+
describe('should cancel stream with tx commit', () => {
634+
async function verifyFailureOnCommit (version) {
635+
if (!boltStub.supported) {
636+
return
637+
}
638+
639+
const server = await boltStub.start(
640+
`./test/resources/boltstub/${version}/read_tx_discard.script`,
641+
9001
642+
)
643+
644+
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
645+
const session = driver.session({ defaultAccessMode: READ, fetchSize: 2 })
646+
const tx = session.beginTransaction()
647+
648+
const result = tx.run('MATCH (n) RETURN n.name')
649+
await tx.commit()
650+
651+
// Client will receive a partial result
652+
const records = (await result).records
653+
expect(records.length).toEqual(2)
654+
expect(records[0].get(0)).toBe('Bob')
655+
expect(records[1].get(0)).toBe('Alice')
656+
657+
const connectionKey = Object.keys(openConnections(driver))[0]
658+
expect(connectionKey).toBeTruthy()
659+
660+
const connection = openConnections(driver, connectionKey)
661+
await session.close()
662+
663+
// generate a fake fatal error
664+
connection._handleFatalError(
665+
newError('connection reset', SERVICE_UNAVAILABLE)
666+
)
667+
668+
// expect that the connection to be removed from the pool
669+
expect(connectionPool(driver, '127.0.0.1:9001').length).toEqual(0)
670+
expect(activeResources(driver, '127.0.0.1:9001')).toBeFalsy()
671+
// expect that the connection to be unregistered from the open connections registry
672+
expect(openConnections(driver, connectionKey)).toBeFalsy()
673+
674+
await driver.close()
675+
await server.exit()
676+
}
677+
678+
fit('v4', () => verifyFailureOnCommit('v4'))
679+
})
680+
588681
function connectionPool (driver, key) {
589682
return driver._connectionProvider._connectionPool._pools[key]
590683
}

test/resources/boltstub/v3/acquire_endpoints_self_as_reader.script

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ S: SUCCESS {"fields": ["ttl", "servers"]}
99
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001","127.0.0.1:9009","127.0.0.1:9010"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9011"], "role": "ROUTE"}]]
1010
SUCCESS {}
1111
C: BEGIN {"mode": "r"}
12-
RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"}
12+
RUN "MATCH (n) RETURN n.name AS name" {} {}
1313
PULL_ALL
1414
S: SUCCESS {}
1515
SUCCESS {"fields": ["name"]}

test/resources/boltstub/v3/read_tx.script

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
C: BEGIN {"mode": "r"}
77
S: SUCCESS {}
8-
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
8+
C: RUN "MATCH (n) RETURN n.name" {} {}
99
PULL_ALL
1010
S: SUCCESS {"fields": ["n.name"]}
1111
RECORD ["Bob"]

test/resources/boltstub/v3/read_tx_dead.script

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
!: AUTO RESET
44

55
C: BEGIN {"mode": "r"}
6-
RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
6+
RUN "MATCH (n) RETURN n.name" {} {}
77
PULL_ALL
88
S: SUCCESS {"fields": []}
99
<EXIT>

test/resources/boltstub/v3/read_tx_with_bookmarks.script

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
!: AUTO GOODBYE
55

66
C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"], "mode": "r"}
7-
C: RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"}
7+
C: RUN "MATCH (n) RETURN n.name AS name" {} {}
88
PULL_ALL
99
S: SUCCESS {}
1010
SUCCESS {"fields": ["name"]}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
!: BOLT 4
2+
!: AUTO HELLO
3+
!: AUTO RESET
4+
!: AUTO GOODBYE
5+
6+
C: RUN "MATCH (n) RETURN n.name" {} {"mode": "r"}
7+
PULL {"n": 2}
8+
S: SUCCESS {"fields": ["n.name"]}
9+
RECORD ["Bob"]
10+
RECORD ["Alice"]
11+
SUCCESS {"has_more":true}
12+
C: DISCARD {"n": -1}
13+
S: SUCCESS {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
!: BOLT 4
2+
!: AUTO HELLO
3+
!: AUTO RESET
4+
!: AUTO GOODBYE
5+
6+
C: BEGIN {"mode": "r"}
7+
RUN "MATCH (n) RETURN n.name" {} {}
8+
PULL {"n": 2}
9+
S: SUCCESS {}
10+
SUCCESS {"fields": ["n.name"]}
11+
RECORD ["Bob"]
12+
RECORD ["Alice"]
13+
SUCCESS {"has_more":true}
14+
C: DISCARD {"n": -1}
15+
S: SUCCESS {}
16+
C: COMMIT
17+
S: SUCCESS {"bookmark": "neo4j:bookmark:v1:tx4242"}

test/resources/boltstub/v4/read_tx_with_bookmarks.script

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
!: AUTO GOODBYE
55

66
C: BEGIN {"bookmarks": ["neo4j:bookmark:v1:tx42"], "mode": "r"}
7-
C: RUN "MATCH (n) RETURN n.name AS name" {} {"mode": "r"}
7+
C: RUN "MATCH (n) RETURN n.name AS name" {} {}
88
PULL { "n": 1000 }
99
S: SUCCESS {}
1010
SUCCESS {"fields": ["name"]}

test/types/transaction.test.ts

+6-14
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,10 @@ result4.subscribe({
8383
onCompleted: (summary: ResultSummary) => console.log(summary)
8484
})
8585

86-
tx.commit()
87-
.then((res: StatementResult) => {
88-
console.log(res)
89-
})
90-
.catch((error: Error) => {
91-
console.log(error)
92-
})
86+
tx.commit().then(() => {
87+
console.log('transaction committed')
88+
})
9389

94-
tx.rollback()
95-
.then((res: StatementResult) => {
96-
console.log(res)
97-
})
98-
.catch((error: Error) => {
99-
console.log(error)
100-
})
90+
tx.rollback().then(() => {
91+
console.log('transaction rolled back')
92+
})

0 commit comments

Comments
 (0)