Skip to content

Add inner_timeout_ms handler to fetcher; add fallback #2529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division

import collections
import copy
Expand Down Expand Up @@ -246,7 +246,7 @@ def _reset_offset(self, partition):
else:
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))

def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
def _retrieve_offsets(self, timestamps, timeout_ms=None):
"""Fetch offset for each partition passed in ``timestamps`` map.

Blocks until offsets are obtained, a non-retriable exception is raised
Expand All @@ -266,29 +266,38 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
if not timestamps:
return {}

start_time = time.time()
remaining_ms = timeout_ms
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

timestamps = copy.copy(timestamps)
while remaining_ms > 0:
while True:
if not timestamps:
return {}

future = self._send_list_offsets_requests(timestamps)
self._client.poll(future=future, timeout_ms=remaining_ms)
self._client.poll(future=future, timeout_ms=inner_timeout_ms())

if future.succeeded():
return future.value
if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type

elapsed_ms = (time.time() - start_time) * 1000
remaining_ms = timeout_ms - elapsed_ms
if remaining_ms < 0:
break

if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
self._client.poll(future=refresh_future, timeout_ms=inner_timeout_ms())

if not future.is_done:
break

# Issue #1780
# Recheck partition existence after after a successful metadata refresh
Expand All @@ -299,10 +308,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, ))
timestamps.pop(unknown_partition)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)

elapsed_ms = (time.time() - start_time) * 1000
remaining_ms = timeout_ms - elapsed_ms
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)

raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
Expand Down
26 changes: 16 additions & 10 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,16 @@ def ensure_coordinator_ready(self, timeout_ms=None):
"""
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return None
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
return max(0, timeout_ms - elapsed)
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

with self._client._lock, self._lock:
while self.coordinator_unknown():
Expand All @@ -275,7 +278,7 @@ def inner_timeout_ms():
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
else:
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
else:
raise future.exception # pylint: disable-msg=raising-bad-type

Expand Down Expand Up @@ -369,13 +372,16 @@ def ensure_active_group(self, timeout_ms=None):

elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
def inner_timeout_ms(fallback=None):
if timeout_ms is None:
return None
return fallback
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError()
return max(0, timeout_ms - elapsed)
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
ret = max(0, timeout_ms - elapsed)
if fallback is not None:
return min(ret, fallback)
return ret

while self.need_rejoin() or self._rejoin_incomplete():
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
Expand All @@ -399,7 +405,7 @@ def inner_timeout_ms():
while not self.coordinator_unknown():
if not self._client.in_flight_request_count(self.coordinator_id):
break
self._client.poll(timeout_ms=min(200, inner_timeout_ms()))
self._client.poll(timeout_ms=inner_timeout_ms(200))
else:
continue

Expand Down Expand Up @@ -451,7 +457,7 @@ def inner_timeout_ms():
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)

def _rejoin_incomplete(self):
return self.join_future is not None
Expand Down
Loading