Skip to content

Commit a0ef054

Browse files
committed
retrying vars back in bulk
1 parent 0648bcf commit a0ef054

File tree

5 files changed

+66
-77
lines changed

5 files changed

+66
-77
lines changed

pymongo/asynchronous/bulk.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ def __init__(
112112
self.uses_hint_update = False
113113
self.uses_hint_delete = False
114114
self.uses_sort = False
115+
self.is_retryable = True
116+
self.retrying = False
117+
self.started_retryable_write = False
115118
# Extra state so that we know where to pick up on a retry attempt.
116119
self.current_run = None
117120
self.next_run = None
@@ -127,23 +130,23 @@ def bulk_ctx_class(self) -> Type[_BulkWriteContext]:
127130
self.is_encrypted = False
128131
return _BulkWriteContext
129132

130-
@property
131-
def is_retryable(self) -> bool:
132-
if self.current_run:
133-
return self.current_run.is_retryable
134-
return True
135-
136-
@property
137-
def retrying(self) -> bool:
138-
if self.current_run:
139-
return self.current_run.retrying
140-
return False
141-
142-
@property
143-
def started_retryable_write(self) -> bool:
144-
if self.current_run:
145-
return self.current_run.started_retryable_write
146-
return False
133+
# @property
134+
# def is_retryable(self) -> bool:
135+
# if self.current_run:
136+
# return self.current_run.is_retryable
137+
# return True
138+
#
139+
# @property
140+
# def retrying(self) -> bool:
141+
# if self.current_run:
142+
# return self.current_run.retrying
143+
# return False
144+
#
145+
# @property
146+
# def started_retryable_write(self) -> bool:
147+
# if self.current_run:
148+
# return self.current_run.started_retryable_write
149+
# return False
147150

148151
def add_insert(self, document: _DocumentOut) -> bool:
149152
"""Add an insert document to the list of ops."""
@@ -255,7 +258,7 @@ def gen_ordered(
255258
yield run
256259
run = _Run(op_type)
257260
run.add(idx, operation)
258-
run.is_retryable = run.is_retryable and retryable
261+
self.is_retryable = self.is_retryable and retryable
259262
if run is None:
260263
raise InvalidOperation("No operations to execute")
261264
yield run
@@ -273,7 +276,7 @@ def gen_unordered(
273276
retryable = process(request)
274277
(op_type, operation) = self.ops[idx]
275278
operations[op_type].add(idx, operation)
276-
operations[op_type].is_retryable = operations[op_type].is_retryable and retryable
279+
self.is_retryable = self.is_retryable and retryable
277280
if (
278281
len(operations[_INSERT].ops) == 0
279282
and len(operations[_UPDATE].ops) == 0
@@ -533,7 +536,7 @@ async def _execute_command(
533536
last_run = False
534537

535538
while run:
536-
if not run.retrying:
539+
if not self.retrying:
537540
self.next_run = next(generator, None)
538541
if self.next_run is None:
539542
last_run = True
@@ -567,10 +570,10 @@ async def _execute_command(
567570
if session:
568571
# Start a new retryable write unless one was already
569572
# started for this command.
570-
if run.is_retryable and not run.started_retryable_write:
573+
if self.is_retryable and not self.started_retryable_write:
571574
session._start_retryable_write()
572575
self.started_retryable_write = True
573-
session._apply_to(cmd, run.is_retryable, ReadPreference.PRIMARY, conn)
576+
session._apply_to(cmd, self.is_retryable, ReadPreference.PRIMARY, conn)
574577
conn.send_cluster_time(cmd, session, client)
575578
conn.add_server_api(cmd)
576579
# CSOT: apply timeout before encoding the command.

pymongo/asynchronous/mongo_client.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2674,8 +2674,8 @@ def __init__(
26742674
self._operation_id = operation_id
26752675

26762676
def _bulk_retryable(self) -> bool:
2677-
if self._bulk is not None and self._bulk.current_run is not None:
2678-
return self._bulk.current_run.is_retryable
2677+
if self._bulk is not None:
2678+
return self._bulk.is_retryable
26792679
return True
26802680

26812681
async def run(self) -> T:
@@ -2695,8 +2695,8 @@ async def run(self) -> T:
26952695
and not self._is_read
26962696
):
26972697
self._session._start_retryable_write() # type: ignore
2698-
if self._bulk and self._bulk.current_run:
2699-
self._bulk.current_run.started_retryable_write = True
2698+
if self._bulk:
2699+
self._bulk.started_retryable_write = True
27002700

27012701
while True:
27022702
self._check_last_error(check_csot=True)
@@ -2746,10 +2746,7 @@ async def run(self) -> T:
27462746
else:
27472747
raise
27482748
if self._bulk:
2749-
if self._bulk.current_run:
2750-
self._bulk.current_run.retrying = True
2751-
else:
2752-
self._bulk.retrying = True
2749+
self._bulk.retrying = True
27532750
else:
27542751
self._retrying = True
27552752
if not exc.has_error_label("NoWritesPerformed"):
@@ -2770,11 +2767,7 @@ def _is_not_eligible_for_retry(self) -> bool:
27702767

27712768
def _is_retrying(self) -> bool:
27722769
"""Checks if the exchange is currently undergoing a retry"""
2773-
return (
2774-
self._bulk.current_run.retrying
2775-
if self._bulk is not None and self._bulk.current_run is not None
2776-
else self._retrying
2777-
)
2770+
return self._bulk.retrying if self._bulk is not None else self._retrying
27782771

27792772
def _is_session_state_retryable(self) -> bool:
27802773
"""Checks if provided session is eligible for retry
@@ -2834,8 +2827,8 @@ async def _write(self) -> T:
28342827
# not support sessions raise the last error.
28352828
self._check_last_error()
28362829
self._retryable = False
2837-
if self._bulk and self._bulk.current_run:
2838-
self._bulk.current_run.is_retryable = False
2830+
if self._bulk:
2831+
self._bulk.is_retryable = False
28392832
return await self._func(self._session, conn) # type: ignore
28402833
except PyMongoError as exc:
28412834
if not self._retryable or not self._bulk_retryable():

pymongo/bulk_shared.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ def __init__(self, op_type: int) -> None:
5050
self.index_map: list[int] = []
5151
self.ops: list[Any] = []
5252
self.idx_offset: int = 0
53-
self.is_retryable = True
54-
self.retrying = False
55-
self.started_retryable_write = False
5653

5754
def index(self, idx: int) -> int:
5855
"""Get the original index of an operation in this run.

pymongo/synchronous/bulk.py

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ def __init__(
112112
self.uses_hint_update = False
113113
self.uses_hint_delete = False
114114
self.uses_sort = False
115+
self.is_retryable = True
116+
self.retrying = False
117+
self.started_retryable_write = False
115118
# Extra state so that we know where to pick up on a retry attempt.
116119
self.current_run = None
117120
self.next_run = None
@@ -127,23 +130,23 @@ def bulk_ctx_class(self) -> Type[_BulkWriteContext]:
127130
self.is_encrypted = False
128131
return _BulkWriteContext
129132

130-
@property
131-
def is_retryable(self) -> bool:
132-
if self.current_run:
133-
return self.current_run.is_retryable
134-
return True
135-
136-
@property
137-
def retrying(self) -> bool:
138-
if self.current_run:
139-
return self.current_run.retrying
140-
return False
141-
142-
@property
143-
def started_retryable_write(self) -> bool:
144-
if self.current_run:
145-
return self.current_run.started_retryable_write
146-
return False
133+
# @property
134+
# def is_retryable(self) -> bool:
135+
# if self.current_run:
136+
# return self.current_run.is_retryable
137+
# return True
138+
#
139+
# @property
140+
# def retrying(self) -> bool:
141+
# if self.current_run:
142+
# return self.current_run.retrying
143+
# return False
144+
#
145+
# @property
146+
# def started_retryable_write(self) -> bool:
147+
# if self.current_run:
148+
# return self.current_run.started_retryable_write
149+
# return False
147150

148151
def add_insert(self, document: _DocumentOut) -> bool:
149152
"""Add an insert document to the list of ops."""
@@ -255,7 +258,7 @@ def gen_ordered(
255258
yield run
256259
run = _Run(op_type)
257260
run.add(idx, operation)
258-
run.is_retryable = run.is_retryable and retryable
261+
self.is_retryable = self.is_retryable and retryable
259262
if run is None:
260263
raise InvalidOperation("No operations to execute")
261264
yield run
@@ -273,7 +276,7 @@ def gen_unordered(
273276
retryable = process(request)
274277
(op_type, operation) = self.ops[idx]
275278
operations[op_type].add(idx, operation)
276-
operations[op_type].is_retryable = operations[op_type].is_retryable and retryable
279+
self.is_retryable = self.is_retryable and retryable
277280
if (
278281
len(operations[_INSERT].ops) == 0
279282
and len(operations[_UPDATE].ops) == 0
@@ -533,7 +536,7 @@ def _execute_command(
533536
last_run = False
534537

535538
while run:
536-
if not run.retrying:
539+
if not self.retrying:
537540
self.next_run = next(generator, None)
538541
if self.next_run is None:
539542
last_run = True
@@ -567,10 +570,10 @@ def _execute_command(
567570
if session:
568571
# Start a new retryable write unless one was already
569572
# started for this command.
570-
if run.is_retryable and not run.started_retryable_write:
573+
if self.is_retryable and not self.started_retryable_write:
571574
session._start_retryable_write()
572575
self.started_retryable_write = True
573-
session._apply_to(cmd, run.is_retryable, ReadPreference.PRIMARY, conn)
576+
session._apply_to(cmd, self.is_retryable, ReadPreference.PRIMARY, conn)
574577
conn.send_cluster_time(cmd, session, client)
575578
conn.add_server_api(cmd)
576579
# CSOT: apply timeout before encoding the command.

pymongo/synchronous/mongo_client.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,8 +2662,8 @@ def __init__(
26622662
self._operation_id = operation_id
26632663

26642664
def _bulk_retryable(self) -> bool:
2665-
if self._bulk is not None and self._bulk.current_run is not None:
2666-
return self._bulk.current_run.is_retryable
2665+
if self._bulk is not None:
2666+
return self._bulk.is_retryable
26672667
return True
26682668

26692669
def run(self) -> T:
@@ -2683,8 +2683,8 @@ def run(self) -> T:
26832683
and not self._is_read
26842684
):
26852685
self._session._start_retryable_write() # type: ignore
2686-
if self._bulk and self._bulk.current_run:
2687-
self._bulk.current_run.started_retryable_write = True
2686+
if self._bulk:
2687+
self._bulk.started_retryable_write = True
26882688

26892689
while True:
26902690
self._check_last_error(check_csot=True)
@@ -2734,10 +2734,7 @@ def run(self) -> T:
27342734
else:
27352735
raise
27362736
if self._bulk:
2737-
if self._bulk.current_run:
2738-
self._bulk.current_run.retrying = True
2739-
else:
2740-
self._bulk.retrying = True
2737+
self._bulk.retrying = True
27412738
else:
27422739
self._retrying = True
27432740
if not exc.has_error_label("NoWritesPerformed"):
@@ -2758,11 +2755,7 @@ def _is_not_eligible_for_retry(self) -> bool:
27582755

27592756
def _is_retrying(self) -> bool:
27602757
"""Checks if the exchange is currently undergoing a retry"""
2761-
return (
2762-
self._bulk.current_run.retrying
2763-
if self._bulk is not None and self._bulk.current_run is not None
2764-
else self._retrying
2765-
)
2758+
return self._bulk.retrying if self._bulk is not None else self._retrying
27662759

27672760
def _is_session_state_retryable(self) -> bool:
27682761
"""Checks if provided session is eligible for retry
@@ -2822,8 +2815,8 @@ def _write(self) -> T:
28222815
# not support sessions raise the last error.
28232816
self._check_last_error()
28242817
self._retryable = False
2825-
if self._bulk and self._bulk.current_run:
2826-
self._bulk.current_run.is_retryable = False
2818+
if self._bulk:
2819+
self._bulk.is_retryable = False
28272820
return self._func(self._session, conn) # type: ignore
28282821
except PyMongoError as exc:
28292822
if not self._retryable or not self._bulk_retryable():

0 commit comments

Comments
 (0)