Skip to content

Commit 726ab3f

Browse files
committed
Introduce Transaction.close method (#847)
This method closes the transaction and rolled it back if it stills open.
1 parent 4237adf commit 726ab3f

File tree

8 files changed

+408
-0
lines changed

8 files changed

+408
-0
lines changed

packages/core/src/transaction.ts

+13
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,19 @@ class Transaction {
210210
return this._state === _states.ACTIVE
211211
}
212212

213+
/**
214+
* Closes the transaction
215+
*
216+
* This method will roll back the transaction if it is not already committed or rolled back.
217+
*
218+
* @returns {Promise<void>} An empty promise if closed successfully or error if any error happened during
219+
*/
220+
async close(): Promise<void> {
221+
if (this.isOpen()) {
222+
await this.rollback()
223+
}
224+
}
225+
213226
_onErrorCallback(err: any): Promise<Connection | void> {
214227
// error will be "acknowledged" by sending a RESET message
215228
// database will then forget about this transaction and cleanup all corresponding resources
+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { ConnectionProvider, Transaction } from "../src";
21+
import { Bookmark } from "../src/internal/bookmark";
22+
import { ConnectionHolder } from "../src/internal/connection-holder";
23+
import FakeConnection from "./utils/connection.fake";
24+
25+
describe('Transaction', () => {
26+
27+
describe('.close()', () => {
28+
describe('when transaction is open', () => {
29+
it('should roll back the transaction', async () => {
30+
const connection = newFakeConnection()
31+
const tx = newTransaction({ connection })
32+
33+
await tx.run('RETURN 1')
34+
await tx.close()
35+
36+
expect(connection.rollbackInvoked).toEqual(1)
37+
})
38+
39+
it('should surface errors during the rollback', async () => {
40+
const expectedError = new Error('rollback error')
41+
const connection = newFakeConnection().withRollbackError(expectedError)
42+
const tx = newTransaction({ connection })
43+
44+
await tx.run('RETURN 1')
45+
46+
try {
47+
await tx.close()
48+
fail('should have thrown')
49+
} catch (error) {
50+
expect(error).toEqual(expectedError)
51+
}
52+
})
53+
})
54+
55+
describe('when transaction is closed', () => {
56+
const commit = async (tx: Transaction) => tx.commit()
57+
const rollback = async (tx: Transaction) => tx.rollback()
58+
const error = async (tx: Transaction, conn: FakeConnection) => {
59+
conn.withRollbackError(new Error('rollback error'))
60+
return tx.rollback().catch(() => { })
61+
}
62+
63+
it.each([
64+
['commmited', commit],
65+
['rolled back', rollback],
66+
['with error', error]
67+
])('should not roll back the connection', async (_, operation) => {
68+
const connection = newFakeConnection()
69+
const tx = newTransaction({ connection })
70+
71+
await operation(tx, connection)
72+
const rollbackInvokedAfterOperation = connection.rollbackInvoked
73+
74+
await tx.close()
75+
76+
expect(connection.rollbackInvoked).toEqual(rollbackInvokedAfterOperation)
77+
})
78+
})
79+
})
80+
})
81+
82+
function newTransaction({
83+
connection,
84+
fetchSize = 1000,
85+
highRecordWatermark = 700,
86+
lowRecordWatermark = 300
87+
}: {
88+
connection: FakeConnection
89+
fetchSize?: number
90+
highRecordWatermark?: number,
91+
lowRecordWatermark?: number
92+
}): Transaction {
93+
const connectionProvider = new ConnectionProvider()
94+
connectionProvider.acquireConnection = () => Promise.resolve(connection)
95+
connectionProvider.close = () => Promise.resolve()
96+
97+
const connectionHolder = new ConnectionHolder({ connectionProvider })
98+
connectionHolder.initializeConnection()
99+
100+
const transaction = new Transaction({
101+
connectionHolder,
102+
onClose: () => { },
103+
onBookmark: (_: Bookmark) => { },
104+
onConnection: () => { },
105+
reactive: false,
106+
fetchSize,
107+
impersonatedUser: ""
108+
})
109+
110+
return transaction
111+
}
112+
113+
function newFakeConnection(): FakeConnection {
114+
return new FakeConnection()
115+
}
+183
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { Connection, ResultObserver, Record, ResultSummary } from '../../src'
21+
import { ResultStreamObserver } from '../../src/internal/observers'
22+
23+
24+
/**
25+
* This class is like a mock of {@link Connection} that tracks invocations count.
26+
* It tries to maintain same "interface" as {@link Connection}.
27+
* It could be replaced with a proper mock by a library like testdouble.
28+
* At the time of writing such libraries require {@link Proxy} support but browser tests execute in
29+
* PhantomJS which does not support proxies.
30+
*/
31+
export default class FakeConnection extends Connection {
32+
private _open: boolean
33+
private _requestRoutingInformationMock: ((params: any) => void) | null
34+
public creationTimestamp: number
35+
public resetInvoked: number
36+
public releaseInvoked: number
37+
public seenQueries: string[]
38+
public seenParameters: any[]
39+
public seenProtocolOptions: any[]
40+
private _server: any
41+
public protocolVersion: number | undefined
42+
public protocolErrorsHandled: number
43+
public seenProtocolErrors: string[]
44+
public seenRequestRoutingInformation: any[]
45+
public rollbackInvoked: number
46+
public _rollbackError: Error | null
47+
48+
constructor() {
49+
super()
50+
51+
this._open = true
52+
this._requestRoutingInformationMock = null
53+
this.creationTimestamp = Date.now()
54+
55+
this.resetInvoked = 0
56+
this.releaseInvoked = 0
57+
this.seenQueries = []
58+
this.seenParameters = []
59+
this.seenProtocolOptions = []
60+
this._server = {}
61+
this.protocolVersion = undefined
62+
this.protocolErrorsHandled = 0
63+
this.seenProtocolErrors = []
64+
this.seenRequestRoutingInformation = []
65+
this.rollbackInvoked = 0
66+
this._rollbackError = null
67+
}
68+
69+
protocol() {
70+
// return fake protocol object that simply records seen queries and parameters
71+
return {
72+
run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => {
73+
this.seenQueries.push(query)
74+
this.seenParameters.push(parameters)
75+
this.seenProtocolOptions.push(protocolOptions)
76+
return mockResultStreamObserver(query, parameters)
77+
},
78+
commitTransaction: () => {
79+
return mockResultStreamObserver('COMMIT', {})
80+
},
81+
beginTransaction: () => {
82+
return Promise.resolve()
83+
},
84+
rollbackTransaction: () => {
85+
this.rollbackInvoked ++
86+
if (this._rollbackError !== null) {
87+
return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError)
88+
}
89+
return mockResultStreamObserver('ROLLBACK', {})
90+
},
91+
requestRoutingInformation: (params: any | undefined) => {
92+
this.seenRequestRoutingInformation.push(params)
93+
if (this._requestRoutingInformationMock) {
94+
this._requestRoutingInformationMock(params)
95+
}
96+
},
97+
version: this.protocolVersion
98+
}
99+
}
100+
101+
resetAndFlush() {
102+
this.resetInvoked++
103+
return Promise.resolve()
104+
}
105+
106+
_release() {
107+
this.releaseInvoked++
108+
return Promise.resolve()
109+
}
110+
111+
isOpen() {
112+
return this._open
113+
}
114+
115+
isNeverReleased() {
116+
return this.isReleasedTimes(0)
117+
}
118+
119+
isReleasedOnce() {
120+
return this.isReleasedTimes(1)
121+
}
122+
123+
isReleasedTimes(times: number) {
124+
return this.resetInvoked === times && this.releaseInvoked === times
125+
}
126+
127+
_handleProtocolError(message: string) {
128+
this.protocolErrorsHandled++
129+
this.seenProtocolErrors.push(message)
130+
}
131+
132+
withProtocolVersion(version: number) {
133+
this.protocolVersion = version
134+
return this
135+
}
136+
137+
withCreationTimestamp(value: number) {
138+
this.creationTimestamp = value
139+
return this
140+
}
141+
142+
withRequestRoutingInformationMock(requestRoutingInformationMock: (params: any) => void) {
143+
this._requestRoutingInformationMock = requestRoutingInformationMock
144+
return this
145+
}
146+
147+
withRollbackError(error: Error) {
148+
this._rollbackError = error
149+
return this
150+
}
151+
152+
closed() {
153+
this._open = false
154+
return this
155+
}
156+
}
157+
158+
function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error) {
159+
const observer = mockResultStreamObserver(query, parameters)
160+
observer.subscribe = (observer: ResultObserver) => {
161+
if (observer && observer.onError) {
162+
observer.onError(error)
163+
}
164+
}
165+
return observer
166+
}
167+
168+
function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver {
169+
return {
170+
onError: (error: any) => { },
171+
onCompleted: () => { },
172+
onNext: (result: any) => { },
173+
cancel: () => { },
174+
prepareToHandleSingleResponse: () => { },
175+
markCompleted: () => { },
176+
subscribe: (observer: ResultObserver) => {
177+
if (observer && observer.onCompleted) {
178+
observer.onCompleted(new ResultSummary(query, parameters, {}))
179+
}
180+
181+
}
182+
}
183+
}

packages/neo4j-driver/src/transaction-rx.js

+18
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,22 @@ export default class RxTransaction {
9090
.catch(err => observer.error(err))
9191
})
9292
}
93+
94+
/**
95+
* Closes the transaction
96+
*
97+
* This method will roll back the transaction if it is not already committed or rolled back.
98+
*
99+
* @returns {Observable} - An empty observable
100+
*/
101+
close () {
102+
return new Observable(observer => {
103+
this._txc
104+
.close()
105+
.then(() => {
106+
observer.complete()
107+
})
108+
.catch(err => observer.error(err))
109+
})
110+
}
93111
}

0 commit comments

Comments
 (0)