Skip to content

Commit 246f7c5

Browse files
committed
Transaction support in js driver.
1 parent 293a05e commit 246f7c5

File tree

8 files changed

+457
-39
lines changed

8 files changed

+457
-39
lines changed

README.md

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ session
4747
.subscribe({
4848
onNext: function(record) {
4949
console.log(record);
50-
},
50+
},
5151
onCompleted: function() {
5252
// Completed!
5353
session.close();
54-
},
54+
},
5555
onError: function(error) {
5656
console.log(error);
5757
}
@@ -72,6 +72,34 @@ session
7272
.catch(function(error) {
7373
console.log(error);
7474
});
75+
76+
//run queries in a transaction
77+
var tx = session.beginTransaction();
78+
tx.run("CREATE (alice {name : {nameParam} })", { nameParam:'Alice'}");
79+
tx.run("MATCH (alice {name : {nameParam} }) RETURN alice.age", { nameParam:'Alice'}");
80+
//decide if the transaction should be committed or rolled back
81+
var success = ...
82+
...
83+
if (success) {
84+
tx.commit()
85+
.subscribe({
86+
onNext: function(record) {
87+
console.log(record);
88+
},
89+
onCompleted: function() {
90+
// Completed!
91+
session.close();
92+
},
93+
onError: function(error) {
94+
console.log(error);
95+
}
96+
});
97+
} else {
98+
//transaction is rolled black nothing is created in the database
99+
tx.rollback();
100+
}
101+
102+
75103
```
76104

77105
## Building
@@ -89,25 +117,25 @@ See files under `examples/` on how to use.
89117
This runs the test suite against a fresh download of Neo4j.
90118
Or `npm test` if you already have a running version of a compatible Neo4j server.
91119

92-
For development, you can have the build tool rerun the tests each time you change
120+
For development, you can have the build tool rerun the tests each time you change
93121
the source code:
94122

95123
gulp watch-n-test
96124

97125
### Testing on windows
98-
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
99-
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
100-
The admin right is required to start/stop Neo4j properly as a system service.
126+
Running tests on windows requires PhantomJS installed and its bin folder added in windows system variable `Path`.
127+
To run the same test suite, run `.\runTest.ps1` instead in powershell with admin right.
128+
The admin right is required to start/stop Neo4j properly as a system service.
101129
While there is no need to grab admin right if you are running tests against an existing Neo4j server using `npm test`.
102130

103131
## A note on numbers and the Integer type
104132
The Neo4j type system includes 64-bit integer values.
105-
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
133+
However, Javascript can only safely represent integers between `-(2`<sup>`53`</sup>` - 1)` and `(2`<sup>`53`</sup>` - 1)`.
106134
In order to support the full Neo4j type system, the driver includes an explicit Integer types.
107135
Any time the driver recieves an Integer value from Neo4j, it will be represented with the Integer type by the driver.
108136

109137
### Write integers
110-
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
138+
Number written directly e.g. `session.run("CREATE (n:Node {age: {age}})", {age: 22})` will be of type `Float` in Neo4j.
111139
To write the `age` as an integer the `neo4j.int` method should be used:
112140

113141
```javascript

src/v1/internal/connector.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class Connection {
190190
// this to the dechunker
191191
self._ch.onmessage = (buf) => {
192192
self._dechunker.write(buf);
193-
}
193+
};
194194

195195
if( buf.hasRemaining() ) {
196196
self._dechunker.write(buf.readSlice( buf.remaining() ));
@@ -219,7 +219,7 @@ class Connection {
219219
}
220220

221221
_handleMessage( msg ) {
222-
222+
223223
switch( msg.signature ) {
224224
case RECORD:
225225
this._currentObserver.onNext( msg.fields[0] );
@@ -234,6 +234,7 @@ class Connection {
234234
case FAILURE:
235235
try {
236236
this._currentObserver.onError( msg );
237+
this._errorMsg = msg;
237238
} finally {
238239
this._currentObserver = this._pendingObservers.shift();
239240
// Things are now broken. Pending observers will get FAILURE messages routed until
@@ -257,6 +258,18 @@ class Connection {
257258
}
258259
}
259260
break;
261+
case IGNORED:
262+
try {
263+
if (this._errorMsg)
264+
this._currentObserver.onError(this._errorMsg);
265+
else
266+
this._currentObserver.onError(msg);
267+
} finally {
268+
this._currentObserver = this._pendingObservers.shift();
269+
}
270+
break;
271+
default:
272+
console.log("UNKNOWN MESSAGE: ", msg);
260273
}
261274
}
262275

src/v1/internal/stream-observer.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
20-
/**
19+
20+
/**
2121
* Handles a RUN/PULL_ALL, or RUN/DISCARD_ALL requests, maps the responses
2222
* in a way that a user-provided observer can see these as a clean Stream
2323
* of records.
@@ -106,7 +106,7 @@ class StreamObserver {
106106
if( this._queuedRecords.length > 0 ) {
107107
for (var i = 0; i < _queuedRecords.length; i++) {
108108
observer.onNext( _queuedRecords[i] );
109-
};
109+
}
110110
}
111111
if( this._tail ) {
112112
observer.onCompleted( this._tail );

src/v1/result.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Result {
3838
this._p = null;
3939
this._statement = statement;
4040
this._parameters = parameters;
41-
this.summary = {}
41+
this.summary = {};
4242
}
4343

4444
/**
@@ -56,7 +56,7 @@ class Result {
5656
onNext: (record) => { records.push(record); },
5757
onCompleted: () => { resolve(records); },
5858
onError: (error) => { reject(error); }
59-
}
59+
};
6060
self.subscribe(observer);
6161
});
6262
}
@@ -99,7 +99,7 @@ class Result {
9999
let onCompletedWrapper = (metadata) => {
100100
this.summary = new ResultSummary(this._statement, this._parameters, metadata);
101101
onCompletedOriginal.call(observer);
102-
}
102+
};
103103
observer.onCompleted = onCompletedWrapper;
104104
this._streamObserver.subscribe(observer);
105105
}

src/v1/session.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
19+
2020
import StreamObserver from './internal/stream-observer';
2121
import Result from './result';
2222
import Transaction from './transaction';
@@ -36,6 +36,7 @@ class Session {
3636
constructor( conn, onClose ) {
3737
this._conn = conn;
3838
this._onClose = onClose;
39+
this._hasTx = false;
3940
}
4041

4142
/**
@@ -52,16 +53,34 @@ class Session {
5253
statement = statement.text;
5354
}
5455
let streamObserver = new StreamObserver();
55-
this._conn.run( statement, parameters || {}, streamObserver );
56-
this._conn.pullAll( streamObserver );
57-
this._conn.sync();
56+
if (!this._hasTx) {
57+
this._conn.run(statement, parameters || {}, streamObserver);
58+
this._conn.pullAll(streamObserver);
59+
this._conn.sync();
60+
} else {
61+
streamObserver.onError({error: "Please close the currently open transaction object before running " +
62+
"more statements/transactions in the current session." });
63+
}
5864
return new Result( streamObserver, statement, parameters );
5965
}
6066

67+
/**
68+
* Begin a new transaction in this session. A session can have at most one transaction running at a time, if you
69+
* want to run multiple concurrent transactions, you should use multiple concurrent sessions.
70+
*
71+
* While a transaction is open the session cannot be used to run statements.
72+
*
73+
* @returns {Transaction} - New Transaction
74+
*/
6175
beginTransaction() {
62-
return new Transaction(this);
76+
if (this._hasTx) {
77+
throw new Error("Cannot have multiple transactions open for the session. Use multiple sessions or close the transaction before opening a new one.")
78+
}
79+
80+
this._hasTx = true;
81+
return new Transaction(this._conn, () => {this._hasTx = false});
6382
}
64-
83+
6584
/**
6685
* Close connection
6786
* @param {function()} cb - Function to be called on connection close
@@ -72,4 +91,5 @@ class Session {
7291
this._conn.close(cb);
7392
}
7493
}
94+
7595
export default Session;

0 commit comments

Comments
 (0)