Skip to content

Commit 8e91765

Browse files
committed
Ensure reactor context is passed to any child BatchCursor Mono/Flux
JAVA-4788
1 parent bcf31b8 commit 8e91765

26 files changed

+536
-150
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import org.reactivestreams.Publisher;
2020
import org.reactivestreams.Subscriber;
21+
import reactor.core.CoreSubscriber;
2122
import reactor.core.publisher.Flux;
2223
import reactor.core.publisher.FluxSink;
2324
import reactor.core.publisher.Mono;
25+
import reactor.util.context.Context;
2426

2527
import java.util.concurrent.atomic.AtomicBoolean;
2628
import java.util.concurrent.atomic.AtomicLong;
@@ -45,6 +47,8 @@ public void subscribe(final Subscriber<? super T> subscriber) {
4547
if (calculateDemand(demand) > 0 && inProgress.compareAndSet(false, true)) {
4648
if (batchCursor == null) {
4749
int batchSize = calculateBatchSize(sink.requestedFromDownstream());
50+
Context initialContext = subscriber instanceof CoreSubscriber<?>
51+
? ((CoreSubscriber<?>) subscriber).currentContext() : null;
4852
batchCursorPublisher.batchCursor(batchSize).subscribe(bc -> {
4953
batchCursor = bc;
5054
inProgress.set(false);
@@ -55,7 +59,7 @@ public void subscribe(final Subscriber<? super T> subscriber) {
5559
} else {
5660
recurseCursor();
5761
}
58-
}, sink::error);
62+
}, sink::error, null, initialContext);
5963
} else {
6064
inProgress.set(false);
6165
recurseCursor();

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.ReadPreference;
21+
import com.mongodb.internal.VisibleForTesting;
2122
import com.mongodb.internal.async.AsyncBatchCursor;
2223
import com.mongodb.internal.operation.AsyncOperations;
2324
import com.mongodb.internal.operation.AsyncReadOperation;
@@ -32,7 +33,8 @@
3233

3334
import static com.mongodb.assertions.Assertions.notNull;
3435

35-
abstract class BatchCursorPublisher<T> implements Publisher<T> {
36+
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PROTECTED)
37+
public abstract class BatchCursorPublisher<T> implements Publisher<T> {
3638
private final ClientSession clientSession;
3739
private final MongoOperationPublisher<T> mongoOperationPublisher;
3840
private Integer batchSize;
@@ -112,6 +114,7 @@ public Publisher<T> first() {
112114
sink.success(results.get(0));
113115
}
114116
})
117+
.contextWrite(sink.contextView())
115118
.subscribe();
116119
}));
117120
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideEncryptionTest.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.bson.BsonDocument;
2626
import org.junit.After;
2727

28+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
29+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
30+
2831
public class ClientSideEncryptionTest extends AbstractClientSideEncryptionTest {
2932

3033
private MongoClient mongoClient;
@@ -35,15 +38,23 @@ public ClientSideEncryptionTest(final String filename, final String description,
3538
}
3639

3740
@Override
38-
protected void createMongoClient(final MongoClientSettings mongoClientSettings) {
39-
mongoClient = new SyncMongoClient(MongoClients.create(mongoClientSettings));
41+
protected void createMongoClient(final MongoClientSettings settings) {
42+
mongoClient = new SyncMongoClient(MongoClients.create(
43+
MongoClientSettings.builder(settings).contextProvider(CONTEXT_PROVIDER).build()
44+
));
4045
}
4146

4247
@Override
4348
protected MongoDatabase getDatabase(final String databaseName) {
4449
return mongoClient.getDatabase(databaseName);
4550
}
4651

52+
@Override
53+
public void shouldPassAllOutcomes() {
54+
super.shouldPassAllOutcomes();
55+
assertContextPassedThrough(getDefinition());
56+
}
57+
4758
@After
4859
public void cleanUp() {
4960
if (mongoClient != null) {

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.bson.BsonDocument;
2626

2727
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
28+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
29+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
2830

2931
public class CrudTest extends AbstractCrudTest {
3032

@@ -40,6 +42,7 @@ protected void createMongoClient(final CommandListener commandListener) {
4042
mongoClient = new SyncMongoClient(
4143
MongoClients.create(getMongoClientSettingsBuilder()
4244
.addCommandListener(commandListener)
45+
.contextProvider(CONTEXT_PROVIDER)
4346
.build()));
4447
}
4548

@@ -48,10 +51,17 @@ protected MongoDatabase getDatabase(final String databaseName) {
4851
return mongoClient.getDatabase(databaseName);
4952
}
5053

54+
@Override
55+
public void shouldPassAllOutcomes() {
56+
super.shouldPassAllOutcomes();
57+
assertContextPassedThrough();
58+
}
59+
5160
@Override
5261
public void cleanUp() {
5362
if (mongoClient != null) {
5463
mongoClient.close();
5564
}
5665
}
66+
5767
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ExplainTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,37 @@
2020
import com.mongodb.client.AbstractExplainTest;
2121
import com.mongodb.client.MongoClient;
2222
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
23+
import org.junit.Test;
24+
25+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
26+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
2327

2428
public class ExplainTest extends AbstractExplainTest {
2529
@Override
2630
protected MongoClient createMongoClient(final MongoClientSettings settings) {
27-
return new SyncMongoClient(MongoClients.create(settings));
31+
return new SyncMongoClient(MongoClients.create(
32+
MongoClientSettings.builder(settings).contextProvider(CONTEXT_PROVIDER).build()
33+
));
34+
}
35+
36+
@Test
37+
@Override
38+
public void testExplainOfFind() {
39+
super.testExplainOfFind();
40+
assertContextPassedThrough();
41+
}
42+
43+
@Test
44+
@Override
45+
public void testExplainOfAggregateWithNewResponseStructure() {
46+
super.testExplainOfAggregateWithNewResponseStructure();
47+
assertContextPassedThrough();
48+
}
49+
50+
@Test
51+
@Override
52+
public void testExplainOfAggregateWithOldResponseStructure() {
53+
super.testExplainOfAggregateWithOldResponseStructure();
54+
assertContextPassedThrough();
2855
}
2956
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/MainTransactionsTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,25 @@
2727
import org.junit.After;
2828
import org.junit.Before;
2929

30+
import java.util.Arrays;
3031
import java.util.Collections;
3132
import java.util.HashSet;
3233
import java.util.Set;
3334

35+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
36+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
37+
3438
public class MainTransactionsTest extends AbstractMainTransactionsTest {
3539
public static final Set<String> SESSION_CLOSE_TIMING_SENSITIVE_TESTS = new HashSet<>(Collections.singletonList(
3640
"implicit abort"));
41+
public static final Set<String> TESTS_THAT_EXECUTE_NO_OPERATIONS = new HashSet<>(Arrays.asList(
42+
"start with implicit unacknowledged write concern",
43+
"start twice",
44+
"commit after no-op abort",
45+
"abort without start",
46+
"abort directly after no-op commit"
47+
));
48+
3749

3850
public MainTransactionsTest(final String filename, final String description, final String databaseName, final String collectionName,
3951
final BsonArray data, final BsonDocument definition, final boolean skipTest) {
@@ -42,14 +54,24 @@ public MainTransactionsTest(final String filename, final String description, fin
4254

4355
@Override
4456
protected MongoClient createMongoClient(final MongoClientSettings settings) {
45-
return new SyncMongoClient(MongoClients.create(settings));
57+
return new SyncMongoClient(MongoClients.create(
58+
MongoClientSettings.builder(settings).contextProvider(CONTEXT_PROVIDER).build()
59+
));
4660
}
4761

4862
@Override
4963
protected StreamFactoryFactory getStreamFactoryFactory() {
5064
return ClusterFixture.getOverriddenStreamFactoryFactory();
5165
}
5266

67+
@Override
68+
public void shouldPassAllOutcomes() {
69+
super.shouldPassAllOutcomes();
70+
if (!TESTS_THAT_EXECUTE_NO_OPERATIONS.contains(getDescription())) {
71+
assertContextPassedThrough(getDefinition());
72+
}
73+
}
74+
5375
@Before
5476
public void before() {
5577
if (SESSION_CLOSE_TIMING_SENSITIVE_TESTS.contains(getDescription())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
20+
import com.mongodb.client.MongoClient;
21+
import com.mongodb.client.MongoCollection;
22+
import com.mongodb.client.MongoDatabase;
23+
import com.mongodb.client.model.Aggregates;
24+
import com.mongodb.client.model.Filters;
25+
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
26+
import org.bson.BsonDocument;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.DisplayName;
30+
import org.junit.jupiter.api.DynamicTest;
31+
import org.junit.jupiter.api.TestFactory;
32+
33+
import java.util.List;
34+
import java.util.stream.Collectors;
35+
36+
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
37+
import static com.mongodb.reactivestreams.client.Fixture.getDefaultDatabaseName;
38+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
39+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
40+
import static java.lang.String.format;
41+
import static java.util.Arrays.asList;
42+
import static java.util.Collections.singletonList;
43+
import static java.util.stream.IntStream.rangeClosed;
44+
import static org.junit.jupiter.api.DynamicTest.dynamicTest;
45+
46+
47+
public class ReactiveContextProviderTest {
48+
private MongoClient mongoClient;
49+
50+
@BeforeEach
51+
public void setup() {
52+
getCollection().insertMany(rangeClosed(1, 11)
53+
.boxed()
54+
.map(i -> BsonDocument.parse(format("{a: %s}", i)))
55+
.collect(Collectors.toList()));
56+
}
57+
58+
@AfterEach
59+
public void tearDown() {
60+
if (mongoClient != null) {
61+
getCollection().drop();
62+
mongoClient.close();
63+
mongoClient = null;
64+
}
65+
}
66+
67+
68+
@SuppressWarnings("deprecation")
69+
@TestFactory
70+
@DisplayName("test context passed through when using first")
71+
List<DynamicTest> testMongoIterableFirstPassesTheContext() {
72+
return asList(
73+
dynamicTest("Aggregate Publisher", () -> {
74+
getCollection().aggregate(singletonList(Aggregates.match(Filters.gt("a", 5)))).first();
75+
assertContextPassedThrough();
76+
}),
77+
dynamicTest("Distinct Publisher", () -> {
78+
getCollection().distinct("a", Integer.class).first();
79+
assertContextPassedThrough();
80+
}),
81+
dynamicTest("Find Publisher", () -> {
82+
getCollection().find().first();
83+
assertContextPassedThrough();
84+
}),
85+
dynamicTest("List Collections Publisher", () -> {
86+
getDatabase().listCollections().first();
87+
assertContextPassedThrough();
88+
}),
89+
dynamicTest("List Databases Publisher", () -> {
90+
getMongoClient().listDatabases().first();
91+
assertContextPassedThrough();
92+
}),
93+
dynamicTest("List Indexes Publisher", () -> {
94+
getCollection().listIndexes().first();
95+
assertContextPassedThrough();
96+
}),
97+
dynamicTest("Map Reduce Publisher", () -> {
98+
getCollection().mapReduce(
99+
"function () { emit('a', this.a) }",
100+
"function (k, v) { return Array.sum(v)}").first();
101+
assertContextPassedThrough();
102+
})
103+
);
104+
}
105+
106+
private MongoClient getMongoClient() {
107+
if (mongoClient == null) {
108+
mongoClient = new SyncMongoClient(
109+
MongoClients.create(getMongoClientSettingsBuilder()
110+
.contextProvider(CONTEXT_PROVIDER)
111+
.build()));
112+
}
113+
return mongoClient;
114+
}
115+
116+
private MongoDatabase getDatabase() {
117+
return getMongoClient().getDatabase(getDefaultDatabaseName());
118+
}
119+
120+
private MongoCollection<BsonDocument> getCollection() {
121+
return getDatabase().getCollection("contextViewRegressionTest", BsonDocument.class);
122+
}
123+
124+
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableReadsTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import org.bson.BsonDocument;
2525
import org.bson.BsonString;
2626

27+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
28+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
29+
2730
public class RetryableReadsTest extends AbstractRetryableReadsTest {
2831
public RetryableReadsTest(final String filename, final String description, final String databaseName, final String collectionName,
2932
final BsonArray data, final BsonString bucketName, final BsonDocument definition, final boolean skipTest) {
@@ -32,6 +35,14 @@ public RetryableReadsTest(final String filename, final String description, final
3235

3336
@Override
3437
protected MongoClient createMongoClient(final MongoClientSettings settings) {
35-
return new SyncMongoClient(MongoClients.create(settings));
38+
return new SyncMongoClient(MongoClients.create(
39+
MongoClientSettings.builder(settings).contextProvider(CONTEXT_PROVIDER).build()
40+
));
41+
}
42+
43+
@Override
44+
public void shouldPassAllOutcomes() {
45+
super.shouldPassAllOutcomes();
46+
assertContextPassedThrough(getDefinition());
3647
}
3748
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/RetryableWritesTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.bson.BsonArray;
2424
import org.bson.BsonDocument;
2525

26+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT_PROVIDER;
27+
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.assertContextPassedThrough;
28+
2629
public class RetryableWritesTest extends AbstractRetryableWritesTest {
2730
public RetryableWritesTest(final String filename, final String description, final String databaseName, final BsonArray data,
2831
final BsonDocument definition, final boolean skipTest) {
@@ -31,6 +34,14 @@ public RetryableWritesTest(final String filename, final String description, fina
3134

3235
@Override
3336
public MongoClient createMongoClient(final MongoClientSettings settings) {
34-
return new SyncMongoClient(MongoClients.create(settings));
37+
return new SyncMongoClient(MongoClients.create(
38+
MongoClientSettings.builder(settings).contextProvider(CONTEXT_PROVIDER).build()
39+
));
40+
}
41+
42+
@Override
43+
public void shouldPassAllOutcomes() {
44+
super.shouldPassAllOutcomes();
45+
assertContextPassedThrough(getDefinition());
3546
}
3647
}

0 commit comments

Comments
 (0)