Skip to content

CSOT: Network failure handling and session management #1396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 11, 2024
Merged
11 changes: 11 additions & 0 deletions driver-core/src/main/com/mongodb/internal/ExceptionUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.mongodb.internal;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoSocketException;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
Expand All @@ -35,6 +37,15 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class ExceptionUtils {

public static boolean isMongoSocketException(final Throwable e) {
return e instanceof MongoSocketException;
}

public static boolean isOperationTimeoutFromSocketException(final Throwable e) {
return e instanceof MongoOperationTimeoutException && e.getCause() instanceof MongoSocketException;
}

public static final class MongoCommandExceptionUtils {
public static int extractErrorCode(final BsonDocument response) {
return extractErrorCodeAsBson(response).intValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.connection;

import com.mongodb.internal.session.SessionContext;

import static com.mongodb.internal.ExceptionUtils.isMongoSocketException;
import static com.mongodb.internal.ExceptionUtils.isOperationTimeoutFromSocketException;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public abstract class AbstractProtocolExecutor implements ProtocolExecutor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this abstract class need to exist? Can these helper methods just be added to ProtocolExecutor?

Copy link
Member Author

@vbabanin vbabanin Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the methods could be moved to the interface since they are stateless, but this would make them public. I've opted to keep them protected within the abstract class to maintain better encapsulation and clearer boundaries in the codebase. If you feel that we should move them to the interface, I am open to doing that. Please let me know your thoughts.

I've also relocated the isMongoSocketException and isOperationTimeoutFromSocketException to ExceptionUtils for broader reuse.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all internal code, it just adds an extra layer for helper style methods. Looks like this abstract class could be removed and shouldMarkSessionDirty be moved to a helper.

However, if you are keen to keep it protected then its ok with me.


protected boolean shouldMarkSessionDirty(final Throwable e, final SessionContext sessionContext) {
if (!sessionContext.hasSession()) {
return false;
}
return isMongoSocketException(e) || isOperationTimeoutFromSocketException(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.mongodb.MongoException;
import com.mongodb.MongoServerUnavailableException;
import com.mongodb.MongoSocketException;
import com.mongodb.ReadPreference;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionDescription;
Expand Down Expand Up @@ -197,7 +196,7 @@ ServerId serverId() {
return serverId;
}

private class DefaultServerProtocolExecutor implements ProtocolExecutor {
private class DefaultServerProtocolExecutor extends AbstractProtocolExecutor {

@SuppressWarnings("unchecked")
@Override
Expand All @@ -216,9 +215,9 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
if (e instanceof MongoWriteConcernWithResponseException) {
return (T) ((MongoWriteConcernWithResponseException) e).getResponse();
} else {
if (e instanceof MongoSocketException && sessionContext.hasSession()) {
if (shouldMarkSessionDirty(e, sessionContext)) {
sessionContext.markSessionDirty();
}
}
throw e;
}
}
Expand All @@ -239,7 +238,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
if (t instanceof MongoWriteConcernWithResponseException) {
callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null);
} else {
if (t instanceof MongoSocketException && sessionContext.hasSession()) {
if (shouldMarkSessionDirty(t, sessionContext)) {
sessionContext.markSessionDirty();
}
callback.onResult(null, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
import static com.mongodb.internal.connection.CommandHelper.HELLO;
Expand Down Expand Up @@ -775,7 +776,7 @@ private void throwTranslatedWriteException(final Throwable e, final OperationCon

private MongoException translateWriteException(final Throwable e, final OperationContext operationContext) {
if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasExpired()) {
return TimeoutContext.createMongoTimeoutException(e);
return createMongoTimeoutException(e);
}

if (e instanceof MongoException) {
Expand All @@ -792,9 +793,12 @@ private MongoException translateWriteException(final Throwable e, final Operatio
}

private MongoException translateReadException(final Throwable e, final OperationContext operationContext) {
if (operationContext.getTimeoutContext().hasTimeoutMS()
&& (e instanceof SocketTimeoutException || e instanceof MongoSocketReadTimeoutException)) {
return TimeoutContext.createMongoTimeoutException(e);
if (operationContext.getTimeoutContext().hasTimeoutMS()) {
if (e instanceof SocketTimeoutException) {
return createMongoTimeoutException(createReadTimeoutException((SocketTimeoutException) e));
} else if (e instanceof MongoSocketReadTimeoutException) {
return createMongoTimeoutException((e));
}
}

if (e instanceof MongoException) {
Expand All @@ -804,7 +808,7 @@ private MongoException translateReadException(final Throwable e, final Operation
if (interruptedException.isPresent()) {
return interruptedException.get();
} else if (e instanceof SocketTimeoutException) {
return new MongoSocketReadTimeoutException("Timeout while receiving message", getServerAddress(), e);
return createReadTimeoutException((SocketTimeoutException) e);
} else if (e instanceof IOException) {
return new MongoSocketReadException("Exception receiving message", getServerAddress(), e);
} else if (e instanceof RuntimeException) {
Expand All @@ -814,6 +818,11 @@ private MongoException translateReadException(final Throwable e, final Operation
}
}

private MongoSocketReadTimeoutException createReadTimeoutException(final SocketTimeoutException e) {
return new MongoSocketReadTimeoutException("Timeout while receiving message",
getServerAddress(), e);
}

private ResponseBuffers receiveResponseBuffers(final OperationContext operationContext) {
try {
ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, operationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public class InternalStreamConnectionInitializer implements InternalConnectionInitializer {
private static final int INITIAL_MIN_RTT = 0;
private final ClusterConnectionMode clusterConnectionMode;
private final Authenticator authenticator;
private final BsonDocument clientMetadataDocument;
Expand Down Expand Up @@ -160,7 +161,7 @@ private InternalConnectionInitializationDescription createInitializationDescript
helloResult);
ServerDescription serverDescription =
createServerDescription(internalConnection.getDescription().getServerAddress(), helloResult,
System.nanoTime() - startTime, 0);
System.nanoTime() - startTime, INITIAL_MIN_RTT);
return new InternalConnectionInitializationDescription(connectionDescription, serverDescription);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ConnectionPool getConnectionPool() {
return connectionPool;
}

private class LoadBalancedServerProtocolExecutor implements ProtocolExecutor {
private class LoadBalancedServerProtocolExecutor extends AbstractProtocolExecutor {
@SuppressWarnings("unchecked")
@Override
public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection connection, final SessionContext sessionContext) {
Expand Down Expand Up @@ -191,7 +191,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
private void handleExecutionException(final InternalConnection connection, final SessionContext sessionContext,
final Throwable t) {
invalidate(t, connection.getDescription().getServiceId(), connection.getGeneration());
if (t instanceof MongoSocketException && sessionContext.hasSession()) {
if (shouldMarkSessionDirty(t, sessionContext)) {
sessionContext.markSessionDirty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoSocketException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
Expand Down Expand Up @@ -286,15 +287,23 @@ <R> void executeWithConnection(final AsyncCallableConnectionWithCallback<R> call
return;
}
callable.call(assertNotNull(connection), (result, t1) -> {
if (t1 instanceof MongoSocketException) {
onCorruptedConnection(connection, (MongoSocketException) t1);
if (t1 != null) {
handleException(connection, t1);
}
connection.release();
callback.onResult(result, t1);
});
});
}

private void handleException(final AsyncConnection connection, final Throwable exception) {
if (exception instanceof MongoOperationTimeoutException && exception.getCause() instanceof MongoSocketException) {
onCorruptedConnection(connection, (MongoSocketException) exception.getCause());
} else if (exception instanceof MongoSocketException) {
onCorruptedConnection(connection, (MongoSocketException) exception);
}
}

private void getConnection(final SingleResultCallback<AsyncConnection> callback) {
assertTrue(getState() != State.IDLE);
AsyncConnection pinnedConnection = getPinnedConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoSocketException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
Expand Down Expand Up @@ -334,6 +335,12 @@ void executeWithConnection(final Consumer<Connection> action) {
} catch (MongoSocketException e) {
onCorruptedConnection(connection, e);
throw e;
} catch (MongoOperationTimeoutException e) {
Throwable cause = e.getCause();
if (cause instanceof MongoSocketException) {
onCorruptedConnection(connection, (MongoSocketException) cause);
}
throw e;
} finally {
connection.release();
}
Expand Down
Loading