Skip to content

Commit 9609890

Browse files
authored
Ensure no NPE is thrown with SerializedFluxSink (#1096)
When the FluxSink is an instance of SerializedFluxSink and the results contains a null value, a NullPointerException is thrown and the whole process stops. This is because SerializedFluxSink.next performs a call to Object.requireNonNull(). To fix this issue we filter out null values from the results. JAVA-4908
1 parent 7bdd1d7 commit 9609890

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import reactor.core.publisher.Mono;
2525
import reactor.util.context.Context;
2626

27+
import java.util.Objects;
2728
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicLong;
2930

@@ -95,7 +96,10 @@ private void recurseCursor(){
9596
})
9697
.doOnSuccess(results -> {
9798
if (results != null) {
98-
results.forEach(sink::next);
99+
results
100+
.stream()
101+
.filter(Objects::nonNull)
102+
.forEach(sink::next);
99103
calculateDemand(-results.size());
100104
}
101105
if (batchCursor.isClosed()) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/internal/BatchCursorFluxTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import reactor.core.publisher.Hooks;
4747
import reactor.core.publisher.Mono;
4848

49+
import java.util.Arrays;
50+
import java.util.HashMap;
4951
import java.util.List;
5052
import java.util.concurrent.atomic.AtomicBoolean;
5153
import java.util.stream.Collectors;
@@ -328,6 +330,32 @@ public void testBatchCursorDoesNotDropAnError() {
328330
}
329331
}
330332

333+
@Test
334+
@DisplayName("Ensure no NPE is thrown on null in result set")
335+
public void testNoNPEOnNull() {
336+
try {
337+
AtomicBoolean errorDropped = new AtomicBoolean();
338+
Hooks.onErrorDropped(t -> errorDropped.set(true));
339+
340+
Document doc = new Document("x", null);
341+
Document doc2 = new Document("x", "hello");
342+
343+
Mono.from(collection.insertMany(Arrays.asList(doc, doc2))).block();
344+
345+
TestSubscriber<String> subscriber = new TestSubscriber<>();
346+
347+
collection.distinct("x", String.class).subscribe(subscriber);
348+
349+
subscriber.requestMore(1);
350+
351+
subscriber.assertReceivedOnNext(Arrays.asList("hello"));
352+
353+
assertFalse(errorDropped.get());
354+
} finally {
355+
Hooks.resetOnErrorDropped();
356+
}
357+
}
358+
331359
@Test
332360
@DisplayName("Ensure BatchCursor reports cursor errors")
333361
@SuppressWarnings("OptionalGetWithoutIsPresent")

0 commit comments

Comments
 (0)