Skip to content

Commit 3387038

Browse files
authored
Fix MixedBulkWriteOperation such that it does not leak MongoWriteConcernWithResponseException to users (#1051)
JAVA-4775
1 parent 8e989d4 commit 3387038

File tree

5 files changed

+245
-53
lines changed

5 files changed

+245
-53
lines changed

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -180,32 +180,22 @@ public BulkWriteResult execute(final WriteBinding binding) {
180180
logRetryExecute(retryState);
181181
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
182182
ConnectionDescription connectionDescription = connection.getDescription();
183-
int maxWireVersion = connectionDescription.getMaxWireVersion();
184183
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
185-
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true);
186-
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
187-
.orElseThrow(Assertions::fail);
184+
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
188185
SessionContext sessionContext = binding.getSessionContext();
189186
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
190-
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(),
191-
connectionDescription, sessionContext)) {
192-
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
193-
retryState.breakAndThrowIfRetryAnd(() -> !(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException));
194-
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> {
195-
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException);
196-
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult)
197-
.getResponse());
198-
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
199-
});
187+
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
188+
source.getServerDescription(), connectionDescription, sessionContext)) {
189+
handleMongoWriteConcernWithResponseException(retryState, true);
200190
}
201191
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
202-
if (!bulkWriteTracker.batch().isPresent()) {
192+
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
203193
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
204194
source.getServerDescription(), connectionDescription, ordered, writeConcern,
205195
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
206196
}
207197
logRetryExecute(retryState);
208-
return executeBulkWriteBatch(retryState, binding, connection, maxWireVersion);
198+
return executeBulkWriteBatch(retryState, binding, connection);
209199
});
210200
});
211201
try {
@@ -226,33 +216,22 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
226216
withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
227217
(source, connection, releasingCallback) -> {
228218
ConnectionDescription connectionDescription = connection.getDescription();
229-
int maxWireVersion = connectionDescription.getMaxWireVersion();
230219
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
231-
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true);
232-
BulkWriteTracker bulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
233-
.orElseThrow(Assertions::fail);
220+
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
234221
SessionContext sessionContext = binding.getSessionContext();
235222
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
236-
if (!retryState.isFirstAttempt() && !isRetryableWrite(retryWrites, writeConcern, source.getServerDescription(),
237-
connectionDescription, sessionContext)) {
238-
Throwable prospectiveFailedResult = retryState.exception().orElse(null);
239-
if (retryState.breakAndCompleteIfRetryAnd(() ->
240-
!(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException), releasingCallback)) {
241-
return;
242-
}
243-
bulkWriteTracker.batch().ifPresent(bulkWriteBatch -> {
244-
assertTrue(prospectiveFailedResult instanceof MongoWriteConcernWithResponseException);
245-
bulkWriteBatch.addResult((BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult)
246-
.getResponse());
247-
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
248-
});
223+
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
224+
source.getServerDescription(),
225+
connectionDescription, sessionContext)
226+
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) {
227+
return;
249228
}
250229
if (validateWriteRequestsAndCompleteIfInvalid(connectionDescription, bypassDocumentValidation, writeRequests,
251230
writeConcern, releasingCallback)) {
252231
return;
253232
}
254233
try {
255-
if (!bulkWriteTracker.batch().isPresent()) {
234+
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
256235
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
257236
source.getServerDescription(), connectionDescription, ordered, writeConcern,
258237
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
@@ -262,17 +241,17 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
262241
return;
263242
}
264243
logRetryExecute(retryState);
265-
executeBulkWriteBatchAsync(retryState, binding, connection, maxWireVersion, releasingCallback);
244+
executeBulkWriteBatchAsync(retryState, binding, connection, releasingCallback);
266245
});
267246
}).whenComplete(binding::release);
268247
retryingBulkWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER)));
269248
}
270249

271-
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection,
272-
final int maxWireVersion) {
250+
private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final WriteBinding binding, final Connection connection) {
273251
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
274252
.orElseThrow(Assertions::fail);
275253
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
254+
int maxWireVersion = connection.getDescription().getMaxWireVersion();
276255
while (currentBatch.shouldProcessBatch()) {
277256
try {
278257
BsonDocument result = executeCommand(connection, currentBatch, binding);
@@ -292,9 +271,10 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
292271
currentBulkWriteTracker = BulkWriteTracker.attachNext(retryState, currentBatch);
293272
currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
294273
} catch (MongoException exception) {
295-
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) {
274+
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
296275
addRetryableWriteErrorLabel(exception, maxWireVersion);
297276
}
277+
handleMongoWriteConcernWithResponseException(retryState, false);
298278
throw exception;
299279
}
300280
}
@@ -307,13 +287,14 @@ private BulkWriteResult executeBulkWriteBatch(final RetryState retryState, final
307287
}
308288

309289
private void executeBulkWriteBatchAsync(final RetryState retryState, final AsyncWriteBinding binding, final AsyncConnection connection,
310-
final int maxWireVersion, final SingleResultCallback<BulkWriteResult> callback) {
290+
final SingleResultCallback<BulkWriteResult> callback) {
311291
LoopState loopState = new LoopState();
312292
AsyncCallbackRunnable loop = new AsyncCallbackLoop(loopState, iterationCallback -> {
313293
BulkWriteTracker currentBulkWriteTracker = retryState.attachment(AttachmentKeys.bulkWriteTracker())
314294
.orElseThrow(Assertions::fail);
315295
loopState.attach(AttachmentKeys.bulkWriteTracker(), currentBulkWriteTracker, true);
316296
BulkWriteBatch currentBatch = currentBulkWriteTracker.batch().orElseThrow(Assertions::fail);
297+
int maxWireVersion = connection.getDescription().getMaxWireVersion();
317298
if (loopState.breakAndCompleteIf(() -> !currentBatch.shouldProcessBatch(), iterationCallback)) {
318299
return;
319300
}
@@ -340,9 +321,12 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
340321
} else {
341322
if (t instanceof MongoException) {
342323
MongoException exception = (MongoException) t;
343-
if (!(retryState.isFirstAttempt() || (exception instanceof MongoWriteConcernWithResponseException))) {
324+
if (!retryState.isFirstAttempt() && !(exception instanceof MongoWriteConcernWithResponseException)) {
344325
addRetryableWriteErrorLabel(exception, maxWireVersion);
345326
}
327+
if (handleMongoWriteConcernWithResponseExceptionAsync(retryState, null)) {
328+
return;
329+
}
346330
}
347331
iterationCallback.onResult(null, t);
348332
}
@@ -368,6 +352,41 @@ private void executeBulkWriteBatchAsync(final RetryState retryState, final Async
368352
});
369353
}
370354

355+
private void handleMongoWriteConcernWithResponseException(final RetryState retryState, final boolean breakAndThrowIfDifferent) {
356+
if (!retryState.isFirstAttempt()) {
357+
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
358+
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException;
359+
retryState.breakAndThrowIfRetryAnd(() -> breakAndThrowIfDifferent && !prospectiveResultIsWriteConcernException);
360+
if (prospectiveResultIsWriteConcernException) {
361+
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail)
362+
.batch().ifPresent(bulkWriteBatch -> {
363+
bulkWriteBatch.addResult(
364+
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse());
365+
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
366+
});
367+
}
368+
}
369+
}
370+
371+
private boolean handleMongoWriteConcernWithResponseExceptionAsync(final RetryState retryState,
372+
@Nullable final SingleResultCallback<BulkWriteResult> callback) {
373+
if (!retryState.isFirstAttempt()) {
374+
RuntimeException prospectiveFailedResult = (RuntimeException) retryState.exception().orElse(null);
375+
boolean prospectiveResultIsWriteConcernException = prospectiveFailedResult instanceof MongoWriteConcernWithResponseException;
376+
if (callback != null && retryState.breakAndCompleteIfRetryAnd(() -> !prospectiveResultIsWriteConcernException, callback)) {
377+
return true;
378+
}
379+
if (prospectiveResultIsWriteConcernException) {
380+
retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail)
381+
.batch().ifPresent(bulkWriteBatch -> {
382+
bulkWriteBatch.addResult(
383+
(BsonDocument) ((MongoWriteConcernWithResponseException) prospectiveFailedResult).getResponse());
384+
BulkWriteTracker.attachNext(retryState, bulkWriteBatch);
385+
});
386+
}
387+
}
388+
return false;
389+
}
371390

372391
private BsonDocument executeCommand(final Connection connection, final BulkWriteBatch batch, final WriteBinding binding) {
373392
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NO_OP_FIELD_NAME_VALIDATOR,

driver-core/src/test/functional/com/mongodb/internal/operation/MixedBulkWriteOperationSpecification.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,16 +1151,16 @@ class MixedBulkWriteOperationSpecification extends OperationFunctionalSpecificat
11511151
def originalException = new MongoSocketException('Some failure', new ServerAddress())
11521152

11531153
when:
1154-
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]],
1155-
[REPLICA_SET_PRIMARY, STANDALONE], originalException, async)
1154+
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0], [3, 6, 0]],
1155+
[REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY, STANDALONE], originalException, async)
11561156

11571157
then:
11581158
Exception commandException = thrown()
11591159
commandException == originalException
11601160

11611161
when:
1162-
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0]],
1163-
[REPLICA_SET_PRIMARY], originalException, async, 1)
1162+
testRetryableOperationThrowsOriginalError(operation, [[3, 6, 0], [3, 6, 0]],
1163+
[REPLICA_SET_PRIMARY, REPLICA_SET_PRIMARY], originalException, async, 1)
11641164

11651165
then:
11661166
commandException = thrown()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
20+
import org.junit.Test;
21+
22+
/**
23+
* See {@link com.mongodb.client.MongoWriteConcernWithResponseExceptionTest}.
24+
*/
25+
public class MongoWriteConcernWithResponseExceptionTest {
26+
@Test
27+
public void doesNotLeak() throws InterruptedException {
28+
com.mongodb.client.MongoWriteConcernWithResponseExceptionTest.doesNotLeak(
29+
mongoClientSettings -> new SyncMongoClient(MongoClients.create(mongoClientSettings)));
30+
}
31+
}

0 commit comments

Comments
 (0)