Skip to content

Commit a4f0cb8

Browse files
authored
Improve KafkaConsumer join group / only enable Heartbeat Thread during stable group (#1695)
1 parent 34fcb11 commit a4f0cb8

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

kafka/coordinator/base.py

+23-11
Original file line numberDiff line numberDiff line change
@@ -331,18 +331,13 @@ def _handle_join_success(self, member_assignment_bytes):
331331
with self._lock:
332332
log.info("Successfully joined group %s with generation %s",
333333
self.group_id, self._generation.generation_id)
334-
self.join_future = None
335334
self.state = MemberState.STABLE
336-
self.rejoining = False
337-
self._heartbeat_thread.enable()
338-
self._on_join_complete(self._generation.generation_id,
339-
self._generation.member_id,
340-
self._generation.protocol,
341-
member_assignment_bytes)
335+
self.rejoin_needed = False
336+
if self._heartbeat_thread:
337+
self._heartbeat_thread.enable()
342338

343339
def _handle_join_failure(self, _):
344340
with self._lock:
345-
self.join_future = None
346341
self.state = MemberState.UNJOINED
347342

348343
def ensure_active_group(self):
@@ -351,7 +346,7 @@ def ensure_active_group(self):
351346
if self._heartbeat_thread is None:
352347
self._start_heartbeat_thread()
353348

354-
while self.need_rejoin():
349+
while self.need_rejoin() or self._rejoin_incomplete():
355350
self.ensure_coordinator_ready()
356351

357352
# call on_join_prepare if needed. We set a flag
@@ -382,6 +377,12 @@ def ensure_active_group(self):
382377
# This ensures that we do not mistakenly attempt to rejoin
383378
# before the pending rebalance has completed.
384379
if self.join_future is None:
380+
# Fence off the heartbeat thread explicitly so that it cannot
381+
# interfere with the join group. Note that this must come after
382+
# the call to _on_join_prepare since we must be able to continue
383+
# sending heartbeats if that callback takes some time.
384+
self._heartbeat_thread.disable()
385+
385386
self.state = MemberState.REBALANCING
386387
future = self._send_join_group_request()
387388

@@ -402,7 +403,16 @@ def ensure_active_group(self):
402403

403404
self._client.poll(future=future)
404405

405-
if future.failed():
406+
if future.succeeded():
407+
self._on_join_complete(self._generation.generation_id,
408+
self._generation.member_id,
409+
self._generation.protocol,
410+
future.value)
411+
self.join_future = None
412+
self.rejoining = False
413+
414+
else:
415+
self.join_future = None
406416
exception = future.exception
407417
if isinstance(exception, (Errors.UnknownMemberIdError,
408418
Errors.RebalanceInProgressError,
@@ -412,6 +422,9 @@ def ensure_active_group(self):
412422
raise exception # pylint: disable-msg=raising-bad-type
413423
time.sleep(self.config['retry_backoff_ms'] / 1000)
414424

425+
def _rejoin_incomplete(self):
426+
return self.join_future is not None
427+
415428
def _send_join_group_request(self):
416429
"""Join the group and return the assignment for the next generation.
417430
@@ -497,7 +510,6 @@ def _handle_join_group_response(self, future, send_time, response):
497510
self._generation = Generation(response.generation_id,
498511
response.member_id,
499512
response.group_protocol)
500-
self.rejoin_needed = False
501513

502514
if response.leader_id == response.member_id:
503515
log.info("Elected group leader -- performing partition"

0 commit comments

Comments
 (0)