Skip to content

Introduce Transaction.close method (#847) #863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/core/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ class Transaction {
return this._state === _states.ACTIVE
}

/**
* Closes the transaction
*
* This method will roll back the transaction if it is not already committed or rolled back.
*
* @returns {Promise<void>} An empty promise if closed successfully or error if any error happened during
*/
async close(): Promise<void> {
if (this.isOpen()) {
await this.rollback()
}
}

_onErrorCallback(err: any): Promise<Connection | void> {
// error will be "acknowledged" by sending a RESET message
// database will then forget about this transaction and cleanup all corresponding resources
Expand Down
115 changes: 115 additions & 0 deletions packages/core/test/transaction.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ConnectionProvider, Transaction } from "../src";
import { Bookmark } from "../src/internal/bookmark";
import { ConnectionHolder } from "../src/internal/connection-holder";
import FakeConnection from "./utils/connection.fake";

describe('Transaction', () => {

describe('.close()', () => {
describe('when transaction is open', () => {
it('should roll back the transaction', async () => {
const connection = newFakeConnection()
const tx = newTransaction({ connection })

await tx.run('RETURN 1')
await tx.close()

expect(connection.rollbackInvoked).toEqual(1)
})

it('should surface errors during the rollback', async () => {
const expectedError = new Error('rollback error')
const connection = newFakeConnection().withRollbackError(expectedError)
const tx = newTransaction({ connection })

await tx.run('RETURN 1')

try {
await tx.close()
fail('should have thrown')
} catch (error) {
expect(error).toEqual(expectedError)
}
})
})

describe('when transaction is closed', () => {
const commit = async (tx: Transaction) => tx.commit()
const rollback = async (tx: Transaction) => tx.rollback()
const error = async (tx: Transaction, conn: FakeConnection) => {
conn.withRollbackError(new Error('rollback error'))
return tx.rollback().catch(() => { })
}

it.each([
['commmited', commit],
['rolled back', rollback],
['with error', error]
])('should not roll back the connection', async (_, operation) => {
const connection = newFakeConnection()
const tx = newTransaction({ connection })

await operation(tx, connection)
const rollbackInvokedAfterOperation = connection.rollbackInvoked

await tx.close()

expect(connection.rollbackInvoked).toEqual(rollbackInvokedAfterOperation)
})
})
})
})

function newTransaction({
connection,
fetchSize = 1000,
highRecordWatermark = 700,
lowRecordWatermark = 300
}: {
connection: FakeConnection
fetchSize?: number
highRecordWatermark?: number,
lowRecordWatermark?: number
}): Transaction {
const connectionProvider = new ConnectionProvider()
connectionProvider.acquireConnection = () => Promise.resolve(connection)
connectionProvider.close = () => Promise.resolve()

const connectionHolder = new ConnectionHolder({ connectionProvider })
connectionHolder.initializeConnection()

const transaction = new Transaction({
connectionHolder,
onClose: () => { },
onBookmark: (_: Bookmark) => { },
onConnection: () => { },
reactive: false,
fetchSize,
impersonatedUser: ""
})

return transaction
}

function newFakeConnection(): FakeConnection {
return new FakeConnection()
}
183 changes: 183 additions & 0 deletions packages/core/test/utils/connection.fake.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Connection, ResultObserver, Record, ResultSummary } from '../../src'
import { ResultStreamObserver } from '../../src/internal/observers'


/**
* This class is like a mock of {@link Connection} that tracks invocations count.
* It tries to maintain same "interface" as {@link Connection}.
* It could be replaced with a proper mock by a library like testdouble.
* At the time of writing such libraries require {@link Proxy} support but browser tests execute in
* PhantomJS which does not support proxies.
*/
export default class FakeConnection extends Connection {
private _open: boolean
private _requestRoutingInformationMock: ((params: any) => void) | null
public creationTimestamp: number
public resetInvoked: number
public releaseInvoked: number
public seenQueries: string[]
public seenParameters: any[]
public seenProtocolOptions: any[]
private _server: any
public protocolVersion: number | undefined
public protocolErrorsHandled: number
public seenProtocolErrors: string[]
public seenRequestRoutingInformation: any[]
public rollbackInvoked: number
public _rollbackError: Error | null

constructor() {
super()

this._open = true
this._requestRoutingInformationMock = null
this.creationTimestamp = Date.now()

this.resetInvoked = 0
this.releaseInvoked = 0
this.seenQueries = []
this.seenParameters = []
this.seenProtocolOptions = []
this._server = {}
this.protocolVersion = undefined
this.protocolErrorsHandled = 0
this.seenProtocolErrors = []
this.seenRequestRoutingInformation = []
this.rollbackInvoked = 0
this._rollbackError = null
}

protocol() {
// return fake protocol object that simply records seen queries and parameters
return {
run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => {
this.seenQueries.push(query)
this.seenParameters.push(parameters)
this.seenProtocolOptions.push(protocolOptions)
return mockResultStreamObserver(query, parameters)
},
commitTransaction: () => {
return mockResultStreamObserver('COMMIT', {})
},
beginTransaction: () => {
return Promise.resolve()
},
rollbackTransaction: () => {
this.rollbackInvoked ++
if (this._rollbackError !== null) {
return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError)
}
return mockResultStreamObserver('ROLLBACK', {})
},
requestRoutingInformation: (params: any | undefined) => {
this.seenRequestRoutingInformation.push(params)
if (this._requestRoutingInformationMock) {
this._requestRoutingInformationMock(params)
}
},
version: this.protocolVersion
}
}

resetAndFlush() {
this.resetInvoked++
return Promise.resolve()
}

_release() {
this.releaseInvoked++
return Promise.resolve()
}

isOpen() {
return this._open
}

isNeverReleased() {
return this.isReleasedTimes(0)
}

isReleasedOnce() {
return this.isReleasedTimes(1)
}

isReleasedTimes(times: number) {
return this.resetInvoked === times && this.releaseInvoked === times
}

_handleProtocolError(message: string) {
this.protocolErrorsHandled++
this.seenProtocolErrors.push(message)
}

withProtocolVersion(version: number) {
this.protocolVersion = version
return this
}

withCreationTimestamp(value: number) {
this.creationTimestamp = value
return this
}

withRequestRoutingInformationMock(requestRoutingInformationMock: (params: any) => void) {
this._requestRoutingInformationMock = requestRoutingInformationMock
return this
}

withRollbackError(error: Error) {
this._rollbackError = error
return this
}

closed() {
this._open = false
return this
}
}

function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error) {
const observer = mockResultStreamObserver(query, parameters)
observer.subscribe = (observer: ResultObserver) => {
if (observer && observer.onError) {
observer.onError(error)
}
}
return observer
}

function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver {
return {
onError: (error: any) => { },
onCompleted: () => { },
onNext: (result: any) => { },
cancel: () => { },
prepareToHandleSingleResponse: () => { },
markCompleted: () => { },
subscribe: (observer: ResultObserver) => {
if (observer && observer.onCompleted) {
observer.onCompleted(new ResultSummary(query, parameters, {}))
}

}
}
}
18 changes: 18 additions & 0 deletions packages/neo4j-driver/src/transaction-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,22 @@ export default class RxTransaction {
.catch(err => observer.error(err))
})
}

/**
* Closes the transaction
*
* This method will roll back the transaction if it is not already committed or rolled back.
*
* @returns {Observable} - An empty observable
*/
close () {
return new Observable(observer => {
this._txc
.close()
.then(() => {
observer.complete()
})
.catch(err => observer.error(err))
})
}
}
Loading