Skip to content

Do not perform server selection to determine sessions support #1092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ConnectionDescription {
private final int maxMessageSize;
private final List<String> compressors;
private final BsonArray saslSupportedMechanisms;
private final Integer logicalSessionTimeoutMinutes;

private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB
private static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 512;
Expand Down Expand Up @@ -99,6 +100,29 @@ public ConnectionDescription(final ConnectionId connectionId, final int maxWireV
saslSupportedMechanisms);
}

/**
* Construct an instance.
*
* @param connectionId the connection id
* @param maxWireVersion the max wire version
* @param serverType the server type
* @param maxBatchCount the max batch count
* @param maxDocumentSize the max document size in bytes
* @param maxMessageSize the max message size in bytes
* @param compressors the available compressors on the connection
* @param saslSupportedMechanisms the supported SASL mechanisms
* @param logicalSessionTimeoutMinutes the logical session timeout, in minutes
* @since 4.10
*/
public ConnectionDescription(final ConnectionId connectionId, final int maxWireVersion,
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms,
@Nullable final Integer logicalSessionTimeoutMinutes) {
this(null, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
* Construct an instance.
*
Expand All @@ -117,6 +141,14 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms) {
this(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
saslSupportedMechanisms, null);
}

private ConnectionDescription(@Nullable final ObjectId serviceId, final ConnectionId connectionId, final int maxWireVersion,
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
final int maxMessageSize, final List<String> compressors,
@Nullable final BsonArray saslSupportedMechanisms, @Nullable final Integer logicalSessionTimeoutMinutes) {
this.serviceId = serviceId;
this.connectionId = connectionId;
this.serverType = serverType;
Expand All @@ -126,6 +158,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
this.maxWireVersion = maxWireVersion;
this.compressors = notNull("compressors", Collections.unmodifiableList(new ArrayList<>(compressors)));
this.saslSupportedMechanisms = saslSupportedMechanisms;
this.logicalSessionTimeoutMinutes = logicalSessionTimeoutMinutes;
}
/**
* Creates a new connection description with the set connection id
Expand All @@ -137,7 +170,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
notNull("connectionId", connectionId);
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
maxMessageSize, compressors, saslSupportedMechanisms);
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
Expand All @@ -150,7 +183,7 @@ public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
public ConnectionDescription withServiceId(final ObjectId serviceId) {
notNull("serviceId", serviceId);
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
maxMessageSize, compressors, saslSupportedMechanisms);
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
}

/**
Expand Down Expand Up @@ -248,6 +281,17 @@ public BsonArray getSaslSupportedMechanisms() {
return saslSupportedMechanisms;
}

/**
* Gets the session timeout in minutes.
*
* @return the session timeout in minutes, or null if sessions are not supported by this connection
* @mongodb.server.release 3.6
* @since 4.10
*/
@Nullable
public Integer getLogicalSessionTimeoutMinutes() {
return logicalSessionTimeoutMinutes;
}
/**
* Get the default maximum message size.
*
Expand Down Expand Up @@ -302,6 +346,9 @@ public boolean equals(final Object o) {
if (!compressors.equals(that.compressors)) {
return false;
}
if (!Objects.equals(logicalSessionTimeoutMinutes, that.logicalSessionTimeoutMinutes)) {
return false;
}
return Objects.equals(saslSupportedMechanisms, that.saslSupportedMechanisms);
}

Expand All @@ -316,6 +363,7 @@ public int hashCode() {
result = 31 * result + compressors.hashCode();
result = 31 * result + (serviceId != null ? serviceId.hashCode() : 0);
result = 31 * result + (saslSupportedMechanisms != null ? saslSupportedMechanisms.hashCode() : 0);
result = 31 * result + (logicalSessionTimeoutMinutes != null ? logicalSessionTimeoutMinutes.hashCode() : 0);
return result;
}

Expand All @@ -329,6 +377,7 @@ public String toString() {
+ ", maxDocumentSize=" + maxDocumentSize
+ ", maxMessageSize=" + maxMessageSize
+ ", compressors=" + compressors
+ ", logicialSessionTimeoutMinutes=" + logicalSessionTimeoutMinutes
+ ", serviceId=" + serviceId
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,14 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
if (sessionContext.getClusterTime() != null) {
extraElements.add(new BsonElement("$clusterTime", sessionContext.getClusterTime()));
}
if (sessionContext.hasSession() && responseExpected) {
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
if (sessionContext.hasSession()) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really the crux of the whole change. A few notes:

  • All application-initiated messages now have a session, so the only time this is false is for SDAM-related connection
  • This seems a bit late to be checking for unsupported explicit sessions, but it's the only place to put it that wouldn't result in code duplication, since sync and async paths all end up here
  • In practice, almost every server that the driver could actually connect to (3.6+) supports sessions, so the only time isSessionSupported will return false is a 3.6 server in 3.4 FCV mode, or a mongocryptd server.

if (!sessionContext.isImplicitSession() && !getSettings().isSessionSupported()) {
throw new MongoClientException("Attempting to use a ClientSession while connected to a server that doesn't support "
+ "sessions");
}
if (getSettings().isSessionSupported() && responseExpected) {
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
}
}
boolean firstMessageInTransaction = sessionContext.notifyMessageSent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static ConnectionDescription createConnectionDescription(final ClusterConnection
ConnectionDescription connectionDescription = new ConnectionDescription(connectionId,
getMaxWireVersion(helloResult), getServerType(helloResult), getMaxWriteBatchSize(helloResult),
getMaxBsonObjectSize(helloResult), getMaxMessageSizeBytes(helloResult), getCompressors(helloResult),
helloResult.getArray("saslSupportedMechs", null));
helloResult.getArray("saslSupportedMechs", null), getLogicalSessionTimeoutMinutes(helloResult));
if (helloResult.containsKey("connectionId")) {
ConnectionId newConnectionId =
connectionDescription.getConnectionId().withServerValue(helloResult.getNumber("connectionId").intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class MessageSettings {
private final int maxBatchCount;
private final int maxWireVersion;
private final ServerType serverType;
private final boolean sessionSupported;

/**
* Gets the builder
Expand All @@ -56,6 +57,7 @@ public static final class Builder {
private int maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
private int maxWireVersion;
private ServerType serverType;
private boolean sessionSupported;

/**
* Build it.
Expand Down Expand Up @@ -108,6 +110,11 @@ public Builder serverType(final ServerType serverType) {
this.serverType = serverType;
return this;
}

public Builder sessionSupported(final boolean sessionSupported) {
this.sessionSupported = sessionSupported;
return this;
}
}

/**
Expand Down Expand Up @@ -145,11 +152,17 @@ public ServerType getServerType() {
return serverType;
}

public boolean isSessionSupported() {
return sessionSupported;
}


private MessageSettings(final Builder builder) {
this.maxDocumentSize = builder.maxDocumentSize;
this.maxMessageSize = builder.maxMessageSize;
this.maxBatchCount = builder.maxBatchCount;
this.maxWireVersion = builder.maxWireVersion;
this.serverType = builder.serverType;
this.sessionSupported = builder.sessionSupported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ static MessageSettings getMessageSettings(final ConnectionDescription connection
.maxBatchCount(connectionDescription.getMaxBatchCount())
.maxWireVersion(connectionDescription.getMaxWireVersion())
.serverType(connectionDescription.getServerType())
.sessionSupported(connectionDescription.getLogicalSessionTimeoutMinutes() != null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private CommandCreator getCommandCreator(final SessionContext sessionContext) {
putIfNotNull(commandDocument, "comment", getComment());
putIfNotNull(commandDocument, "let", getLet());

if (isRetryableWrite(isRetryWrites(), getWriteConcern(), serverDescription, connectionDescription, sessionContext)) {
if (isRetryableWrite(isRetryWrites(), getWriteConcern(), connectionDescription, sessionContext)) {
commandDocument.put("txnNumber", new BsonInt64(sessionContext.advanceTransactionNumber()));
}
return commandDocument;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.mongodb.bulk.BulkWriteUpsert;
import com.mongodb.bulk.WriteConcernError;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.bulk.DeleteRequest;
import com.mongodb.internal.bulk.UpdateRequest;
import com.mongodb.internal.bulk.WriteRequest;
Expand Down Expand Up @@ -96,8 +95,7 @@ public final class BulkWriteBatch {
private final BsonDocument variables;

static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
final ServerDescription serverDescription,
final ConnectionDescription connectionDescription,
final ConnectionDescription connectionDescription,
final boolean ordered, final WriteConcern writeConcern,
final Boolean bypassDocumentValidation, final boolean retryWrites,
final List<? extends WriteRequest> writeRequests,
Expand All @@ -107,7 +105,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
&& !writeConcern.isAcknowledged()) {
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
}
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, serverDescription, connectionDescription, sessionContext);
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
boolean writeRequestsAreRetryable = true;
for (int i = 0; i < writeRequests.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,7 @@ static <T, R> R executeRetryableWrite(
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
int maxWireVersion = connection.getDescription().getMaxWireVersion();
try {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext()));
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()));
BsonDocument command = retryState.attachment(AttachmentKeys.command())
.map(previousAttemptCommand -> {
assertFalse(firstAttempt);
Expand Down Expand Up @@ -462,8 +461,8 @@ static <T, R> void executeRetryableWriteAsync(
SingleResultCallback<R> addingRetryableLabelCallback = firstAttempt
? releasingCallback
: addingRetryableLabelCallback(releasingCallback, maxWireVersion);
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
binding.getSessionContext()), addingRetryableLabelCallback)) {
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()),
addingRetryableLabelCallback)) {
return;
}
BsonDocument command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,13 @@ public BulkWriteResult execute(final WriteBinding binding) {
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(), connectionDescription, sessionContext)) {
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
handleMongoWriteConcernWithResponseException(retryState, true);
}
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
}
logRetryExecute(retryState);
Expand Down Expand Up @@ -220,9 +219,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
SessionContext sessionContext = binding.getSessionContext();
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
source.getServerDescription(),
connectionDescription, sessionContext)
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) {
return;
}
Expand All @@ -233,7 +230,7 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
try {
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
source.getServerDescription(), connectionDescription, ordered, writeConcern,
connectionDescription, ordered, writeConcern,
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
}
} catch (Throwable t) {
Expand Down
Loading