Skip to content

Commit fdf9b22

Browse files
authored
Add inner_timeout_ms handler to fetcher; add fallback (#2529)
1 parent a621bdf commit fdf9b22

File tree

2 files changed

+38
-26
lines changed

2 files changed

+38
-26
lines changed

kafka/consumer/fetcher.py

+22-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from __future__ import absolute_import
1+
from __future__ import absolute_import, division
22

33
import collections
44
import copy
@@ -246,7 +246,7 @@ def _reset_offset(self, partition):
246246
else:
247247
log.debug("Could not find offset for partition %s since it is probably deleted" % (partition,))
248248

249-
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
249+
def _retrieve_offsets(self, timestamps, timeout_ms=None):
250250
"""Fetch offset for each partition passed in ``timestamps`` map.
251251
252252
Blocks until offsets are obtained, a non-retriable exception is raised
@@ -266,29 +266,38 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
266266
if not timestamps:
267267
return {}
268268

269-
start_time = time.time()
270-
remaining_ms = timeout_ms
269+
elapsed = 0.0 # noqa: F841
270+
begin = time.time()
271+
def inner_timeout_ms(fallback=None):
272+
if timeout_ms is None:
273+
return fallback
274+
elapsed = (time.time() - begin) * 1000
275+
if elapsed >= timeout_ms:
276+
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
277+
ret = max(0, timeout_ms - elapsed)
278+
if fallback is not None:
279+
return min(ret, fallback)
280+
return ret
281+
271282
timestamps = copy.copy(timestamps)
272-
while remaining_ms > 0:
283+
while True:
273284
if not timestamps:
274285
return {}
275286

276287
future = self._send_list_offsets_requests(timestamps)
277-
self._client.poll(future=future, timeout_ms=remaining_ms)
288+
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
278289

279290
if future.succeeded():
280291
return future.value
281292
if not future.retriable():
282293
raise future.exception # pylint: disable-msg=raising-bad-type
283294

284-
elapsed_ms = (time.time() - start_time) * 1000
285-
remaining_ms = timeout_ms - elapsed_ms
286-
if remaining_ms < 0:
287-
break
288-
289295
if future.exception.invalid_metadata:
290296
refresh_future = self._client.cluster.request_update()
291-
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
297+
self._client.poll(future=refresh_future, timeout_ms=inner_timeout_ms())
298+
299+
if not future.is_done:
300+
break
292301

293302
# Issue #1780
294303
# Recheck partition existence after after a successful metadata refresh
@@ -299,10 +308,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
299308
log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, ))
300309
timestamps.pop(unknown_partition)
301310
else:
302-
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
303-
304-
elapsed_ms = (time.time() - start_time) * 1000
305-
remaining_ms = timeout_ms - elapsed_ms
311+
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
306312

307313
raise Errors.KafkaTimeoutError(
308314
"Failed to get offsets by timestamps in %s ms" % (timeout_ms,))

kafka/coordinator/base.py

+16-10
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,16 @@ def ensure_coordinator_ready(self, timeout_ms=None):
245245
"""
246246
elapsed = 0.0 # noqa: F841
247247
begin = time.time()
248-
def inner_timeout_ms():
248+
def inner_timeout_ms(fallback=None):
249249
if timeout_ms is None:
250-
return None
250+
return fallback
251251
elapsed = (time.time() - begin) * 1000
252252
if elapsed >= timeout_ms:
253253
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
254-
return max(0, timeout_ms - elapsed)
254+
ret = max(0, timeout_ms - elapsed)
255+
if fallback is not None:
256+
return min(ret, fallback)
257+
return ret
255258

256259
with self._client._lock, self._lock:
257260
while self.coordinator_unknown():
@@ -275,7 +278,7 @@ def inner_timeout_ms():
275278
metadata_update = self._client.cluster.request_update()
276279
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
277280
else:
278-
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
281+
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
279282
else:
280283
raise future.exception # pylint: disable-msg=raising-bad-type
281284

@@ -369,13 +372,16 @@ def ensure_active_group(self, timeout_ms=None):
369372

370373
elapsed = 0.0 # noqa: F841
371374
begin = time.time()
372-
def inner_timeout_ms():
375+
def inner_timeout_ms(fallback=None):
373376
if timeout_ms is None:
374-
return None
377+
return fallback
375378
elapsed = (time.time() - begin) * 1000
376379
if elapsed >= timeout_ms:
377-
raise Errors.KafkaTimeoutError()
378-
return max(0, timeout_ms - elapsed)
380+
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
381+
ret = max(0, timeout_ms - elapsed)
382+
if fallback is not None:
383+
return min(ret, fallback)
384+
return ret
379385

380386
while self.need_rejoin() or self._rejoin_incomplete():
381387
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
@@ -399,7 +405,7 @@ def inner_timeout_ms():
399405
while not self.coordinator_unknown():
400406
if not self._client.in_flight_request_count(self.coordinator_id):
401407
break
402-
self._client.poll(timeout_ms=min(200, inner_timeout_ms()))
408+
self._client.poll(timeout_ms=inner_timeout_ms(200))
403409
else:
404410
continue
405411

@@ -451,7 +457,7 @@ def inner_timeout_ms():
451457
continue
452458
elif not future.retriable():
453459
raise exception # pylint: disable-msg=raising-bad-type
454-
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
460+
time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000)
455461

456462
def _rejoin_incomplete(self):
457463
return self.join_future is not None

0 commit comments

Comments
 (0)