Skip to content

Prioritize electionId over setVersion on 6.0+ servers #1010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.mongodb.connection.ServerType.REPLICA_SET_GHOST;
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
import static com.mongodb.connection.ServerType.STANDALONE;
import static com.mongodb.internal.operation.ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION;
import static java.lang.String.format;

public abstract class AbstractMultiServerCluster extends BaseCluster {
Expand Down Expand Up @@ -224,9 +225,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
}

if (newDescription.getType() == REPLICA_SET_GHOST) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
}
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
return true;
}

Expand All @@ -247,64 +246,78 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
if (newDescription.getCanonicalAddress() != null
&& !newDescription.getAddress().equals(new ServerAddress(newDescription.getCanonicalAddress()))
&& !newDescription.isPrimary()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
newDescription.getCanonicalAddress(), newDescription.getAddress()));
}
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
newDescription.getCanonicalAddress(), newDescription.getAddress()));
removeServer(newDescription.getAddress());
return true;
}

if (newDescription.isPrimary()) {
ObjectId electionId = newDescription.getElectionId();
Integer setVersion = newDescription.getSetVersion();
if (setVersion != null && electionId != null) {
if (isStalePrimary(newDescription)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(),
setVersion, electionId,
maxSetVersion, maxElectionId));
}
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
return false;
}
if (!newDescription.isPrimary()) {
return true;
}

if (!electionId.equals(maxElectionId)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Setting max election id to %s from replica set primary %s", electionId,
newDescription.getAddress()));
}
maxElectionId = electionId;
}
}
if (isStalePrimary(newDescription)) {
invalidatePotentialPrimary(newDescription);
return false;
}

if (setVersion != null
&& (maxSetVersion == null || setVersion.compareTo(maxSetVersion) > 0)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Setting max set version to %d from replica set primary %s", setVersion,
newDescription.getAddress()));
}
maxSetVersion = setVersion;
}
maxElectionId = nullSafeMax(newDescription.getElectionId(), maxElectionId);
maxSetVersion = nullSafeMax(newDescription.getSetVersion(), maxSetVersion);

if (isNotAlreadyPrimary(newDescription.getAddress())) {
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
}
invalidateOldPrimaries(newDescription.getAddress());
invalidateOldPrimaries(newDescription.getAddress());

if (isNotAlreadyPrimary(newDescription.getAddress())) {
LOGGER.info(format("Discovered replica set primary %s with max election id %s and max set version %d",
newDescription.getAddress(), newDescription.getElectionId(), newDescription.getSetVersion()));
}

return true;
}

private boolean isStalePrimary(final ServerDescription newDescription) {
if (maxSetVersion == null || maxElectionId == null) {
return false;
private boolean isStalePrimary(final ServerDescription description) {
ObjectId electionId = description.getElectionId();
Integer setVersion = description.getSetVersion();
if (description.getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION) {
return nullSafeCompareTo(electionId, maxElectionId) < 0
|| (nullSafeCompareTo(electionId, maxElectionId) == 0 && nullSafeCompareTo(setVersion, maxSetVersion) < 0);
} else {
return setVersion != null && electionId != null
&& (nullSafeCompareTo(setVersion, maxSetVersion) < 0
|| (nullSafeCompareTo(setVersion, maxSetVersion) == 0
&& nullSafeCompareTo(electionId, maxElectionId) < 0));
}
}

private void invalidatePotentialPrimary(final ServerDescription newDescription) {
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
+ "is less than one already seen of (%d, %s)",
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
maxSetVersion, maxElectionId));
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
}

Integer setVersion = newDescription.getSetVersion();
return (setVersion == null || maxSetVersion.compareTo(setVersion) > 0
|| (maxSetVersion.equals(setVersion) && maxElectionId.compareTo(newDescription.getElectionId()) > 0));
/**
* Implements the same contract as {@link Comparable#compareTo(Object)}, except that a null value is always considers less-than any
* other value (except null, which it considers as equal-to).
*/
private static <T extends Comparable<T>> int nullSafeCompareTo(final T first, final T second) {
if (first == null) {
return second == null ? 0 : -1;
}
if (second == null) {
return 1;
}
return first.compareTo(second);
}

private static <T extends Comparable<T>> T nullSafeMax(final T first, final T second) {
if (first == null) {
return second;
}
if (second == null) {
return first;
}
return first.compareTo(second) >= 0 ? first : second;
}

private boolean isNotAlreadyPrimary(final ServerAddress address) {
Expand All @@ -315,7 +328,7 @@ private boolean isNotAlreadyPrimary(final ServerAddress address) {
private boolean handleShardRouterChanged(final ServerDescription newDescription) {
if (!newDescription.isShardRouter()) {
LOGGER.error(format("Expecting a %s, but found a %s. Removing %s from client view of cluster.",
SHARD_ROUTER, newDescription.getType(), newDescription.getAddress()));
SHARD_ROUTER, newDescription.getType(), newDescription.getAddress()));
removeServer(newDescription.getAddress());
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public final class ServerVersionHelper {
public static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9;
public static final int FIVE_DOT_ZERO_WIRE_VERSION = 12;
public static final int SIX_DOT_ZERO_WIRE_VERSION = 17;

public static boolean serverIsAtLeastVersionFourDotZero(final ConnectionDescription description) {
return description.getMaxWireVersion() >= FOUR_DOT_ZERO_WIRE_VERSION;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
{
"description" : "ElectionId is considered higher precedence than setVersion",
"uri" : "mongodb://a/?replicaSet=rs",
"phases" : [
{
"responses" : [
[
"a:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000001"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
],
[
"b:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 2,
"electionId" : {
"$oid" : "000000000000000000000001"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
],
[
"a:27017",
{
"ok" : 1,
"helloOk" : true,
"isWritablePrimary" : true,
"hosts" : [
"a:27017",
"b:27017"
],
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000002"
},
"minWireVersion" : 0,
"maxWireVersion" : 17
}
]
],
"outcome" : {
"servers" : {
"a:27017" : {
"type" : "RSPrimary",
"setName" : "rs",
"setVersion" : 1,
"electionId" : {
"$oid" : "000000000000000000000002"
}
},
"b:27017" : {
"type" : "Unknown",
"setName" : null,
"setVersion" : null,
"electionId" : null
}
},
"topologyType" : "ReplicaSetWithPrimary",
"logicalSessionTimeoutMinutes" : null,
"setName" : "rs",
"maxSetVersion" : 1,
"maxElectionId" : {
"$oid" : "000000000000000000000002"
}
}
}
]
}
Loading