Skip to content

Support for transactions in js driver #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ session
.subscribe({
onNext: function(record) {
console.log(record);
},
},
onCompleted: function() {
// Completed!
session.close();
},
},
onError: function(error) {
console.log(error);
}
Expand All @@ -72,6 +72,31 @@ session
.catch(function(error) {
console.log(error);
});

//run statement in a transaction
var tx = session.beginTransaction();
tx.run("CREATE (alice {name : {nameParam} })", { nameParam:'Alice'}");
tx.run("MATCH (alice {name : {nameParam} }) RETURN alice.age", { nameParam:'Alice'}");
//decide if the transaction should be committed or rolled back
var success = ...
...
if (success) {
tx.commit()
.subscribe({
onCompleted: function() {
// Completed!
session.close();
},
onError: function(error) {
console.log(error);
}
});
} else {
//transaction is rolled black nothing is created in the database
tx.rollback();
}


```

## Building
Expand All @@ -89,25 +114,25 @@ See files under `examples/` on how to use.
This runs the test suite against a fresh download of Neo4j.
Or `npm test` if you already have a running version of a compatible Neo4j server.

For development, you can have the build tool rerun the tests each time you change
For development, you can have the build tool rerun the tests each time you change
the source code:

gulp watch-n-test

### Testing on windows
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
The admin right is required to start/stop Neo4j properly as a system service.
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
The admin right is required to start/stop Neo4j properly as a system service.
While there is no need to grab admin right if you are running tests against an existing Neo4j server using `npm test`.

## A note on numbers and the Integer type
The Neo4j type system includes 64-bit integer values.
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
In order to support the full Neo4j type system, the driver includes an explicit Integer types.
Any time the driver recieves an Integer value from Neo4j, it will be represented with the Integer type by the driver.

### Write integers
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
To write the `age` as an integer the `neo4j.int` method should be used:

```javascript
Expand Down
16 changes: 15 additions & 1 deletion src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class Connection {
// this to the dechunker
self._ch.onmessage = (buf) => {
self._dechunker.write(buf);
}
};

if( buf.hasRemaining() ) {
self._dechunker.write(buf.readSlice( buf.remaining() ));
Expand Down Expand Up @@ -219,6 +219,7 @@ class Connection {
}

_handleMessage( msg ) {

switch( msg.signature ) {
case RECORD:
this._currentObserver.onNext( msg.fields[0] );
Expand All @@ -233,6 +234,7 @@ class Connection {
case FAILURE:
try {
this._currentObserver.onError( msg );
this._errorMsg = msg;
} finally {
this._currentObserver = this._pendingObservers.shift();
// Things are now broken. Pending observers will get FAILURE messages routed until
Expand All @@ -256,6 +258,18 @@ class Connection {
}
}
break;
case IGNORED:
try {
if (this._errorMsg)
this._currentObserver.onError(this._errorMsg);
else
this._currentObserver.onError(msg);
} finally {
this._currentObserver = this._pendingObservers.shift();
}
break;
default:
console.log("UNKNOWN MESSAGE: ", msg);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/v1/internal/stream-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**

/**
* Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses
* in a way that a user-provided observer can see these as a clean Stream
* of records.
Expand Down Expand Up @@ -106,7 +106,7 @@ class StreamObserver {
if( this._queuedRecords.length > 0 ) {
for (var i = 0; i < _queuedRecords.length; i++) {
observer.onNext( _queuedRecords[i] );
};
}
}
if( this._tail ) {
observer.onCompleted( this._tail );
Expand Down
10 changes: 4 additions & 6 deletions src/v1/result.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Result {
this._p = null;
this._statement = statement;
this._parameters = parameters;
this.summary = {}
this.summary = {};
}

/**
Expand All @@ -56,7 +56,7 @@ class Result {
onNext: (record) => { records.push(record); },
onCompleted: () => { resolve(records); },
onError: (error) => { reject(error); }
}
};
self.subscribe(observer);
});
}
Expand Down Expand Up @@ -99,7 +99,7 @@ class Result {
let onCompletedWrapper = (metadata) => {
this.summary = new ResultSummary(this._statement, this._parameters, metadata);
onCompletedOriginal.call(observer);
}
};
observer.onCompleted = onCompletedWrapper;
this._streamObserver.subscribe(observer);
}
Expand All @@ -113,6 +113,4 @@ class Result {
}
}

export default {
Result
}
export default Result
35 changes: 30 additions & 5 deletions src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import StreamObserver from './internal/stream-observer';
import {Result} from './result';
import Result from './result';
import Transaction from './transaction';

/**
* A Session instance is used for handling the connection and
Expand All @@ -35,6 +36,7 @@ class Session {
constructor( conn, onClose ) {
this._conn = conn;
this._onClose = onClose;
this._hasTx = false;
}

/**
Expand All @@ -51,12 +53,34 @@ class Session {
statement = statement.text;
}
let streamObserver = new StreamObserver();
this._conn.run( statement, parameters || {}, streamObserver );
this._conn.pullAll( streamObserver );
this._conn.sync();
if (!this._hasTx) {
this._conn.run(statement, parameters || {}, streamObserver);
this._conn.pullAll(streamObserver);
this._conn.sync();
} else {
streamObserver.onError({error: "Please close the currently open transaction object before running " +
"more statements/transactions in the current session." });
}
return new Result( streamObserver, statement, parameters );
}

/**
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
*
* While a transaction is open the session cannot be used to run statements.
*
* @returns {Transaction} - New Transaction
*/
beginTransaction() {
if (this._hasTx) {
throw new Error("Cannot have multiple transactions open for the session. Use multiple sessions or close the transaction before opening a new one.")
}

this._hasTx = true;
return new Transaction(this._conn, () => {this._hasTx = false});
}

/**
* Close connection
* @param {function()} cb - Function to be called on connection close
Expand All @@ -67,4 +91,5 @@ class Session {
this._conn.close(cb);
}
}

export default Session;
Loading