Skip to content

Commit d40a113

Browse files
committed
[fix] Emit at most one event per event loop iteration
Fixes #2216
1 parent b119b41 commit d40a113

File tree

3 files changed

+23
-52
lines changed

3 files changed

+23
-52
lines changed

lib/receiver.js

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@ const { isValidStatusCode, isValidUTF8 } = require('./validation');
1515
const FastBuffer = Buffer[Symbol.species];
1616
const promise = Promise.resolve();
1717

18-
//
19-
// `queueMicrotask()` is not available in Node.js < 11.
20-
//
21-
const queueTask =
22-
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;
23-
2418
const GET_INFO = 0;
2519
const GET_PAYLOAD_LENGTH_16 = 1;
2620
const GET_PAYLOAD_LENGTH_64 = 2;
@@ -577,7 +571,7 @@ class Receiver extends Writable {
577571
this._state = GET_INFO;
578572
} else {
579573
this._state = DEFER_EVENT;
580-
queueTask(() => {
574+
setImmediate(() => {
581575
this.emit('message', data, true);
582576
this._state = GET_INFO;
583577
this.startLoop(cb);
@@ -604,7 +598,7 @@ class Receiver extends Writable {
604598
this._state = GET_INFO;
605599
} else {
606600
this._state = DEFER_EVENT;
607-
queueTask(() => {
601+
setImmediate(() => {
608602
this.emit('message', buf, false);
609603
this._state = GET_INFO;
610604
this.startLoop(cb);
@@ -675,7 +669,7 @@ class Receiver extends Writable {
675669
this._state = GET_INFO;
676670
} else {
677671
this._state = DEFER_EVENT;
678-
queueTask(() => {
672+
setImmediate(() => {
679673
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
680674
this._state = GET_INFO;
681675
this.startLoop(cb);
@@ -711,32 +705,3 @@ class Receiver extends Writable {
711705
}
712706

713707
module.exports = Receiver;
714-
715-
/**
716-
* A shim for `queueMicrotask()`.
717-
*
718-
* @param {Function} cb Callback
719-
*/
720-
function queueMicrotaskShim(cb) {
721-
promise.then(cb).catch(throwErrorNextTick);
722-
}
723-
724-
/**
725-
* Throws an error.
726-
*
727-
* @param {Error} err The error to throw
728-
* @private
729-
*/
730-
function throwError(err) {
731-
throw err;
732-
}
733-
734-
/**
735-
* Throws an error in the next tick.
736-
*
737-
* @param {Error} err The error to throw
738-
* @private
739-
*/
740-
function throwErrorNextTick(err) {
741-
process.nextTick(throwError, err);
742-
}

test/receiver.test.js

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,17 +1085,21 @@ describe('Receiver', () => {
10851085
receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
10861086
});
10871087

1088-
it('emits at most one event per microtask', (done) => {
1088+
it('emits at most one event per event loop iteration', (done) => {
10891089
const actual = [];
10901090
const expected = [
10911091
'1',
1092-
'microtask 1',
1092+
'- 1',
1093+
'-- 1',
10931094
'2',
1094-
'microtask 2',
1095+
'- 2',
1096+
'-- 2',
10951097
'3',
1096-
'microtask 3',
1098+
'- 3',
1099+
'-- 3',
10971100
'4',
1098-
'microtask 4'
1101+
'- 4',
1102+
'-- 4'
10991103
];
11001104

11011105
function listener(data) {
@@ -1104,12 +1108,16 @@ describe('Receiver', () => {
11041108

11051109
// `queueMicrotask()` is not available in Node.js < 11.
11061110
Promise.resolve().then(() => {
1107-
actual.push(`microtask ${message}`);
1111+
actual.push(`- ${message}`);
11081112

1109-
if (actual.length === 8) {
1110-
assert.deepStrictEqual(actual, expected);
1111-
done();
1112-
}
1113+
Promise.resolve().then(() => {
1114+
actual.push(`-- ${message}`);
1115+
1116+
if (actual.length === 12) {
1117+
assert.deepStrictEqual(actual, expected);
1118+
done();
1119+
}
1120+
});
11131121
});
11141122
}
11151123

test/websocket.test.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4205,8 +4205,7 @@ describe('WebSocket', () => {
42054205

42064206
if (messages.push(message.toString()) > 1) return;
42074207

4208-
// `queueMicrotask()` is not available in Node.js < 11.
4209-
Promise.resolve().then(() => {
4208+
setImmediate(() => {
42104209
process.nextTick(() => {
42114210
assert.strictEqual(ws._receiver._state, 5);
42124211
ws.close(1000);
@@ -4456,8 +4455,7 @@ describe('WebSocket', () => {
44564455

44574456
if (messages.push(message.toString()) > 1) return;
44584457

4459-
// `queueMicrotask()` is not available in Node.js < 11.
4460-
Promise.resolve().then(() => {
4458+
setImmediate(() => {
44614459
process.nextTick(() => {
44624460
assert.strictEqual(ws._receiver._state, 5);
44634461
ws.terminate();

0 commit comments

Comments
 (0)