Skip to content

Rollback transaction after failure in transaction functions #397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions src/v1/internal/transaction-executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,62 @@ export default class TransactionExecutor {
}

_executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
let tx;
try {
const tx = transactionCreator();
const transactionWorkResult = transactionWork(tx);
tx = transactionCreator();
} catch (error) {
// failed to create a transaction
reject(error);
return;
}

const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork);

resultPromise
.then(result => this._handleTransactionWorkSuccess(result, tx, resolve, reject))
.catch(error => this._handleTransactionWorkFailure(error, tx, reject));
}

_safeExecuteTransactionWork(tx, transactionWork) {
try {
const result = transactionWork(tx);
// user defined callback is supposed to return a promise, but it might not; so to protect against an
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
// validation step without type checks
const resultPromise = Promise.resolve(transactionWorkResult);

resultPromise.then(result => {
if (tx.isOpen()) {
// transaction work returned resolved promise and transaction has not been committed/rolled back
// try to commit the transaction
tx.commit().then(() => {
// transaction was committed, return result to the user
resolve(result);
}).catch(error => {
// transaction failed to commit, propagate the failure
reject(error);
});
} else {
// transaction work returned resolved promise and transaction is already committed/rolled back
// return the result returned by given transaction work
resolve(result);
}
return Promise.resolve(result);
} catch (error) {
return Promise.reject(error);
}
}

_handleTransactionWorkSuccess(result, tx, resolve, reject) {
if (tx.isOpen()) {
// transaction work returned resolved promise and transaction has not been committed/rolled back
// try to commit the transaction
tx.commit().then(() => {
// transaction was committed, return result to the user
resolve(result);
}).catch(error => {
// transaction work returned rejected promise, propagate the failure
// transaction failed to commit, propagate the failure
reject(error);
});
} else {
// transaction work returned resolved promise and transaction is already committed/rolled back
// return the result returned by given transaction work
resolve(result);
}
}

} catch (error) {
_handleTransactionWorkFailure(error, tx, reject) {
if (tx.isOpen()) {
// transaction work failed and the transaction is still open, roll it back and propagate the failure
tx.rollback()
.catch(ignore => {
// ignore the rollback error
})
.then(() => reject(error)); // propagate the original error we got from the transaction work
} else {
// transaction is already rolled back, propagate the error
reject(error);
}
}
Expand Down
98 changes: 74 additions & 24 deletions test/internal/transaction-executor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ describe('TransactionExecutor', () => {

it('should stop retrying when time expires', done => {
const executor = new TransactionExecutor();
let workInvocationCounter = 0;
const usedTransactions = [];
const realWork = transactionWork([SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_1, TRANSIENT_ERROR_2], 42);

const result = executor.execute(transactionCreator(), tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
if (workInvocationCounter === 3) {
usedTransactions.push(tx);
if (usedTransactions.length === 3) {
const currentTime = Date.now();
clock = lolex.install();
clock.setSystemTime(currentTime + 30001); // move `Date.now()` call forward by 30 seconds
Expand All @@ -95,7 +95,8 @@ describe('TransactionExecutor', () => {
});

result.catch(error => {
expect(workInvocationCounter).toEqual(3);
expect(usedTransactions.length).toEqual(3);
expectAllTransactionsToBeClosed(usedTransactions);
expect(error.code).toEqual(TRANSIENT_ERROR_1);
done();
});
Expand Down Expand Up @@ -152,6 +153,14 @@ describe('TransactionExecutor', () => {
);
});

it('should retry when transaction work throws and rollback fails', done => {
testRetryWhenTransactionWorkThrowsAndRollbackFails(
[SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, SESSION_EXPIRED],
[SESSION_EXPIRED, TRANSIENT_ERROR_1],
done
);
});

it('should cancel in-flight timeouts when closed', done => {
const executor = new TransactionExecutor();
// do not execute setTimeout callbacks
Expand Down Expand Up @@ -190,16 +199,16 @@ describe('TransactionExecutor', () => {
function testRetryWhenTransactionCreatorFails(errorCodes, done) {
const executor = new TransactionExecutor();
const transactionCreator = throwingTransactionCreator(errorCodes, new FakeTransaction());
let workInvocationCounter = 0;
const usedTransactions = [];

const result = executor.execute(transactionCreator, tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
usedTransactions.push(tx);
return Promise.resolve(42);
});

result.then(value => {
expect(workInvocationCounter).toEqual(1);
expect(usedTransactions.length).toEqual(1);
expect(value).toEqual(42);
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
done();
Expand All @@ -208,18 +217,19 @@ describe('TransactionExecutor', () => {

function testRetryWhenTransactionWorkReturnsRejectedPromise(errorCodes, done) {
const executor = new TransactionExecutor();
let workInvocationCounter = 0;
const usedTransactions = [];
const realWork = transactionWork(errorCodes, 42);

const result = executor.execute(transactionCreator(), tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
usedTransactions.push(tx);
return realWork();
});

result.then(value => {
// work should have failed 'failures.length' times and succeeded 1 time
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
expectAllTransactionsToBeClosed(usedTransactions);
expect(value).toEqual(42);
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
done();
Expand All @@ -228,18 +238,19 @@ describe('TransactionExecutor', () => {

function testRetryWhenTransactionCommitReturnsRejectedPromise(errorCodes, done) {
const executor = new TransactionExecutor();
let workInvocationCounter = 0;
const usedTransactions = [];
const realWork = () => Promise.resolve(4242);

const result = executor.execute(transactionCreator(errorCodes), tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
usedTransactions.push(tx);
return realWork();
});

result.then(value => {
// work should have failed 'failures.length' times and succeeded 1 time
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
expectAllTransactionsToBeClosed(usedTransactions);
expect(value).toEqual(4242);
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
done();
Expand All @@ -248,37 +259,60 @@ describe('TransactionExecutor', () => {

function testRetryWhenTransactionWorkThrows(errorCodes, done) {
const executor = new TransactionExecutor();
let workInvocationCounter = 0;
const usedTransactions = [];
const realWork = throwingTransactionWork(errorCodes, 42);

const result = executor.execute(transactionCreator(), tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
usedTransactions.push(tx);
return realWork();
});

result.then(value => {
// work should have failed 'failures.length' times and succeeded 1 time
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
expectAllTransactionsToBeClosed(usedTransactions);
expect(value).toEqual(42);
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
done();
});
}

function testRetryWhenTransactionWorkThrowsAndRollbackFails(txWorkErrorCodes, rollbackErrorCodes, done) {
const executor = new TransactionExecutor();
const usedTransactions = [];
const realWork = throwingTransactionWork(txWorkErrorCodes, 424242);

const result = executor.execute(transactionCreator([], rollbackErrorCodes), tx => {
expect(tx).toBeDefined();
usedTransactions.push(tx);
return realWork();
});

result.then(value => {
// work should have failed 'failures.length' times and succeeded 1 time
expect(usedTransactions.length).toEqual(txWorkErrorCodes.length + 1);
expectAllTransactionsToBeClosed(usedTransactions);
expect(value).toEqual(424242);
verifyRetryDelays(fakeSetTimeout, txWorkErrorCodes.length);
done();
});
}

function testNoRetryOnUnknownError(errorCodes, expectedWorkInvocationCount, done) {
const executor = new TransactionExecutor();
let workInvocationCounter = 0;
const usedTransactions = [];
const realWork = transactionWork(errorCodes, 42);

const result = executor.execute(transactionCreator(), tx => {
expect(tx).toBeDefined();
workInvocationCounter++;
usedTransactions.push(tx);
return realWork();
});

result.catch(error => {
expect(workInvocationCounter).toEqual(expectedWorkInvocationCount);
expect(usedTransactions.length).toEqual(expectedWorkInvocationCount);
expectAllTransactionsToBeClosed(usedTransactions);
if (errorCodes.length === 1) {
expect(error.code).toEqual(errorCodes[0]);
} else {
Expand All @@ -290,9 +324,10 @@ describe('TransactionExecutor', () => {

});

function transactionCreator(commitErrorCodes) {
const remainingErrorCodes = (commitErrorCodes || []).slice().reverse();
return () => new FakeTransaction(remainingErrorCodes.pop());
function transactionCreator(commitErrorCodes, rollbackErrorCodes) {
const remainingCommitErrorCodes = (commitErrorCodes || []).slice().reverse();
const remainingRollbackErrorCodes = (rollbackErrorCodes || []).slice().reverse();
return () => new FakeTransaction(remainingCommitErrorCodes.pop(), remainingRollbackErrorCodes.pop());
}

function throwingTransactionCreator(errorCodes, result) {
Expand Down Expand Up @@ -348,20 +383,35 @@ function verifyRetryDelays(fakeSetTimeout, expectedInvocationCount) {
});
}

function expectAllTransactionsToBeClosed(transactions) {
transactions.forEach(tx => expect(tx.isOpen()).toBeFalsy());
}

class FakeTransaction {

constructor(commitErrorCode) {
constructor(commitErrorCode, rollbackErrorCode) {
this._commitErrorCode = commitErrorCode;
this._rollbackErrorCode = rollbackErrorCode;
this._open = true;
}

isOpen() {
return true;
return this._open;
}

commit() {
this._open = false;
if (this._commitErrorCode) {
return Promise.reject(error(this._commitErrorCode));
}
return Promise.resolve();
}

rollback() {
this._open = false;
if (this._rollbackErrorCode) {
return Promise.reject(error(this._rollbackErrorCode));
}
return Promise.resolve();
}
}
70 changes: 70 additions & 0 deletions test/v1/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import sharedNeo4j from '../internal/shared-neo4j';
import _ from 'lodash';
import {ServerVersion, VERSION_3_1_0} from '../../src/v1/internal/server-version';
import {isString} from '../../src/v1/internal/util';
import {newError, PROTOCOL_ERROR, SESSION_EXPIRED} from '../../src/v1/error';

describe('session', () => {

Expand Down Expand Up @@ -1092,6 +1093,75 @@ describe('session', () => {
testUnsupportedQueryParameter(new neo4j.types.Path(node1, node2, []), done);
});

it('should retry transaction until success when function throws', done => {
testTransactionRetryUntilSuccess(() => {
throw newError('Error that can be retried', SESSION_EXPIRED);
}, done);
});

it('should retry transaction until success when function returns rejected promise', done => {
testTransactionRetryUntilSuccess(() => Promise.reject(newError('Error that can be retried', SESSION_EXPIRED)), done);
});

it('should not retry transaction when function throws fatal error', done => {
testTransactionRetryOnFatalError(() => {
throw newError('Error that is fatal', PROTOCOL_ERROR);
}, done);
});

it('should not retry transaction when function returns promise rejected with fatal error', done => {
testTransactionRetryOnFatalError(() => Promise.reject(newError('Error that is fatal', 'ReallyFatalErrorCode')), done);
});

function testTransactionRetryUntilSuccess(failureResponseFunction, done) {
const session = driver.session();

const failures = 3;
const usedTransactions = [];

const resultPromise = session.writeTransaction(tx => {
usedTransactions.push(tx);
if (usedTransactions.length < failures) {
return failureResponseFunction();
} else {
return tx.run('RETURN "424242"');
}
});

resultPromise.then(result => {
expect(result.records[0].get(0)).toEqual('424242');
expect(usedTransactions.length).toEqual(3);
usedTransactions.forEach(tx => expect(tx.isOpen()).toBeFalsy());
session.close();
done();
}).catch(error => {
done.fail(error);
});
}

function testTransactionRetryOnFatalError(failureResponseFunction, done) {
const session = driver.session();

const usedTransactions = [];

const resultPromise = session.writeTransaction(tx => {
usedTransactions.push(tx);
return failureResponseFunction();
});

resultPromise.then(result => {
session.close();
done.fail('Retries should not succeed: ' + JSON.stringify(result));
}).catch(error => {
session.close();
expect(error).toBeDefined();
expect(error).not.toBeNull();
expect(usedTransactions.length).toEqual(1);
expect(usedTransactions[0].isOpen()).toBeFalsy();
done();
});
}

function serverIs31OrLater(done) {
if (serverVersion.compareTo(VERSION_3_1_0) < 0) {
done();
Expand Down
Loading