Skip to content

Commit eff01da

Browse files
imclemrozza
authored andcommitted
Ensure no NPE is thrown with SerializedFluxSink (#1096)
When the FluxSink is an instance of SerializedFluxSink and the results contains a null value, a NullPointerException is thrown and the whole process stops. This is because SerializedFluxSink.next performs a call to Object.requireNonNull(). To fix this issue we filter out null values from the results. JAVA-4908
1 parent dc39ce6 commit eff01da

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.util.context.Context;
2626

27+
import java.util.Objects;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicLong;
2930

@@ -95,7 +96,10 @@ private void recurseCursor(){
9596
})
9697
.doOnSuccess(results -> {
9798
if (results != null) {
98-
results.forEach(sink::next);
99+
results
100+
.stream()
101+
.filter(Objects::nonNull)
102+
.forEach(sink::next);
99103
calculateDemand(-results.size());
100104
}
101105
if (batchCursor.isClosed()) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import reactor.core.publisher.Hooks;
4747
import reactor.core.publisher.Mono;
4848

49+
import java.util.Arrays;
4950
import java.util.List;
5051
import java.util.concurrent.atomic.AtomicBoolean;
5152
import java.util.stream.Collectors;
@@ -328,6 +329,32 @@ public void testBatchCursorDoesNotDropAnError() {
328329
}
329330
}
330331

332+
@Test
333+
@DisplayName("Ensure no NPE is thrown on null in result set")
334+
public void testNoNPEOnNull() {
335+
try {
336+
AtomicBoolean errorDropped = new AtomicBoolean();
337+
Hooks.onErrorDropped(t -> errorDropped.set(true));
338+
339+
Document doc = new Document("x", null);
340+
Document doc2 = new Document("x", "hello");
341+
342+
Mono.from(collection.insertMany(Arrays.asList(doc, doc2))).block();
343+
344+
TestSubscriber<String> subscriber = new TestSubscriber<>();
345+
346+
collection.distinct("x", String.class).subscribe(subscriber);
347+
348+
subscriber.requestMore(1);
349+
350+
subscriber.assertReceivedOnNext(Arrays.asList("hello"));
351+
352+
assertFalse(errorDropped.get());
353+
} finally {
354+
Hooks.resetOnErrorDropped();
355+
}
356+
}
357+
331358
@Test
332359
@DisplayName("Ensure BatchCursor reports cursor errors")
333360
@SuppressWarnings("OptionalGetWithoutIsPresent")

0 commit comments

Comments
 (0)