Skip to content

consumer deadlock happen after log “Heartbeat: local member_id was not recognized; this consumer needs to re-join” #1623

Closed
@zhgjun

Description

@zhgjun

I use kafka-python 1.4.4, enable_auto_commit=False. One day I found my consumer hang and cannot receive msg.
the log is:

2018-10-29T16:12:44.361+08:00 localhost WARNING [pid:5273] [MainThread][tid:65330864] [base.py:822 _handle_heartbeat_response] Heartbeat: local member_id was not recognized; this consumer needs to re-join
2018-10-29T16:12:44.371+08:00 localhost  INFO [pid:5273] [MainThread][tid:65330864] [consumer.py:341 _on_join_prepare] Revoking previously assigned partitions set([TopicPartition(topic=u'agent', partition=2), TopicPartition(topic=u'agent', partition=3), TopicPartition(topic=u'agent', partition=4), TopicPartition(topic=u'agent', partition=5), TopicPartition(topic=u'agent', partition=6), TopicPartition(topic=u'agent', partition=7), TopicPartition(topic=u'agent', partition=0), TopicPartition(topic=u'agent', partition=1)]) for group agent

and I got the stack traces:

stack 1: the heartbeat thread


/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:914 in run
    'self._run_once()'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:982 in _run_once
    'with self.coordinator._client._lock, self.coordinator._lock:'

/usr/lib64/python2.7/threading.py:285 in __enter__
    'return self.__lock.__enter__()'

/usr/lib64/python2.7/threading.py:173 in acquire
    'rc = self.__block.acquire(blocking)'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/semaphore.py:113 in acquire
    'hubs.get_hub().switch()'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/hubs/hub.py:294 in switch
    'return self.greenlet.switch()'

stack 2: poll msg thread

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/consumer/group.py:609 in poll
    'records = self._poll_once(remaining, max_records)'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/consumer/group.py:629 in _poll_once
    'self._coordinator.poll()'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/consumer.py:281 in poll
    'self.ensure_active_group()'  **(ps: it will get the self.coordinator._lock )**

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:373 in ensure_active_group
    'while not self.coordinator_unknown():'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:227 in coordinator_unknown
    'return self.coordinator() is None'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/coordinator/base.py:236 in coordinator
    'elif self._client.is_disconnected(self.coordinator_id):'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/client_async.py:433 in is_disconnected
    'with self._lock:' **(ps: self._lock is self.coordinator._client._lock too)**

/usr/lib64/python2.7/threading.py:173 in acquire
    'rc = self.__block.acquire(blocking)'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/semaphore.py:113 in acquire
    'hubs.get_hub().switch()'

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/eventlet/hubs/hub.py:294 in switch
    'return self.greenlet.switch()'

I think the reason maybe :

1.when stack 2 get the self.coordinator._lock first

def ensure_active_group(self):
        """Ensure that the group is active (i.e. joined and synced)"""
        with self._lock:
            if self._heartbeat_thread is None:
                self._start_heartbeat_thread()

2.and then it will acquire the self.coordinator._client._lock

/opt/cloud/services/network-agent/venv/lib/python2.7/site-packages/kafka/client_async.py:433 in is_disconnected
    'with self._lock:'

3.but at this time stack 1 got self.coordinator._client._lock and acquire self.coordinator._lock

with self.coordinator._client._lock, self.coordinator._lock:

4.and finally dead lock happen.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions