|
19 | 19 | import com.mongodb.Function;
|
20 | 20 | import com.mongodb.MongoClientException;
|
21 | 21 | import com.mongodb.MongoClientSettings;
|
| 22 | +import com.mongodb.MongoCommandException; |
22 | 23 | import com.mongodb.MongoException;
|
| 24 | +import com.mongodb.ServerAddress; |
| 25 | +import com.mongodb.assertions.Assertions; |
| 26 | +import com.mongodb.event.CommandFailedEvent; |
| 27 | +import com.mongodb.event.CommandListener; |
23 | 28 | import com.mongodb.event.ConnectionCheckOutFailedEvent;
|
24 | 29 | import com.mongodb.event.ConnectionCheckedOutEvent;
|
25 | 30 | import com.mongodb.event.ConnectionPoolClearedEvent;
|
|
34 | 39 | import org.junit.Before;
|
35 | 40 | import org.junit.Test;
|
36 | 41 |
|
| 42 | +import java.util.List; |
| 43 | +import java.util.concurrent.CompletableFuture; |
37 | 44 | import java.util.concurrent.ExecutionException;
|
38 | 45 | import java.util.concurrent.ExecutorService;
|
39 | 46 | import java.util.concurrent.Executors;
|
40 | 47 | import java.util.concurrent.Future;
|
41 | 48 | import java.util.concurrent.TimeUnit;
|
42 | 49 | import java.util.concurrent.TimeoutException;
|
| 50 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 51 | +import java.util.function.BiFunction; |
| 52 | +import java.util.stream.Collectors; |
43 | 53 |
|
44 | 54 | import static com.mongodb.ClusterFixture.getServerStatus;
|
45 | 55 | import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
|
|
55 | 65 | import static java.util.concurrent.TimeUnit.SECONDS;
|
56 | 66 | import static org.junit.Assert.assertEquals;
|
57 | 67 | import static org.junit.Assert.assertTrue;
|
| 68 | +import static org.junit.Assert.fail; |
58 | 69 | import static org.junit.Assume.assumeFalse;
|
59 | 70 | import static org.junit.Assume.assumeTrue;
|
60 | 71 |
|
@@ -138,7 +149,6 @@ public static <R> void poolClearedExceptionMustBeRetryable(
|
138 | 149 | * As a result, the client has to wait for at least its heartbeat delay until it hears back from a server
|
139 | 150 | * (while it waits for a response, calling `ServerMonitor.connect` has no effect).
|
140 | 151 | * Thus, we want to use small heartbeat delay to reduce delays in the test. */
|
141 |
| - .minHeartbeatFrequency(50, TimeUnit.MILLISECONDS) |
142 | 152 | .heartbeatFrequency(50, TimeUnit.MILLISECONDS))
|
143 | 153 | .retryReads(true)
|
144 | 154 | .retryWrites(true)
|
@@ -179,6 +189,64 @@ public static <R> void poolClearedExceptionMustBeRetryable(
|
179 | 189 | }
|
180 | 190 | }
|
181 | 191 |
|
| 192 | + /** |
| 193 | + * Prose test #3. |
| 194 | + */ |
| 195 | + @Test |
| 196 | + public void originalErrorMustBePropagatedIfNoWritesPerformed() throws InterruptedException { |
| 197 | + originalErrorMustBePropagatedIfNoWritesPerformed(MongoClients::create); |
| 198 | + } |
| 199 | + |
| 200 | + public static void originalErrorMustBePropagatedIfNoWritesPerformed( |
| 201 | + final Function<MongoClientSettings, MongoClient> clientCreator) throws InterruptedException { |
| 202 | + assumeTrue(serverVersionAtLeast(6, 0) && isDiscoverableReplicaSet()); |
| 203 | + BiFunction<Integer, List<String>, BsonDocument> configureFailPointDocCreator = (errorCode, errorLabels) -> |
| 204 | + new BsonDocument() |
| 205 | + .append("configureFailPoint", new BsonString("failCommand")) |
| 206 | + .append("mode", new BsonDocument() |
| 207 | + .append("times", new BsonInt32(1))) |
| 208 | + .append("data", new BsonDocument() |
| 209 | + .append("failCommands", new BsonArray(singletonList(new BsonString("insert")))) |
| 210 | + .append("errorCode", new BsonInt32(errorCode)) |
| 211 | + .append("errorLabels", new BsonArray(errorLabels.stream().map(BsonString::new).collect(Collectors.toList())))); |
| 212 | + ServerAddress primaryServerAddress = Fixture.getPrimary(); |
| 213 | + CompletableFuture<FailPoint> futureFailPoint = new CompletableFuture<>(); |
| 214 | + CommandListener commandListener = new CommandListener() { |
| 215 | + private final AtomicBoolean configureFailPoint = new AtomicBoolean(true); |
| 216 | + |
| 217 | + @Override |
| 218 | + public void commandFailed(final CommandFailedEvent event) { |
| 219 | + if (event.getCommandName().equals("insert") && configureFailPoint.compareAndSet(true, false)) { |
| 220 | + Assertions.assertTrue(futureFailPoint.complete(FailPoint.enable( |
| 221 | + configureFailPointDocCreator.apply(10107, asList("RetryableWriteError", "NoWritesPerformed")), |
| 222 | + primaryServerAddress |
| 223 | + ))); |
| 224 | + } |
| 225 | + } |
| 226 | + }; |
| 227 | + try (MongoClient client = clientCreator.apply(getMongoClientSettingsBuilder() |
| 228 | + .retryWrites(true) |
| 229 | + .addCommandListener(commandListener) |
| 230 | + .applyToServerSettings(builder -> |
| 231 | + // see `poolClearedExceptionMustBeRetryable` for the explanation |
| 232 | + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS)) |
| 233 | + .build()); |
| 234 | + FailPoint ignored = FailPoint.enable(configureFailPointDocCreator.apply(91, singletonList("RetryableWriteError")), client)) { |
| 235 | + MongoCollection<Document> collection = client.getDatabase(getDefaultDatabaseName()) |
| 236 | + .getCollection("originalErrorMustBePropagatedIfErrorWithRetryableWriteErrorLabelHappens"); |
| 237 | + collection.drop(); |
| 238 | + try { |
| 239 | + collection.insertOne(new Document()); |
| 240 | + } catch (MongoCommandException e) { |
| 241 | + assertEquals(e.getErrorCode(), 91); |
| 242 | + return; |
| 243 | + } |
| 244 | + fail("must not reach"); |
| 245 | + } finally { |
| 246 | + futureFailPoint.thenAccept(FailPoint::close); |
| 247 | + } |
| 248 | + } |
| 249 | + |
182 | 250 | private boolean canRunTests() {
|
183 | 251 | Document storageEngine = (Document) getServerStatus().get("storageEngine");
|
184 | 252 |
|
|
0 commit comments