|
18 | 18 | */
|
19 | 19 | import WebSocketChannel from './ch-websocket';
|
20 | 20 | import NodeChannel from './ch-node';
|
21 |
| -import {Dechunker, Chunker} from "./chunking"; |
| 21 | +import {Chunker, Dechunker} from './chunking'; |
22 | 22 | import hasFeature from './features';
|
23 | 23 | import {Packer, Unpacker} from './packstream';
|
24 | 24 | import {alloc} from './buf';
|
25 |
| -import {Node, Relationship, UnboundRelationship, Path, PathSegment} from '../graph-types' |
| 25 | +import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types'; |
26 | 26 | import {newError} from './../error';
|
27 | 27 |
|
28 | 28 | let Channel;
|
@@ -201,7 +201,9 @@ class Connection {
|
201 | 201 | this._chunker = new Chunker( channel );
|
202 | 202 | this._packer = new Packer( this._chunker );
|
203 | 203 | this._unpacker = new Unpacker();
|
| 204 | + |
204 | 205 | this._isHandlingFailure = false;
|
| 206 | + this._currentFailure = null; |
205 | 207 |
|
206 | 208 | // Set to true on fatal errors, to get this out of session pool.
|
207 | 209 | this._isBroken = false;
|
@@ -288,54 +290,56 @@ class Connection {
|
288 | 290 | }
|
289 | 291 |
|
290 | 292 | _handleMessage( msg ) {
|
| 293 | + const payload = msg.fields[0]; |
| 294 | + |
291 | 295 | switch( msg.signature ) {
|
292 | 296 | case RECORD:
|
293 |
| - log("S", "RECORD", msg.fields[0]); |
294 |
| - this._currentObserver.onNext( msg.fields[0] ); |
| 297 | + log("S", "RECORD", msg); |
| 298 | + this._currentObserver.onNext( payload ); |
295 | 299 | break;
|
296 | 300 | case SUCCESS:
|
297 |
| - log("S", "SUCCESS", msg.fields[0]); |
| 301 | + log("S", "SUCCESS", msg); |
298 | 302 | try {
|
299 |
| - this._currentObserver.onCompleted( msg.fields[0] ); |
| 303 | + this._currentObserver.onCompleted( payload ); |
300 | 304 | } finally {
|
301 | 305 | this._currentObserver = this._pendingObservers.shift();
|
302 | 306 | }
|
303 | 307 | break;
|
304 | 308 | case FAILURE:
|
305 | 309 | log("S", "FAILURE", msg);
|
306 | 310 | try {
|
307 |
| - this._currentObserver.onError( msg ); |
308 |
| - this._errorMsg = msg; |
| 311 | + this._currentFailure = newError(payload.message, payload.code); |
| 312 | + this._currentObserver.onError( this._currentFailure ); |
309 | 313 | } finally {
|
310 | 314 | this._currentObserver = this._pendingObservers.shift();
|
311 | 315 | // Things are now broken. Pending observers will get FAILURE messages routed until
|
312 | 316 | // We are done handling this failure.
|
313 | 317 | if( !this._isHandlingFailure ) {
|
314 | 318 | this._isHandlingFailure = true;
|
315 |
| - let self = this; |
316 | 319 |
|
317 | 320 | // isHandlingFailure was false, meaning this is the first failure message
|
318 | 321 | // we see from this failure. We may see several others, one for each message
|
319 | 322 | // we had "optimistically" already sent after whatever it was that failed.
|
320 | 323 | // We only want to and need to ACK the first one, which is why we are tracking
|
321 | 324 | // this _isHandlingFailure thing.
|
322 | 325 | this._ackFailure({
|
323 |
| - onNext: NO_OP, |
324 |
| - onError: NO_OP, |
325 |
| - onCompleted: () => { |
326 |
| - self._isHandlingFailure = false; |
327 |
| - } |
| 326 | + onNext: NO_OP, |
| 327 | + onError: NO_OP, |
| 328 | + onCompleted: () => { |
| 329 | + this._isHandlingFailure = false; |
| 330 | + this._currentFailure = null; |
| 331 | + } |
328 | 332 | });
|
329 | 333 | }
|
330 | 334 | }
|
331 | 335 | break;
|
332 | 336 | case IGNORED:
|
333 |
| - log("S", "IGNORED"); |
| 337 | + log("S", "IGNORED", msg); |
334 | 338 | try {
|
335 |
| - if (this._errorMsg && this._currentObserver.onError) |
336 |
| - this._currentObserver.onError(this._errorMsg); |
| 339 | + if (this._currentFailure && this._currentObserver.onError) |
| 340 | + this._currentObserver.onError(this._currentFailure); |
337 | 341 | else if(this._currentObserver.onError)
|
338 |
| - this._currentObserver.onError(msg); |
| 342 | + this._currentObserver.onError(payload); |
339 | 343 | } finally {
|
340 | 344 | this._currentObserver = this._pendingObservers.shift();
|
341 | 345 | }
|
|
0 commit comments