Skip to content

Commit 988410e

Browse files
authored
Merge pull request #512 from zhenlineo/1.7-conn-error
Stop accepting more messages after a connection is reset
2 parents 1a9bf99 + 78dbafc commit 988410e

File tree

5 files changed

+81
-34
lines changed

5 files changed

+81
-34
lines changed

src/v1/session.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ class Session {
9696
? new TxConfig(transactionConfig)
9797
: TxConfig.empty()
9898

99-
return this._run(query, params, (connection, streamObserver) =>
99+
return this._run(query, params, (connection, streamObserver) => {
100+
this._assertSessionIsOpen()
100101
connection
101102
.protocol()
102103
.run(
@@ -107,7 +108,7 @@ class Session {
107108
this._mode,
108109
streamObserver
109110
)
110-
)
111+
})
111112
}
112113

113114
_run (statement, parameters, statementRunner) {
@@ -185,12 +186,19 @@ class Session {
185186
const tx = new Transaction(
186187
connectionHolder,
187188
this._transactionClosed.bind(this),
188-
this._updateBookmark.bind(this)
189+
this._updateBookmark.bind(this),
190+
this._assertSessionIsOpen.bind(this)
189191
)
190192
tx._begin(this._lastBookmark, txConfig)
191193
return tx
192194
}
193195

196+
_assertSessionIsOpen () {
197+
if (!this._open) {
198+
throw newError('You cannot run more transactions on a closed session.')
199+
}
200+
}
201+
194202
_transactionClosed () {
195203
this._hasTx = false
196204
}

src/v1/transaction.js

+45-26
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,24 @@ class Transaction {
3434
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
3535
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
3636
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
37+
* @param onConnection callback invoked when a connection is obtained from connection holder. This need to be
38+
* called to see if the connection is in the process of being released.
3739
*/
38-
constructor (connectionHolder, onClose, onBookmark) {
40+
constructor (connectionHolder, onClose, onBookmark, onConnection) {
3941
this._connectionHolder = connectionHolder
4042
this._state = _states.ACTIVE
4143
this._onClose = onClose
4244
this._onBookmark = onBookmark
45+
this._onConnection = onConnection
4346
}
4447

4548
_begin (bookmark, txConfig) {
4649
const streamObserver = new _TransactionStreamObserver(this)
4750

4851
this._connectionHolder
4952
.getConnection(streamObserver)
50-
.then(conn =>
53+
.then(conn => {
54+
this._onConnection()
5155
conn
5256
.protocol()
5357
.beginTransaction(
@@ -56,7 +60,7 @@ class Transaction {
5660
this._connectionHolder.mode(),
5761
streamObserver
5862
)
59-
)
63+
})
6064
.catch(error => streamObserver.onError(error))
6165
}
6266

@@ -78,7 +82,8 @@ class Transaction {
7882
this._connectionHolder,
7983
new _TransactionStreamObserver(this),
8084
query,
81-
params
85+
params,
86+
this._onConnection
8287
)
8388
}
8489

@@ -90,9 +95,10 @@ class Transaction {
9095
* @returns {Result} New Result
9196
*/
9297
commit () {
93-
let committed = this._state.commit(
98+
const committed = this._state.commit(
9499
this._connectionHolder,
95-
new _TransactionStreamObserver(this)
100+
new _TransactionStreamObserver(this),
101+
this._onConnection
96102
)
97103
this._state = committed.state
98104
// clean up
@@ -108,9 +114,10 @@ class Transaction {
108114
* @returns {Result} New Result
109115
*/
110116
rollback () {
111-
let committed = this._state.rollback(
117+
const committed = this._state.rollback(
112118
this._connectionHolder,
113-
new _TransactionStreamObserver(this)
119+
new _TransactionStreamObserver(this),
120+
this._onConnection
114121
)
115122
this._state = committed.state
116123
// clean up
@@ -161,29 +168,40 @@ class _TransactionStreamObserver extends StreamObserver {
161168
}
162169

163170
/** internal state machine of the transaction */
164-
let _states = {
171+
const _states = {
165172
// The transaction is running with no explicit success or failure marked
166173
ACTIVE: {
167-
commit: (connectionHolder, observer) => {
174+
commit: (connectionHolder, observer, onConnection) => {
168175
return {
169-
result: finishTransaction(true, connectionHolder, observer),
176+
result: finishTransaction(
177+
true,
178+
connectionHolder,
179+
observer,
180+
onConnection
181+
),
170182
state: _states.SUCCEEDED
171183
}
172184
},
173-
rollback: (connectionHolder, observer) => {
185+
rollback: (connectionHolder, observer, onConnection) => {
174186
return {
175-
result: finishTransaction(false, connectionHolder, observer),
187+
result: finishTransaction(
188+
false,
189+
connectionHolder,
190+
observer,
191+
onConnection
192+
),
176193
state: _states.ROLLED_BACK
177194
}
178195
},
179-
run: (connectionHolder, observer, statement, parameters) => {
196+
run: (connectionHolder, observer, statement, parameters, onConnection) => {
180197
// RUN in explicit transaction can't contain bookmarks and transaction configuration
181198
const bookmark = Bookmark.empty()
182199
const txConfig = TxConfig.empty()
183200

184201
connectionHolder
185202
.getConnection(observer)
186-
.then(conn =>
203+
.then(conn => {
204+
onConnection()
187205
conn
188206
.protocol()
189207
.run(
@@ -194,7 +212,7 @@ let _states = {
194212
connectionHolder.mode(),
195213
observer
196214
)
197-
)
215+
})
198216
.catch(error => observer.onError(error))
199217

200218
return _newRunResult(observer, statement, parameters, () =>
@@ -206,7 +224,7 @@ let _states = {
206224
// An error has occurred, transaction can no longer be used and no more messages will
207225
// be sent for this transaction.
208226
FAILED: {
209-
commit: (connectionHolder, observer) => {
227+
commit: (connectionHolder, observer, onConnection) => {
210228
observer.onError({
211229
error:
212230
'Cannot commit statements in this transaction, because previous statements in the ' +
@@ -218,14 +236,14 @@ let _states = {
218236
state: _states.FAILED
219237
}
220238
},
221-
rollback: (connectionHolder, observer) => {
239+
rollback: (connectionHolder, observer, onConnection) => {
222240
observer.markCompleted()
223241
return {
224242
result: _newDummyResult(observer, 'ROLLBACK', {}),
225243
state: _states.FAILED
226244
}
227245
},
228-
run: (connectionHolder, observer, statement, parameters) => {
246+
run: (connectionHolder, observer, statement, parameters, onConnection) => {
229247
observer.onError({
230248
error:
231249
'Cannot run statement, because previous statements in the ' +
@@ -237,7 +255,7 @@ let _states = {
237255

238256
// This transaction has successfully committed
239257
SUCCEEDED: {
240-
commit: (connectionHolder, observer) => {
258+
commit: (connectionHolder, observer, onConnection) => {
241259
observer.onError({
242260
error:
243261
'Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new' +
@@ -248,7 +266,7 @@ let _states = {
248266
state: _states.SUCCEEDED
249267
}
250268
},
251-
rollback: (connectionHolder, observer) => {
269+
rollback: (connectionHolder, observer, onConnection) => {
252270
observer.onError({
253271
error:
254272
'Cannot rollback transaction, because transaction has already been successfully closed.'
@@ -258,7 +276,7 @@ let _states = {
258276
state: _states.SUCCEEDED
259277
}
260278
},
261-
run: (connectionHolder, observer, statement, parameters) => {
279+
run: (connectionHolder, observer, statement, parameters, onConnection) => {
262280
observer.onError({
263281
error:
264282
'Cannot run statement, because transaction has already been successfully closed.'
@@ -269,7 +287,7 @@ let _states = {
269287

270288
// This transaction has been rolled back
271289
ROLLED_BACK: {
272-
commit: (connectionHolder, observer) => {
290+
commit: (connectionHolder, observer, onConnection) => {
273291
observer.onError({
274292
error:
275293
'Cannot commit this transaction, because it has already been rolled back.'
@@ -279,7 +297,7 @@ let _states = {
279297
state: _states.ROLLED_BACK
280298
}
281299
},
282-
rollback: (connectionHolder, observer) => {
300+
rollback: (connectionHolder, observer, onConnection) => {
283301
observer.onError({
284302
error:
285303
'Cannot rollback transaction, because transaction has already been rolled back.'
@@ -289,7 +307,7 @@ let _states = {
289307
state: _states.ROLLED_BACK
290308
}
291309
},
292-
run: (connectionHolder, observer, statement, parameters) => {
310+
run: (connectionHolder, observer, statement, parameters, onConnection) => {
293311
observer.onError({
294312
error:
295313
'Cannot run statement, because transaction has already been rolled back.'
@@ -299,10 +317,11 @@ let _states = {
299317
}
300318
}
301319

302-
function finishTransaction (commit, connectionHolder, observer) {
320+
function finishTransaction (commit, connectionHolder, observer, onConnection) {
303321
connectionHolder
304322
.getConnection(observer)
305323
.then(connection => {
324+
onConnection()
306325
if (commit) {
307326
return connection.protocol().commitTransaction(observer)
308327
} else {

test/internal/node/routing.driver.boltkit.test.js

+4-3
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,7 @@ describe('routing driver with stub server', () => {
18211821
)
18221822

18231823
boltStub.run(() => {
1824+
const session = driver.session(READ)
18241825
session
18251826
.readTransaction(tx =>
18261827
tx.run('MATCH (n) RETURN n.name AS name')
@@ -2328,7 +2329,7 @@ describe('routing driver with stub server', () => {
23282329
const session = driver.session(WRITE, bookmarks)
23292330
const tx = session.beginTransaction()
23302331

2331-
tx.run(`CREATE (n {name:'Bob'})`).then(() => {
2332+
tx.run('CREATE (n {name:\'Bob\'})').then(() => {
23322333
tx.commit().then(() => {
23332334
expect(session.lastBookmark()).toEqual('neo4j:bookmark:v1:tx95')
23342335

@@ -2348,11 +2349,11 @@ describe('routing driver with stub server', () => {
23482349
})
23492350

23502351
it('should forget writer on database unavailable error', done => {
2351-
testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done)
2352+
testAddressPurgeOnDatabaseError('CREATE (n {name:\'Bob\'})', WRITE, done)
23522353
})
23532354

23542355
it('should forget reader on database unavailable error', done => {
2355-
testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done)
2356+
testAddressPurgeOnDatabaseError('RETURN 1', READ, done)
23562357
})
23572358

23582359
it('should use resolver function that returns array during first discovery', done => {

test/v1/temporal-types.test.js

-2
Original file line numberDiff line numberDiff line change
@@ -1480,7 +1480,6 @@ describe('temporal-types', () => {
14801480
const value = records[0].get(0)
14811481
expect(value).toEqual(expectedValue)
14821482

1483-
session.close()
14841483
done()
14851484
})
14861485
.catch(error => {
@@ -1498,7 +1497,6 @@ describe('temporal-types', () => {
14981497
const receivedValue = records[0].get(0)
14991498
expect(receivedValue).toEqual(value)
15001499

1501-
session.close()
15021500
done()
15031501
})
15041502
.catch(error => {

test/v1/transaction.test.js

+21
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,27 @@ describe('transaction', () => {
572572
})
573573
})
574574

575+
it('should reset transaction', done => {
576+
const RetryTimeoutLimit = 10000
577+
const TransactionTimeout = 30000
578+
579+
const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {
580+
maxTransactionRetryTime: RetryTimeoutLimit,
581+
connectionTimeout: TransactionTimeout
582+
})
583+
const session = driver.session()
584+
const runPromise = session
585+
.readTransaction(transaction => transaction.run('RETURN 1'))
586+
.catch(error => {
587+
expect(error.message).toBe(
588+
'You cannot run more transactions on a closed session.'
589+
)
590+
driver.close()
591+
done()
592+
})
593+
session.close() // This will interrupt runPromise to reset the transaction
594+
})
595+
575596
function expectSyntaxError (error) {
576597
expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError')
577598
}

0 commit comments

Comments
 (0)