Skip to content

Allow retries for statuses other than 429 in streaming_bulk #1005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,18 @@ def _process_bulk_chunk(
raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)


def _retry_for_status(status):
if status == 429: return True
return False

def streaming_bulk(
client,
actions,
chunk_size=500,
max_chunk_bytes=100 * 1024 * 1024,
raise_on_error=True,
expand_action_callback=expand_action,
retry_for_status_callback=_retry_for_status,
raise_on_exception=True,
max_retries=0,
initial_backoff=2,
Expand Down Expand Up @@ -198,6 +203,9 @@ def streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_for_status_callback: callback executed on each item's status,
should return a True if the status require a retry and False if not.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
:arg initial_backoff: number of seconds we should wait before the first
Expand Down Expand Up @@ -233,12 +241,12 @@ def streaming_bulk(

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and we get 429 (or retry_for_status_callback is true)
if (
max_retries
and info["status"] == 429
and (attempt + 1) <= max_retries
and retry_for_status_callback(info["status"])
):
# _process_bulk_chunk expects strings so we need to
# re-serialize the data
Expand All @@ -252,8 +260,8 @@ def streaming_bulk(
yield ok, info

except TransportError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress 429 errors (or any status which retry_for_status_callback is true for) since we will retry them
if attempt == max_retries or not retry_for_status_callback(e.status_code):
raise
else:
if not to_retry:
Expand Down