Skip to content

Fix MixedBulkWriteOperation such that it does not leak MongoWriteConcernWithResponseException to users #1051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,32 +180,22 @@ public BulkWriteResult execute(final WriteBinding binding) {
logRetryExecute(retryState);
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
ConnectionDescription connectionDescription = connection.getDescription();
int maxWireVersion = connectionDescription.getMaxWireVersion();
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true);
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
.orElseThrow(Assertions::fail);
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(),
connectionDescription, sessionContext)) {
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
retryState.breakAndThrowIfRetryAnd(() -> !(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException));
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> {
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException);
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult)
.getResponse());
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
});
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(), connectionDescription, sessionContext)) {
handleMongoWriteConcernWithResponseException(retryState, true);
}
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
if (!bulkWriteTracker.batch().isPresent()) {
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
}
logRetryExecute(retryState);
return executeBulkWriteBatch(retryState, binding, connection, maxWireVersion);
return executeBulkWriteBatch(retryState, binding, connection);
});
});
try {
Expand All @@ -226,33 +216,22 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
(source, connection, releasingCallback) -> {
ConnectionDescription connectionDescription = connection.getDescription();
int maxWireVersion = connectionDescription.getMaxWireVersion();
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true);
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
.orElseThrow(Assertions::fail);
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(),
connectionDescription, sessionContext)) {
Throwable prospectiveFailedResult = retryState.exception().orElse(null);
if (retryState.breakAndCompleteIfRetryAnd(() ->
!(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException), releasingCallback)) {
return;
}
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> {
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException);
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult)
.getResponse());
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
});
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(),
connectionDescription, sessionContext)
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) {
return;
}
if (validateWriteRequestsAndCompleteIfInvalid(connectionDescription, bypassDocumentValidation, writeRequests,
writeConcern, releasingCallback)) {
return;
}
try {
if (!bulkWriteTracker.batch().isPresent()) {
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
Expand All @@ -262,17 +241,17 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
return;
}
logRetryExecute(retryState);
executeBulkWriteBatchAsync(retryState, binding, connection, maxWireVersion, releasingCallback);
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback);
});
}).whenComplete(binding::release);
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER)));
}

private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection,
final int maxWireVersion) {
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) {
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
.orElseThrow(Assertions::fail);
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
int maxWireVersion = connection.getDescription().getMaxWireVersion();
while (currentBatch.shouldProcessBatch()) {
try {
BsonDocument result = executeCommand(connection, currentBatch, binding);
Expand All @@ -292,9 +271,10 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
currentBulkWriteTracker = BulkWriteTracker.attachNext(retryState, currentBatch);
currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
} catch (MongoException exception) {
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) {
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed this because I found the new form easier to understand (I was trying to understand what's going on, not very successfully, though).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a separate catch for MongoWriteConcernWithResponseException to make this easier to understand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have

} catch (MongoException exception) {
    if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
        addRetryableWriteErrorLabel(exception, maxWireVersion);
    }
    handleMongoWriteConcernWithResponseException(retryState, false);
    throw exception;
}

with the change you propose we will have

} catch (MongoWriteConcernWithResponseException exception) {
    handleMongoWriteConcernWithResponseException(retryState, false);
    throw exception;
} catch (MongoException exception) {
    if (!retryState.isFirstAttempt()) {
        addRetryableWriteErrorLabel(exception, maxWireVersion);
    }
    handleMongoWriteConcernWithResponseException(retryState, false);
    throw exception;
}

The current code is shorter and does not have code duplication, the proposed code does not have the instanceof check. To me the current approach seems the lesser of two evils.

addRetryableWriteErrorLabel(exception, maxWireVersion);
}
handleMongoWriteConcernWithResponseException(retryState, false);
throw exception;
}
}
Expand All @@ -307,13 +287,14 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
}

private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection,
final int maxWireVersion, final SingleResultCallback<BulkWriteResult> callback) {
final SingleResultCallback<BulkWriteResult> callback) {
LoopState loopState = new LoopState();
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> {
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
.orElseThrow(Assertions::fail);
loopState.attach(AttachmentKeys.bulkWriteTracker(), currentBulkWriteTracker, true);
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
int maxWireVersion = connection.getDescription().getMaxWireVersion();
if (loopState.breakAndCompleteIf(() -> !currentBatch.shouldProcessBatch(), iterationCallback)) {
return;
}
Expand All @@ -340,9 +321,12 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
} else {
if (t instanceof MongoException) {
MongoException exception = (MongoException) t;
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) {
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
addRetryableWriteErrorLabel(exception, maxWireVersion);
}
if (handleMongoWriteConcernWithResponseExceptionAsync(retryState, null)) {
return;
}
}
iterationCallback.onResult(null, t);
}
Expand All @@ -368,6 +352,41 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
});
}

private void handleMongoWriteConcernWithResponseException(final RetryState retryState, final boolean breakAndThrowIfDifferent) {
if (!retryState.isFirstAttempt()) {
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException;
retryState.breakAndThrowIfRetryAnd(() -> breakAndThrowIfDifferent && !prospectiveResultIsWriteConcernException);
if (prospectiveResultIsWriteConcernException) {
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail)
.batch().ifPresent(bulkWriteBatch -> {
bulkWriteBatch.addResult(
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse());
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
});
}
}
}

private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetryState retryState,
@Nullable final SingleResultCallback<BulkWriteResult> callback) {
if (!retryState.isFirstAttempt()) {
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException;
if (callback != null && retryState.breakAndCompleteIfRetryAnd(() -> !prospectiveResultIsWriteConcernException, callback)) {
return true;
}
if (prospectiveResultIsWriteConcernException) {
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail)
.batch().ifPresent(bulkWriteBatch -> {
bulkWriteBatch.addResult(
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse());
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
});
}
}
return false;
}

private BsonDocument executeCommand(final Connection connection, final BulkWriteBatch batch, final WriteBinding binding) {
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NO_OP_FIELD_NAME_VALIDATOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,16 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat
def originalException = new MongoSocketException('Some failure', new ServerAddress())

when:
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]],
[REPLICA_SET_PRIMARY, STANDALONE], originalException, async)
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0], [3, 6, 0]],
[REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY, STANDALONE], originalException, async)

then:
Exception commandException = thrown()
commandException == originalException

when:
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0]],
[REPLICA_SET_PRIMARY], originalException, async, 1)
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]],
[REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY], originalException, async, 1)

then:
commandException = thrown()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.reactivestreams.client;

import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
import org.junit.Test;

/**
* See {@link com.mongodb.client.MongoWriteConcernWithResponseExceptionTest}.
*/
public class MongoWriteConcernWithResponseExceptionTest {
@Test
public void doesNotLeak() throws InterruptedException {
com.mongodb.client.MongoWriteConcernWithResponseExceptionTest.doesNotLeak(
mongoClientSettings -> new SyncMongoClient(MongoClients.create(mongoClientSettings)));
}
}
Loading