-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-2834 Direct read/write retries to another mongos if possible #1421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
8959b7e
590e82c
8eb6361
1e782f4
04789af
e257525
309f858
fcffe30
6aafc26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1277,6 +1277,7 @@ def _select_server( | |
server_selector: Callable[[Selection], Selection], | ||
session: Optional[ClientSession], | ||
address: Optional[_Address] = None, | ||
deprioritized_servers: Optional[list[Server]] = None, | ||
) -> Server: | ||
"""Select a server to run an operation on this client. | ||
|
||
|
@@ -1300,7 +1301,9 @@ def _select_server( | |
if not server: | ||
raise AutoReconnect("server %s:%s no longer available" % address) # noqa: UP031 | ||
else: | ||
server = topology.select_server(server_selector) | ||
server = topology.select_server( | ||
server_selector, deprioritized_servers=deprioritized_servers | ||
) | ||
return server | ||
except PyMongoError as exc: | ||
# Server selection errors in a transaction are transient. | ||
|
@@ -2291,6 +2294,7 @@ def __init__( | |
) | ||
self._address = address | ||
self._server: Server = None # type: ignore | ||
self._deprioritized_servers: list[Server] = [] | ||
|
||
def run(self) -> T: | ||
"""Runs the supplied func() and attempts a retry | ||
|
@@ -2334,6 +2338,8 @@ def run(self) -> T: | |
raise | ||
self._retrying = True | ||
self._last_error = exc | ||
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded: | ||
self._deprioritized_servers.append(self._server) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this logic is the same for reads and writes and we do it in one place? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, thanks! |
||
else: | ||
raise | ||
|
||
|
@@ -2354,6 +2360,8 @@ def run(self) -> T: | |
self._bulk.retrying = True | ||
else: | ||
self._retrying = True | ||
if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded: | ||
self._deprioritized_servers.append(self._server) | ||
if not exc.has_error_label("NoWritesPerformed"): | ||
self._last_error = exc | ||
if self._last_error is None: | ||
|
@@ -2397,7 +2405,10 @@ def _get_server(self) -> Server: | |
Abstraction to connect to server | ||
""" | ||
return self._client._select_server( | ||
self._server_selector, self._session, address=self._address | ||
self._server_selector, | ||
self._session, | ||
address=self._address, | ||
deprioritized_servers=self._deprioritized_servers, | ||
) | ||
|
||
def _write(self) -> T: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
OvertCommandListener, | ||
SpecTestCreator, | ||
rs_or_single_client, | ||
set_fail_point, | ||
) | ||
from test.utils_spec_runner import SpecRunner | ||
from test.version import Version | ||
|
@@ -40,6 +41,7 @@ | |
from bson.raw_bson import RawBSONDocument | ||
from bson.son import SON | ||
from pymongo.errors import ( | ||
AutoReconnect, | ||
ConnectionFailure, | ||
OperationFailure, | ||
ServerSelectionTimeoutError, | ||
|
@@ -469,6 +471,78 @@ def test_batch_splitting_retry_fails(self): | |
self.assertEqual(final_txn, expected_txn) | ||
self.assertEqual(coll.find_one(projection={"_id": True}), {"_id": 1}) | ||
|
||
@client_context.require_multiple_mongoses | ||
@client_context.require_failCommand_fail_point | ||
def test_retryable_writes_in_sharded_cluster_multiple_available(self): | ||
fail_command = { | ||
"configureFailPoint": "failCommand", | ||
"mode": {"times": 1}, | ||
"data": { | ||
"failCommands": ["insert"], | ||
"closeConnection": True, | ||
"appName": "retryableWriteTest", | ||
}, | ||
} | ||
|
||
for mongos in client_context.mongos_seeds().split(","): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That change causes the following error:
|
||
client = rs_or_single_client(mongos, directConnection=True) | ||
NoahStapp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
set_fail_point(client, fail_command) | ||
|
||
listener = OvertCommandListener() | ||
client = rs_or_single_client( | ||
client_context.mongos_seeds(), | ||
appName="retryableWriteTest", | ||
event_listeners=[listener], | ||
retryWrites=True, | ||
) | ||
|
||
with self.assertRaises(AutoReconnect): | ||
client.t.t.insert_one({"x": 1}) | ||
|
||
# Disable failpoints on each mongos | ||
for mongos in client_context.mongos_seeds().split(","): | ||
NoahStapp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
client = rs_or_single_client(mongos, directConnection=True) | ||
|
||
fail_command["mode"] = "off" | ||
set_fail_point(client, fail_command) | ||
|
||
self.assertEqual(len(listener.failed_events), 2) | ||
self.assertEqual(len(listener.succeeded_events), 0) | ||
|
||
@client_context.require_multiple_mongoses | ||
@client_context.require_failCommand_fail_point | ||
def test_retryable_writes_in_sharded_cluster_one_available(self): | ||
NoahStapp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
single_mongos = client_context.mongos_seeds().split(",")[0] | ||
fail_command = { | ||
"configureFailPoint": "failCommand", | ||
"mode": {"times": 1}, | ||
"data": { | ||
"failCommands": ["insert"], | ||
"closeConnection": True, | ||
"appName": "retryableWriteTest", | ||
}, | ||
} | ||
|
||
direct_client = rs_or_single_client(single_mongos, directConnection=True) | ||
set_fail_point(direct_client, fail_command) | ||
|
||
listener = OvertCommandListener() | ||
client = rs_or_single_client( | ||
single_mongos, | ||
appName="retryableWriteTest", | ||
event_listeners=[listener], | ||
retryReads=True, | ||
) | ||
|
||
client.t.t.insert_one({"x": 1}) | ||
|
||
# Disable failpoint on the tested mongos | ||
fail_command["mode"] = "off" | ||
set_fail_point(direct_client, fail_command) | ||
|
||
self.assertEqual(len(listener.failed_events), 1) | ||
self.assertEqual(len(listener.succeeded_events), 1) | ||
|
||
|
||
class TestWriteConcernError(IntegrationTest): | ||
RUN_ON_LOAD_BALANCER = True | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1153,3 +1153,9 @@ def prepare_spec_arguments(spec, arguments, opname, entity_map, with_txn_callbac | |
raise AssertionError(f"Unsupported cursorType: {cursor_type}") | ||
else: | ||
arguments[c2s] = arguments.pop(arg_name) | ||
|
||
|
||
def set_fail_point(client, command_args): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you use the with self.fail_point() helper instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to set the fail point on each individual mongos to ensure the failure occurs on every node. |
||
cmd = SON([("configureFailPoint", "failCommand")]) | ||
cmd.update(command_args) | ||
client.admin.command(cmd) |
Uh oh!
There was an error while loading. Please reload this page.