Skip to content

Commit f443751

Browse files
authored
Ensure Sink.contextView is propagated (#1450)
Context view is propagated via the subscriber, so any nested subscribe calls need to have the context passed through. JAVA-5345
1 parent 6740e9b commit f443751

File tree

4 files changed

+56
-85
lines changed

4 files changed

+56
-85
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818

1919
import org.reactivestreams.Publisher;
2020
import org.reactivestreams.Subscriber;
21-
import reactor.core.CoreSubscriber;
2221
import reactor.core.publisher.Flux;
2322
import reactor.core.publisher.FluxSink;
2423
import reactor.core.publisher.Mono;
25-
import reactor.util.context.Context;
2624

2725
import java.util.Objects;
2826
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,9 +46,9 @@ public void subscribe(final Subscriber<? super T> subscriber) {
4846
if (calculateDemand(demand) > 0 && inProgress.compareAndSet(false, true)) {
4947
if (batchCursor == null) {
5048
int batchSize = calculateBatchSize(sink.requestedFromDownstream());
51-
Context initialContext = subscriber instanceof CoreSubscriber<?>
52-
? ((CoreSubscriber<?>) subscriber).currentContext() : null;
53-
batchCursorPublisher.batchCursor(batchSize).subscribe(bc -> {
49+
batchCursorPublisher.batchCursor(batchSize)
50+
.contextWrite(sink.contextView())
51+
.subscribe(bc -> {
5452
batchCursor = bc;
5553
inProgress.set(false);
5654

@@ -60,7 +58,7 @@ public void subscribe(final Subscriber<? super T> subscriber) {
6058
} else {
6159
recurseCursor();
6260
}
63-
}, sink::error, null, initialContext);
61+
}, sink::error);
6462
} else {
6563
inProgress.set(false);
6664
recurseCursor();
@@ -86,6 +84,7 @@ private void recurseCursor(){
8684
} else {
8785
batchCursor.setBatchSize(calculateBatchSize(sink.requestedFromDownstream()));
8886
Mono.from(batchCursor.next(() -> sink.isCancelled()))
87+
.contextWrite(sink.contextView())
8988
.doOnCancel(this::closeCursor)
9089
.subscribe(results -> {
9190
if (!results.isEmpty()) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorPublisher.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,21 +123,17 @@ public TimeoutMode getTimeoutMode() {
123123

124124
public Publisher<T> first() {
125125
return batchCursor(this::asAsyncFirstReadOperation)
126-
.flatMap(batchCursor -> Mono.create(sink -> {
126+
.flatMap(batchCursor -> {
127127
batchCursor.setBatchSize(1);
128-
Mono.from(batchCursor.next())
128+
return Mono.from(batchCursor.next())
129129
.doOnTerminate(batchCursor::close)
130-
.doOnError(sink::error)
131-
.doOnSuccess(results -> {
130+
.flatMap(results -> {
132131
if (results == null || results.isEmpty()) {
133-
sink.success();
134-
} else {
135-
sink.success(results.get(0));
132+
return Mono.empty();
136133
}
137-
})
138-
.contextWrite(sink.contextView())
139-
.subscribe();
140-
}));
134+
return Mono.fromCallable(() -> results.get(0));
135+
});
136+
});
141137
}
142138

143139
@Override

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/Crypt.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ private void collInfo(final MongoCryptContext cryptContext,
306306
sink.error(new IllegalStateException("Missing database name"));
307307
} else {
308308
collectionInfoRetriever.filter(databaseName, cryptContext.getMongoOperation(), operationTimeout)
309+
.contextWrite(sink.contextView())
309310
.doOnSuccess(result -> {
310311
if (result != null) {
311312
cryptContext.addMongoOperationResult(result);
@@ -328,6 +329,7 @@ private void mark(final MongoCryptContext cryptContext,
328329
sink.error(wrapInClientException(new IllegalStateException("Missing database name")));
329330
} else {
330331
commandMarker.mark(databaseName, cryptContext.getMongoOperation(), operationTimeout)
332+
.contextWrite(sink.contextView())
331333
.doOnSuccess(result -> {
332334
cryptContext.addMongoOperationResult(result);
333335
cryptContext.completeMongoOperation();
@@ -343,6 +345,7 @@ private void fetchKeys(final MongoCryptContext cryptContext,
343345
final MonoSink<RawBsonDocument> sink,
344346
@Nullable final Timeout operationTimeout) {
345347
keyRetriever.find(cryptContext.getMongoOperation(), operationTimeout)
348+
.contextWrite(sink.contextView())
346349
.doOnSuccess(results -> {
347350
for (BsonDocument result : results) {
348351
cryptContext.addMongoOperationResult(result);
@@ -361,11 +364,13 @@ private void decryptKeys(final MongoCryptContext cryptContext,
361364
MongoKeyDecryptor keyDecryptor = cryptContext.nextKeyDecryptor();
362365
if (keyDecryptor != null) {
363366
keyManagementService.decryptKey(keyDecryptor, operationTimeout)
367+
.contextWrite(sink.contextView())
364368
.doOnSuccess(r -> decryptKeys(cryptContext, databaseName, sink, operationTimeout))
365369
.doOnError(e -> sink.error(wrapInClientException(e)))
366370
.subscribe();
367371
} else {
368372
Mono.fromRunnable(cryptContext::completeKeyDecryptors)
373+
.contextWrite(sink.contextView())
369374
.doOnSuccess(r -> executeStateMachineWithSink(cryptContext, databaseName, sink, operationTimeout))
370375
.doOnError(e -> sink.error(wrapInClientException(e)))
371376
.subscribe();

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 39 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,6 @@
4040
import java.util.Date;
4141
import java.util.Map;
4242
import java.util.concurrent.atomic.AtomicBoolean;
43-
import java.util.concurrent.atomic.AtomicInteger;
44-
import java.util.concurrent.atomic.AtomicLong;
45-
import java.util.function.Function;
4643

4744
import static com.mongodb.ReadPreference.primary;
4845
import static com.mongodb.assertions.Assertions.notNull;
@@ -106,7 +103,7 @@ public BsonValue getId() {
106103

107104
@Override
108105
public void subscribe(final Subscriber<? super Void> s) {
109-
Mono.defer(() -> {
106+
Mono.deferContextual(ctx -> {
110107
AtomicBoolean terminated = new AtomicBoolean(false);
111108
Timeout timeout = TimeoutContext.startTimeout(timeoutMs);
112109
return createCheckAndCreateIndexesMono(timeout)
@@ -120,7 +117,7 @@ public void subscribe(final Subscriber<? super Void> s) {
120117
return originalError;
121118
})
122119
.then(Mono.error(originalError)))
123-
.doOnCancel(() -> createCancellationMono(terminated, timeout).subscribe())
120+
.doOnCancel(() -> createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe())
124121
.then();
125122
}).subscribe(s);
126123
}
@@ -149,38 +146,15 @@ public void subscribe(final Subscriber<? super ObjectId> subscriber) {
149146
}
150147

151148
private Mono<Void> createCheckAndCreateIndexesMono(@Nullable final Timeout timeout) {
152-
AtomicBoolean collectionExists = new AtomicBoolean(false);
153-
return Mono.create(sink -> findAllInCollection(filesCollection, timeout).subscribe(
154-
d -> collectionExists.set(true),
155-
sink::error,
156-
() -> {
157-
if (collectionExists.get()) {
158-
sink.success();
159-
} else {
160-
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout)
161-
.doOnSuccess(i -> checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout)
162-
.subscribe(unused -> {}, sink::error, sink::success))
163-
.subscribe(unused -> {}, sink::error);
164-
}
165-
})
166-
);
167-
}
168-
169-
private Mono<Document> findAllInCollection(final MongoCollection<GridFSFile> collection, @Nullable final Timeout timeout) {
170-
return collectionWithTimeoutDeferred(collection
171-
.withDocumentClass(Document.class)
172-
.withReadPreference(primary()), timeout)
173-
.flatMap(wrappedCollection -> {
174-
if (clientSession != null) {
175-
return Mono.from(wrappedCollection.find(clientSession)
176-
.projection(PROJECTION)
177-
.first());
178-
} else {
179-
return Mono.from(wrappedCollection.find()
180-
.projection(PROJECTION)
181-
.first());
182-
}
183-
});
149+
return collectionWithTimeoutDeferred(filesCollection.withDocumentClass(Document.class).withReadPreference(primary()), timeout)
150+
.map(collection -> clientSession != null ? collection.find(clientSession) : collection.find())
151+
.flatMap(findPublisher -> Mono.from(findPublisher.projection(PROJECTION).first()))
152+
.switchIfEmpty(Mono.defer(() ->
153+
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout)
154+
.then(checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout))
155+
.then(Mono.empty())
156+
))
157+
.then();
184158
}
185159

186160
private <T> Mono<Boolean> hasIndex(final MongoCollection<T> collection, final Document index, @Nullable final Timeout timeout) {
@@ -228,40 +202,37 @@ private <T> Mono<String> createIndexMono(final MongoCollection<T> collection, fi
228202
}
229203

230204
private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) {
231-
return Mono.create(sink -> {
232-
AtomicLong lengthInBytes = new AtomicLong(0);
233-
AtomicInteger chunkIndex = new AtomicInteger(0);
234-
new ResizingByteBufferFlux(source, chunkSizeBytes)
235-
.takeUntilOther(createMonoTimer(timeout))
236-
.flatMap((Function<ByteBuffer, Publisher<InsertOneResult>>) byteBuffer -> {
237-
if (terminated.get()) {
238-
return Mono.empty();
239-
}
240-
byte[] byteArray = new byte[byteBuffer.remaining()];
241-
if (byteBuffer.hasArray()) {
242-
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
243-
} else {
244-
byteBuffer.mark();
245-
byteBuffer.get(byteArray);
246-
byteBuffer.reset();
247-
}
248-
Binary data = new Binary(byteArray);
249-
lengthInBytes.addAndGet(data.length());
205+
return new ResizingByteBufferFlux(source, chunkSizeBytes)
206+
.takeUntilOther(createMonoTimer(timeout))
207+
.index()
208+
.flatMap(indexAndBuffer -> {
209+
if (terminated.get()) {
210+
return Mono.empty();
211+
}
212+
Long index = indexAndBuffer.getT1();
213+
ByteBuffer byteBuffer = indexAndBuffer.getT2();
214+
byte[] byteArray = new byte[byteBuffer.remaining()];
215+
if (byteBuffer.hasArray()) {
216+
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
217+
} else {
218+
byteBuffer.mark();
219+
byteBuffer.get(byteArray);
220+
byteBuffer.reset();
221+
}
222+
Binary data = new Binary(byteArray);
250223

251-
Document chunkDocument = new Document("files_id", fileId)
252-
.append("n", chunkIndex.getAndIncrement())
253-
.append("data", data);
224+
Document chunkDocument = new Document("files_id", fileId)
225+
.append("n", index.intValue())
226+
.append("data", data);
254227

255-
if (clientSession == null) {
256-
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument);
257-
} else {
258-
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(clientSession,
259-
chunkDocument);
260-
}
228+
Publisher<InsertOneResult> insertOnePublisher = clientSession == null
229+
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument)
230+
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
231+
.insertOne(clientSession, chunkDocument);
261232

262-
})
263-
.subscribe(null, sink::error, () -> sink.success(lengthInBytes.get()));
264-
});
233+
return Mono.from(insertOnePublisher).thenReturn(data.length());
234+
})
235+
.reduce(0L, Long::sum);
265236
}
266237

267238
/**

0 commit comments

Comments
 (0)