Skip to content

Commit cdedd34

Browse files
committed
Transaction support in js driver.
1 parent 293a05e commit cdedd34

File tree

8 files changed

+455
-40
lines changed

8 files changed

+455
-40
lines changed

README.md

+33-8
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ session
4747
.subscribe({
4848
onNext: function(record) {
4949
console.log(record);
50-
},
50+
},
5151
onCompleted: function() {
5252
// Completed!
5353
session.close();
54-
},
54+
},
5555
onError: function(error) {
5656
console.log(error);
5757
}
@@ -72,6 +72,31 @@ session
7272
.catch(function(error) {
7373
console.log(error);
7474
});
75+
76+
//run statement in a transaction
77+
var tx = session.beginTransaction();
78+
tx.run("CREATE (alice {name : {nameParam} })", { nameParam:'Alice'}");
79+
tx.run("MATCH (alice {name : {nameParam} }) RETURN alice.age", { nameParam:'Alice'}");
80+
//decide if the transaction should be committed or rolled back
81+
var success = ...
82+
...
83+
if (success) {
84+
tx.commit()
85+
.subscribe({
86+
onCompleted: function() {
87+
// Completed!
88+
session.close();
89+
},
90+
onError: function(error) {
91+
console.log(error);
92+
}
93+
});
94+
} else {
95+
//transaction is rolled black nothing is created in the database
96+
tx.rollback();
97+
}
98+
99+
75100
```
76101

77102
## Building
@@ -89,25 +114,25 @@ See files under `examples/` on how to use.
89114
This runs the test suite against a fresh download of Neo4j.
90115
Or `npm test` if you already have a running version of a compatible Neo4j server.
91116

92-
For development, you can have the build tool rerun the tests each time you change
117+
For development, you can have the build tool rerun the tests each time you change
93118
the source code:
94119

95120
gulp watch-n-test
96121

97122
### Testing on windows
98-
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
99-
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
100-
The admin right is required to start/stop Neo4j properly as a system service.
123+
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
124+
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
125+
The admin right is required to start/stop Neo4j properly as a system service.
101126
While there is no need to grab admin right if you are running tests against an existing Neo4j server using `npm test`.
102127

103128
## A note on numbers and the Integer type
104129
The Neo4j type system includes 64-bit integer values.
105-
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
130+
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
106131
In order to support the full Neo4j type system, the driver includes an explicit Integer types.
107132
Any time the driver recieves an Integer value from Neo4j, it will be represented with the Integer type by the driver.
108133

109134
### Write integers
110-
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
135+
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
111136
To write the `age` as an integer the `neo4j.int` method should be used:
112137

113138
```javascript

src/v1/internal/connector.js

+15-2
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class Connection {
190190
// this to the dechunker
191191
self._ch.onmessage = (buf) => {
192192
self._dechunker.write(buf);
193-
}
193+
};
194194

195195
if( buf.hasRemaining() ) {
196196
self._dechunker.write(buf.readSlice( buf.remaining() ));
@@ -219,7 +219,7 @@ class Connection {
219219
}
220220

221221
_handleMessage( msg ) {
222-
222+
223223
switch( msg.signature ) {
224224
case RECORD:
225225
this._currentObserver.onNext( msg.fields[0] );
@@ -234,6 +234,7 @@ class Connection {
234234
case FAILURE:
235235
try {
236236
this._currentObserver.onError( msg );
237+
this._errorMsg = msg;
237238
} finally {
238239
this._currentObserver = this._pendingObservers.shift();
239240
// Things are now broken. Pending observers will get FAILURE messages routed until
@@ -257,6 +258,18 @@ class Connection {
257258
}
258259
}
259260
break;
261+
case IGNORED:
262+
try {
263+
if (this._errorMsg)
264+
this._currentObserver.onError(this._errorMsg);
265+
else
266+
this._currentObserver.onError(msg);
267+
} finally {
268+
this._currentObserver = this._pendingObservers.shift();
269+
}
270+
break;
271+
default:
272+
console.log("UNKNOWN MESSAGE: ", msg);
260273
}
261274
}
262275

src/v1/internal/stream-observer.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
20-
/**
19+
20+
/**
2121
* Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses
2222
* in a way that a user-provided observer can see these as a clean Stream
2323
* of records.
@@ -106,7 +106,7 @@ class StreamObserver {
106106
if( this._queuedRecords.length > 0 ) {
107107
for (var i = 0; i < _queuedRecords.length; i++) {
108108
observer.onNext( _queuedRecords[i] );
109-
};
109+
}
110110
}
111111
if( this._tail ) {
112112
observer.onCompleted( this._tail );

src/v1/result.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Result {
3838
this._p = null;
3939
this._statement = statement;
4040
this._parameters = parameters;
41-
this.summary = {}
41+
this.summary = {};
4242
}
4343

4444
/**
@@ -56,7 +56,7 @@ class Result {
5656
onNext: (record) => { records.push(record); },
5757
onCompleted: () => { resolve(records); },
5858
onError: (error) => { reject(error); }
59-
}
59+
};
6060
self.subscribe(observer);
6161
});
6262
}
@@ -99,7 +99,7 @@ class Result {
9999
let onCompletedWrapper = (metadata) => {
100100
this.summary = new ResultSummary(this._statement, this._parameters, metadata);
101101
onCompletedOriginal.call(observer);
102-
}
102+
};
103103
observer.onCompleted = onCompletedWrapper;
104104
this._streamObserver.subscribe(observer);
105105
}

src/v1/session.js

+27-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
19+
2020
import StreamObserver from './internal/stream-observer';
2121
import Result from './result';
2222
import Transaction from './transaction';
@@ -36,12 +36,13 @@ class Session {
3636
constructor( conn, onClose ) {
3737
this._conn = conn;
3838
this._onClose = onClose;
39+
this._hasTx = false;
3940
}
4041

4142
/**
4243
* Run Cypher statement
4344
* Could be called with a statement object i.e.: {statement: "MATCH ...", parameters: {param: 1}}
44-
* or with the statem ent and parameters as separate arguments.
45+
* or with the statement and parameters as separate arguments.
4546
* @param {mixed} statement - Cypher statement to execute
4647
* @param {Object} parameters - Map with parameters to use in statement
4748
* @return {Result} - New Result
@@ -52,16 +53,34 @@ class Session {
5253
statement = statement.text;
5354
}
5455
let streamObserver = new StreamObserver();
55-
this._conn.run( statement, parameters || {}, streamObserver );
56-
this._conn.pullAll( streamObserver );
57-
this._conn.sync();
56+
if (!this._hasTx) {
57+
this._conn.run(statement, parameters || {}, streamObserver);
58+
this._conn.pullAll(streamObserver);
59+
this._conn.sync();
60+
} else {
61+
streamObserver.onError({error: "Please close the currently open transaction object before running " +
62+
"more statements/transactions in the current session." });
63+
}
5864
return new Result( streamObserver, statement, parameters );
5965
}
6066

67+
/**
68+
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
69+
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
70+
*
71+
* While a transaction is open the session cannot be used to run statements.
72+
*
73+
* @returns {Transaction} - New Transaction
74+
*/
6175
beginTransaction() {
62-
return new Transaction(this);
76+
if (this._hasTx) {
77+
throw new Error("Cannot have multiple transactions open for the session. Use multiple sessions or close the transaction before opening a new one.")
78+
}
79+
80+
this._hasTx = true;
81+
return new Transaction(this._conn, () => {this._hasTx = false});
6382
}
64-
83+
6584
/**
6685
* Close connection
6786
* @param {function()} cb - Function to be called on connection close
@@ -72,4 +91,5 @@ class Session {
7291
this._conn.close(cb);
7392
}
7493
}
94+
7595
export default Session;

0 commit comments

Comments
 (0)