Skip to content

Commit 36287cb

Browse files
authored
Timeout coordinator poll / ensure_coordinator_ready / ensure_active_group (#2526)
1 parent efb554d commit 36287cb

File tree

3 files changed

+95
-44
lines changed

3 files changed

+95
-44
lines changed

kafka/consumer/group.py

+11-6
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
694694
Returns:
695695
dict: Map of topic to list of records (may be empty).
696696
"""
697-
self._coordinator.poll()
697+
begin = time.time()
698+
self._coordinator.poll(timeout_ms=timeout_ms)
698699

699700
# Fetch positions if we have partitions we're subscribed to that we
700701
# don't know the offset for
@@ -720,7 +721,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
720721
if len(futures):
721722
self._client.poll(timeout_ms=0)
722723

723-
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000)
724+
timeout_ms -= (time.time() - begin) * 1000
725+
timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000))
724726
self._client.poll(timeout_ms=timeout_ms)
725727
# after the long poll, we should check whether the group needs to rebalance
726728
# prior to returning data so that the group can stabilize faster
@@ -1134,7 +1136,7 @@ def _update_fetch_positions(self, partitions):
11341136
self._fetcher.update_fetch_positions(partitions)
11351137

11361138
def _message_generator_v2(self):
1137-
timeout_ms = 1000 * (self._consumer_timeout - time.time())
1139+
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
11381140
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
11391141
for tp, records in six.iteritems(record_map):
11401142
# Generators are stateful, and it is possible that the tp / records
@@ -1154,17 +1156,20 @@ def _message_generator_v2(self):
11541156

11551157
def _message_generator(self):
11561158
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
1159+
1160+
def inner_poll_ms():
1161+
return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']))
1162+
11571163
while time.time() < self._consumer_timeout:
11581164

1159-
self._coordinator.poll()
1165+
self._coordinator.poll(timeout_ms=inner_poll_ms())
11601166

11611167
# Fetch offsets for any subscribed partitions that we arent tracking yet
11621168
if not self._subscription.has_all_fetch_positions():
11631169
partitions = self._subscription.missing_fetch_positions()
11641170
self._update_fetch_positions(partitions)
11651171

1166-
poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
1167-
self._client.poll(timeout_ms=poll_ms)
1172+
self._client.poll(timeout_ms=inner_poll_ms())
11681173

11691174
# after the long poll, we should check whether the group needs to rebalance
11701175
# prior to returning data so that the group can stabilize faster

kafka/coordinator/base.py

+44-12
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,25 @@ def coordinator(self):
234234
else:
235235
return self.coordinator_id
236236

237-
def ensure_coordinator_ready(self):
238-
"""Block until the coordinator for this group is known
239-
(and we have an active connection -- java client uses unsent queue).
237+
def ensure_coordinator_ready(self, timeout_ms=None):
238+
"""Block until the coordinator for this group is known.
239+
240+
Keyword Arguments:
241+
timeout_ms (numeric, optional): Maximum number of milliseconds to
242+
block waiting to find coordinator. Default: None.
243+
244+
Raises: KafkaTimeoutError if timeout_ms is not None
240245
"""
246+
elapsed = 0.0 # noqa: F841
247+
begin = time.time()
248+
def inner_timeout_ms():
249+
if timeout_ms is None:
250+
return None
251+
elapsed = (time.time() - begin) * 1000
252+
if elapsed >= timeout_ms:
253+
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
254+
return max(0, timeout_ms - elapsed)
255+
241256
with self._client._lock, self._lock:
242257
while self.coordinator_unknown():
243258

@@ -251,16 +266,16 @@ def ensure_coordinator_ready(self):
251266
continue
252267

253268
future = self.lookup_coordinator()
254-
self._client.poll(future=future)
269+
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
255270

256271
if future.failed():
257272
if future.retriable():
258273
if getattr(future.exception, 'invalid_metadata', False):
259274
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
260275
metadata_update = self._client.cluster.request_update()
261-
self._client.poll(future=metadata_update)
276+
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
262277
else:
263-
time.sleep(self.config['retry_backoff_ms'] / 1000)
278+
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
264279
else:
265280
raise future.exception # pylint: disable-msg=raising-bad-type
266281

@@ -339,14 +354,31 @@ def _handle_join_failure(self, _):
339354
with self._lock:
340355
self.state = MemberState.UNJOINED
341356

342-
def ensure_active_group(self):
343-
"""Ensure that the group is active (i.e. joined and synced)"""
357+
def ensure_active_group(self, timeout_ms=None):
358+
"""Ensure that the group is active (i.e. joined and synced)
359+
360+
Keyword Arguments:
361+
timeout_ms (numeric, optional): Maximum number of milliseconds to
362+
block waiting to join group. Default: None.
363+
364+
Raises: KafkaTimeoutError if timeout_ms is not None
365+
"""
344366
with self._client._lock, self._lock:
345367
if self._heartbeat_thread is None:
346368
self._start_heartbeat_thread()
347369

370+
elapsed = 0.0 # noqa: F841
371+
begin = time.time()
372+
def inner_timeout_ms():
373+
if timeout_ms is None:
374+
return None
375+
elapsed = (time.time() - begin) * 1000
376+
if elapsed >= timeout_ms:
377+
raise Errors.KafkaTimeoutError()
378+
return max(0, timeout_ms - elapsed)
379+
348380
while self.need_rejoin() or self._rejoin_incomplete():
349-
self.ensure_coordinator_ready()
381+
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
350382

351383
# call on_join_prepare if needed. We set a flag
352384
# to make sure that we do not call it a second
@@ -367,7 +399,7 @@ def ensure_active_group(self):
367399
while not self.coordinator_unknown():
368400
if not self._client.in_flight_request_count(self.coordinator_id):
369401
break
370-
self._client.poll(timeout_ms=200)
402+
self._client.poll(timeout_ms=min(200, inner_timeout_ms()))
371403
else:
372404
continue
373405

@@ -400,7 +432,7 @@ def ensure_active_group(self):
400432
else:
401433
future = self.join_future
402434

403-
self._client.poll(future=future)
435+
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
404436

405437
if future.succeeded():
406438
self._on_join_complete(self._generation.generation_id,
@@ -419,7 +451,7 @@ def ensure_active_group(self):
419451
continue
420452
elif not future.retriable():
421453
raise exception # pylint: disable-msg=raising-bad-type
422-
time.sleep(self.config['retry_backoff_ms'] / 1000)
454+
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
423455

424456
def _rejoin_incomplete(self):
425457
return self.join_future is not None

kafka/coordinator/consumer.py

+40-26
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ def _on_join_complete(self, generation, member_id, protocol,
258258
self._subscription.listener, self.group_id,
259259
assigned)
260260

261-
def poll(self):
261+
def poll(self, timeout_ms=None):
262262
"""
263263
Poll for coordinator events. Only applicable if group_id is set, and
264264
broker version supports GroupCoordinators. This ensures that the
@@ -269,31 +269,45 @@ def poll(self):
269269
if self.group_id is None:
270270
return
271271

272-
self._invoke_completed_offset_commit_callbacks()
273-
self.ensure_coordinator_ready()
274-
275-
if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
276-
if self.need_rejoin():
277-
# due to a race condition between the initial metadata fetch and the
278-
# initial rebalance, we need to ensure that the metadata is fresh
279-
# before joining initially, and then request the metadata update. If
280-
# metadata update arrives while the rebalance is still pending (for
281-
# example, when the join group is still inflight), then we will lose
282-
# track of the fact that we need to rebalance again to reflect the
283-
# change to the topic subscription. Without ensuring that the
284-
# metadata is fresh, any metadata update that changes the topic
285-
# subscriptions and arrives while a rebalance is in progress will
286-
# essentially be ignored. See KAFKA-3949 for the complete
287-
# description of the problem.
288-
if self._subscription.subscribed_pattern:
289-
metadata_update = self._client.cluster.request_update()
290-
self._client.poll(future=metadata_update)
291-
292-
self.ensure_active_group()
293-
294-
self.poll_heartbeat()
295-
296-
self._maybe_auto_commit_offsets_async()
272+
elapsed = 0.0 # noqa: F841
273+
begin = time.time()
274+
def inner_timeout_ms():
275+
if timeout_ms is None:
276+
return None
277+
elapsed = (time.time() - begin) * 1000
278+
if elapsed >= timeout_ms:
279+
raise Errors.KafkaTimeoutError()
280+
return max(0, timeout_ms - elapsed)
281+
282+
try:
283+
self._invoke_completed_offset_commit_callbacks()
284+
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
285+
286+
if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
287+
if self.need_rejoin():
288+
# due to a race condition between the initial metadata fetch and the
289+
# initial rebalance, we need to ensure that the metadata is fresh
290+
# before joining initially, and then request the metadata update. If
291+
# metadata update arrives while the rebalance is still pending (for
292+
# example, when the join group is still inflight), then we will lose
293+
# track of the fact that we need to rebalance again to reflect the
294+
# change to the topic subscription. Without ensuring that the
295+
# metadata is fresh, any metadata update that changes the topic
296+
# subscriptions and arrives while a rebalance is in progress will
297+
# essentially be ignored. See KAFKA-3949 for the complete
298+
# description of the problem.
299+
if self._subscription.subscribed_pattern:
300+
metadata_update = self._client.cluster.request_update()
301+
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
302+
303+
self.ensure_active_group(timeout_ms=inner_timeout_ms())
304+
305+
self.poll_heartbeat()
306+
307+
self._maybe_auto_commit_offsets_async()
308+
309+
except Errors.KafkaTimeoutError:
310+
return
297311

298312
def time_to_next_poll(self):
299313
"""Return seconds (float) remaining until :meth:`.poll` should be called again"""

0 commit comments

Comments
 (0)