Skip to content

An exception occurs when the ConsumerCoordinator object is being deleted #2409

Closed
@tatnguyennguyen

Description

@tatnguyennguyen

Hi, I am using kafka-python version 2.0.2 on Ubuntu 22.04 with Python 3.10
My application use multithreading and sometimes I saw this exception

Exception ignored in: <function ConsumerCoordinator.__del__ at 0x7fc077562ef0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/consumer.py", line 132, in __del__
    super(ConsumerCoordinator, self).__del__()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 756, in __del__
    self._close_heartbeat_thread()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 750, in _close_heartbeat_thread
    self._heartbeat_thread.close()
  File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 927, in close
    self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
  File "/usr/lib/python3.10/threading.py", line 1093, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

I found someone also have this problem #663 (comment)

To debug this error, I tried to add some logging just before line 927 in base.py

log.info(f'Current thread id: {threading.get_ident()}')
log.info(f'HeartbeatThread thread id: {self.ident}')

When exception happen, thread identifier of current thread is the same as thread identifier of the HeartbeatThread object. Other times, when exception don't happend, the log print two different identifiers.

So I think error happen because at some point, the garbage collector was run on the same thread managed by HeartbeatThread object. When garbage collector call __del__ method of ConsumerCoordinator, it eventually leads to calling self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) inside close() method of HeartbeatThread and cause exception

To avoid this error, maybe we should avoid calling self.join inside HeartbeatThread?. We could do this by wrapping HeartbeatThread inside another class or something?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions