Skip to content

Commit ea6c37c

Browse files
authored
Obtain waitQueue and connection timeouts from OperationContext (#1228)
JAVA-5212
1 parent a881808 commit ea6c37c

File tree

39 files changed

+254
-210
lines changed

39 files changed

+254
-210
lines changed

driver-core/src/main/com/mongodb/connection/Stream.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.connection;
1818

1919
import com.mongodb.ServerAddress;
20+
import com.mongodb.internal.connection.OperationContext;
2021
import org.bson.ByteBuf;
2122

2223
import java.io.IOException;
@@ -36,14 +37,15 @@ public interface Stream extends BufferProvider{
3637
*
3738
* @throws IOException if an I/O error occurs
3839
*/
39-
void open() throws IOException;
40+
void open(OperationContext operationContext) throws IOException;
4041

4142
/**
4243
* Open the stream asynchronously.
4344
*
44-
* @param handler the completion handler for opening the stream
45+
* @param operationContext
46+
* @param handler the completion handler for opening the stream
4547
*/
46-
void openAsync(AsyncCompletionHandler<Void> handler);
48+
void openAsync(OperationContext operationContext, AsyncCompletionHandler<Void> handler);
4749

4850
/**
4951
* Write each buffer in the list to the stream in order, blocking until all are completely written.

driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mongodb.ServerAddress;
2222
import com.mongodb.internal.connection.AsynchronousChannelStream;
2323
import com.mongodb.internal.connection.ExtendedAsynchronousByteChannel;
24+
import com.mongodb.internal.connection.OperationContext;
2425
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
2526
import com.mongodb.internal.connection.tlschannel.BufferAllocator;
2627
import com.mongodb.internal.connection.tlschannel.ClientTlsChannel;
@@ -201,7 +202,7 @@ public boolean supportsAdditionalTimeout() {
201202

202203
@SuppressWarnings("deprecation")
203204
@Override
204-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
205+
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
205206
isTrue("unopened", getChannel() == null);
206207
try {
207208
SocketChannel socketChannel = SocketChannel.open();

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.mongodb.connection.SocketSettings;
2929
import com.mongodb.connection.SslSettings;
3030
import com.mongodb.connection.Stream;
31+
import com.mongodb.internal.connection.OperationContext;
3132
import com.mongodb.internal.connection.netty.NettyByteBuf;
3233
import com.mongodb.lang.Nullable;
3334
import io.netty.bootstrap.Bootstrap;
@@ -156,15 +157,15 @@ public ByteBuf getBuffer(final int size) {
156157
}
157158

158159
@Override
159-
public void open() throws IOException {
160+
public void open(final OperationContext operationContext) throws IOException {
160161
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
161-
openAsync(handler);
162+
openAsync(operationContext, handler);
162163
handler.get();
163164
}
164165

165166
@SuppressWarnings("deprecation")
166167
@Override
167-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
168+
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
168169
Queue<SocketAddress> socketAddressQueue;
169170

170171
try {
@@ -174,10 +175,11 @@ public void openAsync(final AsyncCompletionHandler<Void> handler) {
174175
return;
175176
}
176177

177-
initializeChannel(handler, socketAddressQueue);
178+
initializeChannel(operationContext, handler, socketAddressQueue);
178179
}
179180

180-
private void initializeChannel(final AsyncCompletionHandler<Void> handler, final Queue<SocketAddress> socketAddressQueue) {
181+
private void initializeChannel(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler,
182+
final Queue<SocketAddress> socketAddressQueue) {
181183
if (socketAddressQueue.isEmpty()) {
182184
handler.failed(new MongoSocketException("Exception opening socket", getAddress()));
183185
} else {
@@ -186,8 +188,8 @@ private void initializeChannel(final AsyncCompletionHandler<Void> handler, final
186188
Bootstrap bootstrap = new Bootstrap();
187189
bootstrap.group(workerGroup);
188190
bootstrap.channel(socketChannelClass);
189-
190-
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeout(MILLISECONDS));
191+
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
192+
operationContext.getTimeoutContext().getConnectTimeoutMs());
191193
bootstrap.option(ChannelOption.TCP_NODELAY, true);
192194
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
193195

@@ -221,7 +223,7 @@ public void initChannel(final SocketChannel ch) {
221223
}
222224
});
223225
ChannelFuture channelFuture = bootstrap.connect(nextAddress);
224-
channelFuture.addListener(new OpenChannelFutureListener(socketAddressQueue, channelFuture, handler));
226+
channelFuture.addListener(new OpenChannelFutureListener(operationContext, socketAddressQueue, channelFuture, handler));
225227
}
226228
}
227229

@@ -503,9 +505,12 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
503505
private final Queue<SocketAddress> socketAddressQueue;
504506
private final ChannelFuture channelFuture;
505507
private final AsyncCompletionHandler<Void> handler;
508+
private final OperationContext operationContext;
506509

507-
OpenChannelFutureListener(final Queue<SocketAddress> socketAddressQueue, final ChannelFuture channelFuture,
508-
final AsyncCompletionHandler<Void> handler) {
510+
OpenChannelFutureListener(final OperationContext operationContext,
511+
final Queue<SocketAddress> socketAddressQueue, final ChannelFuture channelFuture,
512+
final AsyncCompletionHandler<Void> handler) {
513+
this.operationContext = operationContext;
509514
this.socketAddressQueue = socketAddressQueue;
510515
this.channelFuture = channelFuture;
511516
this.handler = handler;
@@ -528,7 +533,7 @@ public void operationComplete(final ChannelFuture future) {
528533
} else if (socketAddressQueue.isEmpty()) {
529534
handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), future.cause()));
530535
} else {
531-
initializeChannel(handler, socketAddressQueue);
536+
initializeChannel(operationContext, handler, socketAddressQueue);
532537
}
533538
}
534539
});

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,13 @@ public Timeout startServerSelectionTimeout() {
158158
long ms = getTimeoutSettings().getServerSelectionTimeoutMS();
159159
return StartTime.now().timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
160160
}
161+
162+
public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
163+
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
164+
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
165+
}
166+
167+
public int getConnectTimeoutMs() {
168+
return (int) getTimeoutSettings().getConnectTimeoutMS();
169+
}
161170
}

driver-core/src/main/com/mongodb/internal/TimeoutSettings.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,35 +37,38 @@ public class TimeoutSettings {
3737
@Nullable
3838
private final Long defaultTimeoutMS;
3939

40-
private final long maxAwaitTimeMS;
41-
42-
// Deprecated timeouts
43-
private final long readTimeoutMS;
40+
// Deprecated configuration timeout options
41+
private final long readTimeoutMS; // aka socketTimeoutMS
42+
private final long maxWaitTimeMS; // aka waitQueueTimeoutMS
43+
@Nullable
44+
private final Long wTimeoutMS;
4445

46+
// Deprecated options for CRUD methods
4547
private final long maxTimeMS;
46-
48+
private final long maxAwaitTimeMS;
4749
private final long maxCommitTimeMS;
48-
@Nullable
49-
private final Long wTimeoutMS;
5050

5151
public static final TimeoutSettings DEFAULT = create(MongoClientSettings.builder().build());
5252

5353
public static TimeoutSettings create(final MongoClientSettings settings) {
54-
return new TimeoutSettings(settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS),
54+
return new TimeoutSettings(
55+
settings.getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS),
5556
settings.getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS),
5657
settings.getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS),
57-
settings.getTimeout(TimeUnit.MILLISECONDS));
58+
settings.getTimeout(TimeUnit.MILLISECONDS),
59+
settings.getConnectionPoolSettings().getMaxWaitTime(TimeUnit.MILLISECONDS));
5860
}
5961

6062
public TimeoutSettings(
61-
final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS, @Nullable final Long timeoutMS) {
62-
this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null);
63+
final long serverSelectionTimeoutMS, final long connectTimeoutMS, final long readTimeoutMS,
64+
@Nullable final Long timeoutMS, final long maxWaitTimeMS) {
65+
this(timeoutMS, null, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, 0, 0, 0, null, maxWaitTimeMS);
6366
}
6467

6568
TimeoutSettings(
6669
@Nullable final Long timeoutMS, @Nullable final Long defaultTimeoutMS, final long serverSelectionTimeoutMS,
6770
final long connectTimeoutMS, final long readTimeoutMS, final long maxAwaitTimeMS, final long maxTimeMS,
68-
final long maxCommitTimeMS, @Nullable final Long wTimeoutMS) {
71+
final long maxCommitTimeMS, @Nullable final Long wTimeoutMS, final long maxWaitTimeMS) {
6972
isTrueArgument("timeoutMS must be >= 0", timeoutMS == null || timeoutMS >= 0);
7073
this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
7174
this.connectTimeoutMS = connectTimeoutMS;
@@ -76,45 +79,46 @@ public TimeoutSettings(
7679
this.maxTimeMS = maxTimeMS;
7780
this.maxCommitTimeMS = maxCommitTimeMS;
7881
this.wTimeoutMS = wTimeoutMS;
82+
this.maxWaitTimeMS = maxWaitTimeMS;
7983
}
8084

8185
public TimeoutSettings connectionOnly() {
82-
return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null);
86+
return new TimeoutSettings(serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, null, 0);
8387
}
8488

8589
public TimeoutSettings withTimeoutMS(final long timeoutMS) {
8690
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
87-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
91+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
8892
}
8993

9094
public TimeoutSettings withDefaultTimeoutMS(final long defaultTimeoutMS) {
9195
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
92-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
96+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
9397
}
9498

9599
public TimeoutSettings withMaxTimeMS(final long maxTimeMS) {
96100
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
97-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
101+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
98102
}
99103

100104
public TimeoutSettings withMaxAwaitTimeMS(final long maxAwaitTimeMS) {
101105
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
102-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
106+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
103107
}
104108

105109
public TimeoutSettings withMaxTimeAndMaxAwaitTimeMS(final long maxTimeMS, final long maxAwaitTimeMS) {
106110
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
107-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
111+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
108112
}
109113

110114
public TimeoutSettings withMaxCommitMS(final long maxCommitTimeMS) {
111115
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
112-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
116+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
113117
}
114118

115119
public TimeoutSettings withWTimeoutMS(@Nullable final Long wTimeoutMS) {
116120
return new TimeoutSettings(timeoutMS, defaultTimeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS,
117-
maxTimeMS, maxCommitTimeMS, wTimeoutMS);
121+
maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS);
118122
}
119123

120124
public long getServerSelectionTimeoutMS() {
@@ -156,6 +160,10 @@ public Long getWTimeoutMS() {
156160
return wTimeoutMS;
157161
}
158162

163+
public long getMaxWaitTimeMS() {
164+
return maxWaitTimeMS;
165+
}
166+
159167
@Override
160168
public String toString() {
161169
return "TimeoutSettings{"

driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As
125125
}
126126

127127
@Override
128-
public void open() throws IOException {
128+
public void open(final OperationContext operationContext) throws IOException {
129129
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
130-
openAsync(handler);
130+
openAsync(operationContext, handler);
131131
handler.getOpen();
132132
}
133133

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final
5656

5757
@SuppressWarnings("deprecation")
5858
@Override
59-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
59+
public void openAsync(final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) {
6060
isTrue("unopened", getChannel() == null);
6161
Queue<SocketAddress> socketAddressQueue;
6262

driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,26 +105,26 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
105105

106106
ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
107107
boolean selectionFailureLogged = false;
108-
Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout();
108+
Timeout serverSelectionTimeout = operationContext.getTimeoutContext().startServerSelectionTimeout();
109109

110110
while (true) {
111111
CountDownLatch currentPhaseLatch = phase.get();
112112
ClusterDescription currentDescription = description;
113-
ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, timeout);
113+
ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, serverSelectionTimeout);
114114

115115
throwIfIncompatible(currentDescription);
116116
if (serverTuple != null) {
117117
return serverTuple;
118118
}
119-
if (timeout.hasExpired()) {
119+
if (serverSelectionTimeout.hasExpired()) {
120120
throw createTimeoutException(serverSelector, currentDescription);
121121
}
122122
if (!selectionFailureLogged) {
123-
logServerSelectionFailure(serverSelector, currentDescription, timeout);
123+
logServerSelectionFailure(serverSelector, currentDescription, serverSelectionTimeout);
124124
selectionFailureLogged = true;
125125
}
126126
connect();
127-
Timeout heartbeatLimitedTimeout = timeout.orEarlier(startMinWaitHeartbeatTimeout());
127+
Timeout heartbeatLimitedTimeout = serverSelectionTimeout.orEarlier(startMinWaitHeartbeatTimeout());
128128
heartbeatLimitedTimeout.awaitOn(currentPhaseLatch,
129129
() -> format("waiting for a server that matches %s", serverSelector));
130130
}

driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,22 @@
1818

1919
import com.mongodb.MongoConnectionPoolClearedException;
2020
import com.mongodb.annotations.ThreadSafe;
21-
import com.mongodb.connection.ConnectionPoolSettings;
2221
import com.mongodb.internal.async.SingleResultCallback;
23-
import com.mongodb.internal.time.StartTime;
24-
import org.bson.types.ObjectId;
2522
import com.mongodb.lang.Nullable;
23+
import org.bson.types.ObjectId;
2624

2725
import java.io.Closeable;
28-
import java.util.concurrent.TimeUnit;
2926

3027
/**
3128
* An instance of an implementation must be created in the {@linkplain #invalidate(Throwable) paused} state.
3229
*/
3330
@ThreadSafe
3431
interface ConnectionPool extends Closeable {
35-
/**
36-
* Is equivalent to {@link #get(OperationContext, long, TimeUnit)} called with {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}.
37-
*/
38-
InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException;
39-
4032
/**
4133
* @param operationContext the operation context
42-
* @param timeout This is not a timeout for the whole {@link #get(OperationContext, long, TimeUnit)},
43-
* see {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}.
44-
* <p>
45-
* See {@link StartTime#timeoutAfterOrInfiniteIfNegative(long, TimeUnit)}.</p>
4634
* @throws MongoConnectionPoolClearedException If detects that the pool is {@linkplain #invalidate(Throwable) paused}.
4735
*/
48-
InternalConnection get(OperationContext operationContext, long timeout, TimeUnit timeUnit) throws MongoConnectionPoolClearedException;
36+
InternalConnection get(OperationContext operationContext) throws MongoConnectionPoolClearedException;
4937

5038
/**
5139
* Completes the {@code callback} with a {@link MongoConnectionPoolClearedException}

0 commit comments

Comments
 (0)