-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Ensure Sink.contextView is propagated #1450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
30f3de9
bce9f74
12083ad
905202f
96da0b8
55cdbf7
57dbf6a
2d0db20
38109c9
cc75540
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,9 +40,6 @@ | |
import java.util.Date; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Function; | ||
|
||
import static com.mongodb.ReadPreference.primary; | ||
import static com.mongodb.assertions.Assertions.notNull; | ||
|
@@ -106,7 +103,7 @@ public BsonValue getId() { | |
|
||
@Override | ||
public void subscribe(final Subscriber<? super Void> s) { | ||
Mono.defer(() -> { | ||
Mono.deferContextual(ctx -> { | ||
AtomicBoolean terminated = new AtomicBoolean(false); | ||
Timeout timeout = TimeoutContext.startTimeout(timeoutMs); | ||
return createCheckAndCreateIndexesMono(timeout) | ||
|
@@ -120,7 +117,7 @@ public void subscribe(final Subscriber<? super Void> s) { | |
return originalError; | ||
}) | ||
.then(Mono.error(originalError))) | ||
.doOnCancel(() -> createCancellationMono(terminated, timeout).subscribe()) | ||
.doOnCancel(() -> createCancellationMono(terminated, timeout).contextWrite(ctx).subscribe()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cancellation has a side effect - so we need to propagate context there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't us calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On cancellation of the main
The user may have tracing and want to trace from the web app to the driver. Without adding the context here we break that chain and they cannot tie the cleanup operations to a web request. |
||
.then(); | ||
}).subscribe(s); | ||
} | ||
|
@@ -149,38 +146,15 @@ public void subscribe(final Subscriber<? super ObjectId> subscriber) { | |
} | ||
|
||
private Mono<Void> createCheckAndCreateIndexesMono(@Nullable final Timeout timeout) { | ||
AtomicBoolean collectionExists = new AtomicBoolean(false); | ||
return Mono.create(sink -> findAllInCollection(filesCollection, timeout).subscribe( | ||
d -> collectionExists.set(true), | ||
sink::error, | ||
() -> { | ||
if (collectionExists.get()) { | ||
sink.success(); | ||
} else { | ||
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout) | ||
.doOnSuccess(i -> checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout) | ||
.subscribe(unused -> {}, sink::error, sink::success)) | ||
.subscribe(unused -> {}, sink::error); | ||
} | ||
}) | ||
); | ||
} | ||
|
||
private Mono<Document> findAllInCollection(final MongoCollection<GridFSFile> collection, @Nullable final Timeout timeout) { | ||
return collectionWithTimeoutDeferred(collection | ||
.withDocumentClass(Document.class) | ||
.withReadPreference(primary()), timeout) | ||
.flatMap(wrappedCollection -> { | ||
if (clientSession != null) { | ||
return Mono.from(wrappedCollection.find(clientSession) | ||
.projection(PROJECTION) | ||
.first()); | ||
} else { | ||
return Mono.from(wrappedCollection.find() | ||
.projection(PROJECTION) | ||
.first()); | ||
} | ||
}); | ||
return collectionWithTimeoutDeferred(filesCollection.withDocumentClass(Document.class).withReadPreference(primary()), timeout) | ||
.map(collection -> clientSession != null ? collection.find(clientSession) : collection.find()) | ||
.flatMap(findPublisher -> Mono.from(findPublisher.projection(PROJECTION).first())) | ||
.switchIfEmpty(Mono.defer(() -> | ||
checkAndCreateIndex(filesCollection.withReadPreference(primary()), FILES_INDEX, timeout) | ||
.then(checkAndCreateIndex(chunksCollection.withReadPreference(primary()), CHUNKS_INDEX, timeout)) | ||
.then(Mono.empty()) | ||
)) | ||
.then(); | ||
} | ||
|
||
private <T> Mono<Boolean> hasIndex(final MongoCollection<T> collection, final Document index, @Nullable final Timeout timeout) { | ||
|
@@ -228,40 +202,37 @@ private <T> Mono<String> createIndexMono(final MongoCollection<T> collection, fi | |
} | ||
|
||
private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) { | ||
return Mono.create(sink -> { | ||
rozza marked this conversation as resolved.
Show resolved
Hide resolved
|
||
AtomicLong lengthInBytes = new AtomicLong(0); | ||
AtomicInteger chunkIndex = new AtomicInteger(0); | ||
new ResizingByteBufferFlux(source, chunkSizeBytes) | ||
.takeUntilOther(createMonoTimer(timeout)) | ||
.flatMap((Function<ByteBuffer, Publisher<InsertOneResult>>) byteBuffer -> { | ||
if (terminated.get()) { | ||
return Mono.empty(); | ||
} | ||
byte[] byteArray = new byte[byteBuffer.remaining()]; | ||
if (byteBuffer.hasArray()) { | ||
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining()); | ||
} else { | ||
byteBuffer.mark(); | ||
byteBuffer.get(byteArray); | ||
byteBuffer.reset(); | ||
} | ||
Binary data = new Binary(byteArray); | ||
lengthInBytes.addAndGet(data.length()); | ||
return new ResizingByteBufferFlux(source, chunkSizeBytes) | ||
.takeUntilOther(createMonoTimer(timeout)) | ||
.index() | ||
.flatMap(indexAndBuffer -> { | ||
if (terminated.get()) { | ||
return Mono.empty(); | ||
} | ||
Long index = indexAndBuffer.getT1(); | ||
ByteBuffer byteBuffer = indexAndBuffer.getT2(); | ||
byte[] byteArray = new byte[byteBuffer.remaining()]; | ||
if (byteBuffer.hasArray()) { | ||
System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining()); | ||
} else { | ||
byteBuffer.mark(); | ||
byteBuffer.get(byteArray); | ||
byteBuffer.reset(); | ||
} | ||
Binary data = new Binary(byteArray); | ||
|
||
Document chunkDocument = new Document("files_id", fileId) | ||
.append("n", chunkIndex.getAndIncrement()) | ||
.append("data", data); | ||
Document chunkDocument = new Document("files_id", fileId) | ||
.append("n", index.intValue()) | ||
.append("data", data); | ||
|
||
if (clientSession == null) { | ||
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument); | ||
} else { | ||
return collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(clientSession, | ||
chunkDocument); | ||
} | ||
Publisher<InsertOneResult> insertOnePublisher = clientSession == null | ||
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument) | ||
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE) | ||
.insertOne(clientSession, chunkDocument); | ||
|
||
}) | ||
.subscribe(null, sink::error, () -> sink.success(lengthInBytes.get())); | ||
}); | ||
return Mono.from(insertOnePublisher).thenReturn(data.length()); | ||
}) | ||
.reduce(0L, Long::sum); | ||
} | ||
|
||
/** | ||
|
Uh oh!
There was an error while loading. Please reload this page.