Skip to content

Commit fef2873

Browse files
authored
Prioritize electionId over setVersion on 6.0+ servers (#1010)
JAVA-4707
1 parent c86d174 commit fef2873

14 files changed

+1084
-143
lines changed

driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 62 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import static com.mongodb.connection.ServerType.REPLICA_SET_GHOST;
4646
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
4747
import static com.mongodb.connection.ServerType.STANDALONE;
48+
import static com.mongodb.internal.operation.ServerVersionHelper.SIX_DOT_ZERO_WIRE_VERSION;
4849
import static java.lang.String.format;
4950

5051
public abstract class AbstractMultiServerCluster extends BaseCluster {
@@ -224,9 +225,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
224225
}
225226

226227
if (newDescription.getType() == REPLICA_SET_GHOST) {
227-
if (LOGGER.isInfoEnabled()) {
228-
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
229-
}
228+
LOGGER.info(format("Server %s does not appear to be a member of an initiated replica set.", newDescription.getAddress()));
230229
return true;
231230
}
232231

@@ -247,64 +246,78 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
247246
if (newDescription.getCanonicalAddress() != null
248247
&& !newDescription.getAddress().equals(new ServerAddress(newDescription.getCanonicalAddress()))
249248
&& !newDescription.isPrimary()) {
250-
if (LOGGER.isInfoEnabled()) {
251-
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
252-
newDescription.getCanonicalAddress(), newDescription.getAddress()));
253-
}
249+
LOGGER.info(format("Canonical address %s does not match server address. Removing %s from client view of cluster",
250+
newDescription.getCanonicalAddress(), newDescription.getAddress()));
254251
removeServer(newDescription.getAddress());
255252
return true;
256253
}
257254

258-
if (newDescription.isPrimary()) {
259-
ObjectId electionId = newDescription.getElectionId();
260-
Integer setVersion = newDescription.getSetVersion();
261-
if (setVersion != null && electionId != null) {
262-
if (isStalePrimary(newDescription)) {
263-
if (LOGGER.isInfoEnabled()) {
264-
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
265-
+ "is less than one already seen of (%d, %s)",
266-
newDescription.getAddress(),
267-
setVersion, electionId,
268-
maxSetVersion, maxElectionId));
269-
}
270-
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
271-
return false;
272-
}
255+
if (!newDescription.isPrimary()) {
256+
return true;
257+
}
273258

274-
if (!electionId.equals(maxElectionId)) {
275-
if (LOGGER.isInfoEnabled()) {
276-
LOGGER.info(format("Setting max election id to %s from replica set primary %s", electionId,
277-
newDescription.getAddress()));
278-
}
279-
maxElectionId = electionId;
280-
}
281-
}
259+
if (isStalePrimary(newDescription)) {
260+
invalidatePotentialPrimary(newDescription);
261+
return false;
262+
}
282263

283-
if (setVersion != null
284-
&& (maxSetVersion == null || setVersion.compareTo(maxSetVersion) > 0)) {
285-
if (LOGGER.isInfoEnabled()) {
286-
LOGGER.info(format("Setting max set version to %d from replica set primary %s", setVersion,
287-
newDescription.getAddress()));
288-
}
289-
maxSetVersion = setVersion;
290-
}
264+
maxElectionId = nullSafeMax(newDescription.getElectionId(), maxElectionId);
265+
maxSetVersion = nullSafeMax(newDescription.getSetVersion(), maxSetVersion);
291266

292-
if (isNotAlreadyPrimary(newDescription.getAddress())) {
293-
LOGGER.info(format("Discovered replica set primary %s", newDescription.getAddress()));
294-
}
295-
invalidateOldPrimaries(newDescription.getAddress());
267+
invalidateOldPrimaries(newDescription.getAddress());
268+
269+
if (isNotAlreadyPrimary(newDescription.getAddress())) {
270+
LOGGER.info(format("Discovered replica set primary %s with max election id %s and max set version %d",
271+
newDescription.getAddress(), newDescription.getElectionId(), newDescription.getSetVersion()));
296272
}
273+
297274
return true;
298275
}
299276

300-
private boolean isStalePrimary(final ServerDescription newDescription) {
301-
if (maxSetVersion == null || maxElectionId == null) {
302-
return false;
277+
private boolean isStalePrimary(final ServerDescription description) {
278+
ObjectId electionId = description.getElectionId();
279+
Integer setVersion = description.getSetVersion();
280+
if (description.getMaxWireVersion() >= SIX_DOT_ZERO_WIRE_VERSION) {
281+
return nullSafeCompareTo(electionId, maxElectionId) < 0
282+
|| (nullSafeCompareTo(electionId, maxElectionId) == 0 && nullSafeCompareTo(setVersion, maxSetVersion) < 0);
283+
} else {
284+
return setVersion != null && electionId != null
285+
&& (nullSafeCompareTo(setVersion, maxSetVersion) < 0
286+
|| (nullSafeCompareTo(setVersion, maxSetVersion) == 0
287+
&& nullSafeCompareTo(electionId, maxElectionId) < 0));
303288
}
289+
}
290+
291+
private void invalidatePotentialPrimary(final ServerDescription newDescription) {
292+
LOGGER.info(format("Invalidating potential primary %s whose (set version, election id) tuple of (%d, %s) "
293+
+ "is less than one already seen of (%d, %s)",
294+
newDescription.getAddress(), newDescription.getSetVersion(), newDescription.getElectionId(),
295+
maxSetVersion, maxElectionId));
296+
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
297+
}
304298

305-
Integer setVersion = newDescription.getSetVersion();
306-
return (setVersion == null || maxSetVersion.compareTo(setVersion) > 0
307-
|| (maxSetVersion.equals(setVersion) && maxElectionId.compareTo(newDescription.getElectionId()) > 0));
299+
/**
300+
* Implements the same contract as {@link Comparable#compareTo(Object)}, except that a null value is always considers less-than any
301+
* other value (except null, which it considers as equal-to).
302+
*/
303+
private static <T extends Comparable<T>> int nullSafeCompareTo(final T first, final T second) {
304+
if (first == null) {
305+
return second == null ? 0 : -1;
306+
}
307+
if (second == null) {
308+
return 1;
309+
}
310+
return first.compareTo(second);
311+
}
312+
313+
private static <T extends Comparable<T>> T nullSafeMax(final T first, final T second) {
314+
if (first == null) {
315+
return second;
316+
}
317+
if (second == null) {
318+
return first;
319+
}
320+
return first.compareTo(second) >= 0 ? first : second;
308321
}
309322

310323
private boolean isNotAlreadyPrimary(final ServerAddress address) {
@@ -315,7 +328,7 @@ private boolean isNotAlreadyPrimary(final ServerAddress address) {
315328
private boolean handleShardRouterChanged(final ServerDescription newDescription) {
316329
if (!newDescription.isShardRouter()) {
317330
LOGGER.error(format("Expecting a %s, but found a %s. Removing %s from client view of cluster.",
318-
SHARD_ROUTER, newDescription.getType(), newDescription.getAddress()));
331+
SHARD_ROUTER, newDescription.getType(), newDescription.getAddress()));
319332
removeServer(newDescription.getAddress());
320333
}
321334
return true;

driver-core/src/main/com/mongodb/internal/operation/ServerVersionHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public final class ServerVersionHelper {
3030
public static final int FOUR_DOT_TWO_WIRE_VERSION = 8;
3131
public static final int FOUR_DOT_FOUR_WIRE_VERSION = 9;
3232
public static final int FIVE_DOT_ZERO_WIRE_VERSION = 12;
33+
public static final int SIX_DOT_ZERO_WIRE_VERSION = 17;
3334

3435
public static boolean serverIsAtLeastVersionFourDotZero(final ConnectionDescription description) {
3536
return description.getMaxWireVersion() >= FOUR_DOT_ZERO_WIRE_VERSION;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
{
2+
"description" : "ElectionId is considered higher precedence than setVersion",
3+
"uri" : "mongodb://a/?replicaSet=rs",
4+
"phases" : [
5+
{
6+
"responses" : [
7+
[
8+
"a:27017",
9+
{
10+
"ok" : 1,
11+
"helloOk" : true,
12+
"isWritablePrimary" : true,
13+
"hosts" : [
14+
"a:27017",
15+
"b:27017"
16+
],
17+
"setName" : "rs",
18+
"setVersion" : 1,
19+
"electionId" : {
20+
"$oid" : "000000000000000000000001"
21+
},
22+
"minWireVersion" : 0,
23+
"maxWireVersion" : 17
24+
}
25+
],
26+
[
27+
"b:27017",
28+
{
29+
"ok" : 1,
30+
"helloOk" : true,
31+
"isWritablePrimary" : true,
32+
"hosts" : [
33+
"a:27017",
34+
"b:27017"
35+
],
36+
"setName" : "rs",
37+
"setVersion" : 2,
38+
"electionId" : {
39+
"$oid" : "000000000000000000000001"
40+
},
41+
"minWireVersion" : 0,
42+
"maxWireVersion" : 17
43+
}
44+
],
45+
[
46+
"a:27017",
47+
{
48+
"ok" : 1,
49+
"helloOk" : true,
50+
"isWritablePrimary" : true,
51+
"hosts" : [
52+
"a:27017",
53+
"b:27017"
54+
],
55+
"setName" : "rs",
56+
"setVersion" : 1,
57+
"electionId" : {
58+
"$oid" : "000000000000000000000002"
59+
},
60+
"minWireVersion" : 0,
61+
"maxWireVersion" : 17
62+
}
63+
]
64+
],
65+
"outcome" : {
66+
"servers" : {
67+
"a:27017" : {
68+
"type" : "RSPrimary",
69+
"setName" : "rs",
70+
"setVersion" : 1,
71+
"electionId" : {
72+
"$oid" : "000000000000000000000002"
73+
}
74+
},
75+
"b:27017" : {
76+
"type" : "Unknown",
77+
"setName" : null,
78+
"setVersion" : null,
79+
"electionId" : null
80+
}
81+
},
82+
"topologyType" : "ReplicaSetWithPrimary",
83+
"logicalSessionTimeoutMinutes" : null,
84+
"setName" : "rs",
85+
"maxSetVersion" : 1,
86+
"maxElectionId" : {
87+
"$oid" : "000000000000000000000002"
88+
}
89+
}
90+
}
91+
]
92+
}

0 commit comments

Comments
 (0)