Closed
Description
I use kafka-python 1.4.4, enable_auto_commit=False. One day I found my consumer hang and cannot receive msg.
the log is:
2018-10-29T16:12:44.361+08:00 localhost WARNING [pid:5273] [MainThread][tid:65330864] [base.py:822 _handle_heartbeat_response] Heartbeat: local member_id was not recognized; this consumer needs to re-join
2018-10-29T16:12:44.371+08:00 localhost INFO [pid:5273] [MainThread][tid:65330864] [consumer.py:341 _on_join_prepare] Revoking previously assigned partitions set([TopicPartition(topic=u'agent', partition=2), TopicPartition(topic=u'agent', partition=3), TopicPartition(topic=u'agent', partition=4), TopicPartition(topic=u'agent', partition=5), TopicPartition(topic=u'agent', partition=6), TopicPartition(topic=u'agent', partition=7), TopicPartition(topic=u'agent', partition=0), TopicPartition(topic=u'agent', partition=1)]) for group agent
and I got the stack traces:
stack 1: the heartbeat thread
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:914 in run
'self._run_once()'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:982 in _run_once
'with self.coordinator._client._lock, self.coordinator._lock:'
/usr/lib64/python2.7/threading.py:285 in __enter__
'return self.__lock.__enter__()'
/usr/lib64/python2.7/threading.py:173 in acquire
'rc = self.__block.acquire(blocking)'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/semaphore.py:113 in acquire
'hubs.get_hub().switch()'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/hubs/hub.py:294 in switch
'return self.greenlet.switch()'
stack 2: poll msg thread
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/consumer/group.py:609 in poll
'records = self._poll_once(remaining, max_records)'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/consumer/group.py:629 in _poll_once
'self._coordinator.poll()'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/consumer.py:281 in poll
'self.ensure_active_group()' **(ps: it will get the self.coordinator._lock )**
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:373 in ensure_active_group
'while not self.coordinator_unknown():'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:227 in coordinator_unknown
'return self.coordinator() is None'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:236 in coordinator
'elif self._client.is_disconnected(self.coordinator_id):'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/client_async.py:433 in is_disconnected
'with self._lock:' **(ps: self._lock is self.coordinator._client._lock too)**
/usr/lib64/python2.7/threading.py:173 in acquire
'rc = self.__block.acquire(blocking)'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/semaphore.py:113 in acquire
'hubs.get_hub().switch()'
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/hubs/hub.py:294 in switch
'return self.greenlet.switch()'
I think the reason maybe :
1.when stack 2 get the self.coordinator._lock first
def ensure_active_group(self):
"""Ensure that the group is active (i.e. joined and synced)"""
with self._lock:
if self._heartbeat_thread is None:
self._start_heartbeat_thread()
2.and then it will acquire the self.coordinator._client._lock
/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/client_async.py:433 in is_disconnected
'with self._lock:'
3.but at this time stack 1 got self.coordinator._client._lock and acquire self.coordinator._lock
with self.coordinator._client._lock, self.coordinator._lock:
4.and finally dead lock happen.