Skip to content

Add BatchCursor interceptor in reactive tests #1390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoIterable;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.reactivestreams.client.internal.BatchCursor;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand All @@ -41,6 +42,7 @@ public class SyncMongoClient implements MongoClient {

private static long sleepAfterCursorCloseMS;
private static long sleepAfterSessionCloseMS;
private static boolean waitForBatchCursorCreation;

/**
* Unfortunately this is the only way to wait for a query to be initiated, since Reactive Streams is asynchronous
Expand Down Expand Up @@ -88,6 +90,27 @@ public static void enableSleepAfterSessionClose(final long sleepMS) {
sleepAfterSessionCloseMS = sleepMS;
}

/**
* Enables behavior for waiting until a reactive {@link BatchCursor} is created.
* <p>
* When enabled, {@link SyncMongoCursor} allows intercepting the result of the cursor creation process.
* If the creation fails, the resulting exception will be propagated; if successful, the
* process will proceed to issue getMore commands.
* <p>
* NOTE: Do not enable when multiple cursors are being iterated concurrently.
*/
public static void enableWaitForBatchCursorCreation() {
waitForBatchCursorCreation = true;
}

public static boolean isWaitForBatchCursorCreationEnabled() {
return waitForBatchCursorCreation;
}

public static void disableWaitForBatchCursorCreation() {
waitForBatchCursorCreation = false;
}

public static void disableSleep() {
sleepAfterCursorOpenMS = 0;
sleepAfterCursorCloseMS = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,36 @@
import com.mongodb.ServerCursor;
import com.mongodb.client.MongoCursor;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.internal.BatchCursor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

import java.util.NoSuchElementException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.mongodb.ClusterFixture.TIMEOUT;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.isWaitForBatchCursorCreationEnabled;

class SyncMongoCursor<T> implements MongoCursor<T> {
private static final Object COMPLETED = new Object();
private final BlockingDeque<Object> results = new LinkedBlockingDeque<>();
private final CompletableFuture<Object> batchCursorCompletableFuture = new CompletableFuture<>();
private final Integer batchSize;
private int countToBatchSize;
private Subscription subscription;
Expand All @@ -51,6 +61,15 @@ class SyncMongoCursor<T> implements MongoCursor<T> {
SyncMongoCursor(final Publisher<T> publisher, @Nullable final Integer batchSize) {
this.batchSize = batchSize;
CountDownLatch latch = new CountDownLatch(1);

if (isWaitForBatchCursorCreationEnabled()) {
// This hook allows us to intercept the `onNext` and `onError` signals for any operation to determine
// whether the {@link BatchCursor} was created successfully or if an error occurred during its creation process.
// The result is propagated to a {@link CompletableFuture}, which we use to block until it is completed.
Hooks.onEachOperator(Operators.lift((sc, sub) ->
new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture)));
}

//noinspection ReactiveStreamsSubscriberImplementation
Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber<T>() {
@Override
Expand Down Expand Up @@ -83,9 +102,19 @@ public void onComplete() {
if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) {
throw new MongoTimeoutException("Timeout waiting for subscription");
}
if (isWaitForBatchCursorCreationEnabled()) {
batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
Hooks.resetOnEachOperator();
}
sleep(getSleepAfterCursorOpen());
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e);
} catch (ExecutionException | TimeoutException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -181,4 +210,49 @@ private RuntimeException translateError(final Throwable throwable) {
}
return new RuntimeException(throwable);
}


private static final class BatchCursorInterceptSubscriber implements CoreSubscriber<Object> {

private final CoreSubscriber<Object> sub;
private final CompletableFuture<Object> batchCursorCompletableFuture;

BatchCursorInterceptSubscriber(final CoreSubscriber<Object> sub,
final CompletableFuture<Object> batchCursorCompletableFuture) {
this.sub = sub;
this.batchCursorCompletableFuture = batchCursorCompletableFuture;
}

@Override
public Context currentContext() {
return sub.currentContext();
}

@Override
public void onSubscribe(final Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(final Object o) {
if (o instanceof BatchCursor) {
// Interception of a cursor means that it has been created at this point.
batchCursorCompletableFuture.complete(o);
}
sub.onNext(o);
}

@Override
public void onError(final Throwable t) {
if (!batchCursorCompletableFuture.isDone()) { // Cursor has not been created yet but an error occurred.
batchCursorCompletableFuture.completeExceptionally(t);
}
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@
import java.util.List;

import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableSleep;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableWaitForBatchCursorCreation;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorOpen;
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableWaitForBatchCursorCreation;
import static org.junit.Assume.assumeFalse;

public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {

private static final List<String> ERROR_REQUIRED_FROM_CHANGE_STREAM_INITIALIZATION_TESTS =
Arrays.asList(
"Test with document comment - pre 4.4",
"Change Stream should error when an invalid aggregation stage is passed in",
"The watch helper must not throw a custom exception when executed against a single server topology, "
+ "but instead depend on a server error"
"Test with document comment - pre 4.4"
);

private static final List<String> EVENT_SENSITIVE_TESTS =
Expand All @@ -48,6 +47,14 @@ public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
"Test that comment is not set on getMore - pre 4.4"
);

private static final List<String> REQUIRES_BATCH_CURSOR_CREATION_WAITING =
Arrays.asList(
"Change Stream should error when an invalid aggregation stage is passed in",
"The watch helper must not throw a custom exception when executed against a single server topology, "
+ "but instead depend on a server error"
);


public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescription,
@SuppressWarnings("unused") final String testDescription,
final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities,
Expand All @@ -58,12 +65,17 @@ public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescriptio
assumeFalse(EVENT_SENSITIVE_TESTS.contains(testDescription));

enableSleepAfterCursorOpen(256);

if (REQUIRES_BATCH_CURSOR_CREATION_WAITING.contains(testDescription)) {
enableWaitForBatchCursorCreation();
}
}

@After
public void cleanUp() {
super.cleanUp();
disableSleep();
disableWaitForBatchCursorCreation();
}

@Parameterized.Parameters(name = "{0}: {1}")
Expand Down