Skip to content

Obtain waitQueue and connection timeouts from OperationContext #1228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions driver-core/src/main/com/mongodb/connection/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.connection;

import com.mongodb.ServerAddress;
import com.mongodb.internal.connection.OperationContext;
import org.bson.ByteBuf;

import java.io.IOException;
Expand All @@ -36,14 +37,15 @@ public interface Stream extends BufferProvider{
*
* @throws IOException if an I/O error occurs
*/
void open() throws IOException;
void open(OperationContext operationContext) throws IOException;

/**
* Open the stream asynchronously.
*
* @param handler the completion handler for opening the stream
* @param operationContext
* @param handler the completion handler for opening the stream
*/
void openAsync(AsyncCompletionHandler<Void> handler);
void openAsync(OperationContext operationContext, AsyncCompletionHandler<Void> handler);

/**
* Write each buffer in the list to the stream in order, blocking until all are completely written.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.ServerAddress;
import com.mongodb.internal.connection.AsynchronousChannelStream;
import com.mongodb.internal.connection.ExtendedAsynchronousByteChannel;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
import com.mongodb.internal.connection.tlschannel.BufferAllocator;
import com.mongodb.internal.connection.tlschannel.ClientTlsChannel;
Expand Down Expand Up @@ -201,7 +202,7 @@ public boolean supportsAdditionalTimeout() {

@SuppressWarnings("deprecation")
@Override
public void openAsync(final AsyncCompletionHandler<Void> handler) {
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
isTrue("unopened", getChannel() == null);
try {
SocketChannel socketChannel = SocketChannel.open();
Expand Down
27 changes: 16 additions & 11 deletions driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.Stream;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.netty.NettyByteBuf;
import com.mongodb.lang.Nullable;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -156,15 +157,15 @@ public ByteBuf getBuffer(final int size) {
}

@Override
public void open() throws IOException {
public void open(final OperationContext operationContext) throws IOException {
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
openAsync(handler);
openAsync(operationContext, handler);
handler.get();
}

@SuppressWarnings("deprecation")
@Override
public void openAsync(final AsyncCompletionHandler<Void> handler) {
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
Queue<SocketAddress> socketAddressQueue;

try {
Expand All @@ -174,10 +175,11 @@ public void openAsync(final AsyncCompletionHandler<Void> handler) {
return;
}

initializeChannel(handler, socketAddressQueue);
initializeChannel(operationContext, handler, socketAddressQueue);
}

private void initializeChannel(final AsyncCompletionHandler<Void> handler, final Queue<SocketAddress> socketAddressQueue) {
private void initializeChannel(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler,
final Queue<SocketAddress> socketAddressQueue) {
if (socketAddressQueue.isEmpty()) {
handler.failed(new MongoSocketException("Exception opening socket", getAddress()));
} else {
Expand All @@ -186,8 +188,8 @@ private void initializeChannel(final AsyncCompletionHandler<Void> handler, final
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(socketChannelClass);

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeout(MILLISECONDS));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
operationContext.getTimeoutContext().getConnectTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);

Expand Down Expand Up @@ -221,7 +223,7 @@ public void initChannel(final SocketChannel ch) {
}
});
ChannelFuture channelFuture = bootstrap.connect(nextAddress);
channelFuture.addListener(new OpenChannelFutureListener(socketAddressQueue, channelFuture, handler));
channelFuture.addListener(new OpenChannelFutureListener(operationContext, socketAddressQueue, channelFuture, handler));
}
}

Expand Down Expand Up @@ -503,9 +505,12 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
private final Queue<SocketAddress> socketAddressQueue;
private final ChannelFuture channelFuture;
private final AsyncCompletionHandler<Void> handler;
private final OperationContext operationContext;

OpenChannelFutureListener(final Queue<SocketAddress> socketAddressQueue, final ChannelFuture channelFuture,
final AsyncCompletionHandler<Void> handler) {
OpenChannelFutureListener(final OperationContext operationContext,
final Queue<SocketAddress> socketAddressQueue, final ChannelFuture channelFuture,
final AsyncCompletionHandler<Void> handler) {
this.operationContext = operationContext;
this.socketAddressQueue = socketAddressQueue;
this.channelFuture = channelFuture;
this.handler = handler;
Expand All @@ -528,7 +533,7 @@ public void operationComplete(final ChannelFuture future) {
} else if (socketAddressQueue.isEmpty()) {
handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), future.cause()));
} else {
initializeChannel(handler, socketAddressQueue);
initializeChannel(operationContext, handler, socketAddressQueue);
}
}
});
Expand Down
9 changes: 9 additions & 0 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,13 @@ public Timeout startServerSelectionTimeout() {
long ms = getTimeoutSettings().getServerSelectionTimeoutMS();
return StartTime.now().timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}

public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}

public int getConnectTimeoutMs() {
return (int) getTimeoutSettings().getConnectTimeoutMS();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns an int. I think it would be more consistent to return a Timeout, but it was used in a way where 0 indicated infinite timeout (rather than the usual negative is infinite, see above). I could not find what the user should use to indicate infinite connect timeout specified in our docs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because with Java sockets a timeout cannot be negative and 0 means infinite. See: socket.connect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, though I think that we should probably say the same in our own docs. (I didn't confirm that all paths used an API where 0 meant infinite, but I think this must be the case.)

}
}
48 changes: 28 additions & 20 deletions driver-core/src/main/com/mongodb/internal/TimeoutSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,38 @@ public class TimeoutSettings {
@Nullable
private final Long defaultTimeoutMS;

private final long maxAwaitTimeMS;

// Deprecated timeouts
private final long readTimeoutMS;
// Deprecated configuration timeout options
private final long readTimeoutMS; // aka socketTimeoutMS
private final long maxWaitTimeMS; // aka waitQueueTimeoutMS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Nullable
private final Long wTimeoutMS;

// Deprecated options for CRUD methods
private final long maxTimeMS;

private final long maxAwaitTimeMS;
private final long maxCommitTimeMS;
@Nullable
private final Long wTimeoutMS;

public static final TimeoutSettings DEFAULT = create(MongoClientSettings.builder().build());

public static TimeoutSettings create(final MongoClientSettings settings) {
return new TimeoutSettings(settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS),
return new TimeoutSettings(
settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS),
settings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS),
settings.getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS),
settings.getTimeout(TimeUnit.MILLISECONDS));
settings.getTimeout(TimeUnit.MILLISECONDS),
settings.getConnectionPoolSettings().getMaxWaitTime(TimeUnit.MILLISECONDS));
}

public TimeoutSettings(
final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS, @Nullable final Long timeoutMS) {
this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null);
final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS,
@Nullable final Long timeoutMS, final long maxWaitTimeMS) {
this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null, maxWaitTimeMS);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added maxWaitTimeMS, in a way that seemed consistent with readTimeoutMS, but I was also not sure whether we intended to pass these in, since both were deprecated. Should these really be passed in, or should they be set using one of the "with" methods below?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its needed in the constructor, so it can be copied via the with methods.
We should eventually make this constructor private or annotate visibility test only.

}

TimeoutSettings(
@Nullable final Long timeoutMS, @Nullable final Long defaultTimeoutMS, final long serverSelectionTimeoutMS,
final long connectTimeoutMS, final long readTimeoutMS, final long maxAwaitTimeMS, final long maxTimeMS,
final long maxCommitTimeMS, @Nullable final Long wTimeoutMS) {
final long maxCommitTimeMS, @Nullable final Long wTimeoutMS, final long maxWaitTimeMS) {
isTrueArgument("timeoutMS must be >= 0", timeoutMS == null || timeoutMS >= 0);
this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
this.connectTimeoutMS = connectTimeoutMS;
Expand All @@ -76,45 +79,46 @@ public TimeoutSettings(
this.maxTimeMS = maxTimeMS;
this.maxCommitTimeMS = maxCommitTimeMS;
this.wTimeoutMS = wTimeoutMS;
this.maxWaitTimeMS = maxWaitTimeMS;
}

public TimeoutSettings connectionOnly() {
return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null);
return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null, 0);
}

public TimeoutSettings withTimeoutMS(final long timeoutMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withDefaultTimeoutMS(final long defaultTimeoutMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withMaxTimeMS(final long maxTimeMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withMaxAwaitTimeMS(final long maxAwaitTimeMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withMaxTimeAndMaxAwaitTimeMS(final long maxTimeMS, final long maxAwaitTimeMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withMaxCommitMS(final long maxCommitTimeMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public TimeoutSettings withWTimeoutMS(@Nullable final Long wTimeoutMS) {
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
}

public long getServerSelectionTimeoutMS() {
Expand Down Expand Up @@ -156,6 +160,10 @@ public Long getWTimeoutMS() {
return wTimeoutMS;
}

public long getMaxWaitTimeMS() {
return maxWaitTimeMS;
}

@Override
public String toString() {
return "TimeoutSettings{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As
}

@Override
public void open() throws IOException {
public void open(final OperationContext operationContext) throws IOException {
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
openAsync(handler);
openAsync(operationContext, handler);
handler.getOpen();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final

@SuppressWarnings("deprecation")
@Override
public void openAsync(final AsyncCompletionHandler<Void> handler) {
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
isTrue("unopened", getChannel() == null);
Queue<SocketAddress> socketAddressQueue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,26 +105,26 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera

ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
boolean selectionFailureLogged = false;
Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout();
Timeout serverSelectionTimeout = operationContext.getTimeoutContext().startServerSelectionTimeout();

while (true) {
CountDownLatch currentPhaseLatch = phase.get();
ClusterDescription currentDescription = description;
ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, timeout);
ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, serverSelectionTimeout);

throwIfIncompatible(currentDescription);
if (serverTuple != null) {
return serverTuple;
}
if (timeout.hasExpired()) {
if (serverSelectionTimeout.hasExpired()) {
throw createTimeoutException(serverSelector, currentDescription);
}
if (!selectionFailureLogged) {
logServerSelectionFailure(serverSelector, currentDescription, timeout);
logServerSelectionFailure(serverSelector, currentDescription, serverSelectionTimeout);
selectionFailureLogged = true;
}
connect();
Timeout heartbeatLimitedTimeout = timeout.orEarlier(startMinWaitHeartbeatTimeout());
Timeout heartbeatLimitedTimeout = serverSelectionTimeout.orEarlier(startMinWaitHeartbeatTimeout());
heartbeatLimitedTimeout.awaitOn(currentPhaseLatch,
() -> format("waiting for a server that matches %s", serverSelector));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,22 @@

import com.mongodb.MongoConnectionPoolClearedException;
import com.mongodb.annotations.ThreadSafe;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.time.StartTime;
import org.bson.types.ObjectId;
import com.mongodb.lang.Nullable;
import org.bson.types.ObjectId;

import java.io.Closeable;
import java.util.concurrent.TimeUnit;

/**
* An instance of an implementation must be created in the {@linkplain #invalidate(Throwable) paused} state.
*/
@ThreadSafe
interface ConnectionPool extends Closeable {
/**
* Is equivalent to {@link #get(OperationContext, long, TimeUnit)} called with {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}.
*/
InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException;

/**
* @param operationContext the operation context
* @param timeout This is not a timeout for the whole {@link #get(OperationContext, long, TimeUnit)},
* see {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}.
* <p>
* See {@link StartTime#timeoutAfterOrInfiniteIfNegative(long, TimeUnit)}.</p>
* @throws MongoConnectionPoolClearedException If detects that the pool is {@linkplain #invalidate(Throwable) paused}.
*/
InternalConnection get(OperationContext operationContext, long timeout, TimeUnit timeUnit) throws MongoConnectionPoolClearedException;
InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException;

/**
* Completes the {@code callback} with a {@link MongoConnectionPoolClearedException}
Expand Down
Loading