Skip to content

Timeout coordinator poll / ensure_coordinator_ready / ensure_active_group #2526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
Returns:
dict: Map of topic to list of records (may be empty).
"""
self._coordinator.poll()
begin = time.time()
self._coordinator.poll(timeout_ms=timeout_ms)

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
Expand All @@ -720,7 +721,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
if len(futures):
self._client.poll(timeout_ms=0)

timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll() * 1000)
timeout_ms -= (time.time() - begin) * 1000
timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000))
self._client.poll(timeout_ms=timeout_ms)
# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
Expand Down Expand Up @@ -1134,7 +1136,7 @@ def _update_fetch_positions(self, partitions):
self._fetcher.update_fetch_positions(partitions)

def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
for tp, records in six.iteritems(record_map):
# Generators are stateful, and it is possible that the tp / records
Expand All @@ -1154,17 +1156,20 @@ def _message_generator_v2(self):

def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'

def inner_poll_ms():
return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms']))

while time.time() < self._consumer_timeout:

self._coordinator.poll()
self._coordinator.poll(timeout_ms=inner_poll_ms())

# Fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)

poll_ms = min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])
self._client.poll(timeout_ms=poll_ms)
self._client.poll(timeout_ms=inner_poll_ms())

# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
Expand Down
56 changes: 44 additions & 12 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,25 @@ def coordinator(self):
else:
return self.coordinator_id

def ensure_coordinator_ready(self):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
def ensure_coordinator_ready(self, timeout_ms=None):
"""Block until the coordinator for this group is known.

Keyword Arguments:
timeout_ms (numeric, optional): Maximum number of milliseconds to
block waiting to find coordinator. Default: None.

Raises: KafkaTimeoutError if timeout_ms is not None
"""
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
if timeout_ms is None:
return None
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator')
return max(0, timeout_ms - elapsed)

with self._client._lock, self._lock:
while self.coordinator_unknown():

Expand All @@ -251,16 +266,16 @@ def ensure_coordinator_ready(self):
continue

future = self.lookup_coordinator()
self._client.poll(future=future)
self._client.poll(future=future, timeout_ms=inner_timeout_ms())

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())
else:
time.sleep(self.config['retry_backoff_ms'] / 1000)
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)
else:
raise future.exception # pylint: disable-msg=raising-bad-type

Expand Down Expand Up @@ -339,14 +354,31 @@ def _handle_join_failure(self, _):
with self._lock:
self.state = MemberState.UNJOINED

def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
def ensure_active_group(self, timeout_ms=None):
"""Ensure that the group is active (i.e. joined and synced)

Keyword Arguments:
timeout_ms (numeric, optional): Maximum number of milliseconds to
block waiting to join group. Default: None.

Raises: KafkaTimeoutError if timeout_ms is not None
"""
with self._client._lock, self._lock:
if self._heartbeat_thread is None:
self._start_heartbeat_thread()

elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
if timeout_ms is None:
return None
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError()
return max(0, timeout_ms - elapsed)

while self.need_rejoin() or self._rejoin_incomplete():
self.ensure_coordinator_ready()
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())

# call on_join_prepare if needed. We set a flag
# to make sure that we do not call it a second
Expand All @@ -367,7 +399,7 @@ def ensure_active_group(self):
while not self.coordinator_unknown():
if not self._client.in_flight_request_count(self.coordinator_id):
break
self._client.poll(timeout_ms=200)
self._client.poll(timeout_ms=min(200, inner_timeout_ms()))
else:
continue

Expand Down Expand Up @@ -400,7 +432,7 @@ def ensure_active_group(self):
else:
future = self.join_future

self._client.poll(future=future)
self._client.poll(future=future, timeout_ms=inner_timeout_ms())

if future.succeeded():
self._on_join_complete(self._generation.generation_id,
Expand All @@ -419,7 +451,7 @@ def ensure_active_group(self):
continue
elif not future.retriable():
raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000)
time.sleep(min(inner_timeout_ms(), self.config['retry_backoff_ms']) / 1000)

def _rejoin_incomplete(self):
return self.join_future is not None
Expand Down
66 changes: 40 additions & 26 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def _on_join_complete(self, generation, member_id, protocol,
self._subscription.listener, self.group_id,
assigned)

def poll(self):
def poll(self, timeout_ms=None):
"""
Poll for coordinator events. Only applicable if group_id is set, and
broker version supports GroupCoordinators. This ensures that the
Expand All @@ -269,31 +269,45 @@ def poll(self):
if self.group_id is None:
return

self._invoke_completed_offset_commit_callbacks()
self.ensure_coordinator_ready()

if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
if self.need_rejoin():
# due to a race condition between the initial metadata fetch and the
# initial rebalance, we need to ensure that the metadata is fresh
# before joining initially, and then request the metadata update. If
# metadata update arrives while the rebalance is still pending (for
# example, when the join group is still inflight), then we will lose
# track of the fact that we need to rebalance again to reflect the
# change to the topic subscription. Without ensuring that the
# metadata is fresh, any metadata update that changes the topic
# subscriptions and arrives while a rebalance is in progress will
# essentially be ignored. See KAFKA-3949 for the complete
# description of the problem.
if self._subscription.subscribed_pattern:
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)

self.ensure_active_group()

self.poll_heartbeat()

self._maybe_auto_commit_offsets_async()
elapsed = 0.0 # noqa: F841
begin = time.time()
def inner_timeout_ms():
if timeout_ms is None:
return None
elapsed = (time.time() - begin) * 1000
if elapsed >= timeout_ms:
raise Errors.KafkaTimeoutError()
return max(0, timeout_ms - elapsed)

try:
self._invoke_completed_offset_commit_callbacks()
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())

if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
if self.need_rejoin():
# due to a race condition between the initial metadata fetch and the
# initial rebalance, we need to ensure that the metadata is fresh
# before joining initially, and then request the metadata update. If
# metadata update arrives while the rebalance is still pending (for
# example, when the join group is still inflight), then we will lose
# track of the fact that we need to rebalance again to reflect the
# change to the topic subscription. Without ensuring that the
# metadata is fresh, any metadata update that changes the topic
# subscriptions and arrives while a rebalance is in progress will
# essentially be ignored. See KAFKA-3949 for the complete
# description of the problem.
if self._subscription.subscribed_pattern:
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms())

self.ensure_active_group(timeout_ms=inner_timeout_ms())

self.poll_heartbeat()

self._maybe_auto_commit_offsets_async()

except Errors.KafkaTimeoutError:
return

def time_to_next_poll(self):
"""Return seconds (float) remaining until :meth:`.poll` should be called again"""
Expand Down