-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix MixedBulkWriteOperation
such that it does not leak MongoWriteConcernWithResponseException
to users
#1051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -180,32 +180,22 @@ public BulkWriteResult execute(final WriteBinding binding) { | |
logRetryExecute(retryState); | ||
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> { | ||
ConnectionDescription connectionDescription = connection.getDescription(); | ||
int maxWireVersion = connectionDescription.getMaxWireVersion(); | ||
// attach `maxWireVersion` ASAP because it is used to check whether we can retry | ||
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true); | ||
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) | ||
.orElseThrow(Assertions::fail); | ||
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); | ||
SessionContext sessionContext = binding.getSessionContext(); | ||
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); | ||
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(), | ||
connectionDescription, sessionContext)) { | ||
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null); | ||
retryState.breakAndThrowIfRetryAnd(() -> !(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException)); | ||
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> { | ||
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException); | ||
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult) | ||
.getResponse()); | ||
BulkWriteTracker.attachNext(retryState, bulkWriteBatch); | ||
}); | ||
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), | ||
source.getServerDescription(), connectionDescription, sessionContext)) { | ||
handleMongoWriteConcernWithResponseException(retryState, true); | ||
} | ||
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern); | ||
if (!bulkWriteTracker.batch().isPresent()) { | ||
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) { | ||
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace, | ||
source.getServerDescription(), connectionDescription, ordered, writeConcern, | ||
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables)); | ||
} | ||
logRetryExecute(retryState); | ||
return executeBulkWriteBatch(retryState, binding, connection, maxWireVersion); | ||
return executeBulkWriteBatch(retryState, binding, connection); | ||
}); | ||
}); | ||
try { | ||
|
@@ -226,33 +216,22 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall | |
withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback, | ||
(source, connection, releasingCallback) -> { | ||
ConnectionDescription connectionDescription = connection.getDescription(); | ||
int maxWireVersion = connectionDescription.getMaxWireVersion(); | ||
// attach `maxWireVersion` ASAP because it is used to check whether we can retry | ||
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true); | ||
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) | ||
.orElseThrow(Assertions::fail); | ||
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true); | ||
SessionContext sessionContext = binding.getSessionContext(); | ||
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext); | ||
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(), | ||
connectionDescription, sessionContext)) { | ||
Throwable prospectiveFailedResult = retryState.exception().orElse(null); | ||
if (retryState.breakAndCompleteIfRetryAnd(() -> | ||
!(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException), releasingCallback)) { | ||
return; | ||
} | ||
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> { | ||
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException); | ||
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult) | ||
.getResponse()); | ||
BulkWriteTracker.attachNext(retryState, bulkWriteBatch); | ||
}); | ||
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), | ||
source.getServerDescription(), | ||
connectionDescription, sessionContext) | ||
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) { | ||
return; | ||
} | ||
if (validateWriteRequestsAndCompleteIfInvalid(connectionDescription, bypassDocumentValidation, writeRequests, | ||
writeConcern, releasingCallback)) { | ||
return; | ||
} | ||
try { | ||
if (!bulkWriteTracker.batch().isPresent()) { | ||
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) { | ||
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace, | ||
source.getServerDescription(), connectionDescription, ordered, writeConcern, | ||
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables)); | ||
|
@@ -262,17 +241,17 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall | |
return; | ||
} | ||
logRetryExecute(retryState); | ||
executeBulkWriteBatchAsync(retryState, binding, connection, maxWireVersion, releasingCallback); | ||
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback); | ||
}); | ||
}).whenComplete(binding::release); | ||
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER))); | ||
} | ||
|
||
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection, | ||
final int maxWireVersion) { | ||
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) { | ||
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) | ||
.orElseThrow(Assertions::fail); | ||
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); | ||
int maxWireVersion = connection.getDescription().getMaxWireVersion(); | ||
stIncMale marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while (currentBatch.shouldProcessBatch()) { | ||
try { | ||
BsonDocument result = executeCommand(connection, currentBatch, binding); | ||
|
@@ -292,9 +271,10 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final | |
currentBulkWriteTracker = BulkWriteTracker.attachNext(retryState, currentBatch); | ||
currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); | ||
} catch (MongoException exception) { | ||
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) { | ||
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this because I found the new form easier to understand (I was trying to understand what's going on, not very successfully, though). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a separate catch for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently we have } catch (MongoException exception) {
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
addRetryableWriteErrorLabel(exception, maxWireVersion);
}
handleMongoWriteConcernWithResponseException(retryState, false);
throw exception;
} with the change you propose we will have } catch (MongoWriteConcernWithResponseException exception) {
handleMongoWriteConcernWithResponseException(retryState, false);
throw exception;
} catch (MongoException exception) {
if (!retryState.isFirstAttempt()) {
addRetryableWriteErrorLabel(exception, maxWireVersion);
}
handleMongoWriteConcernWithResponseException(retryState, false);
throw exception;
} The current code is shorter and does not have code duplication, the proposed code does not have the |
||
addRetryableWriteErrorLabel(exception, maxWireVersion); | ||
} | ||
handleMongoWriteConcernWithResponseException(retryState, false); | ||
throw exception; | ||
} | ||
} | ||
|
@@ -307,13 +287,14 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final | |
} | ||
|
||
private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection, | ||
final int maxWireVersion, final SingleResultCallback<BulkWriteResult> callback) { | ||
final SingleResultCallback<BulkWriteResult> callback) { | ||
LoopState loopState = new LoopState(); | ||
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> { | ||
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker()) | ||
.orElseThrow(Assertions::fail); | ||
loopState.attach(AttachmentKeys.bulkWriteTracker(), currentBulkWriteTracker, true); | ||
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail); | ||
int maxWireVersion = connection.getDescription().getMaxWireVersion(); | ||
if (loopState.breakAndCompleteIf(() -> !currentBatch.shouldProcessBatch(), iterationCallback)) { | ||
return; | ||
} | ||
|
@@ -340,9 +321,12 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async | |
} else { | ||
if (t instanceof MongoException) { | ||
MongoException exception = (MongoException) t; | ||
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) { | ||
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) { | ||
addRetryableWriteErrorLabel(exception, maxWireVersion); | ||
} | ||
if (handleMongoWriteConcernWithResponseExceptionAsync(retryState, null)) { | ||
return; | ||
} | ||
} | ||
iterationCallback.onResult(null, t); | ||
} | ||
|
@@ -368,6 +352,41 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async | |
}); | ||
} | ||
|
||
private void handleMongoWriteConcernWithResponseException(final RetryState retryState, final boolean breakAndThrowIfDifferent) { | ||
if (!retryState.isFirstAttempt()) { | ||
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null); | ||
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException; | ||
retryState.breakAndThrowIfRetryAnd(() -> breakAndThrowIfDifferent && !prospectiveResultIsWriteConcernException); | ||
if (prospectiveResultIsWriteConcernException) { | ||
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail) | ||
.batch().ifPresent(bulkWriteBatch -> { | ||
bulkWriteBatch.addResult( | ||
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse()); | ||
BulkWriteTracker.attachNext(retryState, bulkWriteBatch); | ||
}); | ||
} | ||
} | ||
} | ||
|
||
private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetryState retryState, | ||
@Nullable final SingleResultCallback<BulkWriteResult> callback) { | ||
if (!retryState.isFirstAttempt()) { | ||
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null); | ||
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException; | ||
if (callback != null && retryState.breakAndCompleteIfRetryAnd(() -> !prospectiveResultIsWriteConcernException, callback)) { | ||
return true; | ||
} | ||
if (prospectiveResultIsWriteConcernException) { | ||
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail) | ||
.batch().ifPresent(bulkWriteBatch -> { | ||
bulkWriteBatch.addResult( | ||
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse()); | ||
BulkWriteTracker.attachNext(retryState, bulkWriteBatch); | ||
}); | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
private BsonDocument executeCommand(final Connection connection, final BulkWriteBatch batch, final WriteBinding binding) { | ||
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NO_OP_FIELD_NAME_VALIDATOR, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright 2008-present MongoDB, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.mongodb.reactivestreams.client; | ||
|
||
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; | ||
import org.junit.Test; | ||
|
||
/** | ||
* See {@link com.mongodb.client.MongoWriteConcernWithResponseExceptionTest}. | ||
*/ | ||
public class MongoWriteConcernWithResponseExceptionTest { | ||
@Test | ||
public void doesNotLeak() throws InterruptedException { | ||
com.mongodb.client.MongoWriteConcernWithResponseExceptionTest.doesNotLeak( | ||
mongoClientSettings -> new SyncMongoClient(MongoClients.create(mongoClientSettings))); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.