Skip to content

Commit 8c86e30

Browse files
authored
fix(NODE-6630): read all messages in buffer when chunk arrives (#4512)
1 parent f0b8739 commit 8c86e30

File tree

3 files changed

+154
-12
lines changed

3 files changed

+154
-12
lines changed

src/cmap/connection.ts

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
MongoNetworkTimeoutError,
2828
MongoOperationTimeoutError,
2929
MongoParseError,
30+
MongoRuntimeError,
3031
MongoServerError,
3132
MongoUnexpectedServerResponseError
3233
} from '../error';
@@ -791,22 +792,41 @@ export class SizedMessageTransform extends Transform {
791792
}
792793

793794
this.bufferPool.append(chunk);
794-
const sizeOfMessage = this.bufferPool.getInt32();
795795

796-
if (sizeOfMessage == null) {
797-
return callback();
798-
}
796+
while (this.bufferPool.length) {
797+
// While there are any bytes in the buffer
799798

800-
if (sizeOfMessage < 0) {
801-
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
802-
}
799+
// Try to fetch a size from the top 4 bytes
800+
const sizeOfMessage = this.bufferPool.getInt32();
801+
802+
if (sizeOfMessage == null) {
803+
// Not even an int32 worth of data. Stop the loop, we need more chunks.
804+
break;
805+
}
806+
807+
if (sizeOfMessage < 0) {
808+
// The size in the message has a negative value, this is probably corruption, throw:
809+
return callback(new MongoParseError(`Message size cannot be negative: ${sizeOfMessage}`));
810+
}
803811

804-
if (sizeOfMessage > this.bufferPool.length) {
805-
return callback();
812+
if (sizeOfMessage > this.bufferPool.length) {
813+
// We do not have enough bytes to make a sizeOfMessage chunk
814+
break;
815+
}
816+
817+
// Add a message to the stream
818+
const message = this.bufferPool.read(sizeOfMessage);
819+
820+
if (!this.push(message)) {
821+
// We only subscribe to data events so we should never get backpressure
822+
// if we do, we do not have the handling for it.
823+
return callback(
824+
new MongoRuntimeError(`SizedMessageTransform does not support backpressure`)
825+
);
826+
}
806827
}
807828

808-
const message = this.bufferPool.read(sizeOfMessage);
809-
return callback(null, message);
829+
callback();
810830
}
811831
}
812832

test/integration/connection-monitoring-and-pooling/connection.test.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
} from '../../mongodb';
2323
import * as mock from '../../tools/mongodb-mock/index';
2424
import { skipBrokenAuthTestBeforeEachHook } from '../../tools/runner/hooks/configuration';
25-
import { sleep } from '../../tools/utils';
25+
import { processTick, sleep } from '../../tools/utils';
2626
import { assert as test, setupDatabase } from '../shared';
2727

2828
const commonConnectOptions = {
@@ -249,6 +249,54 @@ describe('Connection', function () {
249249
client.connect();
250250
});
251251

252+
describe(
253+
'when a monitoring Connection receives many hellos in one chunk',
254+
{ requires: { topology: 'replicaset', mongodb: '>=4.4' } }, // need to be on a streaming hello version
255+
function () {
256+
let client: MongoClient;
257+
258+
beforeEach(async function () {
259+
// set heartbeatFrequencyMS just so we don't have to wait so long for a hello
260+
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
261+
});
262+
263+
afterEach(async function () {
264+
await client.close();
265+
});
266+
267+
// In the future we may want to skip processing concatenated heartbeats.
268+
// This test exists to prevent regression of processing many messages inside one chunk.
269+
it(
270+
'processes all of them and emits heartbeats',
271+
{ requires: { topology: 'replicaset', mongodb: '>=4.4' } },
272+
async function () {
273+
let hbSuccess = 0;
274+
client.on('serverHeartbeatSucceeded', () => (hbSuccess += 1));
275+
expect(hbSuccess).to.equal(0);
276+
277+
await client.db().command({ ping: 1 }); // start monitoring.
278+
const monitor = [...client.topology.s.servers.values()][0].monitor;
279+
280+
// @ts-expect-error: accessing private property
281+
const messageStream = monitor.connection.messageStream;
282+
// @ts-expect-error: accessing private property
283+
const socket = monitor.connection.socket;
284+
285+
const [hello] = (await once(messageStream, 'data')) as [Buffer];
286+
287+
const thousandHellos = Array.from({ length: 1000 }, () => [...hello]).flat(1);
288+
289+
// pretend this came from the server
290+
socket.emit('data', Buffer.from(thousandHellos));
291+
292+
// All of the hb will be emitted synchronously in the next tick as the entire chunk is processed.
293+
await processTick();
294+
expect(hbSuccess).to.be.greaterThan(1000);
295+
}
296+
);
297+
}
298+
);
299+
252300
context(
253301
'when a large message is written to the socket',
254302
{ requires: { topology: 'single', auth: 'disabled' } },

test/unit/cmap/connection.test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Socket } from 'node:net';
2+
import { Writable } from 'node:stream';
23

34
import { expect } from 'chai';
45
import * as sinon from 'sinon';
@@ -11,7 +12,9 @@ import {
1112
MongoClientAuthProviders,
1213
MongoDBCollectionNamespace,
1314
MongoNetworkTimeoutError,
15+
MongoRuntimeError,
1416
ns,
17+
promiseWithResolvers,
1518
SizedMessageTransform
1619
} from '../../mongodb';
1720
import * as mock from '../../tools/mongodb-mock/index';
@@ -333,5 +336,76 @@ describe('new Connection()', function () {
333336
expect(stream.read(1)).to.deep.equal(Buffer.from([6, 0, 0, 0, 5, 6]));
334337
expect(stream.read(1)).to.equal(null);
335338
});
339+
340+
it('parses many wire messages when a single chunk arrives', function () {
341+
const stream = new SizedMessageTransform({ connection: {} as any });
342+
343+
let dataCount = 0;
344+
stream.on('data', chunk => {
345+
expect(chunk).to.have.lengthOf(8);
346+
dataCount += 1;
347+
});
348+
349+
// 3 messages of size 8
350+
stream.write(
351+
Buffer.from([
352+
...[8, 0, 0, 0, 0, 0, 0, 0],
353+
...[8, 0, 0, 0, 0, 0, 0, 0],
354+
...[8, 0, 0, 0, 0, 0, 0, 0]
355+
])
356+
);
357+
358+
expect(dataCount).to.equal(3);
359+
});
360+
361+
it('parses many wire messages when a single chunk arrives and processes the remaining partial when it is complete', function () {
362+
const stream = new SizedMessageTransform({ connection: {} as any });
363+
364+
let dataCount = 0;
365+
stream.on('data', chunk => {
366+
expect(chunk).to.have.lengthOf(8);
367+
dataCount += 1;
368+
});
369+
370+
// 3 messages of size 8
371+
stream.write(
372+
Buffer.from([
373+
...[8, 0, 0, 0, 0, 0, 0, 0],
374+
...[8, 0, 0, 0, 0, 0, 0, 0],
375+
...[8, 0, 0, 0, 0, 0, 0, 0],
376+
...[8, 0, 0, 0, 0, 0] // two shy of 8
377+
])
378+
);
379+
380+
expect(dataCount).to.equal(3);
381+
382+
stream.write(Buffer.from([0, 0])); // the rest of the last 8
383+
384+
expect(dataCount).to.equal(4);
385+
});
386+
387+
it('throws an error when backpressure detected', async function () {
388+
const stream = new SizedMessageTransform({ connection: {} as any });
389+
const destination = new Writable({
390+
highWaterMark: 1,
391+
objectMode: true,
392+
write: (chunk, encoding, callback) => {
393+
void stream;
394+
setTimeout(1).then(() => callback());
395+
}
396+
});
397+
398+
// 1000 messages of size 8
399+
stream.write(
400+
Buffer.from(Array.from({ length: 1000 }, () => [8, 0, 0, 0, 0, 0, 0, 0]).flat(1))
401+
);
402+
403+
const { promise, resolve, reject } = promiseWithResolvers();
404+
405+
stream.on('error', reject).pipe(destination).on('error', reject).on('finish', resolve);
406+
407+
const error = await promise.catch(error => error);
408+
expect(error).to.be.instanceOf(MongoRuntimeError);
409+
});
336410
});
337411
});

0 commit comments

Comments
 (0)