Skip to content

Commit 0a2ccba

Browse files
authored
(Attempt to) Fix deadlock between consumer and heartbeat (#1628)
1 parent 9d44e3c commit 0a2ccba

File tree

2 files changed

+2
-4
lines changed

2 files changed

+2
-4
lines changed

kafka/client_async.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,9 +571,7 @@ def poll(self, timeout_ms=None, future=None):
571571

572572
self._poll(timeout)
573573

574-
# called without the lock to avoid deadlock potential
575-
# if handlers need to acquire locks
576-
responses.extend(self._fire_pending_completed_requests())
574+
responses.extend(self._fire_pending_completed_requests())
577575

578576
# If all we had was a timeout (future is None) - only do one poll
579577
# If we do have a future, we keep looping until it is done

kafka/coordinator/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def _handle_join_failure(self, _):
347347

348348
def ensure_active_group(self):
349349
"""Ensure that the group is active (i.e. joined and synced)"""
350-
with self._lock:
350+
with self._client._lock, self._lock:
351351
if self._heartbeat_thread is None:
352352
self._start_heartbeat_thread()
353353

0 commit comments

Comments
 (0)