Skip to content

Commit bf42b22

Browse files
committed
Introduce Transaction.close method (#847)
This method closes the transaction and rolled it back if it stills open.
1 parent 4237adf commit bf42b22

File tree

8 files changed

+476
-0
lines changed

8 files changed

+476
-0
lines changed

packages/core/src/transaction.ts

+13
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,19 @@ class Transaction {
210210
return this._state === _states.ACTIVE
211211
}
212212

213+
/**
214+
* Closes the transaction
215+
*
216+
* This method will roll back the transaction if it is not already committed or rolled back.
217+
*
218+
* @returns {Promise<void>} An empty promise if closed successfully or error if any error happened during
219+
*/
220+
async close(): Promise<void> {
221+
if (this.isOpen()) {
222+
await this.rollback()
223+
}
224+
}
225+
213226
_onErrorCallback(err: any): Promise<Connection | void> {
214227
// error will be "acknowledged" by sending a RESET message
215228
// database will then forget about this transaction and cleanup all corresponding resources
+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { ConnectionProvider, Transaction } from "../src";
21+
import { Bookmark } from "../src/internal/bookmark";
22+
import { ConnectionHolder } from "../src/internal/connection-holder";
23+
import FakeConnection from "./utils/connection.fake";
24+
25+
describe('Transaction', () => {
26+
27+
describe('.run()', () => {
28+
it('should call run with watermarks', async () => {
29+
const connection = newFakeConnection()
30+
const tx = newTransaction({
31+
connection,
32+
fetchSize: 1000,
33+
highRecordWatermark: 700,
34+
lowRecordWatermark: 300
35+
})
36+
37+
await tx.run('RETURN 1')
38+
39+
expect(connection.seenProtocolOptions[0]).toMatchObject({
40+
fetchSize: 1000,
41+
lowRecordWatermark: 300,
42+
highRecordWatermark: 700
43+
})
44+
})
45+
46+
it('should configure result with watermarks', async () => {
47+
const connection = newFakeConnection()
48+
const tx = newTransaction({
49+
connection,
50+
fetchSize: 1000,
51+
highRecordWatermark: 700,
52+
lowRecordWatermark: 300
53+
})
54+
55+
var result = tx.run('RETURN 1')
56+
57+
// @ts-ignore
58+
expect(result._watermarks).toEqual({ high: 700, low: 300 })
59+
})
60+
61+
})
62+
63+
describe('.close()', () => {
64+
describe('when transaction is open', () => {
65+
it('should roll back the transaction', async () => {
66+
const connection = newFakeConnection()
67+
const tx = newTransaction({ connection })
68+
69+
await tx.run('RETURN 1')
70+
await tx.close()
71+
72+
expect(connection.rollbackInvoked).toEqual(1)
73+
})
74+
75+
it('should surface errors during the rollback', async () => {
76+
const expectedError = new Error('rollback error')
77+
const connection = newFakeConnection().withRollbackError(expectedError)
78+
const tx = newTransaction({ connection })
79+
80+
await tx.run('RETURN 1')
81+
82+
try {
83+
await tx.close()
84+
fail('should have thrown')
85+
} catch (error) {
86+
expect(error).toEqual(expectedError)
87+
}
88+
})
89+
})
90+
91+
describe('when transaction is closed', () => {
92+
const commit = async (tx: Transaction) => tx.commit()
93+
const rollback = async (tx: Transaction) => tx.rollback()
94+
const error = async (tx: Transaction, conn: FakeConnection) => {
95+
conn.withRollbackError(new Error('rollback error'))
96+
return tx.rollback().catch(() => { })
97+
}
98+
99+
it.each([
100+
['commmited', commit],
101+
['rolled back', rollback],
102+
['with error', error]
103+
])('should not roll back the connection', async (_, operation) => {
104+
const connection = newFakeConnection()
105+
const tx = newTransaction({ connection })
106+
107+
await operation(tx, connection)
108+
const rollbackInvokedAfterOperation = connection.rollbackInvoked
109+
110+
await tx.close()
111+
112+
expect(connection.rollbackInvoked).toEqual(rollbackInvokedAfterOperation)
113+
})
114+
})
115+
})
116+
})
117+
118+
function newTransaction({
119+
connection,
120+
fetchSize = 1000,
121+
highRecordWatermark = 700,
122+
lowRecordWatermark = 300
123+
}: {
124+
connection: FakeConnection
125+
fetchSize?: number
126+
highRecordWatermark?: number,
127+
lowRecordWatermark?: number
128+
}): Transaction {
129+
const connectionProvider = new ConnectionProvider()
130+
connectionProvider.acquireConnection = () => Promise.resolve(connection)
131+
connectionProvider.close = () => Promise.resolve()
132+
133+
const connectionHolder = new ConnectionHolder({ connectionProvider })
134+
connectionHolder.initializeConnection()
135+
136+
const transaction = new Transaction({
137+
connectionHolder,
138+
onClose: () => { },
139+
onBookmark: (_: Bookmark) => { },
140+
onConnection: () => { },
141+
reactive: false,
142+
fetchSize,
143+
impersonatedUser: "",
144+
highRecordWatermark,
145+
lowRecordWatermark
146+
})
147+
148+
return transaction
149+
}
150+
151+
function newFakeConnection(): FakeConnection {
152+
return new FakeConnection()
153+
}
+213
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { Connection, ResultObserver, Record, ResultSummary } from '../../src'
21+
import { ResultStreamObserver } from '../../src/internal/observers'
22+
23+
24+
/**
25+
* This class is like a mock of {@link Connection} that tracks invocations count.
26+
* It tries to maintain same "interface" as {@link Connection}.
27+
* It could be replaced with a proper mock by a library like testdouble.
28+
* At the time of writing such libraries require {@link Proxy} support but browser tests execute in
29+
* PhantomJS which does not support proxies.
30+
*/
31+
export default class FakeConnection extends Connection {
32+
private _open: boolean
33+
private _id: number
34+
private _databaseId: string | null
35+
private _requestRoutingInformationMock: ((params: any) => void) | null
36+
public creationTimestamp: number
37+
public resetInvoked: number
38+
public releaseInvoked: number
39+
public seenQueries: string[]
40+
public seenParameters: any[]
41+
public seenProtocolOptions: any[]
42+
private _server: any
43+
public protocolVersion: number | undefined
44+
public protocolErrorsHandled: number
45+
public seenProtocolErrors: string[]
46+
public seenRequestRoutingInformation: any[]
47+
public rollbackInvoked: number
48+
public _rollbackError: Error | null
49+
50+
constructor() {
51+
super()
52+
53+
this._open = true
54+
this._id = 0
55+
this._databaseId = null
56+
this._requestRoutingInformationMock = null
57+
this.creationTimestamp = Date.now()
58+
59+
this.resetInvoked = 0
60+
this.releaseInvoked = 0
61+
this.seenQueries = []
62+
this.seenParameters = []
63+
this.seenProtocolOptions = []
64+
this._server = {}
65+
this.protocolVersion = undefined
66+
this.protocolErrorsHandled = 0
67+
this.seenProtocolErrors = []
68+
this.seenRequestRoutingInformation = []
69+
this.rollbackInvoked = 0
70+
this._rollbackError = null
71+
}
72+
73+
get id(): string {
74+
return this._id.toString()
75+
}
76+
77+
get databaseId(): string {
78+
return this._databaseId!!
79+
}
80+
81+
set databaseId(value) {
82+
this._databaseId = value
83+
}
84+
85+
get server() {
86+
return this._server
87+
}
88+
89+
get version() {
90+
return this._server.version
91+
}
92+
93+
set version(value) {
94+
this._server.version = value
95+
}
96+
97+
protocol() {
98+
// return fake protocol object that simply records seen queries and parameters
99+
return {
100+
run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => {
101+
this.seenQueries.push(query)
102+
this.seenParameters.push(parameters)
103+
this.seenProtocolOptions.push(protocolOptions)
104+
return mockResultStreamObserver(query, parameters)
105+
},
106+
commitTransaction: () => {
107+
return mockResultStreamObserver('COMMIT', {})
108+
},
109+
beginTransaction: () => {
110+
return Promise.resolve()
111+
},
112+
rollbackTransaction: () => {
113+
this.rollbackInvoked ++
114+
if (this._rollbackError !== null) {
115+
return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError)
116+
}
117+
return mockResultStreamObserver('ROLLBACK', {})
118+
},
119+
requestRoutingInformation: (params: any | undefined) => {
120+
this.seenRequestRoutingInformation.push(params)
121+
if (this._requestRoutingInformationMock) {
122+
this._requestRoutingInformationMock(params)
123+
}
124+
},
125+
version: this.protocolVersion
126+
}
127+
}
128+
129+
resetAndFlush() {
130+
this.resetInvoked++
131+
return Promise.resolve()
132+
}
133+
134+
_release() {
135+
this.releaseInvoked++
136+
return Promise.resolve()
137+
}
138+
139+
isOpen() {
140+
return this._open
141+
}
142+
143+
isNeverReleased() {
144+
return this.isReleasedTimes(0)
145+
}
146+
147+
isReleasedOnce() {
148+
return this.isReleasedTimes(1)
149+
}
150+
151+
isReleasedTimes(times: number) {
152+
return this.resetInvoked === times && this.releaseInvoked === times
153+
}
154+
155+
_handleProtocolError(message: string) {
156+
this.protocolErrorsHandled++
157+
this.seenProtocolErrors.push(message)
158+
}
159+
160+
withProtocolVersion(version: number) {
161+
this.protocolVersion = version
162+
return this
163+
}
164+
165+
withCreationTimestamp(value: number) {
166+
this.creationTimestamp = value
167+
return this
168+
}
169+
170+
withRequestRoutingInformationMock(requestRoutingInformationMock: (params: any) => void) {
171+
this._requestRoutingInformationMock = requestRoutingInformationMock
172+
return this
173+
}
174+
175+
withRollbackError(error: Error) {
176+
this._rollbackError = error
177+
return this
178+
}
179+
180+
closed() {
181+
this._open = false
182+
return this
183+
}
184+
}
185+
186+
function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error) {
187+
const observer = mockResultStreamObserver(query, parameters)
188+
observer.subscribe = (observer: ResultObserver) => {
189+
if (observer && observer.onError) {
190+
observer.onError(error)
191+
}
192+
}
193+
return observer
194+
}
195+
196+
function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver {
197+
return {
198+
onError: (error: any) => { },
199+
onCompleted: () => { },
200+
onNext: (result: any) => { },
201+
cancel: () => { },
202+
prepareToHandleSingleResponse: () => { },
203+
pause: () => { },
204+
resume: () => { },
205+
markCompleted: () => { },
206+
subscribe: (observer: ResultObserver) => {
207+
if (observer && observer.onCompleted) {
208+
observer.onCompleted(new ResultSummary(query, parameters, {}))
209+
}
210+
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)